1import os
2import string
3import unittest
4import shutil
5from test.support import reap_children, unix_shell
6from test.support.os_helper import TESTFN, unlink
7from test.support.warnings_helper import import_deprecated
8
9pipes = import_deprecated("pipes")
10
11
12if os.name != 'posix':
13    raise unittest.SkipTest('pipes module only works on posix')
14
15if not (unix_shell and os.path.exists(unix_shell)):
16    raise unittest.SkipTest('pipes module requires a shell')
17
18TESTFN2 = TESTFN + "2"
19
20# tr a-z A-Z is not portable, so make the ranges explicit
21s_command = 'tr %s %s' % (string.ascii_lowercase, string.ascii_uppercase)
22
23class SimplePipeTests(unittest.TestCase):
24    def tearDown(self):
25        for f in (TESTFN, TESTFN2):
26            unlink(f)
27
28    def testSimplePipe1(self):
29        if shutil.which('tr') is None:
30            self.skipTest('tr is not available')
31        t = pipes.Template()
32        t.append(s_command, pipes.STDIN_STDOUT)
33        with t.open(TESTFN, 'w') as f:
34            f.write('hello world #1')
35        with open(TESTFN) as f:
36            self.assertEqual(f.read(), 'HELLO WORLD #1')
37
38    def testSimplePipe2(self):
39        if shutil.which('tr') is None:
40            self.skipTest('tr is not available')
41        with open(TESTFN, 'w') as f:
42            f.write('hello world #2')
43        t = pipes.Template()
44        t.append(s_command + ' < $IN > $OUT', pipes.FILEIN_FILEOUT)
45        t.copy(TESTFN, TESTFN2)
46        with open(TESTFN2) as f:
47            self.assertEqual(f.read(), 'HELLO WORLD #2')
48
49    def testSimplePipe3(self):
50        if shutil.which('tr') is None:
51            self.skipTest('tr is not available')
52        with open(TESTFN, 'w') as f:
53            f.write('hello world #2')
54        t = pipes.Template()
55        t.append(s_command + ' < $IN', pipes.FILEIN_STDOUT)
56        f = t.open(TESTFN, 'r')
57        try:
58            self.assertEqual(f.read(), 'HELLO WORLD #2')
59        finally:
60            f.close()
61
62    def testEmptyPipeline1(self):
63        # copy through empty pipe
64        d = 'empty pipeline test COPY'
65        with open(TESTFN, 'w') as f:
66            f.write(d)
67        with open(TESTFN2, 'w') as f:
68            f.write('')
69        t=pipes.Template()
70        t.copy(TESTFN, TESTFN2)
71        with open(TESTFN2) as f:
72            self.assertEqual(f.read(), d)
73
74    def testEmptyPipeline2(self):
75        # read through empty pipe
76        d = 'empty pipeline test READ'
77        with open(TESTFN, 'w') as f:
78            f.write(d)
79        t=pipes.Template()
80        f = t.open(TESTFN, 'r')
81        try:
82            self.assertEqual(f.read(), d)
83        finally:
84            f.close()
85
86    def testEmptyPipeline3(self):
87        # write through empty pipe
88        d = 'empty pipeline test WRITE'
89        t = pipes.Template()
90        with t.open(TESTFN, 'w') as f:
91            f.write(d)
92        with open(TESTFN) as f:
93            self.assertEqual(f.read(), d)
94
95    def testRepr(self):
96        t = pipes.Template()
97        self.assertEqual(repr(t), "<Template instance, steps=[]>")
98        t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
99        self.assertEqual(repr(t),
100                    "<Template instance, steps=[('tr a-z A-Z', '--')]>")
101
102    def testSetDebug(self):
103        t = pipes.Template()
104        t.debug(False)
105        self.assertEqual(t.debugging, False)
106        t.debug(True)
107        self.assertEqual(t.debugging, True)
108
109    def testReadOpenSink(self):
110        # check calling open('r') on a pipe ending with
111        # a sink raises ValueError
112        t = pipes.Template()
113        t.append('boguscmd', pipes.SINK)
114        self.assertRaises(ValueError, t.open, 'bogusfile', 'r')
115
116    def testWriteOpenSource(self):
117        # check calling open('w') on a pipe ending with
118        # a source raises ValueError
119        t = pipes.Template()
120        t.prepend('boguscmd', pipes.SOURCE)
121        self.assertRaises(ValueError, t.open, 'bogusfile', 'w')
122
123    def testBadAppendOptions(self):
124        t = pipes.Template()
125
126        # try a non-string command
127        self.assertRaises(TypeError, t.append, 7, pipes.STDIN_STDOUT)
128
129        # try a type that isn't recognized
130        self.assertRaises(ValueError, t.append, 'boguscmd', 'xx')
131
132        # shouldn't be able to append a source
133        self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SOURCE)
134
135        # check appending two sinks
136        t = pipes.Template()
137        t.append('boguscmd', pipes.SINK)
138        self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SINK)
139
140        # command needing file input but with no $IN
141        t = pipes.Template()
142        self.assertRaises(ValueError, t.append, 'boguscmd $OUT',
143                           pipes.FILEIN_FILEOUT)
144        t = pipes.Template()
145        self.assertRaises(ValueError, t.append, 'boguscmd',
146                           pipes.FILEIN_STDOUT)
147
148        # command needing file output but with no $OUT
149        t = pipes.Template()
150        self.assertRaises(ValueError, t.append, 'boguscmd $IN',
151                           pipes.FILEIN_FILEOUT)
152        t = pipes.Template()
153        self.assertRaises(ValueError, t.append, 'boguscmd',
154                           pipes.STDIN_FILEOUT)
155
156
157    def testBadPrependOptions(self):
158        t = pipes.Template()
159
160        # try a non-string command
161        self.assertRaises(TypeError, t.prepend, 7, pipes.STDIN_STDOUT)
162
163        # try a type that isn't recognized
164        self.assertRaises(ValueError, t.prepend, 'tr a-z A-Z', 'xx')
165
166        # shouldn't be able to prepend a sink
167        self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SINK)
168
169        # check prepending two sources
170        t = pipes.Template()
171        t.prepend('boguscmd', pipes.SOURCE)
172        self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SOURCE)
173
174        # command needing file input but with no $IN
175        t = pipes.Template()
176        self.assertRaises(ValueError, t.prepend, 'boguscmd $OUT',
177                           pipes.FILEIN_FILEOUT)
178        t = pipes.Template()
179        self.assertRaises(ValueError, t.prepend, 'boguscmd',
180                           pipes.FILEIN_STDOUT)
181
182        # command needing file output but with no $OUT
183        t = pipes.Template()
184        self.assertRaises(ValueError, t.prepend, 'boguscmd $IN',
185                           pipes.FILEIN_FILEOUT)
186        t = pipes.Template()
187        self.assertRaises(ValueError, t.prepend, 'boguscmd',
188                           pipes.STDIN_FILEOUT)
189
190    def testBadOpenMode(self):
191        t = pipes.Template()
192        self.assertRaises(ValueError, t.open, 'bogusfile', 'x')
193
194    def testClone(self):
195        t = pipes.Template()
196        t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
197
198        u = t.clone()
199        self.assertNotEqual(id(t), id(u))
200        self.assertEqual(t.steps, u.steps)
201        self.assertNotEqual(id(t.steps), id(u.steps))
202        self.assertEqual(t.debugging, u.debugging)
203
204
205def tearDownModule():
206    reap_children()
207
208
209if __name__ == "__main__":
210    unittest.main()
211