1#! python
2#
3# Python Serial Port Extension for Win32, Linux, BSD, Jython
4# see __init__.py
5#
6# This module implements a URL dummy handler for serial_for_url.
7#
8# (C) 2011 Chris Liechti <[email protected]>
9# this is distributed under a free software license, see license.txt
10#
11# URL format:    test://
12
13from serial.serialutil import *
14import time
15import socket
16import logging
17
18# map log level names to constants. used in fromURL()
19LOGGER_LEVELS = {
20    'debug': logging.DEBUG,
21    'info': logging.INFO,
22    'warning': logging.WARNING,
23    'error': logging.ERROR,
24    }
25
26class DummySerial(SerialBase):
27    """Serial port implementation for plain sockets."""
28
29    def open(self):
30        """Open port with current settings. This may throw a SerialException
31           if the port cannot be opened."""
32        self.logger = None
33        if self._port is None:
34            raise SerialException("Port must be configured before it can be used.")
35        # not that there anything to configure...
36        self._reconfigurePort()
37        # all things set up get, now a clean start
38        self._isOpen = True
39
40    def _reconfigurePort(self):
41        """Set communication parameters on opened port. for the test://
42        protocol all settings are ignored!"""
43        if self.logger:
44            self.logger.info('ignored port configuration change')
45
46    def close(self):
47        """Close port"""
48        if self._isOpen:
49            self._isOpen = False
50
51    def makeDeviceName(self, port):
52        raise SerialException("there is no sensible way to turn numbers into URLs")
53
54    def fromURL(self, url):
55        """extract host and port from an URL string"""
56        if url.lower().startswith("test://"): url = url[7:]
57        try:
58            # is there a "path" (our options)?
59            if '/' in url:
60                # cut away options
61                url, options = url.split('/', 1)
62                # process options now, directly altering self
63                for option in options.split('/'):
64                    if '=' in option:
65                        option, value = option.split('=', 1)
66                    else:
67                        value = None
68                    if option == 'logging':
69                        logging.basicConfig()   # XXX is that good to call it here?
70                        self.logger = logging.getLogger('pySerial.test')
71                        self.logger.setLevel(LOGGER_LEVELS[value])
72                        self.logger.debug('enabled logging')
73                    else:
74                        raise ValueError('unknown option: {!r}'.format(option))
75        except ValueError as e:
76            raise SerialException('expected a string in the form "[test://][option[/option...]]": {}'.format(e))
77        return (host, port)
78
79    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
80
81    def inWaiting(self):
82        """Return the number of characters currently in the input buffer."""
83        if not self._isOpen: raise PortNotOpenError()
84        if self.logger:
85            # set this one to debug as the function could be called often...
86            self.logger.debug('WARNING: inWaiting returns dummy value')
87        return 0 # hmmm, see comment in read()
88
89    def read(self, size=1):
90        """Read size bytes from the serial port. If a timeout is set it may
91        return less characters as requested. With no timeout it will block
92        until the requested number of bytes is read."""
93        if not self._isOpen: raise PortNotOpenError()
94        data = '123' # dummy data
95        return bytes(data)
96
97    def write(self, data):
98        """Output the given string over the serial port. Can block if the
99        connection is blocked. May raise SerialException if the connection is
100        closed."""
101        if not self._isOpen: raise PortNotOpenError()
102        # nothing done
103        return len(data)
104
105    def flushInput(self):
106        """Clear input buffer, discarding all that is in the buffer."""
107        if not self._isOpen: raise PortNotOpenError()
108        if self.logger:
109            self.logger.info('ignored flushInput')
110
111    def flushOutput(self):
112        """Clear output buffer, aborting the current output and
113        discarding all that is in the buffer."""
114        if not self._isOpen: raise PortNotOpenError()
115        if self.logger:
116            self.logger.info('ignored flushOutput')
117
118    def sendBreak(self, duration=0.25):
119        """Send break condition. Timed, returns to idle state after given
120        duration."""
121        if not self._isOpen: raise PortNotOpenError()
122        if self.logger:
123            self.logger.info('ignored sendBreak({!r})'.format(duration))
124
125    def setBreak(self, level=True):
126        """Set break: Controls TXD. When active, to transmitting is
127        possible."""
128        if not self._isOpen: raise PortNotOpenError()
129        if self.logger:
130            self.logger.info('ignored setBreak({!r})'.format(level))
131
132    def setRTS(self, level=True):
133        """Set terminal status line: Request To Send"""
134        if not self._isOpen: raise PortNotOpenError()
135        if self.logger:
136            self.logger.info('ignored setRTS({!r})'.format(level))
137
138    def setDTR(self, level=True):
139        """Set terminal status line: Data Terminal Ready"""
140        if not self._isOpen: raise PortNotOpenError()
141        if self.logger:
142            self.logger.info('ignored setDTR({!r})'.format(level))
143
144    def getCTS(self):
145        """Read terminal status line: Clear To Send"""
146        if not self._isOpen: raise PortNotOpenError()
147        if self.logger:
148            self.logger.info('returning dummy for getCTS()')
149        return True
150
151    def getDSR(self):
152        """Read terminal status line: Data Set Ready"""
153        if not self._isOpen: raise PortNotOpenError()
154        if self.logger:
155            self.logger.info('returning dummy for getDSR()')
156        return True
157
158    def getRI(self):
159        """Read terminal status line: Ring Indicator"""
160        if not self._isOpen: raise PortNotOpenError()
161        if self.logger:
162            self.logger.info('returning dummy for getRI()')
163        return False
164
165    def getCD(self):
166        """Read terminal status line: Carrier Detect"""
167        if not self._isOpen: raise PortNotOpenError()
168        if self.logger:
169            self.logger.info('returning dummy for getCD()')
170        return True
171
172    # - - - platform specific - - -
173    # None so far
174
175
176# assemble Serial class with the platform specific implementation and the base
177# for file-like behavior. for Python 2.6 and newer, that provide the new I/O
178# library, derive from io.RawIOBase
179try:
180    import io
181except ImportError:
182    # classic version with our own file-like emulation
183    class Serial(DummySerial, FileLike):
184        pass
185else:
186    # io library present
187    class Serial(DummySerial, io.RawIOBase):
188        pass
189
190
191# simple client test
192if __name__ == '__main__':
193    import sys
194    s = Serial('test://logging=debug')
195    sys.stdout.write('{}\n'.format(s))
196
197    sys.stdout.write("write...\n")
198    s.write("hello\n")
199    s.flush()
200    sys.stdout.write("read: {}\n".format(s.read(5)))
201
202    s.close()
203