1#! python 2# 3# Python Serial Port Extension for Win32, Linux, BSD, Jython 4# see __init__.py 5# 6# This module implements a URL dummy handler for serial_for_url. 7# 8# (C) 2011 Chris Liechti <[email protected]> 9# this is distributed under a free software license, see license.txt 10# 11# URL format: test:// 12 13from serial.serialutil import * 14import time 15import socket 16import logging 17 18# map log level names to constants. used in fromURL() 19LOGGER_LEVELS = { 20 'debug': logging.DEBUG, 21 'info': logging.INFO, 22 'warning': logging.WARNING, 23 'error': logging.ERROR, 24 } 25 26class DummySerial(SerialBase): 27 """Serial port implementation for plain sockets.""" 28 29 def open(self): 30 """Open port with current settings. This may throw a SerialException 31 if the port cannot be opened.""" 32 self.logger = None 33 if self._port is None: 34 raise SerialException("Port must be configured before it can be used.") 35 # not that there anything to configure... 36 self._reconfigurePort() 37 # all things set up get, now a clean start 38 self._isOpen = True 39 40 def _reconfigurePort(self): 41 """Set communication parameters on opened port. for the test:// 42 protocol all settings are ignored!""" 43 if self.logger: 44 self.logger.info('ignored port configuration change') 45 46 def close(self): 47 """Close port""" 48 if self._isOpen: 49 self._isOpen = False 50 51 def makeDeviceName(self, port): 52 raise SerialException("there is no sensible way to turn numbers into URLs") 53 54 def fromURL(self, url): 55 """extract host and port from an URL string""" 56 if url.lower().startswith("test://"): url = url[7:] 57 try: 58 # is there a "path" (our options)? 59 if '/' in url: 60 # cut away options 61 url, options = url.split('/', 1) 62 # process options now, directly altering self 63 for option in options.split('/'): 64 if '=' in option: 65 option, value = option.split('=', 1) 66 else: 67 value = None 68 if option == 'logging': 69 logging.basicConfig() # XXX is that good to call it here? 70 self.logger = logging.getLogger('pySerial.test') 71 self.logger.setLevel(LOGGER_LEVELS[value]) 72 self.logger.debug('enabled logging') 73 else: 74 raise ValueError('unknown option: {!r}'.format(option)) 75 except ValueError as e: 76 raise SerialException('expected a string in the form "[test://][option[/option...]]": {}'.format(e)) 77 return (host, port) 78 79 # - - - - - - - - - - - - - - - - - - - - - - - - 80 81 def inWaiting(self): 82 """Return the number of characters currently in the input buffer.""" 83 if not self._isOpen: raise PortNotOpenError() 84 if self.logger: 85 # set this one to debug as the function could be called often... 86 self.logger.debug('WARNING: inWaiting returns dummy value') 87 return 0 # hmmm, see comment in read() 88 89 def read(self, size=1): 90 """Read size bytes from the serial port. If a timeout is set it may 91 return less characters as requested. With no timeout it will block 92 until the requested number of bytes is read.""" 93 if not self._isOpen: raise PortNotOpenError() 94 data = '123' # dummy data 95 return bytes(data) 96 97 def write(self, data): 98 """Output the given string over the serial port. Can block if the 99 connection is blocked. May raise SerialException if the connection is 100 closed.""" 101 if not self._isOpen: raise PortNotOpenError() 102 # nothing done 103 return len(data) 104 105 def flushInput(self): 106 """Clear input buffer, discarding all that is in the buffer.""" 107 if not self._isOpen: raise PortNotOpenError() 108 if self.logger: 109 self.logger.info('ignored flushInput') 110 111 def flushOutput(self): 112 """Clear output buffer, aborting the current output and 113 discarding all that is in the buffer.""" 114 if not self._isOpen: raise PortNotOpenError() 115 if self.logger: 116 self.logger.info('ignored flushOutput') 117 118 def sendBreak(self, duration=0.25): 119 """Send break condition. Timed, returns to idle state after given 120 duration.""" 121 if not self._isOpen: raise PortNotOpenError() 122 if self.logger: 123 self.logger.info('ignored sendBreak({!r})'.format(duration)) 124 125 def setBreak(self, level=True): 126 """Set break: Controls TXD. When active, to transmitting is 127 possible.""" 128 if not self._isOpen: raise PortNotOpenError() 129 if self.logger: 130 self.logger.info('ignored setBreak({!r})'.format(level)) 131 132 def setRTS(self, level=True): 133 """Set terminal status line: Request To Send""" 134 if not self._isOpen: raise PortNotOpenError() 135 if self.logger: 136 self.logger.info('ignored setRTS({!r})'.format(level)) 137 138 def setDTR(self, level=True): 139 """Set terminal status line: Data Terminal Ready""" 140 if not self._isOpen: raise PortNotOpenError() 141 if self.logger: 142 self.logger.info('ignored setDTR({!r})'.format(level)) 143 144 def getCTS(self): 145 """Read terminal status line: Clear To Send""" 146 if not self._isOpen: raise PortNotOpenError() 147 if self.logger: 148 self.logger.info('returning dummy for getCTS()') 149 return True 150 151 def getDSR(self): 152 """Read terminal status line: Data Set Ready""" 153 if not self._isOpen: raise PortNotOpenError() 154 if self.logger: 155 self.logger.info('returning dummy for getDSR()') 156 return True 157 158 def getRI(self): 159 """Read terminal status line: Ring Indicator""" 160 if not self._isOpen: raise PortNotOpenError() 161 if self.logger: 162 self.logger.info('returning dummy for getRI()') 163 return False 164 165 def getCD(self): 166 """Read terminal status line: Carrier Detect""" 167 if not self._isOpen: raise PortNotOpenError() 168 if self.logger: 169 self.logger.info('returning dummy for getCD()') 170 return True 171 172 # - - - platform specific - - - 173 # None so far 174 175 176# assemble Serial class with the platform specific implementation and the base 177# for file-like behavior. for Python 2.6 and newer, that provide the new I/O 178# library, derive from io.RawIOBase 179try: 180 import io 181except ImportError: 182 # classic version with our own file-like emulation 183 class Serial(DummySerial, FileLike): 184 pass 185else: 186 # io library present 187 class Serial(DummySerial, io.RawIOBase): 188 pass 189 190 191# simple client test 192if __name__ == '__main__': 193 import sys 194 s = Serial('test://logging=debug') 195 sys.stdout.write('{}\n'.format(s)) 196 197 sys.stdout.write("write...\n") 198 s.write("hello\n") 199 s.flush() 200 sys.stdout.write("read: {}\n".format(s.read(5))) 201 202 s.close() 203