1import os 2import string 3import unittest 4import shutil 5from test.support import reap_children, unix_shell 6from test.support.os_helper import TESTFN, unlink 7from test.support.warnings_helper import import_deprecated 8 9pipes = import_deprecated("pipes") 10 11 12if os.name != 'posix': 13 raise unittest.SkipTest('pipes module only works on posix') 14 15if not (unix_shell and os.path.exists(unix_shell)): 16 raise unittest.SkipTest('pipes module requires a shell') 17 18TESTFN2 = TESTFN + "2" 19 20# tr a-z A-Z is not portable, so make the ranges explicit 21s_command = 'tr %s %s' % (string.ascii_lowercase, string.ascii_uppercase) 22 23class SimplePipeTests(unittest.TestCase): 24 def tearDown(self): 25 for f in (TESTFN, TESTFN2): 26 unlink(f) 27 28 def testSimplePipe1(self): 29 if shutil.which('tr') is None: 30 self.skipTest('tr is not available') 31 t = pipes.Template() 32 t.append(s_command, pipes.STDIN_STDOUT) 33 with t.open(TESTFN, 'w') as f: 34 f.write('hello world #1') 35 with open(TESTFN) as f: 36 self.assertEqual(f.read(), 'HELLO WORLD #1') 37 38 def testSimplePipe2(self): 39 if shutil.which('tr') is None: 40 self.skipTest('tr is not available') 41 with open(TESTFN, 'w') as f: 42 f.write('hello world #2') 43 t = pipes.Template() 44 t.append(s_command + ' < $IN > $OUT', pipes.FILEIN_FILEOUT) 45 t.copy(TESTFN, TESTFN2) 46 with open(TESTFN2) as f: 47 self.assertEqual(f.read(), 'HELLO WORLD #2') 48 49 def testSimplePipe3(self): 50 if shutil.which('tr') is None: 51 self.skipTest('tr is not available') 52 with open(TESTFN, 'w') as f: 53 f.write('hello world #2') 54 t = pipes.Template() 55 t.append(s_command + ' < $IN', pipes.FILEIN_STDOUT) 56 f = t.open(TESTFN, 'r') 57 try: 58 self.assertEqual(f.read(), 'HELLO WORLD #2') 59 finally: 60 f.close() 61 62 def testEmptyPipeline1(self): 63 # copy through empty pipe 64 d = 'empty pipeline test COPY' 65 with open(TESTFN, 'w') as f: 66 f.write(d) 67 with open(TESTFN2, 'w') as f: 68 f.write('') 69 t=pipes.Template() 70 t.copy(TESTFN, TESTFN2) 71 with open(TESTFN2) as f: 72 self.assertEqual(f.read(), d) 73 74 def testEmptyPipeline2(self): 75 # read through empty pipe 76 d = 'empty pipeline test READ' 77 with open(TESTFN, 'w') as f: 78 f.write(d) 79 t=pipes.Template() 80 f = t.open(TESTFN, 'r') 81 try: 82 self.assertEqual(f.read(), d) 83 finally: 84 f.close() 85 86 def testEmptyPipeline3(self): 87 # write through empty pipe 88 d = 'empty pipeline test WRITE' 89 t = pipes.Template() 90 with t.open(TESTFN, 'w') as f: 91 f.write(d) 92 with open(TESTFN) as f: 93 self.assertEqual(f.read(), d) 94 95 def testRepr(self): 96 t = pipes.Template() 97 self.assertEqual(repr(t), "<Template instance, steps=[]>") 98 t.append('tr a-z A-Z', pipes.STDIN_STDOUT) 99 self.assertEqual(repr(t), 100 "<Template instance, steps=[('tr a-z A-Z', '--')]>") 101 102 def testSetDebug(self): 103 t = pipes.Template() 104 t.debug(False) 105 self.assertEqual(t.debugging, False) 106 t.debug(True) 107 self.assertEqual(t.debugging, True) 108 109 def testReadOpenSink(self): 110 # check calling open('r') on a pipe ending with 111 # a sink raises ValueError 112 t = pipes.Template() 113 t.append('boguscmd', pipes.SINK) 114 self.assertRaises(ValueError, t.open, 'bogusfile', 'r') 115 116 def testWriteOpenSource(self): 117 # check calling open('w') on a pipe ending with 118 # a source raises ValueError 119 t = pipes.Template() 120 t.prepend('boguscmd', pipes.SOURCE) 121 self.assertRaises(ValueError, t.open, 'bogusfile', 'w') 122 123 def testBadAppendOptions(self): 124 t = pipes.Template() 125 126 # try a non-string command 127 self.assertRaises(TypeError, t.append, 7, pipes.STDIN_STDOUT) 128 129 # try a type that isn't recognized 130 self.assertRaises(ValueError, t.append, 'boguscmd', 'xx') 131 132 # shouldn't be able to append a source 133 self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SOURCE) 134 135 # check appending two sinks 136 t = pipes.Template() 137 t.append('boguscmd', pipes.SINK) 138 self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SINK) 139 140 # command needing file input but with no $IN 141 t = pipes.Template() 142 self.assertRaises(ValueError, t.append, 'boguscmd $OUT', 143 pipes.FILEIN_FILEOUT) 144 t = pipes.Template() 145 self.assertRaises(ValueError, t.append, 'boguscmd', 146 pipes.FILEIN_STDOUT) 147 148 # command needing file output but with no $OUT 149 t = pipes.Template() 150 self.assertRaises(ValueError, t.append, 'boguscmd $IN', 151 pipes.FILEIN_FILEOUT) 152 t = pipes.Template() 153 self.assertRaises(ValueError, t.append, 'boguscmd', 154 pipes.STDIN_FILEOUT) 155 156 157 def testBadPrependOptions(self): 158 t = pipes.Template() 159 160 # try a non-string command 161 self.assertRaises(TypeError, t.prepend, 7, pipes.STDIN_STDOUT) 162 163 # try a type that isn't recognized 164 self.assertRaises(ValueError, t.prepend, 'tr a-z A-Z', 'xx') 165 166 # shouldn't be able to prepend a sink 167 self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SINK) 168 169 # check prepending two sources 170 t = pipes.Template() 171 t.prepend('boguscmd', pipes.SOURCE) 172 self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SOURCE) 173 174 # command needing file input but with no $IN 175 t = pipes.Template() 176 self.assertRaises(ValueError, t.prepend, 'boguscmd $OUT', 177 pipes.FILEIN_FILEOUT) 178 t = pipes.Template() 179 self.assertRaises(ValueError, t.prepend, 'boguscmd', 180 pipes.FILEIN_STDOUT) 181 182 # command needing file output but with no $OUT 183 t = pipes.Template() 184 self.assertRaises(ValueError, t.prepend, 'boguscmd $IN', 185 pipes.FILEIN_FILEOUT) 186 t = pipes.Template() 187 self.assertRaises(ValueError, t.prepend, 'boguscmd', 188 pipes.STDIN_FILEOUT) 189 190 def testBadOpenMode(self): 191 t = pipes.Template() 192 self.assertRaises(ValueError, t.open, 'bogusfile', 'x') 193 194 def testClone(self): 195 t = pipes.Template() 196 t.append('tr a-z A-Z', pipes.STDIN_STDOUT) 197 198 u = t.clone() 199 self.assertNotEqual(id(t), id(u)) 200 self.assertEqual(t.steps, u.steps) 201 self.assertNotEqual(id(t.steps), id(u.steps)) 202 self.assertEqual(t.debugging, u.debugging) 203 204 205def tearDownModule(): 206 reap_children() 207 208 209if __name__ == "__main__": 210 unittest.main() 211