1#! /usr/bin/env python
2#
3# This file is part of pySerial - Cross platform serial port support for Python
4# (C) 2001-2015 Chris Liechti <[email protected]>
5#
6# SPDX-License-Identifier:    BSD-3-Clause
7"""\
8Some tests for the serial module.
9Part of pyserial (http://pyserial.sf.net)  (C)2001-2015 [email protected]
10
11Intended to be run on different platforms, to ensure portability of
12the code.
13
14For all these tests a simple hardware is required.
15Loopback HW adapter:
16Shortcut these pin pairs:
17 TX  <-> RX
18 RTS <-> CTS
19 DTR <-> DSR
20
21On a 9 pole DSUB these are the pins (2-3) (4-6) (7-8)
22"""
23
24import unittest
25import threading
26import time
27import sys
28import serial
29
30# on which port should the tests be performed:
31PORT = 'loop://'
32
33# indirection via bytearray b/c bytes(range(256)) does something else in Pyhton 2.7
34bytes_0to255 = bytes(bytearray(range(256)))
35
36
37def segments(data, size=16):
38    for a in range(0, len(data), size):
39        yield data[a:a + size]
40
41
42class Test4_Nonblocking(unittest.TestCase):
43    """Test with timeouts"""
44    timeout = 0
45
46    def setUp(self):
47        self.s = serial.serial_for_url(PORT, timeout=self.timeout)
48
49    def tearDown(self):
50        self.s.close()
51
52    def test0_Messy(self):
53        """NonBlocking (timeout=0)"""
54        # this is only here to write out the message in verbose mode
55        # because Test3 and Test4 print the same messages
56
57    def test1_ReadEmpty(self):
58        """timeout: After port open, the input buffer must be empty"""
59        self.assertEqual(self.s.read(1), b'', "expected empty buffer")
60
61    def test2_Loopback(self):
62        """timeout: each sent character should return (binary test).
63           this is also a test for the binary capability of a port."""
64        for block in segments(bytes_0to255):
65            length = len(block)
66            self.s.write(block)
67            # there might be a small delay until the character is ready (especially on win32)
68            time.sleep(0.05)
69            self.assertEqual(self.s.in_waiting, length, "expected exactly {} character for inWainting()".format(length))
70            self.assertEqual(self.s.read(length), block)  #, "expected a %r which was written before" % block)
71        self.assertEqual(self.s.read(1), b'', "expected empty buffer after all sent chars are read")
72
73    def test2_LoopbackTimeout(self):
74        """timeout: test the timeout/immediate return.
75        partial results should be returned."""
76        self.s.write(b"HELLO")
77        time.sleep(0.1)    # there might be a small delay until the character is ready (especially on win32 and rfc2217)
78        # read more characters as are available to run in the timeout
79        self.assertEqual(self.s.read(10), b'HELLO', "expected the 'HELLO' which was written before")
80        self.assertEqual(self.s.read(1), b'', "expected empty buffer after all sent chars are read")
81
82
83class Test3_Timeout(Test4_Nonblocking):
84    """Same tests as the NonBlocking ones but this time with timeout"""
85    timeout = 1
86
87    def test0_Messy(self):
88        """Blocking (timeout=1)"""
89        # this is only here to write out the message in verbose mode
90        # because Test3 and Test4 print the same messages
91
92
93class SendEvent(threading.Thread):
94    def __init__(self, serial, delay=3):
95        threading.Thread.__init__(self)
96        self.serial = serial
97        self.delay = delay
98        self.x = threading.Event()
99        self.stopped = 0
100        self.start()
101
102    def run(self):
103        time.sleep(self.delay)
104        self.x.set()
105        if not self.stopped:
106            self.serial.write(b"E")
107            self.serial.flush()
108
109    def isSet(self):
110        return self.x.isSet()
111
112    def stop(self):
113        self.stopped = 1
114        self.x.wait()
115
116
117class Test1_Forever(unittest.TestCase):
118    """Tests a port with no timeout. These tests require that a
119    character is sent after some time to stop the test, this is done
120    through the SendEvent class and the Loopback HW."""
121    def setUp(self):
122        self.s = serial.serial_for_url(PORT, timeout=None)
123        self.event = SendEvent(self.s)
124
125    def tearDown(self):
126        self.event.stop()
127        self.s.close()
128
129    def test2_ReadEmpty(self):
130        """no timeout: after port open, the input buffer must be empty (read).
131        a character is sent after some time to terminate the test (SendEvent)."""
132        c = self.s.read(1)
133        if not (self.event.isSet() and c == b'E'):
134            self.fail("expected marker (evt={!r}, c={!r})".format(self.event.isSet(), c))
135
136
137class Test2_Forever(unittest.TestCase):
138    """Tests a port with no timeout"""
139    def setUp(self):
140        self.s = serial.serial_for_url(PORT, timeout=None)
141
142    def tearDown(self):
143        self.s.close()
144
145    def test1_inWaitingEmpty(self):
146        """no timeout: after port open, the input buffer must be empty (in_waiting)"""
147        self.assertEqual(self.s.in_waiting, 0, "expected empty buffer")
148
149    def test2_Loopback(self):
150        """no timeout: each sent character should return (binary test).
151           this is also a test for the binary capability of a port."""
152        for block in segments(bytes_0to255):
153            length = len(block)
154            self.s.write(block)
155            # there might be a small delay until the character is ready (especially on win32 and rfc2217)
156            time.sleep(0.05)
157            self.assertEqual(self.s.in_waiting, length)  #, "expected exactly %d character for inWainting()" % length)
158            self.assertEqual(self.s.read(length), block)  #, "expected %r which was written before" % block)
159        self.assertEqual(self.s.in_waiting, 0, "expected empty buffer after all sent chars are read")
160
161
162class Test0_DataWires(unittest.TestCase):
163    """Test modem control lines"""
164    def setUp(self):
165        self.s = serial.serial_for_url(PORT)
166
167    def tearDown(self):
168        self.s.close()
169
170    def test1_RTS(self):
171        """Test RTS/CTS"""
172        self.s.rts = False
173        time.sleep(1.1)
174        self.assertTrue(not self.s.cts, "CTS -> 0")
175        self.s.rts = True
176        time.sleep(1.1)
177        self.assertTrue(self.s.cts, "CTS -> 1")
178
179    def test2_DTR(self):
180        """Test DTR/DSR"""
181        self.s.dtr = False
182        time.sleep(1.1)
183        self.assertTrue(not self.s.dsr, "DSR -> 0")
184        self.s.dtr = True
185        time.sleep(1.1)
186        self.assertTrue(self.s.dsr, "DSR -> 1")
187
188    def test3_RI(self):
189        """Test RI"""
190        self.assertTrue(not self.s.ri, "RI -> 0")
191
192
193class Test_MoreTimeouts(unittest.TestCase):
194    """Test with timeouts"""
195    def setUp(self):
196        # create an closed serial port
197        self.s = serial.serial_for_url(PORT, do_not_open=True)
198
199    def tearDown(self):
200        self.s.reset_output_buffer()
201        self.s.flush()
202        #~ self.s.write(serial.XON)
203        self.s.close()
204        # reopen... some faulty USB-serial adapter make next test fail otherwise...
205        self.s.timeout = 1
206        self.s.xonxoff = False
207        self.s.open()
208        self.s.read(3000)
209        self.s.close()
210
211    def test_WriteTimeout(self):
212        """Test write() timeout."""
213        # use xonxoff setting and the loop-back adapter to switch traffic on hold
214        self.s.port = PORT
215        self.s.write_timeout = 1.0
216        self.s.xonxoff = True
217        self.s.open()
218        self.s.write(serial.XOFF)
219        time.sleep(0.5)  # some systems need a little delay so that they can react on XOFF
220        t1 = time.time()
221        self.assertRaises(serial.SerialTimeoutException, self.s.write, b"timeout please" * 200)
222        t2 = time.time()
223        self.assertTrue(0.9 <= (t2 - t1) < 2.1, "Timeout not in the given interval ({})".format(t2 - t1))
224
225
226if __name__ == '__main__':
227    sys.stdout.write(__doc__)
228    if len(sys.argv) > 1:
229        PORT = sys.argv[1]
230    sys.stdout.write("Testing port: {!r}\n".format(PORT))
231    sys.argv[1:] = ['-v']
232    # When this module is executed from the command-line, it runs all its tests
233    unittest.main()
234