1 | # -*- coding: utf-8 -*- |
---|
2 | # |
---|
3 | # Copyright (C) 2005-2007 Christopher Lenz <cmlenz@gmx.de> |
---|
4 | # Copyright (C) 2007-2010 Edgewall Software |
---|
5 | # All rights reserved. |
---|
6 | # |
---|
7 | # This software is licensed as described in the file COPYING, which |
---|
8 | # you should have received as part of this distribution. The terms |
---|
9 | # are also available at http://bitten.edgewall.org/wiki/License. |
---|
10 | |
---|
11 | import os |
---|
12 | import sys |
---|
13 | import shutil |
---|
14 | import tempfile |
---|
15 | import unittest |
---|
16 | |
---|
17 | from bitten.slave import BuildSlave, ExitSlave |
---|
18 | from bitten.util import xmlio |
---|
19 | from bitten.slave import encode_multipart_formdata |
---|
20 | |
---|
21 | class DummyResponse(object): |
---|
22 | def __init__(self, code): |
---|
23 | self.code = code |
---|
24 | |
---|
25 | class TestSlave(BuildSlave): |
---|
26 | |
---|
27 | def __init__(self, filename, work_dir): |
---|
28 | BuildSlave.__init__(self, [filename], work_dir=work_dir) |
---|
29 | self.results = [] |
---|
30 | |
---|
31 | def _gather(self, method, url, body=None, headers=None): |
---|
32 | self.results.append(xmlio.parse(body)) |
---|
33 | return DummyResponse(201) |
---|
34 | |
---|
35 | def _execute_step(self, _build_url, recipe, step): |
---|
36 | old_local, old_request = self.local, self.request |
---|
37 | try: |
---|
38 | self.local, self.request = False, self._gather |
---|
39 | return BuildSlave._execute_step(self, 'dummy_build', recipe, step) |
---|
40 | finally: |
---|
41 | self.local, self.request = old_local, old_request |
---|
42 | |
---|
43 | class BuildSlaveTestCase(unittest.TestCase): |
---|
44 | |
---|
45 | def setUp(self): |
---|
46 | self.work_dir = tempfile.mkdtemp(prefix='bitten_test') |
---|
47 | self.python_path = xmlio._escape_attr(sys.executable) |
---|
48 | |
---|
49 | def tearDown(self): |
---|
50 | shutil.rmtree(self.work_dir) |
---|
51 | |
---|
52 | def _create_file(self, *path): |
---|
53 | filename = os.path.join(self.work_dir, *path) |
---|
54 | fd = file(filename, 'w') |
---|
55 | fd.close() |
---|
56 | return filename |
---|
57 | |
---|
58 | def _run_slave(self, recipe): |
---|
59 | results = [] |
---|
60 | filename = self._create_file("recipe.xml") |
---|
61 | recipe_file = file(filename, "wb") |
---|
62 | recipe_file.write(recipe) |
---|
63 | recipe_file.close() |
---|
64 | slave = TestSlave(filename, self.work_dir) |
---|
65 | slave.run() |
---|
66 | return slave.results |
---|
67 | |
---|
68 | def test_quit_raises(self): |
---|
69 | self.slave = BuildSlave([], work_dir=self.work_dir) |
---|
70 | self.assertRaises(ExitSlave, self.slave.quit) |
---|
71 | |
---|
72 | def test_simple_recipe(self): |
---|
73 | results = self._run_slave(""" |
---|
74 | <build xmlns:sh="http://bitten.edgewall.org/tools/sh" |
---|
75 | > |
---|
76 | <step id="print"> |
---|
77 | <sh:exec executable="%s" args='-c "print (\\"Hello\\")"' /> |
---|
78 | </step> |
---|
79 | </build>""" % self.python_path) |
---|
80 | |
---|
81 | result = results[0] |
---|
82 | self.assertEqual(result.attr["step"], "print") |
---|
83 | self.assertEqual(result.attr["status"], "success") |
---|
84 | log = list(result)[0] |
---|
85 | msg = list(log)[0] |
---|
86 | self.assertEqual(str(msg), '<message level="info">Hello</message>') |
---|
87 | |
---|
88 | def test_non_utf8(self): |
---|
89 | results = self._run_slave(""" |
---|
90 | <build xmlns:sh="http://bitten.edgewall.org/tools/sh" |
---|
91 | > |
---|
92 | <step id="print"> |
---|
93 | <sh:exec executable="%s" args='-c "print (\\"\\xe9\\")"' /> |
---|
94 | </step> |
---|
95 | </build>""" % self.python_path) |
---|
96 | |
---|
97 | result = results[0] |
---|
98 | self.assertEqual(result.attr["step"], "print") |
---|
99 | self.assertEqual(result.attr["status"], "success") |
---|
100 | log = list(result)[0] |
---|
101 | msg = list(log)[0] |
---|
102 | # check replacement character (\uFFFD) was generated correctly |
---|
103 | self.assertEqual(str(msg).decode("utf-8"), |
---|
104 | u'<message level="info">\uFFFD</message>') |
---|
105 | |
---|
106 | class MultiPartEncodeTestCase(unittest.TestCase): |
---|
107 | |
---|
108 | def setUp(self): |
---|
109 | self.work_dir = tempfile.mkdtemp(prefix='bitten_test') |
---|
110 | |
---|
111 | def tearDown(self): |
---|
112 | shutil.rmtree(self.work_dir) |
---|
113 | |
---|
114 | def test_mutlipart_encode_one(self): |
---|
115 | fields = { |
---|
116 | 'foo': 'bar', |
---|
117 | 'foofile': ('test.txt', 'contents of foofile'), |
---|
118 | } |
---|
119 | body, content_type = encode_multipart_formdata(fields) |
---|
120 | boundary = content_type.split(';')[1].strip().split('=')[1] |
---|
121 | self.assertEquals('multipart/form-data; boundary=%s' % boundary, |
---|
122 | content_type) |
---|
123 | self.assertEquals('--%s\r\nContent-Disposition: form-data; ' \ |
---|
124 | 'name="foo"\r\n\r\nbar\r\n--%s\r\nContent-Disposition: ' \ |
---|
125 | 'form-data; name="foofile"; filename="test.txt"\r\n' \ |
---|
126 | 'Content-Type: application/octet-stream\r\n\r\n' \ |
---|
127 | 'contents of foofile\r\n--%s--\r\n' % ( |
---|
128 | boundary,boundary,boundary), body) |
---|
129 | |
---|
130 | def suite(): |
---|
131 | suite = unittest.TestSuite() |
---|
132 | suite.addTest(unittest.makeSuite(BuildSlaveTestCase, 'test')) |
---|
133 | suite.addTest(unittest.makeSuite(MultiPartEncodeTestCase, 'test')) |
---|
134 | return suite |
---|
135 | |
---|
136 | if __name__ == '__main__': |
---|
137 | unittest.main(defaultTest='suite') |
---|