1#!/usr/bin/env python
2#
3# Very simple serial terminal
4#
5# This file is part of pySerial. https://github.com/pyserial/pyserial
6# (C)2002-2020 Chris Liechti <[email protected]>
7#
8# SPDX-License-Identifier:    BSD-3-Clause
9
10from __future__ import absolute_import
11
12import codecs
13import os
14import sys
15import threading
16
17import serial
18from serial.tools.list_ports import comports
19from serial.tools import hexlify_codec
20
21# pylint: disable=wrong-import-order,wrong-import-position
22
23codecs.register(lambda c: hexlify_codec.getregentry() if c == 'hexlify' else None)
24
25try:
26    raw_input
27except NameError:
28    # pylint: disable=redefined-builtin,invalid-name
29    raw_input = input   # in python3 it's "raw"
30    unichr = chr
31
32
33def key_description(character):
34    """generate a readable description for a key"""
35    ascii_code = ord(character)
36    if ascii_code < 32:
37        return 'Ctrl+{:c}'.format(ord('@') + ascii_code)
38    else:
39        return repr(character)
40
41
42# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
43class ConsoleBase(object):
44    """OS abstraction for console (input/output codec, no echo)"""
45
46    def __init__(self):
47        if sys.version_info >= (3, 0):
48            self.byte_output = sys.stdout.buffer
49        else:
50            self.byte_output = sys.stdout
51        self.output = sys.stdout
52
53    def setup(self):
54        """Set console to read single characters, no echo"""
55
56    def cleanup(self):
57        """Restore default console settings"""
58
59    def getkey(self):
60        """Read a single key from the console"""
61        return None
62
63    def write_bytes(self, byte_string):
64        """Write bytes (already encoded)"""
65        self.byte_output.write(byte_string)
66        self.byte_output.flush()
67
68    def write(self, text):
69        """Write string"""
70        self.output.write(text)
71        self.output.flush()
72
73    def cancel(self):
74        """Cancel getkey operation"""
75
76    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
77    # context manager:
78    # switch terminal temporary to normal mode (e.g. to get user input)
79
80    def __enter__(self):
81        self.cleanup()
82        return self
83
84    def __exit__(self, *args, **kwargs):
85        self.setup()
86
87
88if os.name == 'nt':  # noqa
89    import msvcrt
90    import ctypes
91    import platform
92
93    class Out(object):
94        """file-like wrapper that uses os.write"""
95
96        def __init__(self, fd):
97            self.fd = fd
98
99        def flush(self):
100            pass
101
102        def write(self, s):
103            os.write(self.fd, s)
104
105    class Console(ConsoleBase):
106        fncodes = {
107            ';': '\x1bOP',  # F1
108            '<': '\x1bOQ',  # F2
109            '=': '\x1bOR',  # F3
110            '>': '\x1bOS',  # F4
111            '?': '\x1b[15~',  # F5
112            '@': '\x1b[17~',  # F6
113            'A': '\x1b[18~',  # F7
114            'B': '\x1b[19~',  # F8
115            'C': '\x1b[20~',  # F9
116            'D': '\x1b[21~',  # F10
117        }
118        navcodes = {
119            'H': '\x1b[A',  # UP
120            'P': '\x1b[B',  # DOWN
121            'K': '\x1b[D',  # LEFT
122            'M': '\x1b[C',  # RIGHT
123            'G': '\x1b[H',  # HOME
124            'O': '\x1b[F',  # END
125            'R': '\x1b[2~',  # INSERT
126            'S': '\x1b[3~',  # DELETE
127            'I': '\x1b[5~',  # PGUP
128            'Q': '\x1b[6~',  # PGDN
129        }
130
131        def __init__(self):
132            super(Console, self).__init__()
133            self._saved_ocp = ctypes.windll.kernel32.GetConsoleOutputCP()
134            self._saved_icp = ctypes.windll.kernel32.GetConsoleCP()
135            ctypes.windll.kernel32.SetConsoleOutputCP(65001)
136            ctypes.windll.kernel32.SetConsoleCP(65001)
137            # ANSI handling available through SetConsoleMode since Windows 10 v1511
138            # https://en.wikipedia.org/wiki/ANSI_escape_code#cite_note-win10th2-1
139            if platform.release() == '10' and int(platform.version().split('.')[2]) > 10586:
140                ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
141                import ctypes.wintypes as wintypes
142                if not hasattr(wintypes, 'LPDWORD'): # PY2
143                    wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD)
144                SetConsoleMode = ctypes.windll.kernel32.SetConsoleMode
145                GetConsoleMode = ctypes.windll.kernel32.GetConsoleMode
146                GetStdHandle = ctypes.windll.kernel32.GetStdHandle
147                mode = wintypes.DWORD()
148                GetConsoleMode(GetStdHandle(-11), ctypes.byref(mode))
149                if (mode.value & ENABLE_VIRTUAL_TERMINAL_PROCESSING) == 0:
150                    SetConsoleMode(GetStdHandle(-11), mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING)
151                    self._saved_cm = mode
152            self.output = codecs.getwriter('UTF-8')(Out(sys.stdout.fileno()), 'replace')
153            # the change of the code page is not propagated to Python, manually fix it
154            sys.stderr = codecs.getwriter('UTF-8')(Out(sys.stderr.fileno()), 'replace')
155            sys.stdout = self.output
156            self.output.encoding = 'UTF-8'  # needed for input
157
158        def __del__(self):
159            ctypes.windll.kernel32.SetConsoleOutputCP(self._saved_ocp)
160            ctypes.windll.kernel32.SetConsoleCP(self._saved_icp)
161            try:
162                ctypes.windll.kernel32.SetConsoleMode(ctypes.windll.kernel32.GetStdHandle(-11), self._saved_cm)
163            except AttributeError: # in case no _saved_cm
164                pass
165
166        def getkey(self):
167            while True:
168                z = msvcrt.getwch()
169                if z == unichr(13):
170                    return unichr(10)
171                elif z is unichr(0) or z is unichr(0xe0):
172                    try:
173                        code = msvcrt.getwch()
174                        if z is unichr(0):
175                            return self.fncodes[code]
176                        else:
177                            return self.navcodes[code]
178                    except KeyError:
179                        pass
180                else:
181                    return z
182
183        def cancel(self):
184            # CancelIo, CancelSynchronousIo do not seem to work when using
185            # getwch, so instead, send a key to the window with the console
186            hwnd = ctypes.windll.kernel32.GetConsoleWindow()
187            ctypes.windll.user32.PostMessageA(hwnd, 0x100, 0x0d, 0)
188
189elif os.name == 'posix':
190    import atexit
191    import termios
192    import fcntl
193
194    class Console(ConsoleBase):
195        def __init__(self):
196            super(Console, self).__init__()
197            self.fd = sys.stdin.fileno()
198            self.old = termios.tcgetattr(self.fd)
199            atexit.register(self.cleanup)
200            if sys.version_info < (3, 0):
201                self.enc_stdin = codecs.getreader(sys.stdin.encoding)(sys.stdin)
202            else:
203                self.enc_stdin = sys.stdin
204
205        def setup(self):
206            new = termios.tcgetattr(self.fd)
207            new[3] = new[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG
208            new[6][termios.VMIN] = 1
209            new[6][termios.VTIME] = 0
210            termios.tcsetattr(self.fd, termios.TCSANOW, new)
211
212        def getkey(self):
213            c = self.enc_stdin.read(1)
214            if c == unichr(0x7f):
215                c = unichr(8)    # map the BS key (which yields DEL) to backspace
216            return c
217
218        def cancel(self):
219            fcntl.ioctl(self.fd, termios.TIOCSTI, b'\0')
220
221        def cleanup(self):
222            termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old)
223
224else:
225    raise NotImplementedError(
226        'Sorry no implementation for your platform ({}) available.'.format(sys.platform))
227
228
229# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
230
231class Transform(object):
232    """do-nothing: forward all data unchanged"""
233    def rx(self, text):
234        """text received from serial port"""
235        return text
236
237    def tx(self, text):
238        """text to be sent to serial port"""
239        return text
240
241    def echo(self, text):
242        """text to be sent but displayed on console"""
243        return text
244
245
246class CRLF(Transform):
247    """ENTER sends CR+LF"""
248
249    def tx(self, text):
250        return text.replace('\n', '\r\n')
251
252
253class CR(Transform):
254    """ENTER sends CR"""
255
256    def rx(self, text):
257        return text.replace('\r', '\n')
258
259    def tx(self, text):
260        return text.replace('\n', '\r')
261
262
263class LF(Transform):
264    """ENTER sends LF"""
265
266
267class NoTerminal(Transform):
268    """remove typical terminal control codes from input"""
269
270    REPLACEMENT_MAP = dict((x, 0x2400 + x) for x in range(32) if unichr(x) not in '\r\n\b\t')
271    REPLACEMENT_MAP.update(
272        {
273            0x7F: 0x2421,  # DEL
274            0x9B: 0x2425,  # CSI
275        })
276
277    def rx(self, text):
278        return text.translate(self.REPLACEMENT_MAP)
279
280    echo = rx
281
282
283class NoControls(NoTerminal):
284    """Remove all control codes, incl. CR+LF"""
285
286    REPLACEMENT_MAP = dict((x, 0x2400 + x) for x in range(32))
287    REPLACEMENT_MAP.update(
288        {
289            0x20: 0x2423,  # visual space
290            0x7F: 0x2421,  # DEL
291            0x9B: 0x2425,  # CSI
292        })
293
294
295class Printable(Transform):
296    """Show decimal code for all non-ASCII characters and replace most control codes"""
297
298    def rx(self, text):
299        r = []
300        for c in text:
301            if ' ' <= c < '\x7f' or c in '\r\n\b\t':
302                r.append(c)
303            elif c < ' ':
304                r.append(unichr(0x2400 + ord(c)))
305            else:
306                r.extend(unichr(0x2080 + ord(d) - 48) for d in '{:d}'.format(ord(c)))
307                r.append(' ')
308        return ''.join(r)
309
310    echo = rx
311
312
313class Colorize(Transform):
314    """Apply different colors for received and echo"""
315
316    def __init__(self):
317        # XXX make it configurable, use colorama?
318        self.input_color = '\x1b[37m'
319        self.echo_color = '\x1b[31m'
320
321    def rx(self, text):
322        return self.input_color + text
323
324    def echo(self, text):
325        return self.echo_color + text
326
327
328class DebugIO(Transform):
329    """Print what is sent and received"""
330
331    def rx(self, text):
332        sys.stderr.write(' [RX:{!r}] '.format(text))
333        sys.stderr.flush()
334        return text
335
336    def tx(self, text):
337        sys.stderr.write(' [TX:{!r}] '.format(text))
338        sys.stderr.flush()
339        return text
340
341
342# other ideas:
343# - add date/time for each newline
344# - insert newline after: a) timeout b) packet end character
345
346EOL_TRANSFORMATIONS = {
347    'crlf': CRLF,
348    'cr': CR,
349    'lf': LF,
350}
351
352TRANSFORMATIONS = {
353    'direct': Transform,    # no transformation
354    'default': NoTerminal,
355    'nocontrol': NoControls,
356    'printable': Printable,
357    'colorize': Colorize,
358    'debug': DebugIO,
359}
360
361
362# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
363def ask_for_port():
364    """\
365    Show a list of ports and ask the user for a choice. To make selection
366    easier on systems with long device names, also allow the input of an
367    index.
368    """
369    sys.stderr.write('\n--- Available ports:\n')
370    ports = []
371    for n, (port, desc, hwid) in enumerate(sorted(comports()), 1):
372        sys.stderr.write('--- {:2}: {:20} {!r}\n'.format(n, port, desc))
373        ports.append(port)
374    while True:
375        port = raw_input('--- Enter port index or full name: ')
376        try:
377            index = int(port) - 1
378            if not 0 <= index < len(ports):
379                sys.stderr.write('--- Invalid index!\n')
380                continue
381        except ValueError:
382            pass
383        else:
384            port = ports[index]
385        return port
386
387
388class Miniterm(object):
389    """\
390    Terminal application. Copy data from serial port to console and vice versa.
391    Handle special keys from the console to show menu etc.
392    """
393
394    def __init__(self, serial_instance, echo=False, eol='crlf', filters=()):
395        self.console = Console()
396        self.serial = serial_instance
397        self.echo = echo
398        self.raw = False
399        self.input_encoding = 'UTF-8'
400        self.output_encoding = 'UTF-8'
401        self.eol = eol
402        self.filters = filters
403        self.update_transformations()
404        self.exit_character = unichr(0x1d)  # GS/CTRL+]
405        self.menu_character = unichr(0x14)  # Menu: CTRL+T
406        self.alive = None
407        self._reader_alive = None
408        self.receiver_thread = None
409        self.rx_decoder = None
410        self.tx_decoder = None
411
412    def _start_reader(self):
413        """Start reader thread"""
414        self._reader_alive = True
415        # start serial->console thread
416        self.receiver_thread = threading.Thread(target=self.reader, name='rx')
417        self.receiver_thread.daemon = True
418        self.receiver_thread.start()
419
420    def _stop_reader(self):
421        """Stop reader thread only, wait for clean exit of thread"""
422        self._reader_alive = False
423        if hasattr(self.serial, 'cancel_read'):
424            self.serial.cancel_read()
425        self.receiver_thread.join()
426
427    def start(self):
428        """start worker threads"""
429        self.alive = True
430        self._start_reader()
431        # enter console->serial loop
432        self.transmitter_thread = threading.Thread(target=self.writer, name='tx')
433        self.transmitter_thread.daemon = True
434        self.transmitter_thread.start()
435        self.console.setup()
436
437    def stop(self):
438        """set flag to stop worker threads"""
439        self.alive = False
440
441    def join(self, transmit_only=False):
442        """wait for worker threads to terminate"""
443        self.transmitter_thread.join()
444        if not transmit_only:
445            if hasattr(self.serial, 'cancel_read'):
446                self.serial.cancel_read()
447            self.receiver_thread.join()
448
449    def close(self):
450        self.serial.close()
451
452    def update_transformations(self):
453        """take list of transformation classes and instantiate them for rx and tx"""
454        transformations = [EOL_TRANSFORMATIONS[self.eol]] + [TRANSFORMATIONS[f]
455                                                             for f in self.filters]
456        self.tx_transformations = [t() for t in transformations]
457        self.rx_transformations = list(reversed(self.tx_transformations))
458
459    def set_rx_encoding(self, encoding, errors='replace'):
460        """set encoding for received data"""
461        self.input_encoding = encoding
462        self.rx_decoder = codecs.getincrementaldecoder(encoding)(errors)
463
464    def set_tx_encoding(self, encoding, errors='replace'):
465        """set encoding for transmitted data"""
466        self.output_encoding = encoding
467        self.tx_encoder = codecs.getincrementalencoder(encoding)(errors)
468
469    def dump_port_settings(self):
470        """Write current settings to sys.stderr"""
471        sys.stderr.write("\n--- Settings: {p.name}  {p.baudrate},{p.bytesize},{p.parity},{p.stopbits}\n".format(
472            p=self.serial))
473        sys.stderr.write('--- RTS: {:8}  DTR: {:8}  BREAK: {:8}\n'.format(
474            ('active' if self.serial.rts else 'inactive'),
475            ('active' if self.serial.dtr else 'inactive'),
476            ('active' if self.serial.break_condition else 'inactive')))
477        try:
478            sys.stderr.write('--- CTS: {:8}  DSR: {:8}  RI: {:8}  CD: {:8}\n'.format(
479                ('active' if self.serial.cts else 'inactive'),
480                ('active' if self.serial.dsr else 'inactive'),
481                ('active' if self.serial.ri else 'inactive'),
482                ('active' if self.serial.cd else 'inactive')))
483        except serial.SerialException:
484            # on RFC 2217 ports, it can happen if no modem state notification was
485            # yet received. ignore this error.
486            pass
487        sys.stderr.write('--- software flow control: {}\n'.format('active' if self.serial.xonxoff else 'inactive'))
488        sys.stderr.write('--- hardware flow control: {}\n'.format('active' if self.serial.rtscts else 'inactive'))
489        sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
490        sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
491        sys.stderr.write('--- EOL: {}\n'.format(self.eol.upper()))
492        sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
493
494    def reader(self):
495        """loop and copy serial->console"""
496        try:
497            while self.alive and self._reader_alive:
498                # read all that is there or wait for one byte
499                data = self.serial.read(self.serial.in_waiting or 1)
500                if data:
501                    if self.raw:
502                        self.console.write_bytes(data)
503                    else:
504                        text = self.rx_decoder.decode(data)
505                        for transformation in self.rx_transformations:
506                            text = transformation.rx(text)
507                        self.console.write(text)
508        except serial.SerialException:
509            self.alive = False
510            self.console.cancel()
511            raise       # XXX handle instead of re-raise?
512
513    def writer(self):
514        """\
515        Loop and copy console->serial until self.exit_character character is
516        found. When self.menu_character is found, interpret the next key
517        locally.
518        """
519        menu_active = False
520        try:
521            while self.alive:
522                try:
523                    c = self.console.getkey()
524                except KeyboardInterrupt:
525                    c = '\x03'
526                if not self.alive:
527                    break
528                if menu_active:
529                    self.handle_menu_key(c)
530                    menu_active = False
531                elif c == self.menu_character:
532                    menu_active = True      # next char will be for menu
533                elif c == self.exit_character:
534                    self.stop()             # exit app
535                    break
536                else:
537                    #~ if self.raw:
538                    text = c
539                    for transformation in self.tx_transformations:
540                        text = transformation.tx(text)
541                    self.serial.write(self.tx_encoder.encode(text))
542                    if self.echo:
543                        echo_text = c
544                        for transformation in self.tx_transformations:
545                            echo_text = transformation.echo(echo_text)
546                        self.console.write(echo_text)
547        except:
548            self.alive = False
549            raise
550
551    def handle_menu_key(self, c):
552        """Implement a simple menu / settings"""
553        if c == self.menu_character or c == self.exit_character:
554            # Menu/exit character again -> send itself
555            self.serial.write(self.tx_encoder.encode(c))
556            if self.echo:
557                self.console.write(c)
558        elif c == '\x15':                       # CTRL+U -> upload file
559            self.upload_file()
560        elif c in '\x08hH?':                    # CTRL+H, h, H, ? -> Show help
561            sys.stderr.write(self.get_help_text())
562        elif c == '\x12':                       # CTRL+R -> Toggle RTS
563            self.serial.rts = not self.serial.rts
564            sys.stderr.write('--- RTS {} ---\n'.format('active' if self.serial.rts else 'inactive'))
565        elif c == '\x04':                       # CTRL+D -> Toggle DTR
566            self.serial.dtr = not self.serial.dtr
567            sys.stderr.write('--- DTR {} ---\n'.format('active' if self.serial.dtr else 'inactive'))
568        elif c == '\x02':                       # CTRL+B -> toggle BREAK condition
569            self.serial.break_condition = not self.serial.break_condition
570            sys.stderr.write('--- BREAK {} ---\n'.format('active' if self.serial.break_condition else 'inactive'))
571        elif c == '\x05':                       # CTRL+E -> toggle local echo
572            self.echo = not self.echo
573            sys.stderr.write('--- local echo {} ---\n'.format('active' if self.echo else 'inactive'))
574        elif c == '\x06':                       # CTRL+F -> edit filters
575            self.change_filter()
576        elif c == '\x0c':                       # CTRL+L -> EOL mode
577            modes = list(EOL_TRANSFORMATIONS)   # keys
578            eol = modes.index(self.eol) + 1
579            if eol >= len(modes):
580                eol = 0
581            self.eol = modes[eol]
582            sys.stderr.write('--- EOL: {} ---\n'.format(self.eol.upper()))
583            self.update_transformations()
584        elif c == '\x01':                       # CTRL+A -> set encoding
585            self.change_encoding()
586        elif c == '\x09':                       # CTRL+I -> info
587            self.dump_port_settings()
588        #~ elif c == '\x01':                       # CTRL+A -> cycle escape mode
589        #~ elif c == '\x0c':                       # CTRL+L -> cycle linefeed mode
590        elif c in 'pP':                         # P -> change port
591            self.change_port()
592        elif c in 'zZ':                         # S -> suspend / open port temporarily
593            self.suspend_port()
594        elif c in 'bB':                         # B -> change baudrate
595            self.change_baudrate()
596        elif c == '8':                          # 8 -> change to 8 bits
597            self.serial.bytesize = serial.EIGHTBITS
598            self.dump_port_settings()
599        elif c == '7':                          # 7 -> change to 8 bits
600            self.serial.bytesize = serial.SEVENBITS
601            self.dump_port_settings()
602        elif c in 'eE':                         # E -> change to even parity
603            self.serial.parity = serial.PARITY_EVEN
604            self.dump_port_settings()
605        elif c in 'oO':                         # O -> change to odd parity
606            self.serial.parity = serial.PARITY_ODD
607            self.dump_port_settings()
608        elif c in 'mM':                         # M -> change to mark parity
609            self.serial.parity = serial.PARITY_MARK
610            self.dump_port_settings()
611        elif c in 'sS':                         # S -> change to space parity
612            self.serial.parity = serial.PARITY_SPACE
613            self.dump_port_settings()
614        elif c in 'nN':                         # N -> change to no parity
615            self.serial.parity = serial.PARITY_NONE
616            self.dump_port_settings()
617        elif c == '1':                          # 1 -> change to 1 stop bits
618            self.serial.stopbits = serial.STOPBITS_ONE
619            self.dump_port_settings()
620        elif c == '2':                          # 2 -> change to 2 stop bits
621            self.serial.stopbits = serial.STOPBITS_TWO
622            self.dump_port_settings()
623        elif c == '3':                          # 3 -> change to 1.5 stop bits
624            self.serial.stopbits = serial.STOPBITS_ONE_POINT_FIVE
625            self.dump_port_settings()
626        elif c in 'xX':                         # X -> change software flow control
627            self.serial.xonxoff = (c == 'X')
628            self.dump_port_settings()
629        elif c in 'rR':                         # R -> change hardware flow control
630            self.serial.rtscts = (c == 'R')
631            self.dump_port_settings()
632        elif c in 'qQ':
633            self.stop()                         # Q -> exit app
634        else:
635            sys.stderr.write('--- unknown menu character {} --\n'.format(key_description(c)))
636
637    def upload_file(self):
638        """Ask user for filename and send its contents"""
639        sys.stderr.write('\n--- File to upload: ')
640        sys.stderr.flush()
641        with self.console:
642            filename = sys.stdin.readline().rstrip('\r\n')
643            if filename:
644                try:
645                    with open(filename, 'rb') as f:
646                        sys.stderr.write('--- Sending file {} ---\n'.format(filename))
647                        while True:
648                            block = f.read(1024)
649                            if not block:
650                                break
651                            self.serial.write(block)
652                            # Wait for output buffer to drain.
653                            self.serial.flush()
654                            sys.stderr.write('.')   # Progress indicator.
655                    sys.stderr.write('\n--- File {} sent ---\n'.format(filename))
656                except IOError as e:
657                    sys.stderr.write('--- ERROR opening file {}: {} ---\n'.format(filename, e))
658
659    def change_filter(self):
660        """change the i/o transformations"""
661        sys.stderr.write('\n--- Available Filters:\n')
662        sys.stderr.write('\n'.join(
663            '---   {:<10} = {.__doc__}'.format(k, v)
664            for k, v in sorted(TRANSFORMATIONS.items())))
665        sys.stderr.write('\n--- Enter new filter name(s) [{}]: '.format(' '.join(self.filters)))
666        with self.console:
667            new_filters = sys.stdin.readline().lower().split()
668        if new_filters:
669            for f in new_filters:
670                if f not in TRANSFORMATIONS:
671                    sys.stderr.write('--- unknown filter: {!r}\n'.format(f))
672                    break
673            else:
674                self.filters = new_filters
675                self.update_transformations()
676        sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
677
678    def change_encoding(self):
679        """change encoding on the serial port"""
680        sys.stderr.write('\n--- Enter new encoding name [{}]: '.format(self.input_encoding))
681        with self.console:
682            new_encoding = sys.stdin.readline().strip()
683        if new_encoding:
684            try:
685                codecs.lookup(new_encoding)
686            except LookupError:
687                sys.stderr.write('--- invalid encoding name: {}\n'.format(new_encoding))
688            else:
689                self.set_rx_encoding(new_encoding)
690                self.set_tx_encoding(new_encoding)
691        sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
692        sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
693
694    def change_baudrate(self):
695        """change the baudrate"""
696        sys.stderr.write('\n--- Baudrate: ')
697        sys.stderr.flush()
698        with self.console:
699            backup = self.serial.baudrate
700            try:
701                self.serial.baudrate = int(sys.stdin.readline().strip())
702            except ValueError as e:
703                sys.stderr.write('--- ERROR setting baudrate: {} ---\n'.format(e))
704                self.serial.baudrate = backup
705            else:
706                self.dump_port_settings()
707
708    def change_port(self):
709        """Have a conversation with the user to change the serial port"""
710        with self.console:
711            try:
712                port = ask_for_port()
713            except KeyboardInterrupt:
714                port = None
715        if port and port != self.serial.port:
716            # reader thread needs to be shut down
717            self._stop_reader()
718            # save settings
719            settings = self.serial.getSettingsDict()
720            try:
721                new_serial = serial.serial_for_url(port, do_not_open=True)
722                # restore settings and open
723                new_serial.applySettingsDict(settings)
724                new_serial.rts = self.serial.rts
725                new_serial.dtr = self.serial.dtr
726                new_serial.open()
727                new_serial.break_condition = self.serial.break_condition
728            except Exception as e:
729                sys.stderr.write('--- ERROR opening new port: {} ---\n'.format(e))
730                new_serial.close()
731            else:
732                self.serial.close()
733                self.serial = new_serial
734                sys.stderr.write('--- Port changed to: {} ---\n'.format(self.serial.port))
735            # and restart the reader thread
736            self._start_reader()
737
738    def suspend_port(self):
739        """\
740        open port temporarily, allow reconnect, exit and port change to get
741        out of the loop
742        """
743        # reader thread needs to be shut down
744        self._stop_reader()
745        self.serial.close()
746        sys.stderr.write('\n--- Port closed: {} ---\n'.format(self.serial.port))
747        do_change_port = False
748        while not self.serial.is_open:
749            sys.stderr.write('--- Quit: {exit} | p: port change | any other key to reconnect ---\n'.format(
750                exit=key_description(self.exit_character)))
751            k = self.console.getkey()
752            if k == self.exit_character:
753                self.stop()             # exit app
754                break
755            elif k in 'pP':
756                do_change_port = True
757                break
758            try:
759                self.serial.open()
760            except Exception as e:
761                sys.stderr.write('--- ERROR opening port: {} ---\n'.format(e))
762        if do_change_port:
763            self.change_port()
764        else:
765            # and restart the reader thread
766            self._start_reader()
767            sys.stderr.write('--- Port opened: {} ---\n'.format(self.serial.port))
768
769    def get_help_text(self):
770        """return the help text"""
771        # help text, starts with blank line!
772        return """
773--- pySerial ({version}) - miniterm - help
774---
775--- {exit:8} Exit program (alias {menu} Q)
776--- {menu:8} Menu escape key, followed by:
777--- Menu keys:
778---    {menu:7} Send the menu character itself to remote
779---    {exit:7} Send the exit character itself to remote
780---    {info:7} Show info
781---    {upload:7} Upload file (prompt will be shown)
782---    {repr:7} encoding
783---    {filter:7} edit filters
784--- Toggles:
785---    {rts:7} RTS   {dtr:7} DTR   {brk:7} BREAK
786---    {echo:7} echo  {eol:7} EOL
787---
788--- Port settings ({menu} followed by the following):
789---    p          change port
790---    7 8        set data bits
791---    N E O S M  change parity (None, Even, Odd, Space, Mark)
792---    1 2 3      set stop bits (1, 2, 1.5)
793---    b          change baud rate
794---    x X        disable/enable software flow control
795---    r R        disable/enable hardware flow control
796""".format(version=getattr(serial, 'VERSION', 'unknown version'),
797           exit=key_description(self.exit_character),
798           menu=key_description(self.menu_character),
799           rts=key_description('\x12'),
800           dtr=key_description('\x04'),
801           brk=key_description('\x02'),
802           echo=key_description('\x05'),
803           info=key_description('\x09'),
804           upload=key_description('\x15'),
805           repr=key_description('\x01'),
806           filter=key_description('\x06'),
807           eol=key_description('\x0c'))
808
809
810# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
811# default args can be used to override when calling main() from an other script
812# e.g to create a miniterm-my-device.py
813def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr=None, serial_instance=None):
814    """Command line tool, entry point"""
815
816    import argparse
817
818    parser = argparse.ArgumentParser(
819        description='Miniterm - A simple terminal program for the serial port.')
820
821    parser.add_argument(
822        'port',
823        nargs='?',
824        help='serial port name ("-" to show port list)',
825        default=default_port)
826
827    parser.add_argument(
828        'baudrate',
829        nargs='?',
830        type=int,
831        help='set baud rate, default: %(default)s',
832        default=default_baudrate)
833
834    group = parser.add_argument_group('port settings')
835
836    group.add_argument(
837        '--parity',
838        choices=['N', 'E', 'O', 'S', 'M'],
839        type=lambda c: c.upper(),
840        help='set parity, one of {N E O S M}, default: N',
841        default='N')
842
843    group.add_argument(
844        '--rtscts',
845        action='store_true',
846        help='enable RTS/CTS flow control (default off)',
847        default=False)
848
849    group.add_argument(
850        '--xonxoff',
851        action='store_true',
852        help='enable software flow control (default off)',
853        default=False)
854
855    group.add_argument(
856        '--rts',
857        type=int,
858        help='set initial RTS line state (possible values: 0, 1)',
859        default=default_rts)
860
861    group.add_argument(
862        '--dtr',
863        type=int,
864        help='set initial DTR line state (possible values: 0, 1)',
865        default=default_dtr)
866
867    group.add_argument(
868        '--non-exclusive',
869        dest='exclusive',
870        action='store_false',
871        help='disable locking for native ports',
872        default=True)
873
874    group.add_argument(
875        '--ask',
876        action='store_true',
877        help='ask again for port when open fails',
878        default=False)
879
880    group = parser.add_argument_group('data handling')
881
882    group.add_argument(
883        '-e', '--echo',
884        action='store_true',
885        help='enable local echo (default off)',
886        default=False)
887
888    group.add_argument(
889        '--encoding',
890        dest='serial_port_encoding',
891        metavar='CODEC',
892        help='set the encoding for the serial port (e.g. hexlify, Latin1, UTF-8), default: %(default)s',
893        default='UTF-8')
894
895    group.add_argument(
896        '-f', '--filter',
897        action='append',
898        metavar='NAME',
899        help='add text transformation',
900        default=[])
901
902    group.add_argument(
903        '--eol',
904        choices=['CR', 'LF', 'CRLF'],
905        type=lambda c: c.upper(),
906        help='end of line mode',
907        default='CRLF')
908
909    group.add_argument(
910        '--raw',
911        action='store_true',
912        help='Do no apply any encodings/transformations',
913        default=False)
914
915    group = parser.add_argument_group('hotkeys')
916
917    group.add_argument(
918        '--exit-char',
919        type=int,
920        metavar='NUM',
921        help='Unicode of special character that is used to exit the application, default: %(default)s',
922        default=0x1d)  # GS/CTRL+]
923
924    group.add_argument(
925        '--menu-char',
926        type=int,
927        metavar='NUM',
928        help='Unicode code of special character that is used to control miniterm (menu), default: %(default)s',
929        default=0x14)  # Menu: CTRL+T
930
931    group = parser.add_argument_group('diagnostics')
932
933    group.add_argument(
934        '-q', '--quiet',
935        action='store_true',
936        help='suppress non-error messages',
937        default=False)
938
939    group.add_argument(
940        '--develop',
941        action='store_true',
942        help='show Python traceback on error',
943        default=False)
944
945    args = parser.parse_args()
946
947    if args.menu_char == args.exit_char:
948        parser.error('--exit-char can not be the same as --menu-char')
949
950    if args.filter:
951        if 'help' in args.filter:
952            sys.stderr.write('Available filters:\n')
953            sys.stderr.write('\n'.join(
954                '{:<10} = {.__doc__}'.format(k, v)
955                for k, v in sorted(TRANSFORMATIONS.items())))
956            sys.stderr.write('\n')
957            sys.exit(1)
958        filters = args.filter
959    else:
960        filters = ['default']
961
962    while serial_instance is None:
963        # no port given on command line -> ask user now
964        if args.port is None or args.port == '-':
965            try:
966                args.port = ask_for_port()
967            except KeyboardInterrupt:
968                sys.stderr.write('\n')
969                parser.error('user aborted and port is not given')
970            else:
971                if not args.port:
972                    parser.error('port is not given')
973        try:
974            serial_instance = serial.serial_for_url(
975                args.port,
976                args.baudrate,
977                parity=args.parity,
978                rtscts=args.rtscts,
979                xonxoff=args.xonxoff,
980                do_not_open=True)
981
982            if not hasattr(serial_instance, 'cancel_read'):
983                # enable timeout for alive flag polling if cancel_read is not available
984                serial_instance.timeout = 1
985
986            if args.dtr is not None:
987                if not args.quiet:
988                    sys.stderr.write('--- forcing DTR {}\n'.format('active' if args.dtr else 'inactive'))
989                serial_instance.dtr = args.dtr
990            if args.rts is not None:
991                if not args.quiet:
992                    sys.stderr.write('--- forcing RTS {}\n'.format('active' if args.rts else 'inactive'))
993                serial_instance.rts = args.rts
994
995            if isinstance(serial_instance, serial.Serial):
996                serial_instance.exclusive = args.exclusive
997
998            serial_instance.open()
999        except serial.SerialException as e:
1000            sys.stderr.write('could not open port {!r}: {}\n'.format(args.port, e))
1001            if args.develop:
1002                raise
1003            if not args.ask:
1004                sys.exit(1)
1005            else:
1006                args.port = '-'
1007        else:
1008            break
1009
1010    miniterm = Miniterm(
1011        serial_instance,
1012        echo=args.echo,
1013        eol=args.eol.lower(),
1014        filters=filters)
1015    miniterm.exit_character = unichr(args.exit_char)
1016    miniterm.menu_character = unichr(args.menu_char)
1017    miniterm.raw = args.raw
1018    miniterm.set_rx_encoding(args.serial_port_encoding)
1019    miniterm.set_tx_encoding(args.serial_port_encoding)
1020
1021    if not args.quiet:
1022        sys.stderr.write('--- Miniterm on {p.name}  {p.baudrate},{p.bytesize},{p.parity},{p.stopbits} ---\n'.format(
1023            p=miniterm.serial))
1024        sys.stderr.write('--- Quit: {} | Menu: {} | Help: {} followed by {} ---\n'.format(
1025            key_description(miniterm.exit_character),
1026            key_description(miniterm.menu_character),
1027            key_description(miniterm.menu_character),
1028            key_description('\x08')))
1029
1030    miniterm.start()
1031    try:
1032        miniterm.join(True)
1033    except KeyboardInterrupt:
1034        pass
1035    if not args.quiet:
1036        sys.stderr.write('\n--- exit ---\n')
1037    miniterm.join()
1038    miniterm.close()
1039
1040# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
1041if __name__ == '__main__':
1042    main()
1043