1#! python 2# 3# backend for Windows ("win32" incl. 32/64 bit support) 4# 5# (C) 2001-2020 Chris Liechti <[email protected]> 6# 7# This file is part of pySerial. https://github.com/pyserial/pyserial 8# SPDX-License-Identifier: BSD-3-Clause 9# 10# Initial patch to use ctypes by Giovanni Bajo <[email protected]> 11 12from __future__ import absolute_import 13 14# pylint: disable=invalid-name,too-few-public-methods 15import ctypes 16import time 17from serial import win32 18 19import serial 20from serial.serialutil import SerialBase, SerialException, to_bytes, PortNotOpenError, SerialTimeoutException 21 22 23class Serial(SerialBase): 24 """Serial port implementation for Win32 based on ctypes.""" 25 26 BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800, 27 9600, 19200, 38400, 57600, 115200) 28 29 def __init__(self, *args, **kwargs): 30 self._port_handle = None 31 self._overlapped_read = None 32 self._overlapped_write = None 33 super(Serial, self).__init__(*args, **kwargs) 34 35 def open(self): 36 """\ 37 Open port with current settings. This may throw a SerialException 38 if the port cannot be opened. 39 """ 40 if self._port is None: 41 raise SerialException("Port must be configured before it can be used.") 42 if self.is_open: 43 raise SerialException("Port is already open.") 44 # the "\\.\COMx" format is required for devices other than COM1-COM8 45 # not all versions of windows seem to support this properly 46 # so that the first few ports are used with the DOS device name 47 port = self.name 48 try: 49 if port.upper().startswith('COM') and int(port[3:]) > 8: 50 port = '\\\\.\\' + port 51 except ValueError: 52 # for like COMnotanumber 53 pass 54 self._port_handle = win32.CreateFile( 55 port, 56 win32.GENERIC_READ | win32.GENERIC_WRITE, 57 0, # exclusive access 58 None, # no security 59 win32.OPEN_EXISTING, 60 win32.FILE_ATTRIBUTE_NORMAL | win32.FILE_FLAG_OVERLAPPED, 61 0) 62 if self._port_handle == win32.INVALID_HANDLE_VALUE: 63 self._port_handle = None # 'cause __del__ is called anyway 64 raise SerialException("could not open port {!r}: {!r}".format(self.portstr, ctypes.WinError())) 65 66 try: 67 self._overlapped_read = win32.OVERLAPPED() 68 self._overlapped_read.hEvent = win32.CreateEvent(None, 1, 0, None) 69 self._overlapped_write = win32.OVERLAPPED() 70 #~ self._overlapped_write.hEvent = win32.CreateEvent(None, 1, 0, None) 71 self._overlapped_write.hEvent = win32.CreateEvent(None, 0, 0, None) 72 73 # Setup a 4k buffer 74 win32.SetupComm(self._port_handle, 4096, 4096) 75 76 # Save original timeout values: 77 self._orgTimeouts = win32.COMMTIMEOUTS() 78 win32.GetCommTimeouts(self._port_handle, ctypes.byref(self._orgTimeouts)) 79 80 self._reconfigure_port() 81 82 # Clear buffers: 83 # Remove anything that was there 84 win32.PurgeComm( 85 self._port_handle, 86 win32.PURGE_TXCLEAR | win32.PURGE_TXABORT | 87 win32.PURGE_RXCLEAR | win32.PURGE_RXABORT) 88 except: 89 try: 90 self._close() 91 except: 92 # ignore any exception when closing the port 93 # also to keep original exception that happened when setting up 94 pass 95 self._port_handle = None 96 raise 97 else: 98 self.is_open = True 99 100 def _reconfigure_port(self): 101 """Set communication parameters on opened port.""" 102 if not self._port_handle: 103 raise SerialException("Can only operate on a valid port handle") 104 105 # Set Windows timeout values 106 # timeouts is a tuple with the following items: 107 # (ReadIntervalTimeout,ReadTotalTimeoutMultiplier, 108 # ReadTotalTimeoutConstant,WriteTotalTimeoutMultiplier, 109 # WriteTotalTimeoutConstant) 110 timeouts = win32.COMMTIMEOUTS() 111 if self._timeout is None: 112 pass # default of all zeros is OK 113 elif self._timeout == 0: 114 timeouts.ReadIntervalTimeout = win32.MAXDWORD 115 else: 116 timeouts.ReadTotalTimeoutConstant = max(int(self._timeout * 1000), 1) 117 if self._timeout != 0 and self._inter_byte_timeout is not None: 118 timeouts.ReadIntervalTimeout = max(int(self._inter_byte_timeout * 1000), 1) 119 120 if self._write_timeout is None: 121 pass 122 elif self._write_timeout == 0: 123 timeouts.WriteTotalTimeoutConstant = win32.MAXDWORD 124 else: 125 timeouts.WriteTotalTimeoutConstant = max(int(self._write_timeout * 1000), 1) 126 win32.SetCommTimeouts(self._port_handle, ctypes.byref(timeouts)) 127 128 win32.SetCommMask(self._port_handle, win32.EV_ERR) 129 130 # Setup the connection info. 131 # Get state and modify it: 132 comDCB = win32.DCB() 133 win32.GetCommState(self._port_handle, ctypes.byref(comDCB)) 134 comDCB.BaudRate = self._baudrate 135 136 if self._bytesize == serial.FIVEBITS: 137 comDCB.ByteSize = 5 138 elif self._bytesize == serial.SIXBITS: 139 comDCB.ByteSize = 6 140 elif self._bytesize == serial.SEVENBITS: 141 comDCB.ByteSize = 7 142 elif self._bytesize == serial.EIGHTBITS: 143 comDCB.ByteSize = 8 144 else: 145 raise ValueError("Unsupported number of data bits: {!r}".format(self._bytesize)) 146 147 if self._parity == serial.PARITY_NONE: 148 comDCB.Parity = win32.NOPARITY 149 comDCB.fParity = 0 # Disable Parity Check 150 elif self._parity == serial.PARITY_EVEN: 151 comDCB.Parity = win32.EVENPARITY 152 comDCB.fParity = 1 # Enable Parity Check 153 elif self._parity == serial.PARITY_ODD: 154 comDCB.Parity = win32.ODDPARITY 155 comDCB.fParity = 1 # Enable Parity Check 156 elif self._parity == serial.PARITY_MARK: 157 comDCB.Parity = win32.MARKPARITY 158 comDCB.fParity = 1 # Enable Parity Check 159 elif self._parity == serial.PARITY_SPACE: 160 comDCB.Parity = win32.SPACEPARITY 161 comDCB.fParity = 1 # Enable Parity Check 162 else: 163 raise ValueError("Unsupported parity mode: {!r}".format(self._parity)) 164 165 if self._stopbits == serial.STOPBITS_ONE: 166 comDCB.StopBits = win32.ONESTOPBIT 167 elif self._stopbits == serial.STOPBITS_ONE_POINT_FIVE: 168 comDCB.StopBits = win32.ONE5STOPBITS 169 elif self._stopbits == serial.STOPBITS_TWO: 170 comDCB.StopBits = win32.TWOSTOPBITS 171 else: 172 raise ValueError("Unsupported number of stop bits: {!r}".format(self._stopbits)) 173 174 comDCB.fBinary = 1 # Enable Binary Transmission 175 # Char. w/ Parity-Err are replaced with 0xff (if fErrorChar is set to TRUE) 176 if self._rs485_mode is None: 177 if self._rtscts: 178 comDCB.fRtsControl = win32.RTS_CONTROL_HANDSHAKE 179 else: 180 comDCB.fRtsControl = win32.RTS_CONTROL_ENABLE if self._rts_state else win32.RTS_CONTROL_DISABLE 181 comDCB.fOutxCtsFlow = self._rtscts 182 else: 183 # checks for unsupported settings 184 # XXX verify if platform really does not have a setting for those 185 if not self._rs485_mode.rts_level_for_tx: 186 raise ValueError( 187 'Unsupported value for RS485Settings.rts_level_for_tx: {!r} (only True is allowed)'.format( 188 self._rs485_mode.rts_level_for_tx,)) 189 if self._rs485_mode.rts_level_for_rx: 190 raise ValueError( 191 'Unsupported value for RS485Settings.rts_level_for_rx: {!r} (only False is allowed)'.format( 192 self._rs485_mode.rts_level_for_rx,)) 193 if self._rs485_mode.delay_before_tx is not None: 194 raise ValueError( 195 'Unsupported value for RS485Settings.delay_before_tx: {!r} (only None is allowed)'.format( 196 self._rs485_mode.delay_before_tx,)) 197 if self._rs485_mode.delay_before_rx is not None: 198 raise ValueError( 199 'Unsupported value for RS485Settings.delay_before_rx: {!r} (only None is allowed)'.format( 200 self._rs485_mode.delay_before_rx,)) 201 if self._rs485_mode.loopback: 202 raise ValueError( 203 'Unsupported value for RS485Settings.loopback: {!r} (only False is allowed)'.format( 204 self._rs485_mode.loopback,)) 205 comDCB.fRtsControl = win32.RTS_CONTROL_TOGGLE 206 comDCB.fOutxCtsFlow = 0 207 208 if self._dsrdtr: 209 comDCB.fDtrControl = win32.DTR_CONTROL_HANDSHAKE 210 else: 211 comDCB.fDtrControl = win32.DTR_CONTROL_ENABLE if self._dtr_state else win32.DTR_CONTROL_DISABLE 212 comDCB.fOutxDsrFlow = self._dsrdtr 213 comDCB.fOutX = self._xonxoff 214 comDCB.fInX = self._xonxoff 215 comDCB.fNull = 0 216 comDCB.fErrorChar = 0 217 comDCB.fAbortOnError = 0 218 comDCB.XonChar = serial.XON 219 comDCB.XoffChar = serial.XOFF 220 221 if not win32.SetCommState(self._port_handle, ctypes.byref(comDCB)): 222 raise SerialException( 223 'Cannot configure port, something went wrong. ' 224 'Original message: {!r}'.format(ctypes.WinError())) 225 226 #~ def __del__(self): 227 #~ self.close() 228 229 def _close(self): 230 """internal close port helper""" 231 if self._port_handle is not None: 232 # Restore original timeout values: 233 win32.SetCommTimeouts(self._port_handle, self._orgTimeouts) 234 if self._overlapped_read is not None: 235 self.cancel_read() 236 win32.CloseHandle(self._overlapped_read.hEvent) 237 self._overlapped_read = None 238 if self._overlapped_write is not None: 239 self.cancel_write() 240 win32.CloseHandle(self._overlapped_write.hEvent) 241 self._overlapped_write = None 242 win32.CloseHandle(self._port_handle) 243 self._port_handle = None 244 245 def close(self): 246 """Close port""" 247 if self.is_open: 248 self._close() 249 self.is_open = False 250 251 # - - - - - - - - - - - - - - - - - - - - - - - - 252 253 @property 254 def in_waiting(self): 255 """Return the number of bytes currently in the input buffer.""" 256 flags = win32.DWORD() 257 comstat = win32.COMSTAT() 258 if not win32.ClearCommError(self._port_handle, ctypes.byref(flags), ctypes.byref(comstat)): 259 raise SerialException("ClearCommError failed ({!r})".format(ctypes.WinError())) 260 return comstat.cbInQue 261 262 def read(self, size=1): 263 """\ 264 Read size bytes from the serial port. If a timeout is set it may 265 return less characters as requested. With no timeout it will block 266 until the requested number of bytes is read. 267 """ 268 if not self.is_open: 269 raise PortNotOpenError() 270 if size > 0: 271 win32.ResetEvent(self._overlapped_read.hEvent) 272 flags = win32.DWORD() 273 comstat = win32.COMSTAT() 274 if not win32.ClearCommError(self._port_handle, ctypes.byref(flags), ctypes.byref(comstat)): 275 raise SerialException("ClearCommError failed ({!r})".format(ctypes.WinError())) 276 n = min(comstat.cbInQue, size) if self.timeout == 0 else size 277 if n > 0: 278 buf = ctypes.create_string_buffer(n) 279 rc = win32.DWORD() 280 read_ok = win32.ReadFile( 281 self._port_handle, 282 buf, 283 n, 284 ctypes.byref(rc), 285 ctypes.byref(self._overlapped_read)) 286 if not read_ok and win32.GetLastError() not in (win32.ERROR_SUCCESS, win32.ERROR_IO_PENDING): 287 raise SerialException("ReadFile failed ({!r})".format(ctypes.WinError())) 288 result_ok = win32.GetOverlappedResult( 289 self._port_handle, 290 ctypes.byref(self._overlapped_read), 291 ctypes.byref(rc), 292 True) 293 if not result_ok: 294 if win32.GetLastError() != win32.ERROR_OPERATION_ABORTED: 295 raise SerialException("GetOverlappedResult failed ({!r})".format(ctypes.WinError())) 296 read = buf.raw[:rc.value] 297 else: 298 read = bytes() 299 else: 300 read = bytes() 301 return bytes(read) 302 303 def write(self, data): 304 """Output the given byte string over the serial port.""" 305 if not self.is_open: 306 raise PortNotOpenError() 307 #~ if not isinstance(data, (bytes, bytearray)): 308 #~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data))) 309 # convert data (needed in case of memoryview instance: Py 3.1 io lib), ctypes doesn't like memoryview 310 data = to_bytes(data) 311 if data: 312 #~ win32event.ResetEvent(self._overlapped_write.hEvent) 313 n = win32.DWORD() 314 success = win32.WriteFile(self._port_handle, data, len(data), ctypes.byref(n), self._overlapped_write) 315 if self._write_timeout != 0: # if blocking (None) or w/ write timeout (>0) 316 if not success and win32.GetLastError() not in (win32.ERROR_SUCCESS, win32.ERROR_IO_PENDING): 317 raise SerialException("WriteFile failed ({!r})".format(ctypes.WinError())) 318 319 # Wait for the write to complete. 320 #~ win32.WaitForSingleObject(self._overlapped_write.hEvent, win32.INFINITE) 321 win32.GetOverlappedResult(self._port_handle, self._overlapped_write, ctypes.byref(n), True) 322 if win32.GetLastError() == win32.ERROR_OPERATION_ABORTED: 323 return n.value # canceled IO is no error 324 if n.value != len(data): 325 raise SerialTimeoutException('Write timeout') 326 return n.value 327 else: 328 errorcode = win32.ERROR_SUCCESS if success else win32.GetLastError() 329 if errorcode in (win32.ERROR_INVALID_USER_BUFFER, win32.ERROR_NOT_ENOUGH_MEMORY, 330 win32.ERROR_OPERATION_ABORTED): 331 return 0 332 elif errorcode in (win32.ERROR_SUCCESS, win32.ERROR_IO_PENDING): 333 # no info on true length provided by OS function in async mode 334 return len(data) 335 else: 336 raise SerialException("WriteFile failed ({!r})".format(ctypes.WinError())) 337 else: 338 return 0 339 340 def flush(self): 341 """\ 342 Flush of file like objects. In this case, wait until all data 343 is written. 344 """ 345 while self.out_waiting: 346 time.sleep(0.05) 347 # XXX could also use WaitCommEvent with mask EV_TXEMPTY, but it would 348 # require overlapped IO and it's also only possible to set a single mask 349 # on the port--- 350 351 def reset_input_buffer(self): 352 """Clear input buffer, discarding all that is in the buffer.""" 353 if not self.is_open: 354 raise PortNotOpenError() 355 win32.PurgeComm(self._port_handle, win32.PURGE_RXCLEAR | win32.PURGE_RXABORT) 356 357 def reset_output_buffer(self): 358 """\ 359 Clear output buffer, aborting the current output and discarding all 360 that is in the buffer. 361 """ 362 if not self.is_open: 363 raise PortNotOpenError() 364 win32.PurgeComm(self._port_handle, win32.PURGE_TXCLEAR | win32.PURGE_TXABORT) 365 366 def _update_break_state(self): 367 """Set break: Controls TXD. When active, to transmitting is possible.""" 368 if not self.is_open: 369 raise PortNotOpenError() 370 if self._break_state: 371 win32.SetCommBreak(self._port_handle) 372 else: 373 win32.ClearCommBreak(self._port_handle) 374 375 def _update_rts_state(self): 376 """Set terminal status line: Request To Send""" 377 if self._rts_state: 378 win32.EscapeCommFunction(self._port_handle, win32.SETRTS) 379 else: 380 win32.EscapeCommFunction(self._port_handle, win32.CLRRTS) 381 382 def _update_dtr_state(self): 383 """Set terminal status line: Data Terminal Ready""" 384 if self._dtr_state: 385 win32.EscapeCommFunction(self._port_handle, win32.SETDTR) 386 else: 387 win32.EscapeCommFunction(self._port_handle, win32.CLRDTR) 388 389 def _GetCommModemStatus(self): 390 if not self.is_open: 391 raise PortNotOpenError() 392 stat = win32.DWORD() 393 win32.GetCommModemStatus(self._port_handle, ctypes.byref(stat)) 394 return stat.value 395 396 @property 397 def cts(self): 398 """Read terminal status line: Clear To Send""" 399 return win32.MS_CTS_ON & self._GetCommModemStatus() != 0 400 401 @property 402 def dsr(self): 403 """Read terminal status line: Data Set Ready""" 404 return win32.MS_DSR_ON & self._GetCommModemStatus() != 0 405 406 @property 407 def ri(self): 408 """Read terminal status line: Ring Indicator""" 409 return win32.MS_RING_ON & self._GetCommModemStatus() != 0 410 411 @property 412 def cd(self): 413 """Read terminal status line: Carrier Detect""" 414 return win32.MS_RLSD_ON & self._GetCommModemStatus() != 0 415 416 # - - platform specific - - - - 417 418 def set_buffer_size(self, rx_size=4096, tx_size=None): 419 """\ 420 Recommend a buffer size to the driver (device driver can ignore this 421 value). Must be called after the port is opened. 422 """ 423 if tx_size is None: 424 tx_size = rx_size 425 win32.SetupComm(self._port_handle, rx_size, tx_size) 426 427 def set_output_flow_control(self, enable=True): 428 """\ 429 Manually control flow - when software flow control is enabled. 430 This will do the same as if XON (true) or XOFF (false) are received 431 from the other device and control the transmission accordingly. 432 WARNING: this function is not portable to different platforms! 433 """ 434 if not self.is_open: 435 raise PortNotOpenError() 436 if enable: 437 win32.EscapeCommFunction(self._port_handle, win32.SETXON) 438 else: 439 win32.EscapeCommFunction(self._port_handle, win32.SETXOFF) 440 441 @property 442 def out_waiting(self): 443 """Return how many bytes the in the outgoing buffer""" 444 flags = win32.DWORD() 445 comstat = win32.COMSTAT() 446 if not win32.ClearCommError(self._port_handle, ctypes.byref(flags), ctypes.byref(comstat)): 447 raise SerialException("ClearCommError failed ({!r})".format(ctypes.WinError())) 448 return comstat.cbOutQue 449 450 def _cancel_overlapped_io(self, overlapped): 451 """Cancel a blocking read operation, may be called from other thread""" 452 # check if read operation is pending 453 rc = win32.DWORD() 454 err = win32.GetOverlappedResult( 455 self._port_handle, 456 ctypes.byref(overlapped), 457 ctypes.byref(rc), 458 False) 459 if not err and win32.GetLastError() in (win32.ERROR_IO_PENDING, win32.ERROR_IO_INCOMPLETE): 460 # cancel, ignoring any errors (e.g. it may just have finished on its own) 461 win32.CancelIoEx(self._port_handle, overlapped) 462 463 def cancel_read(self): 464 """Cancel a blocking read operation, may be called from other thread""" 465 self._cancel_overlapped_io(self._overlapped_read) 466 467 def cancel_write(self): 468 """Cancel a blocking write operation, may be called from other thread""" 469 self._cancel_overlapped_io(self._overlapped_write) 470 471 @SerialBase.exclusive.setter 472 def exclusive(self, exclusive): 473 """Change the exclusive access setting.""" 474 if exclusive is not None and not exclusive: 475 raise ValueError('win32 only supports exclusive access (not: {})'.format(exclusive)) 476 else: 477 serial.SerialBase.exclusive.__set__(self, exclusive) 478