1import socket 2import selectors 3import threading 4import contextlib 5 6from test import support 7from test.support import socket_helper, warnings_helper 8import unittest 9 10support.requires_working_socket(module=True) 11 12telnetlib = warnings_helper.import_deprecated('telnetlib') 13 14HOST = socket_helper.HOST 15 16def server(evt, serv): 17 serv.listen() 18 evt.set() 19 try: 20 conn, addr = serv.accept() 21 conn.close() 22 except TimeoutError: 23 pass 24 finally: 25 serv.close() 26 27class GeneralTests(unittest.TestCase): 28 29 def setUp(self): 30 self.evt = threading.Event() 31 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 32 self.sock.settimeout(60) # Safety net. Look issue 11812 33 self.port = socket_helper.bind_port(self.sock) 34 self.thread = threading.Thread(target=server, args=(self.evt,self.sock)) 35 self.thread.daemon = True 36 self.thread.start() 37 self.evt.wait() 38 39 def tearDown(self): 40 self.thread.join() 41 del self.thread # Clear out any dangling Thread objects. 42 43 def testBasic(self): 44 # connects 45 telnet = telnetlib.Telnet(HOST, self.port) 46 telnet.sock.close() 47 48 def testContextManager(self): 49 with telnetlib.Telnet(HOST, self.port) as tn: 50 self.assertIsNotNone(tn.get_socket()) 51 self.assertIsNone(tn.get_socket()) 52 53 def testTimeoutDefault(self): 54 self.assertTrue(socket.getdefaulttimeout() is None) 55 socket.setdefaulttimeout(30) 56 try: 57 telnet = telnetlib.Telnet(HOST, self.port) 58 finally: 59 socket.setdefaulttimeout(None) 60 self.assertEqual(telnet.sock.gettimeout(), 30) 61 telnet.sock.close() 62 63 def testTimeoutNone(self): 64 # None, having other default 65 self.assertTrue(socket.getdefaulttimeout() is None) 66 socket.setdefaulttimeout(30) 67 try: 68 telnet = telnetlib.Telnet(HOST, self.port, timeout=None) 69 finally: 70 socket.setdefaulttimeout(None) 71 self.assertTrue(telnet.sock.gettimeout() is None) 72 telnet.sock.close() 73 74 def testTimeoutValue(self): 75 telnet = telnetlib.Telnet(HOST, self.port, timeout=30) 76 self.assertEqual(telnet.sock.gettimeout(), 30) 77 telnet.sock.close() 78 79 def testTimeoutOpen(self): 80 telnet = telnetlib.Telnet() 81 telnet.open(HOST, self.port, timeout=30) 82 self.assertEqual(telnet.sock.gettimeout(), 30) 83 telnet.sock.close() 84 85 def testGetters(self): 86 # Test telnet getter methods 87 telnet = telnetlib.Telnet(HOST, self.port, timeout=30) 88 t_sock = telnet.sock 89 self.assertEqual(telnet.get_socket(), t_sock) 90 self.assertEqual(telnet.fileno(), t_sock.fileno()) 91 telnet.sock.close() 92 93class SocketStub(object): 94 ''' a socket proxy that re-defines sendall() ''' 95 def __init__(self, reads=()): 96 self.reads = list(reads) # Intentionally make a copy. 97 self.writes = [] 98 self.block = False 99 def sendall(self, data): 100 self.writes.append(data) 101 def recv(self, size): 102 out = b'' 103 while self.reads and len(out) < size: 104 out += self.reads.pop(0) 105 if len(out) > size: 106 self.reads.insert(0, out[size:]) 107 out = out[:size] 108 return out 109 110class TelnetAlike(telnetlib.Telnet): 111 def fileno(self): 112 raise NotImplementedError() 113 def close(self): pass 114 def sock_avail(self): 115 return (not self.sock.block) 116 def msg(self, msg, *args): 117 with support.captured_stdout() as out: 118 telnetlib.Telnet.msg(self, msg, *args) 119 self._messages += out.getvalue() 120 return 121 122class MockSelector(selectors.BaseSelector): 123 124 def __init__(self): 125 self.keys = {} 126 127 @property 128 def resolution(self): 129 return 1e-3 130 131 def register(self, fileobj, events, data=None): 132 key = selectors.SelectorKey(fileobj, 0, events, data) 133 self.keys[fileobj] = key 134 return key 135 136 def unregister(self, fileobj): 137 return self.keys.pop(fileobj) 138 139 def select(self, timeout=None): 140 block = False 141 for fileobj in self.keys: 142 if isinstance(fileobj, TelnetAlike): 143 block = fileobj.sock.block 144 break 145 if block: 146 return [] 147 else: 148 return [(key, key.events) for key in self.keys.values()] 149 150 def get_map(self): 151 return self.keys 152 153 154@contextlib.contextmanager 155def test_socket(reads): 156 def new_conn(*ignored): 157 return SocketStub(reads) 158 try: 159 old_conn = socket.create_connection 160 socket.create_connection = new_conn 161 yield None 162 finally: 163 socket.create_connection = old_conn 164 return 165 166def test_telnet(reads=(), cls=TelnetAlike): 167 ''' return a telnetlib.Telnet object that uses a SocketStub with 168 reads queued up to be read ''' 169 for x in reads: 170 assert type(x) is bytes, x 171 with test_socket(reads): 172 telnet = cls('dummy', 0) 173 telnet._messages = '' # debuglevel output 174 return telnet 175 176class ExpectAndReadTestCase(unittest.TestCase): 177 def setUp(self): 178 self.old_selector = telnetlib._TelnetSelector 179 telnetlib._TelnetSelector = MockSelector 180 def tearDown(self): 181 telnetlib._TelnetSelector = self.old_selector 182 183class ReadTests(ExpectAndReadTestCase): 184 def test_read_until(self): 185 """ 186 read_until(expected, timeout=None) 187 test the blocking version of read_util 188 """ 189 want = [b'xxxmatchyyy'] 190 telnet = test_telnet(want) 191 data = telnet.read_until(b'match') 192 self.assertEqual(data, b'xxxmatch', msg=(telnet.cookedq, telnet.rawq, telnet.sock.reads)) 193 194 reads = [b'x' * 50, b'match', b'y' * 50] 195 expect = b''.join(reads[:-1]) 196 telnet = test_telnet(reads) 197 data = telnet.read_until(b'match') 198 self.assertEqual(data, expect) 199 200 201 def test_read_all(self): 202 """ 203 read_all() 204 Read all data until EOF; may block. 205 """ 206 reads = [b'x' * 500, b'y' * 500, b'z' * 500] 207 expect = b''.join(reads) 208 telnet = test_telnet(reads) 209 data = telnet.read_all() 210 self.assertEqual(data, expect) 211 return 212 213 def test_read_some(self): 214 """ 215 read_some() 216 Read at least one byte or EOF; may block. 217 """ 218 # test 'at least one byte' 219 telnet = test_telnet([b'x' * 500]) 220 data = telnet.read_some() 221 self.assertTrue(len(data) >= 1) 222 # test EOF 223 telnet = test_telnet() 224 data = telnet.read_some() 225 self.assertEqual(b'', data) 226 227 def _read_eager(self, func_name): 228 """ 229 read_*_eager() 230 Read all data available already queued or on the socket, 231 without blocking. 232 """ 233 want = b'x' * 100 234 telnet = test_telnet([want]) 235 func = getattr(telnet, func_name) 236 telnet.sock.block = True 237 self.assertEqual(b'', func()) 238 telnet.sock.block = False 239 data = b'' 240 while True: 241 try: 242 data += func() 243 except EOFError: 244 break 245 self.assertEqual(data, want) 246 247 def test_read_eager(self): 248 # read_eager and read_very_eager make the same guarantees 249 # (they behave differently but we only test the guarantees) 250 self._read_eager('read_eager') 251 self._read_eager('read_very_eager') 252 # NB -- we need to test the IAC block which is mentioned in the 253 # docstring but not in the module docs 254 255 def read_very_lazy(self): 256 want = b'x' * 100 257 telnet = test_telnet([want]) 258 self.assertEqual(b'', telnet.read_very_lazy()) 259 while telnet.sock.reads: 260 telnet.fill_rawq() 261 data = telnet.read_very_lazy() 262 self.assertEqual(want, data) 263 self.assertRaises(EOFError, telnet.read_very_lazy) 264 265 def test_read_lazy(self): 266 want = b'x' * 100 267 telnet = test_telnet([want]) 268 self.assertEqual(b'', telnet.read_lazy()) 269 data = b'' 270 while True: 271 try: 272 read_data = telnet.read_lazy() 273 data += read_data 274 if not read_data: 275 telnet.fill_rawq() 276 except EOFError: 277 break 278 self.assertTrue(want.startswith(data)) 279 self.assertEqual(data, want) 280 281class nego_collector(object): 282 def __init__(self, sb_getter=None): 283 self.seen = b'' 284 self.sb_getter = sb_getter 285 self.sb_seen = b'' 286 287 def do_nego(self, sock, cmd, opt): 288 self.seen += cmd + opt 289 if cmd == tl.SE and self.sb_getter: 290 sb_data = self.sb_getter() 291 self.sb_seen += sb_data 292 293tl = telnetlib 294 295class WriteTests(unittest.TestCase): 296 '''The only thing that write does is replace each tl.IAC for 297 tl.IAC+tl.IAC''' 298 299 def test_write(self): 300 data_sample = [b'data sample without IAC', 301 b'data sample with' + tl.IAC + b' one IAC', 302 b'a few' + tl.IAC + tl.IAC + b' iacs' + tl.IAC, 303 tl.IAC, 304 b''] 305 for data in data_sample: 306 telnet = test_telnet() 307 telnet.write(data) 308 written = b''.join(telnet.sock.writes) 309 self.assertEqual(data.replace(tl.IAC,tl.IAC+tl.IAC), written) 310 311class OptionTests(unittest.TestCase): 312 # RFC 854 commands 313 cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP] 314 315 def _test_command(self, data): 316 """ helper for testing IAC + cmd """ 317 telnet = test_telnet(data) 318 data_len = len(b''.join(data)) 319 nego = nego_collector() 320 telnet.set_option_negotiation_callback(nego.do_nego) 321 txt = telnet.read_all() 322 cmd = nego.seen 323 self.assertTrue(len(cmd) > 0) # we expect at least one command 324 self.assertIn(cmd[:1], self.cmds) 325 self.assertEqual(cmd[1:2], tl.NOOPT) 326 self.assertEqual(data_len, len(txt + cmd)) 327 nego.sb_getter = None # break the nego => telnet cycle 328 329 def test_IAC_commands(self): 330 for cmd in self.cmds: 331 self._test_command([tl.IAC, cmd]) 332 self._test_command([b'x' * 100, tl.IAC, cmd, b'y'*100]) 333 self._test_command([b'x' * 10, tl.IAC, cmd, b'y'*10]) 334 # all at once 335 self._test_command([tl.IAC + cmd for (cmd) in self.cmds]) 336 337 def test_SB_commands(self): 338 # RFC 855, subnegotiations portion 339 send = [tl.IAC + tl.SB + tl.IAC + tl.SE, 340 tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE, 341 tl.IAC + tl.SB + tl.IAC + tl.IAC + b'aa' + tl.IAC + tl.SE, 342 tl.IAC + tl.SB + b'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE, 343 tl.IAC + tl.SB + b'cc' + tl.IAC + tl.IAC + b'dd' + tl.IAC + tl.SE, 344 ] 345 telnet = test_telnet(send) 346 nego = nego_collector(telnet.read_sb_data) 347 telnet.set_option_negotiation_callback(nego.do_nego) 348 txt = telnet.read_all() 349 self.assertEqual(txt, b'') 350 want_sb_data = tl.IAC + tl.IAC + b'aabb' + tl.IAC + b'cc' + tl.IAC + b'dd' 351 self.assertEqual(nego.sb_seen, want_sb_data) 352 self.assertEqual(b'', telnet.read_sb_data()) 353 nego.sb_getter = None # break the nego => telnet cycle 354 355 def test_debuglevel_reads(self): 356 # test all the various places that self.msg(...) is called 357 given_a_expect_b = [ 358 # Telnet.fill_rawq 359 (b'a', ": recv b''\n"), 360 # Telnet.process_rawq 361 (tl.IAC + bytes([88]), ": IAC 88 not recognized\n"), 362 (tl.IAC + tl.DO + bytes([1]), ": IAC DO 1\n"), 363 (tl.IAC + tl.DONT + bytes([1]), ": IAC DONT 1\n"), 364 (tl.IAC + tl.WILL + bytes([1]), ": IAC WILL 1\n"), 365 (tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"), 366 ] 367 for a, b in given_a_expect_b: 368 telnet = test_telnet([a]) 369 telnet.set_debuglevel(1) 370 txt = telnet.read_all() 371 self.assertIn(b, telnet._messages) 372 return 373 374 def test_debuglevel_write(self): 375 telnet = test_telnet() 376 telnet.set_debuglevel(1) 377 telnet.write(b'xxx') 378 expected = "send b'xxx'\n" 379 self.assertIn(expected, telnet._messages) 380 381 def test_debug_accepts_str_port(self): 382 # Issue 10695 383 with test_socket([]): 384 telnet = TelnetAlike('dummy', '0') 385 telnet._messages = '' 386 telnet.set_debuglevel(1) 387 telnet.msg('test') 388 self.assertRegex(telnet._messages, r'0.*test') 389 390 391class ExpectTests(ExpectAndReadTestCase): 392 def test_expect(self): 393 """ 394 expect(expected, [timeout]) 395 Read until the expected string has been seen, or a timeout is 396 hit (default is no timeout); may block. 397 """ 398 want = [b'x' * 10, b'match', b'y' * 10] 399 telnet = test_telnet(want) 400 (_,_,data) = telnet.expect([b'match']) 401 self.assertEqual(data, b''.join(want[:-1])) 402 403 404if __name__ == '__main__': 405 unittest.main() 406