1#! python
2#
3# backend for Windows ("win32" incl. 32/64 bit support)
4#
5# (C) 2001-2020 Chris Liechti <[email protected]>
6#
7# This file is part of pySerial. https://github.com/pyserial/pyserial
8# SPDX-License-Identifier:    BSD-3-Clause
9#
10# Initial patch to use ctypes by Giovanni Bajo <[email protected]>
11
12from __future__ import absolute_import
13
14# pylint: disable=invalid-name,too-few-public-methods
15import ctypes
16import time
17from serial import win32
18
19import serial
20from serial.serialutil import SerialBase, SerialException, to_bytes, PortNotOpenError, SerialTimeoutException
21
22
23class Serial(SerialBase):
24    """Serial port implementation for Win32 based on ctypes."""
25
26    BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
27                 9600, 19200, 38400, 57600, 115200)
28
29    def __init__(self, *args, **kwargs):
30        self._port_handle = None
31        self._overlapped_read = None
32        self._overlapped_write = None
33        super(Serial, self).__init__(*args, **kwargs)
34
35    def open(self):
36        """\
37        Open port with current settings. This may throw a SerialException
38        if the port cannot be opened.
39        """
40        if self._port is None:
41            raise SerialException("Port must be configured before it can be used.")
42        if self.is_open:
43            raise SerialException("Port is already open.")
44        # the "\\.\COMx" format is required for devices other than COM1-COM8
45        # not all versions of windows seem to support this properly
46        # so that the first few ports are used with the DOS device name
47        port = self.name
48        try:
49            if port.upper().startswith('COM') and int(port[3:]) > 8:
50                port = '\\\\.\\' + port
51        except ValueError:
52            # for like COMnotanumber
53            pass
54        self._port_handle = win32.CreateFile(
55            port,
56            win32.GENERIC_READ | win32.GENERIC_WRITE,
57            0,  # exclusive access
58            None,  # no security
59            win32.OPEN_EXISTING,
60            win32.FILE_ATTRIBUTE_NORMAL | win32.FILE_FLAG_OVERLAPPED,
61            0)
62        if self._port_handle == win32.INVALID_HANDLE_VALUE:
63            self._port_handle = None    # 'cause __del__ is called anyway
64            raise SerialException("could not open port {!r}: {!r}".format(self.portstr, ctypes.WinError()))
65
66        try:
67            self._overlapped_read = win32.OVERLAPPED()
68            self._overlapped_read.hEvent = win32.CreateEvent(None, 1, 0, None)
69            self._overlapped_write = win32.OVERLAPPED()
70            #~ self._overlapped_write.hEvent = win32.CreateEvent(None, 1, 0, None)
71            self._overlapped_write.hEvent = win32.CreateEvent(None, 0, 0, None)
72
73            # Setup a 4k buffer
74            win32.SetupComm(self._port_handle, 4096, 4096)
75
76            # Save original timeout values:
77            self._orgTimeouts = win32.COMMTIMEOUTS()
78            win32.GetCommTimeouts(self._port_handle, ctypes.byref(self._orgTimeouts))
79
80            self._reconfigure_port()
81
82            # Clear buffers:
83            # Remove anything that was there
84            win32.PurgeComm(
85                self._port_handle,
86                win32.PURGE_TXCLEAR | win32.PURGE_TXABORT |
87                win32.PURGE_RXCLEAR | win32.PURGE_RXABORT)
88        except:
89            try:
90                self._close()
91            except:
92                # ignore any exception when closing the port
93                # also to keep original exception that happened when setting up
94                pass
95            self._port_handle = None
96            raise
97        else:
98            self.is_open = True
99
100    def _reconfigure_port(self):
101        """Set communication parameters on opened port."""
102        if not self._port_handle:
103            raise SerialException("Can only operate on a valid port handle")
104
105        # Set Windows timeout values
106        # timeouts is a tuple with the following items:
107        # (ReadIntervalTimeout,ReadTotalTimeoutMultiplier,
108        #  ReadTotalTimeoutConstant,WriteTotalTimeoutMultiplier,
109        #  WriteTotalTimeoutConstant)
110        timeouts = win32.COMMTIMEOUTS()
111        if self._timeout is None:
112            pass  # default of all zeros is OK
113        elif self._timeout == 0:
114            timeouts.ReadIntervalTimeout = win32.MAXDWORD
115        else:
116            timeouts.ReadTotalTimeoutConstant = max(int(self._timeout * 1000), 1)
117        if self._timeout != 0 and self._inter_byte_timeout is not None:
118            timeouts.ReadIntervalTimeout = max(int(self._inter_byte_timeout * 1000), 1)
119
120        if self._write_timeout is None:
121            pass
122        elif self._write_timeout == 0:
123            timeouts.WriteTotalTimeoutConstant = win32.MAXDWORD
124        else:
125            timeouts.WriteTotalTimeoutConstant = max(int(self._write_timeout * 1000), 1)
126        win32.SetCommTimeouts(self._port_handle, ctypes.byref(timeouts))
127
128        win32.SetCommMask(self._port_handle, win32.EV_ERR)
129
130        # Setup the connection info.
131        # Get state and modify it:
132        comDCB = win32.DCB()
133        win32.GetCommState(self._port_handle, ctypes.byref(comDCB))
134        comDCB.BaudRate = self._baudrate
135
136        if self._bytesize == serial.FIVEBITS:
137            comDCB.ByteSize = 5
138        elif self._bytesize == serial.SIXBITS:
139            comDCB.ByteSize = 6
140        elif self._bytesize == serial.SEVENBITS:
141            comDCB.ByteSize = 7
142        elif self._bytesize == serial.EIGHTBITS:
143            comDCB.ByteSize = 8
144        else:
145            raise ValueError("Unsupported number of data bits: {!r}".format(self._bytesize))
146
147        if self._parity == serial.PARITY_NONE:
148            comDCB.Parity = win32.NOPARITY
149            comDCB.fParity = 0  # Disable Parity Check
150        elif self._parity == serial.PARITY_EVEN:
151            comDCB.Parity = win32.EVENPARITY
152            comDCB.fParity = 1  # Enable Parity Check
153        elif self._parity == serial.PARITY_ODD:
154            comDCB.Parity = win32.ODDPARITY
155            comDCB.fParity = 1  # Enable Parity Check
156        elif self._parity == serial.PARITY_MARK:
157            comDCB.Parity = win32.MARKPARITY
158            comDCB.fParity = 1  # Enable Parity Check
159        elif self._parity == serial.PARITY_SPACE:
160            comDCB.Parity = win32.SPACEPARITY
161            comDCB.fParity = 1  # Enable Parity Check
162        else:
163            raise ValueError("Unsupported parity mode: {!r}".format(self._parity))
164
165        if self._stopbits == serial.STOPBITS_ONE:
166            comDCB.StopBits = win32.ONESTOPBIT
167        elif self._stopbits == serial.STOPBITS_ONE_POINT_FIVE:
168            comDCB.StopBits = win32.ONE5STOPBITS
169        elif self._stopbits == serial.STOPBITS_TWO:
170            comDCB.StopBits = win32.TWOSTOPBITS
171        else:
172            raise ValueError("Unsupported number of stop bits: {!r}".format(self._stopbits))
173
174        comDCB.fBinary = 1  # Enable Binary Transmission
175        # Char. w/ Parity-Err are replaced with 0xff (if fErrorChar is set to TRUE)
176        if self._rs485_mode is None:
177            if self._rtscts:
178                comDCB.fRtsControl = win32.RTS_CONTROL_HANDSHAKE
179            else:
180                comDCB.fRtsControl = win32.RTS_CONTROL_ENABLE if self._rts_state else win32.RTS_CONTROL_DISABLE
181            comDCB.fOutxCtsFlow = self._rtscts
182        else:
183            # checks for unsupported settings
184            # XXX verify if platform really does not have a setting for those
185            if not self._rs485_mode.rts_level_for_tx:
186                raise ValueError(
187                    'Unsupported value for RS485Settings.rts_level_for_tx: {!r} (only True is allowed)'.format(
188                        self._rs485_mode.rts_level_for_tx,))
189            if self._rs485_mode.rts_level_for_rx:
190                raise ValueError(
191                    'Unsupported value for RS485Settings.rts_level_for_rx: {!r} (only False is allowed)'.format(
192                        self._rs485_mode.rts_level_for_rx,))
193            if self._rs485_mode.delay_before_tx is not None:
194                raise ValueError(
195                    'Unsupported value for RS485Settings.delay_before_tx: {!r} (only None is allowed)'.format(
196                        self._rs485_mode.delay_before_tx,))
197            if self._rs485_mode.delay_before_rx is not None:
198                raise ValueError(
199                    'Unsupported value for RS485Settings.delay_before_rx: {!r} (only None is allowed)'.format(
200                        self._rs485_mode.delay_before_rx,))
201            if self._rs485_mode.loopback:
202                raise ValueError(
203                    'Unsupported value for RS485Settings.loopback: {!r} (only False is allowed)'.format(
204                        self._rs485_mode.loopback,))
205            comDCB.fRtsControl = win32.RTS_CONTROL_TOGGLE
206            comDCB.fOutxCtsFlow = 0
207
208        if self._dsrdtr:
209            comDCB.fDtrControl = win32.DTR_CONTROL_HANDSHAKE
210        else:
211            comDCB.fDtrControl = win32.DTR_CONTROL_ENABLE if self._dtr_state else win32.DTR_CONTROL_DISABLE
212        comDCB.fOutxDsrFlow = self._dsrdtr
213        comDCB.fOutX = self._xonxoff
214        comDCB.fInX = self._xonxoff
215        comDCB.fNull = 0
216        comDCB.fErrorChar = 0
217        comDCB.fAbortOnError = 0
218        comDCB.XonChar = serial.XON
219        comDCB.XoffChar = serial.XOFF
220
221        if not win32.SetCommState(self._port_handle, ctypes.byref(comDCB)):
222            raise SerialException(
223                'Cannot configure port, something went wrong. '
224                'Original message: {!r}'.format(ctypes.WinError()))
225
226    #~ def __del__(self):
227        #~ self.close()
228
229    def _close(self):
230        """internal close port helper"""
231        if self._port_handle is not None:
232            # Restore original timeout values:
233            win32.SetCommTimeouts(self._port_handle, self._orgTimeouts)
234            if self._overlapped_read is not None:
235                self.cancel_read()
236                win32.CloseHandle(self._overlapped_read.hEvent)
237                self._overlapped_read = None
238            if self._overlapped_write is not None:
239                self.cancel_write()
240                win32.CloseHandle(self._overlapped_write.hEvent)
241                self._overlapped_write = None
242            win32.CloseHandle(self._port_handle)
243            self._port_handle = None
244
245    def close(self):
246        """Close port"""
247        if self.is_open:
248            self._close()
249            self.is_open = False
250
251    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
252
253    @property
254    def in_waiting(self):
255        """Return the number of bytes currently in the input buffer."""
256        flags = win32.DWORD()
257        comstat = win32.COMSTAT()
258        if not win32.ClearCommError(self._port_handle, ctypes.byref(flags), ctypes.byref(comstat)):
259            raise SerialException("ClearCommError failed ({!r})".format(ctypes.WinError()))
260        return comstat.cbInQue
261
262    def read(self, size=1):
263        """\
264        Read size bytes from the serial port. If a timeout is set it may
265        return less characters as requested. With no timeout it will block
266        until the requested number of bytes is read.
267        """
268        if not self.is_open:
269            raise PortNotOpenError()
270        if size > 0:
271            win32.ResetEvent(self._overlapped_read.hEvent)
272            flags = win32.DWORD()
273            comstat = win32.COMSTAT()
274            if not win32.ClearCommError(self._port_handle, ctypes.byref(flags), ctypes.byref(comstat)):
275                raise SerialException("ClearCommError failed ({!r})".format(ctypes.WinError()))
276            n = min(comstat.cbInQue, size) if self.timeout == 0 else size
277            if n > 0:
278                buf = ctypes.create_string_buffer(n)
279                rc = win32.DWORD()
280                read_ok = win32.ReadFile(
281                    self._port_handle,
282                    buf,
283                    n,
284                    ctypes.byref(rc),
285                    ctypes.byref(self._overlapped_read))
286                if not read_ok and win32.GetLastError() not in (win32.ERROR_SUCCESS, win32.ERROR_IO_PENDING):
287                    raise SerialException("ReadFile failed ({!r})".format(ctypes.WinError()))
288                result_ok = win32.GetOverlappedResult(
289                    self._port_handle,
290                    ctypes.byref(self._overlapped_read),
291                    ctypes.byref(rc),
292                    True)
293                if not result_ok:
294                    if win32.GetLastError() != win32.ERROR_OPERATION_ABORTED:
295                        raise SerialException("GetOverlappedResult failed ({!r})".format(ctypes.WinError()))
296                read = buf.raw[:rc.value]
297            else:
298                read = bytes()
299        else:
300            read = bytes()
301        return bytes(read)
302
303    def write(self, data):
304        """Output the given byte string over the serial port."""
305        if not self.is_open:
306            raise PortNotOpenError()
307        #~ if not isinstance(data, (bytes, bytearray)):
308            #~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data)))
309        # convert data (needed in case of memoryview instance: Py 3.1 io lib), ctypes doesn't like memoryview
310        data = to_bytes(data)
311        if data:
312            #~ win32event.ResetEvent(self._overlapped_write.hEvent)
313            n = win32.DWORD()
314            success = win32.WriteFile(self._port_handle, data, len(data), ctypes.byref(n), self._overlapped_write)
315            if self._write_timeout != 0:  # if blocking (None) or w/ write timeout (>0)
316                if not success and win32.GetLastError() not in (win32.ERROR_SUCCESS, win32.ERROR_IO_PENDING):
317                    raise SerialException("WriteFile failed ({!r})".format(ctypes.WinError()))
318
319                # Wait for the write to complete.
320                #~ win32.WaitForSingleObject(self._overlapped_write.hEvent, win32.INFINITE)
321                win32.GetOverlappedResult(self._port_handle, self._overlapped_write, ctypes.byref(n), True)
322                if win32.GetLastError() == win32.ERROR_OPERATION_ABORTED:
323                    return n.value  # canceled IO is no error
324                if n.value != len(data):
325                    raise SerialTimeoutException('Write timeout')
326                return n.value
327            else:
328                errorcode = win32.ERROR_SUCCESS if success else win32.GetLastError()
329                if errorcode in (win32.ERROR_INVALID_USER_BUFFER, win32.ERROR_NOT_ENOUGH_MEMORY,
330                                 win32.ERROR_OPERATION_ABORTED):
331                    return 0
332                elif errorcode in (win32.ERROR_SUCCESS, win32.ERROR_IO_PENDING):
333                    # no info on true length provided by OS function in async mode
334                    return len(data)
335                else:
336                    raise SerialException("WriteFile failed ({!r})".format(ctypes.WinError()))
337        else:
338            return 0
339
340    def flush(self):
341        """\
342        Flush of file like objects. In this case, wait until all data
343        is written.
344        """
345        while self.out_waiting:
346            time.sleep(0.05)
347        # XXX could also use WaitCommEvent with mask EV_TXEMPTY, but it would
348        # require overlapped IO and it's also only possible to set a single mask
349        # on the port---
350
351    def reset_input_buffer(self):
352        """Clear input buffer, discarding all that is in the buffer."""
353        if not self.is_open:
354            raise PortNotOpenError()
355        win32.PurgeComm(self._port_handle, win32.PURGE_RXCLEAR | win32.PURGE_RXABORT)
356
357    def reset_output_buffer(self):
358        """\
359        Clear output buffer, aborting the current output and discarding all
360        that is in the buffer.
361        """
362        if not self.is_open:
363            raise PortNotOpenError()
364        win32.PurgeComm(self._port_handle, win32.PURGE_TXCLEAR | win32.PURGE_TXABORT)
365
366    def _update_break_state(self):
367        """Set break: Controls TXD. When active, to transmitting is possible."""
368        if not self.is_open:
369            raise PortNotOpenError()
370        if self._break_state:
371            win32.SetCommBreak(self._port_handle)
372        else:
373            win32.ClearCommBreak(self._port_handle)
374
375    def _update_rts_state(self):
376        """Set terminal status line: Request To Send"""
377        if self._rts_state:
378            win32.EscapeCommFunction(self._port_handle, win32.SETRTS)
379        else:
380            win32.EscapeCommFunction(self._port_handle, win32.CLRRTS)
381
382    def _update_dtr_state(self):
383        """Set terminal status line: Data Terminal Ready"""
384        if self._dtr_state:
385            win32.EscapeCommFunction(self._port_handle, win32.SETDTR)
386        else:
387            win32.EscapeCommFunction(self._port_handle, win32.CLRDTR)
388
389    def _GetCommModemStatus(self):
390        if not self.is_open:
391            raise PortNotOpenError()
392        stat = win32.DWORD()
393        win32.GetCommModemStatus(self._port_handle, ctypes.byref(stat))
394        return stat.value
395
396    @property
397    def cts(self):
398        """Read terminal status line: Clear To Send"""
399        return win32.MS_CTS_ON & self._GetCommModemStatus() != 0
400
401    @property
402    def dsr(self):
403        """Read terminal status line: Data Set Ready"""
404        return win32.MS_DSR_ON & self._GetCommModemStatus() != 0
405
406    @property
407    def ri(self):
408        """Read terminal status line: Ring Indicator"""
409        return win32.MS_RING_ON & self._GetCommModemStatus() != 0
410
411    @property
412    def cd(self):
413        """Read terminal status line: Carrier Detect"""
414        return win32.MS_RLSD_ON & self._GetCommModemStatus() != 0
415
416    # - - platform specific - - - -
417
418    def set_buffer_size(self, rx_size=4096, tx_size=None):
419        """\
420        Recommend a buffer size to the driver (device driver can ignore this
421        value). Must be called after the port is opened.
422        """
423        if tx_size is None:
424            tx_size = rx_size
425        win32.SetupComm(self._port_handle, rx_size, tx_size)
426
427    def set_output_flow_control(self, enable=True):
428        """\
429        Manually control flow - when software flow control is enabled.
430        This will do the same as if XON (true) or XOFF (false) are received
431        from the other device and control the transmission accordingly.
432        WARNING: this function is not portable to different platforms!
433        """
434        if not self.is_open:
435            raise PortNotOpenError()
436        if enable:
437            win32.EscapeCommFunction(self._port_handle, win32.SETXON)
438        else:
439            win32.EscapeCommFunction(self._port_handle, win32.SETXOFF)
440
441    @property
442    def out_waiting(self):
443        """Return how many bytes the in the outgoing buffer"""
444        flags = win32.DWORD()
445        comstat = win32.COMSTAT()
446        if not win32.ClearCommError(self._port_handle, ctypes.byref(flags), ctypes.byref(comstat)):
447            raise SerialException("ClearCommError failed ({!r})".format(ctypes.WinError()))
448        return comstat.cbOutQue
449
450    def _cancel_overlapped_io(self, overlapped):
451        """Cancel a blocking read operation, may be called from other thread"""
452        # check if read operation is pending
453        rc = win32.DWORD()
454        err = win32.GetOverlappedResult(
455            self._port_handle,
456            ctypes.byref(overlapped),
457            ctypes.byref(rc),
458            False)
459        if not err and win32.GetLastError() in (win32.ERROR_IO_PENDING, win32.ERROR_IO_INCOMPLETE):
460            # cancel, ignoring any errors (e.g. it may just have finished on its own)
461            win32.CancelIoEx(self._port_handle, overlapped)
462
463    def cancel_read(self):
464        """Cancel a blocking read operation, may be called from other thread"""
465        self._cancel_overlapped_io(self._overlapped_read)
466
467    def cancel_write(self):
468        """Cancel a blocking write operation, may be called from other thread"""
469        self._cancel_overlapped_io(self._overlapped_write)
470
471    @SerialBase.exclusive.setter
472    def exclusive(self, exclusive):
473        """Change the exclusive access setting."""
474        if exclusive is not None and not exclusive:
475            raise ValueError('win32 only supports exclusive access (not: {})'.format(exclusive))
476        else:
477            serial.SerialBase.exclusive.__set__(self, exclusive)
478