1#! /usr/bin/env python 2# 3# This file is part of pySerial - Cross platform serial port support for Python 4# (C) 2001-2015 Chris Liechti <[email protected]> 5# 6# SPDX-License-Identifier: BSD-3-Clause 7"""\ 8Some tests for the serial module. 9Part of pyserial (http://pyserial.sf.net) (C)2001-2015 [email protected] 10 11Intended to be run on different platforms, to ensure portability of 12the code. 13 14For all these tests a simple hardware is required. 15Loopback HW adapter: 16Shortcut these pin pairs: 17 TX <-> RX 18 RTS <-> CTS 19 DTR <-> DSR 20 21On a 9 pole DSUB these are the pins (2-3) (4-6) (7-8) 22""" 23 24import unittest 25import threading 26import time 27import sys 28import serial 29 30# on which port should the tests be performed: 31PORT = 'loop://' 32 33# indirection via bytearray b/c bytes(range(256)) does something else in Pyhton 2.7 34bytes_0to255 = bytes(bytearray(range(256))) 35 36 37def segments(data, size=16): 38 for a in range(0, len(data), size): 39 yield data[a:a + size] 40 41 42class Test4_Nonblocking(unittest.TestCase): 43 """Test with timeouts""" 44 timeout = 0 45 46 def setUp(self): 47 self.s = serial.serial_for_url(PORT, timeout=self.timeout) 48 49 def tearDown(self): 50 self.s.close() 51 52 def test0_Messy(self): 53 """NonBlocking (timeout=0)""" 54 # this is only here to write out the message in verbose mode 55 # because Test3 and Test4 print the same messages 56 57 def test1_ReadEmpty(self): 58 """timeout: After port open, the input buffer must be empty""" 59 self.assertEqual(self.s.read(1), b'', "expected empty buffer") 60 61 def test2_Loopback(self): 62 """timeout: each sent character should return (binary test). 63 this is also a test for the binary capability of a port.""" 64 for block in segments(bytes_0to255): 65 length = len(block) 66 self.s.write(block) 67 # there might be a small delay until the character is ready (especially on win32) 68 time.sleep(0.05) 69 self.assertEqual(self.s.in_waiting, length, "expected exactly {} character for inWainting()".format(length)) 70 self.assertEqual(self.s.read(length), block) #, "expected a %r which was written before" % block) 71 self.assertEqual(self.s.read(1), b'', "expected empty buffer after all sent chars are read") 72 73 def test2_LoopbackTimeout(self): 74 """timeout: test the timeout/immediate return. 75 partial results should be returned.""" 76 self.s.write(b"HELLO") 77 time.sleep(0.1) # there might be a small delay until the character is ready (especially on win32 and rfc2217) 78 # read more characters as are available to run in the timeout 79 self.assertEqual(self.s.read(10), b'HELLO', "expected the 'HELLO' which was written before") 80 self.assertEqual(self.s.read(1), b'', "expected empty buffer after all sent chars are read") 81 82 83class Test3_Timeout(Test4_Nonblocking): 84 """Same tests as the NonBlocking ones but this time with timeout""" 85 timeout = 1 86 87 def test0_Messy(self): 88 """Blocking (timeout=1)""" 89 # this is only here to write out the message in verbose mode 90 # because Test3 and Test4 print the same messages 91 92 93class SendEvent(threading.Thread): 94 def __init__(self, serial, delay=3): 95 threading.Thread.__init__(self) 96 self.serial = serial 97 self.delay = delay 98 self.x = threading.Event() 99 self.stopped = 0 100 self.start() 101 102 def run(self): 103 time.sleep(self.delay) 104 self.x.set() 105 if not self.stopped: 106 self.serial.write(b"E") 107 self.serial.flush() 108 109 def isSet(self): 110 return self.x.isSet() 111 112 def stop(self): 113 self.stopped = 1 114 self.x.wait() 115 116 117class Test1_Forever(unittest.TestCase): 118 """Tests a port with no timeout. These tests require that a 119 character is sent after some time to stop the test, this is done 120 through the SendEvent class and the Loopback HW.""" 121 def setUp(self): 122 self.s = serial.serial_for_url(PORT, timeout=None) 123 self.event = SendEvent(self.s) 124 125 def tearDown(self): 126 self.event.stop() 127 self.s.close() 128 129 def test2_ReadEmpty(self): 130 """no timeout: after port open, the input buffer must be empty (read). 131 a character is sent after some time to terminate the test (SendEvent).""" 132 c = self.s.read(1) 133 if not (self.event.isSet() and c == b'E'): 134 self.fail("expected marker (evt={!r}, c={!r})".format(self.event.isSet(), c)) 135 136 137class Test2_Forever(unittest.TestCase): 138 """Tests a port with no timeout""" 139 def setUp(self): 140 self.s = serial.serial_for_url(PORT, timeout=None) 141 142 def tearDown(self): 143 self.s.close() 144 145 def test1_inWaitingEmpty(self): 146 """no timeout: after port open, the input buffer must be empty (in_waiting)""" 147 self.assertEqual(self.s.in_waiting, 0, "expected empty buffer") 148 149 def test2_Loopback(self): 150 """no timeout: each sent character should return (binary test). 151 this is also a test for the binary capability of a port.""" 152 for block in segments(bytes_0to255): 153 length = len(block) 154 self.s.write(block) 155 # there might be a small delay until the character is ready (especially on win32 and rfc2217) 156 time.sleep(0.05) 157 self.assertEqual(self.s.in_waiting, length) #, "expected exactly %d character for inWainting()" % length) 158 self.assertEqual(self.s.read(length), block) #, "expected %r which was written before" % block) 159 self.assertEqual(self.s.in_waiting, 0, "expected empty buffer after all sent chars are read") 160 161 162class Test0_DataWires(unittest.TestCase): 163 """Test modem control lines""" 164 def setUp(self): 165 self.s = serial.serial_for_url(PORT) 166 167 def tearDown(self): 168 self.s.close() 169 170 def test1_RTS(self): 171 """Test RTS/CTS""" 172 self.s.rts = False 173 time.sleep(1.1) 174 self.assertTrue(not self.s.cts, "CTS -> 0") 175 self.s.rts = True 176 time.sleep(1.1) 177 self.assertTrue(self.s.cts, "CTS -> 1") 178 179 def test2_DTR(self): 180 """Test DTR/DSR""" 181 self.s.dtr = False 182 time.sleep(1.1) 183 self.assertTrue(not self.s.dsr, "DSR -> 0") 184 self.s.dtr = True 185 time.sleep(1.1) 186 self.assertTrue(self.s.dsr, "DSR -> 1") 187 188 def test3_RI(self): 189 """Test RI""" 190 self.assertTrue(not self.s.ri, "RI -> 0") 191 192 193class Test_MoreTimeouts(unittest.TestCase): 194 """Test with timeouts""" 195 def setUp(self): 196 # create an closed serial port 197 self.s = serial.serial_for_url(PORT, do_not_open=True) 198 199 def tearDown(self): 200 self.s.reset_output_buffer() 201 self.s.flush() 202 #~ self.s.write(serial.XON) 203 self.s.close() 204 # reopen... some faulty USB-serial adapter make next test fail otherwise... 205 self.s.timeout = 1 206 self.s.xonxoff = False 207 self.s.open() 208 self.s.read(3000) 209 self.s.close() 210 211 def test_WriteTimeout(self): 212 """Test write() timeout.""" 213 # use xonxoff setting and the loop-back adapter to switch traffic on hold 214 self.s.port = PORT 215 self.s.write_timeout = 1.0 216 self.s.xonxoff = True 217 self.s.open() 218 self.s.write(serial.XOFF) 219 time.sleep(0.5) # some systems need a little delay so that they can react on XOFF 220 t1 = time.time() 221 self.assertRaises(serial.SerialTimeoutException, self.s.write, b"timeout please" * 200) 222 t2 = time.time() 223 self.assertTrue(0.9 <= (t2 - t1) < 2.1, "Timeout not in the given interval ({})".format(t2 - t1)) 224 225 226if __name__ == '__main__': 227 sys.stdout.write(__doc__) 228 if len(sys.argv) > 1: 229 PORT = sys.argv[1] 230 sys.stdout.write("Testing port: {!r}\n".format(PORT)) 231 sys.argv[1:] = ['-v'] 232 # When this module is executed from the command-line, it runs all its tests 233 unittest.main() 234