1#!/usr/bin/env python 2# 3# Very simple serial terminal 4# 5# This file is part of pySerial. https://github.com/pyserial/pyserial 6# (C)2002-2020 Chris Liechti <[email protected]> 7# 8# SPDX-License-Identifier: BSD-3-Clause 9 10from __future__ import absolute_import 11 12import codecs 13import os 14import sys 15import threading 16 17import serial 18from serial.tools.list_ports import comports 19from serial.tools import hexlify_codec 20 21# pylint: disable=wrong-import-order,wrong-import-position 22 23codecs.register(lambda c: hexlify_codec.getregentry() if c == 'hexlify' else None) 24 25try: 26 raw_input 27except NameError: 28 # pylint: disable=redefined-builtin,invalid-name 29 raw_input = input # in python3 it's "raw" 30 unichr = chr 31 32 33def key_description(character): 34 """generate a readable description for a key""" 35 ascii_code = ord(character) 36 if ascii_code < 32: 37 return 'Ctrl+{:c}'.format(ord('@') + ascii_code) 38 else: 39 return repr(character) 40 41 42# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 43class ConsoleBase(object): 44 """OS abstraction for console (input/output codec, no echo)""" 45 46 def __init__(self): 47 if sys.version_info >= (3, 0): 48 self.byte_output = sys.stdout.buffer 49 else: 50 self.byte_output = sys.stdout 51 self.output = sys.stdout 52 53 def setup(self): 54 """Set console to read single characters, no echo""" 55 56 def cleanup(self): 57 """Restore default console settings""" 58 59 def getkey(self): 60 """Read a single key from the console""" 61 return None 62 63 def write_bytes(self, byte_string): 64 """Write bytes (already encoded)""" 65 self.byte_output.write(byte_string) 66 self.byte_output.flush() 67 68 def write(self, text): 69 """Write string""" 70 self.output.write(text) 71 self.output.flush() 72 73 def cancel(self): 74 """Cancel getkey operation""" 75 76 # - - - - - - - - - - - - - - - - - - - - - - - - 77 # context manager: 78 # switch terminal temporary to normal mode (e.g. to get user input) 79 80 def __enter__(self): 81 self.cleanup() 82 return self 83 84 def __exit__(self, *args, **kwargs): 85 self.setup() 86 87 88if os.name == 'nt': # noqa 89 import msvcrt 90 import ctypes 91 import platform 92 93 class Out(object): 94 """file-like wrapper that uses os.write""" 95 96 def __init__(self, fd): 97 self.fd = fd 98 99 def flush(self): 100 pass 101 102 def write(self, s): 103 os.write(self.fd, s) 104 105 class Console(ConsoleBase): 106 fncodes = { 107 ';': '\x1bOP', # F1 108 '<': '\x1bOQ', # F2 109 '=': '\x1bOR', # F3 110 '>': '\x1bOS', # F4 111 '?': '\x1b[15~', # F5 112 '@': '\x1b[17~', # F6 113 'A': '\x1b[18~', # F7 114 'B': '\x1b[19~', # F8 115 'C': '\x1b[20~', # F9 116 'D': '\x1b[21~', # F10 117 } 118 navcodes = { 119 'H': '\x1b[A', # UP 120 'P': '\x1b[B', # DOWN 121 'K': '\x1b[D', # LEFT 122 'M': '\x1b[C', # RIGHT 123 'G': '\x1b[H', # HOME 124 'O': '\x1b[F', # END 125 'R': '\x1b[2~', # INSERT 126 'S': '\x1b[3~', # DELETE 127 'I': '\x1b[5~', # PGUP 128 'Q': '\x1b[6~', # PGDN 129 } 130 131 def __init__(self): 132 super(Console, self).__init__() 133 self._saved_ocp = ctypes.windll.kernel32.GetConsoleOutputCP() 134 self._saved_icp = ctypes.windll.kernel32.GetConsoleCP() 135 ctypes.windll.kernel32.SetConsoleOutputCP(65001) 136 ctypes.windll.kernel32.SetConsoleCP(65001) 137 # ANSI handling available through SetConsoleMode since Windows 10 v1511 138 # https://en.wikipedia.org/wiki/ANSI_escape_code#cite_note-win10th2-1 139 if platform.release() == '10' and int(platform.version().split('.')[2]) > 10586: 140 ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004 141 import ctypes.wintypes as wintypes 142 if not hasattr(wintypes, 'LPDWORD'): # PY2 143 wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD) 144 SetConsoleMode = ctypes.windll.kernel32.SetConsoleMode 145 GetConsoleMode = ctypes.windll.kernel32.GetConsoleMode 146 GetStdHandle = ctypes.windll.kernel32.GetStdHandle 147 mode = wintypes.DWORD() 148 GetConsoleMode(GetStdHandle(-11), ctypes.byref(mode)) 149 if (mode.value & ENABLE_VIRTUAL_TERMINAL_PROCESSING) == 0: 150 SetConsoleMode(GetStdHandle(-11), mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING) 151 self._saved_cm = mode 152 self.output = codecs.getwriter('UTF-8')(Out(sys.stdout.fileno()), 'replace') 153 # the change of the code page is not propagated to Python, manually fix it 154 sys.stderr = codecs.getwriter('UTF-8')(Out(sys.stderr.fileno()), 'replace') 155 sys.stdout = self.output 156 self.output.encoding = 'UTF-8' # needed for input 157 158 def __del__(self): 159 ctypes.windll.kernel32.SetConsoleOutputCP(self._saved_ocp) 160 ctypes.windll.kernel32.SetConsoleCP(self._saved_icp) 161 try: 162 ctypes.windll.kernel32.SetConsoleMode(ctypes.windll.kernel32.GetStdHandle(-11), self._saved_cm) 163 except AttributeError: # in case no _saved_cm 164 pass 165 166 def getkey(self): 167 while True: 168 z = msvcrt.getwch() 169 if z == unichr(13): 170 return unichr(10) 171 elif z is unichr(0) or z is unichr(0xe0): 172 try: 173 code = msvcrt.getwch() 174 if z is unichr(0): 175 return self.fncodes[code] 176 else: 177 return self.navcodes[code] 178 except KeyError: 179 pass 180 else: 181 return z 182 183 def cancel(self): 184 # CancelIo, CancelSynchronousIo do not seem to work when using 185 # getwch, so instead, send a key to the window with the console 186 hwnd = ctypes.windll.kernel32.GetConsoleWindow() 187 ctypes.windll.user32.PostMessageA(hwnd, 0x100, 0x0d, 0) 188 189elif os.name == 'posix': 190 import atexit 191 import termios 192 import fcntl 193 194 class Console(ConsoleBase): 195 def __init__(self): 196 super(Console, self).__init__() 197 self.fd = sys.stdin.fileno() 198 self.old = termios.tcgetattr(self.fd) 199 atexit.register(self.cleanup) 200 if sys.version_info < (3, 0): 201 self.enc_stdin = codecs.getreader(sys.stdin.encoding)(sys.stdin) 202 else: 203 self.enc_stdin = sys.stdin 204 205 def setup(self): 206 new = termios.tcgetattr(self.fd) 207 new[3] = new[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG 208 new[6][termios.VMIN] = 1 209 new[6][termios.VTIME] = 0 210 termios.tcsetattr(self.fd, termios.TCSANOW, new) 211 212 def getkey(self): 213 c = self.enc_stdin.read(1) 214 if c == unichr(0x7f): 215 c = unichr(8) # map the BS key (which yields DEL) to backspace 216 return c 217 218 def cancel(self): 219 fcntl.ioctl(self.fd, termios.TIOCSTI, b'\0') 220 221 def cleanup(self): 222 termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old) 223 224else: 225 raise NotImplementedError( 226 'Sorry no implementation for your platform ({}) available.'.format(sys.platform)) 227 228 229# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 230 231class Transform(object): 232 """do-nothing: forward all data unchanged""" 233 def rx(self, text): 234 """text received from serial port""" 235 return text 236 237 def tx(self, text): 238 """text to be sent to serial port""" 239 return text 240 241 def echo(self, text): 242 """text to be sent but displayed on console""" 243 return text 244 245 246class CRLF(Transform): 247 """ENTER sends CR+LF""" 248 249 def tx(self, text): 250 return text.replace('\n', '\r\n') 251 252 253class CR(Transform): 254 """ENTER sends CR""" 255 256 def rx(self, text): 257 return text.replace('\r', '\n') 258 259 def tx(self, text): 260 return text.replace('\n', '\r') 261 262 263class LF(Transform): 264 """ENTER sends LF""" 265 266 267class NoTerminal(Transform): 268 """remove typical terminal control codes from input""" 269 270 REPLACEMENT_MAP = dict((x, 0x2400 + x) for x in range(32) if unichr(x) not in '\r\n\b\t') 271 REPLACEMENT_MAP.update( 272 { 273 0x7F: 0x2421, # DEL 274 0x9B: 0x2425, # CSI 275 }) 276 277 def rx(self, text): 278 return text.translate(self.REPLACEMENT_MAP) 279 280 echo = rx 281 282 283class NoControls(NoTerminal): 284 """Remove all control codes, incl. CR+LF""" 285 286 REPLACEMENT_MAP = dict((x, 0x2400 + x) for x in range(32)) 287 REPLACEMENT_MAP.update( 288 { 289 0x20: 0x2423, # visual space 290 0x7F: 0x2421, # DEL 291 0x9B: 0x2425, # CSI 292 }) 293 294 295class Printable(Transform): 296 """Show decimal code for all non-ASCII characters and replace most control codes""" 297 298 def rx(self, text): 299 r = [] 300 for c in text: 301 if ' ' <= c < '\x7f' or c in '\r\n\b\t': 302 r.append(c) 303 elif c < ' ': 304 r.append(unichr(0x2400 + ord(c))) 305 else: 306 r.extend(unichr(0x2080 + ord(d) - 48) for d in '{:d}'.format(ord(c))) 307 r.append(' ') 308 return ''.join(r) 309 310 echo = rx 311 312 313class Colorize(Transform): 314 """Apply different colors for received and echo""" 315 316 def __init__(self): 317 # XXX make it configurable, use colorama? 318 self.input_color = '\x1b[37m' 319 self.echo_color = '\x1b[31m' 320 321 def rx(self, text): 322 return self.input_color + text 323 324 def echo(self, text): 325 return self.echo_color + text 326 327 328class DebugIO(Transform): 329 """Print what is sent and received""" 330 331 def rx(self, text): 332 sys.stderr.write(' [RX:{!r}] '.format(text)) 333 sys.stderr.flush() 334 return text 335 336 def tx(self, text): 337 sys.stderr.write(' [TX:{!r}] '.format(text)) 338 sys.stderr.flush() 339 return text 340 341 342# other ideas: 343# - add date/time for each newline 344# - insert newline after: a) timeout b) packet end character 345 346EOL_TRANSFORMATIONS = { 347 'crlf': CRLF, 348 'cr': CR, 349 'lf': LF, 350} 351 352TRANSFORMATIONS = { 353 'direct': Transform, # no transformation 354 'default': NoTerminal, 355 'nocontrol': NoControls, 356 'printable': Printable, 357 'colorize': Colorize, 358 'debug': DebugIO, 359} 360 361 362# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 363def ask_for_port(): 364 """\ 365 Show a list of ports and ask the user for a choice. To make selection 366 easier on systems with long device names, also allow the input of an 367 index. 368 """ 369 sys.stderr.write('\n--- Available ports:\n') 370 ports = [] 371 for n, (port, desc, hwid) in enumerate(sorted(comports()), 1): 372 sys.stderr.write('--- {:2}: {:20} {!r}\n'.format(n, port, desc)) 373 ports.append(port) 374 while True: 375 port = raw_input('--- Enter port index or full name: ') 376 try: 377 index = int(port) - 1 378 if not 0 <= index < len(ports): 379 sys.stderr.write('--- Invalid index!\n') 380 continue 381 except ValueError: 382 pass 383 else: 384 port = ports[index] 385 return port 386 387 388class Miniterm(object): 389 """\ 390 Terminal application. Copy data from serial port to console and vice versa. 391 Handle special keys from the console to show menu etc. 392 """ 393 394 def __init__(self, serial_instance, echo=False, eol='crlf', filters=()): 395 self.console = Console() 396 self.serial = serial_instance 397 self.echo = echo 398 self.raw = False 399 self.input_encoding = 'UTF-8' 400 self.output_encoding = 'UTF-8' 401 self.eol = eol 402 self.filters = filters 403 self.update_transformations() 404 self.exit_character = unichr(0x1d) # GS/CTRL+] 405 self.menu_character = unichr(0x14) # Menu: CTRL+T 406 self.alive = None 407 self._reader_alive = None 408 self.receiver_thread = None 409 self.rx_decoder = None 410 self.tx_decoder = None 411 412 def _start_reader(self): 413 """Start reader thread""" 414 self._reader_alive = True 415 # start serial->console thread 416 self.receiver_thread = threading.Thread(target=self.reader, name='rx') 417 self.receiver_thread.daemon = True 418 self.receiver_thread.start() 419 420 def _stop_reader(self): 421 """Stop reader thread only, wait for clean exit of thread""" 422 self._reader_alive = False 423 if hasattr(self.serial, 'cancel_read'): 424 self.serial.cancel_read() 425 self.receiver_thread.join() 426 427 def start(self): 428 """start worker threads""" 429 self.alive = True 430 self._start_reader() 431 # enter console->serial loop 432 self.transmitter_thread = threading.Thread(target=self.writer, name='tx') 433 self.transmitter_thread.daemon = True 434 self.transmitter_thread.start() 435 self.console.setup() 436 437 def stop(self): 438 """set flag to stop worker threads""" 439 self.alive = False 440 441 def join(self, transmit_only=False): 442 """wait for worker threads to terminate""" 443 self.transmitter_thread.join() 444 if not transmit_only: 445 if hasattr(self.serial, 'cancel_read'): 446 self.serial.cancel_read() 447 self.receiver_thread.join() 448 449 def close(self): 450 self.serial.close() 451 452 def update_transformations(self): 453 """take list of transformation classes and instantiate them for rx and tx""" 454 transformations = [EOL_TRANSFORMATIONS[self.eol]] + [TRANSFORMATIONS[f] 455 for f in self.filters] 456 self.tx_transformations = [t() for t in transformations] 457 self.rx_transformations = list(reversed(self.tx_transformations)) 458 459 def set_rx_encoding(self, encoding, errors='replace'): 460 """set encoding for received data""" 461 self.input_encoding = encoding 462 self.rx_decoder = codecs.getincrementaldecoder(encoding)(errors) 463 464 def set_tx_encoding(self, encoding, errors='replace'): 465 """set encoding for transmitted data""" 466 self.output_encoding = encoding 467 self.tx_encoder = codecs.getincrementalencoder(encoding)(errors) 468 469 def dump_port_settings(self): 470 """Write current settings to sys.stderr""" 471 sys.stderr.write("\n--- Settings: {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits}\n".format( 472 p=self.serial)) 473 sys.stderr.write('--- RTS: {:8} DTR: {:8} BREAK: {:8}\n'.format( 474 ('active' if self.serial.rts else 'inactive'), 475 ('active' if self.serial.dtr else 'inactive'), 476 ('active' if self.serial.break_condition else 'inactive'))) 477 try: 478 sys.stderr.write('--- CTS: {:8} DSR: {:8} RI: {:8} CD: {:8}\n'.format( 479 ('active' if self.serial.cts else 'inactive'), 480 ('active' if self.serial.dsr else 'inactive'), 481 ('active' if self.serial.ri else 'inactive'), 482 ('active' if self.serial.cd else 'inactive'))) 483 except serial.SerialException: 484 # on RFC 2217 ports, it can happen if no modem state notification was 485 # yet received. ignore this error. 486 pass 487 sys.stderr.write('--- software flow control: {}\n'.format('active' if self.serial.xonxoff else 'inactive')) 488 sys.stderr.write('--- hardware flow control: {}\n'.format('active' if self.serial.rtscts else 'inactive')) 489 sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding)) 490 sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding)) 491 sys.stderr.write('--- EOL: {}\n'.format(self.eol.upper())) 492 sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters))) 493 494 def reader(self): 495 """loop and copy serial->console""" 496 try: 497 while self.alive and self._reader_alive: 498 # read all that is there or wait for one byte 499 data = self.serial.read(self.serial.in_waiting or 1) 500 if data: 501 if self.raw: 502 self.console.write_bytes(data) 503 else: 504 text = self.rx_decoder.decode(data) 505 for transformation in self.rx_transformations: 506 text = transformation.rx(text) 507 self.console.write(text) 508 except serial.SerialException: 509 self.alive = False 510 self.console.cancel() 511 raise # XXX handle instead of re-raise? 512 513 def writer(self): 514 """\ 515 Loop and copy console->serial until self.exit_character character is 516 found. When self.menu_character is found, interpret the next key 517 locally. 518 """ 519 menu_active = False 520 try: 521 while self.alive: 522 try: 523 c = self.console.getkey() 524 except KeyboardInterrupt: 525 c = '\x03' 526 if not self.alive: 527 break 528 if menu_active: 529 self.handle_menu_key(c) 530 menu_active = False 531 elif c == self.menu_character: 532 menu_active = True # next char will be for menu 533 elif c == self.exit_character: 534 self.stop() # exit app 535 break 536 else: 537 #~ if self.raw: 538 text = c 539 for transformation in self.tx_transformations: 540 text = transformation.tx(text) 541 self.serial.write(self.tx_encoder.encode(text)) 542 if self.echo: 543 echo_text = c 544 for transformation in self.tx_transformations: 545 echo_text = transformation.echo(echo_text) 546 self.console.write(echo_text) 547 except: 548 self.alive = False 549 raise 550 551 def handle_menu_key(self, c): 552 """Implement a simple menu / settings""" 553 if c == self.menu_character or c == self.exit_character: 554 # Menu/exit character again -> send itself 555 self.serial.write(self.tx_encoder.encode(c)) 556 if self.echo: 557 self.console.write(c) 558 elif c == '\x15': # CTRL+U -> upload file 559 self.upload_file() 560 elif c in '\x08hH?': # CTRL+H, h, H, ? -> Show help 561 sys.stderr.write(self.get_help_text()) 562 elif c == '\x12': # CTRL+R -> Toggle RTS 563 self.serial.rts = not self.serial.rts 564 sys.stderr.write('--- RTS {} ---\n'.format('active' if self.serial.rts else 'inactive')) 565 elif c == '\x04': # CTRL+D -> Toggle DTR 566 self.serial.dtr = not self.serial.dtr 567 sys.stderr.write('--- DTR {} ---\n'.format('active' if self.serial.dtr else 'inactive')) 568 elif c == '\x02': # CTRL+B -> toggle BREAK condition 569 self.serial.break_condition = not self.serial.break_condition 570 sys.stderr.write('--- BREAK {} ---\n'.format('active' if self.serial.break_condition else 'inactive')) 571 elif c == '\x05': # CTRL+E -> toggle local echo 572 self.echo = not self.echo 573 sys.stderr.write('--- local echo {} ---\n'.format('active' if self.echo else 'inactive')) 574 elif c == '\x06': # CTRL+F -> edit filters 575 self.change_filter() 576 elif c == '\x0c': # CTRL+L -> EOL mode 577 modes = list(EOL_TRANSFORMATIONS) # keys 578 eol = modes.index(self.eol) + 1 579 if eol >= len(modes): 580 eol = 0 581 self.eol = modes[eol] 582 sys.stderr.write('--- EOL: {} ---\n'.format(self.eol.upper())) 583 self.update_transformations() 584 elif c == '\x01': # CTRL+A -> set encoding 585 self.change_encoding() 586 elif c == '\x09': # CTRL+I -> info 587 self.dump_port_settings() 588 #~ elif c == '\x01': # CTRL+A -> cycle escape mode 589 #~ elif c == '\x0c': # CTRL+L -> cycle linefeed mode 590 elif c in 'pP': # P -> change port 591 self.change_port() 592 elif c in 'zZ': # S -> suspend / open port temporarily 593 self.suspend_port() 594 elif c in 'bB': # B -> change baudrate 595 self.change_baudrate() 596 elif c == '8': # 8 -> change to 8 bits 597 self.serial.bytesize = serial.EIGHTBITS 598 self.dump_port_settings() 599 elif c == '7': # 7 -> change to 8 bits 600 self.serial.bytesize = serial.SEVENBITS 601 self.dump_port_settings() 602 elif c in 'eE': # E -> change to even parity 603 self.serial.parity = serial.PARITY_EVEN 604 self.dump_port_settings() 605 elif c in 'oO': # O -> change to odd parity 606 self.serial.parity = serial.PARITY_ODD 607 self.dump_port_settings() 608 elif c in 'mM': # M -> change to mark parity 609 self.serial.parity = serial.PARITY_MARK 610 self.dump_port_settings() 611 elif c in 'sS': # S -> change to space parity 612 self.serial.parity = serial.PARITY_SPACE 613 self.dump_port_settings() 614 elif c in 'nN': # N -> change to no parity 615 self.serial.parity = serial.PARITY_NONE 616 self.dump_port_settings() 617 elif c == '1': # 1 -> change to 1 stop bits 618 self.serial.stopbits = serial.STOPBITS_ONE 619 self.dump_port_settings() 620 elif c == '2': # 2 -> change to 2 stop bits 621 self.serial.stopbits = serial.STOPBITS_TWO 622 self.dump_port_settings() 623 elif c == '3': # 3 -> change to 1.5 stop bits 624 self.serial.stopbits = serial.STOPBITS_ONE_POINT_FIVE 625 self.dump_port_settings() 626 elif c in 'xX': # X -> change software flow control 627 self.serial.xonxoff = (c == 'X') 628 self.dump_port_settings() 629 elif c in 'rR': # R -> change hardware flow control 630 self.serial.rtscts = (c == 'R') 631 self.dump_port_settings() 632 elif c in 'qQ': 633 self.stop() # Q -> exit app 634 else: 635 sys.stderr.write('--- unknown menu character {} --\n'.format(key_description(c))) 636 637 def upload_file(self): 638 """Ask user for filename and send its contents""" 639 sys.stderr.write('\n--- File to upload: ') 640 sys.stderr.flush() 641 with self.console: 642 filename = sys.stdin.readline().rstrip('\r\n') 643 if filename: 644 try: 645 with open(filename, 'rb') as f: 646 sys.stderr.write('--- Sending file {} ---\n'.format(filename)) 647 while True: 648 block = f.read(1024) 649 if not block: 650 break 651 self.serial.write(block) 652 # Wait for output buffer to drain. 653 self.serial.flush() 654 sys.stderr.write('.') # Progress indicator. 655 sys.stderr.write('\n--- File {} sent ---\n'.format(filename)) 656 except IOError as e: 657 sys.stderr.write('--- ERROR opening file {}: {} ---\n'.format(filename, e)) 658 659 def change_filter(self): 660 """change the i/o transformations""" 661 sys.stderr.write('\n--- Available Filters:\n') 662 sys.stderr.write('\n'.join( 663 '--- {:<10} = {.__doc__}'.format(k, v) 664 for k, v in sorted(TRANSFORMATIONS.items()))) 665 sys.stderr.write('\n--- Enter new filter name(s) [{}]: '.format(' '.join(self.filters))) 666 with self.console: 667 new_filters = sys.stdin.readline().lower().split() 668 if new_filters: 669 for f in new_filters: 670 if f not in TRANSFORMATIONS: 671 sys.stderr.write('--- unknown filter: {!r}\n'.format(f)) 672 break 673 else: 674 self.filters = new_filters 675 self.update_transformations() 676 sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters))) 677 678 def change_encoding(self): 679 """change encoding on the serial port""" 680 sys.stderr.write('\n--- Enter new encoding name [{}]: '.format(self.input_encoding)) 681 with self.console: 682 new_encoding = sys.stdin.readline().strip() 683 if new_encoding: 684 try: 685 codecs.lookup(new_encoding) 686 except LookupError: 687 sys.stderr.write('--- invalid encoding name: {}\n'.format(new_encoding)) 688 else: 689 self.set_rx_encoding(new_encoding) 690 self.set_tx_encoding(new_encoding) 691 sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding)) 692 sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding)) 693 694 def change_baudrate(self): 695 """change the baudrate""" 696 sys.stderr.write('\n--- Baudrate: ') 697 sys.stderr.flush() 698 with self.console: 699 backup = self.serial.baudrate 700 try: 701 self.serial.baudrate = int(sys.stdin.readline().strip()) 702 except ValueError as e: 703 sys.stderr.write('--- ERROR setting baudrate: {} ---\n'.format(e)) 704 self.serial.baudrate = backup 705 else: 706 self.dump_port_settings() 707 708 def change_port(self): 709 """Have a conversation with the user to change the serial port""" 710 with self.console: 711 try: 712 port = ask_for_port() 713 except KeyboardInterrupt: 714 port = None 715 if port and port != self.serial.port: 716 # reader thread needs to be shut down 717 self._stop_reader() 718 # save settings 719 settings = self.serial.getSettingsDict() 720 try: 721 new_serial = serial.serial_for_url(port, do_not_open=True) 722 # restore settings and open 723 new_serial.applySettingsDict(settings) 724 new_serial.rts = self.serial.rts 725 new_serial.dtr = self.serial.dtr 726 new_serial.open() 727 new_serial.break_condition = self.serial.break_condition 728 except Exception as e: 729 sys.stderr.write('--- ERROR opening new port: {} ---\n'.format(e)) 730 new_serial.close() 731 else: 732 self.serial.close() 733 self.serial = new_serial 734 sys.stderr.write('--- Port changed to: {} ---\n'.format(self.serial.port)) 735 # and restart the reader thread 736 self._start_reader() 737 738 def suspend_port(self): 739 """\ 740 open port temporarily, allow reconnect, exit and port change to get 741 out of the loop 742 """ 743 # reader thread needs to be shut down 744 self._stop_reader() 745 self.serial.close() 746 sys.stderr.write('\n--- Port closed: {} ---\n'.format(self.serial.port)) 747 do_change_port = False 748 while not self.serial.is_open: 749 sys.stderr.write('--- Quit: {exit} | p: port change | any other key to reconnect ---\n'.format( 750 exit=key_description(self.exit_character))) 751 k = self.console.getkey() 752 if k == self.exit_character: 753 self.stop() # exit app 754 break 755 elif k in 'pP': 756 do_change_port = True 757 break 758 try: 759 self.serial.open() 760 except Exception as e: 761 sys.stderr.write('--- ERROR opening port: {} ---\n'.format(e)) 762 if do_change_port: 763 self.change_port() 764 else: 765 # and restart the reader thread 766 self._start_reader() 767 sys.stderr.write('--- Port opened: {} ---\n'.format(self.serial.port)) 768 769 def get_help_text(self): 770 """return the help text""" 771 # help text, starts with blank line! 772 return """ 773--- pySerial ({version}) - miniterm - help 774--- 775--- {exit:8} Exit program (alias {menu} Q) 776--- {menu:8} Menu escape key, followed by: 777--- Menu keys: 778--- {menu:7} Send the menu character itself to remote 779--- {exit:7} Send the exit character itself to remote 780--- {info:7} Show info 781--- {upload:7} Upload file (prompt will be shown) 782--- {repr:7} encoding 783--- {filter:7} edit filters 784--- Toggles: 785--- {rts:7} RTS {dtr:7} DTR {brk:7} BREAK 786--- {echo:7} echo {eol:7} EOL 787--- 788--- Port settings ({menu} followed by the following): 789--- p change port 790--- 7 8 set data bits 791--- N E O S M change parity (None, Even, Odd, Space, Mark) 792--- 1 2 3 set stop bits (1, 2, 1.5) 793--- b change baud rate 794--- x X disable/enable software flow control 795--- r R disable/enable hardware flow control 796""".format(version=getattr(serial, 'VERSION', 'unknown version'), 797 exit=key_description(self.exit_character), 798 menu=key_description(self.menu_character), 799 rts=key_description('\x12'), 800 dtr=key_description('\x04'), 801 brk=key_description('\x02'), 802 echo=key_description('\x05'), 803 info=key_description('\x09'), 804 upload=key_description('\x15'), 805 repr=key_description('\x01'), 806 filter=key_description('\x06'), 807 eol=key_description('\x0c')) 808 809 810# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 811# default args can be used to override when calling main() from an other script 812# e.g to create a miniterm-my-device.py 813def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr=None, serial_instance=None): 814 """Command line tool, entry point""" 815 816 import argparse 817 818 parser = argparse.ArgumentParser( 819 description='Miniterm - A simple terminal program for the serial port.') 820 821 parser.add_argument( 822 'port', 823 nargs='?', 824 help='serial port name ("-" to show port list)', 825 default=default_port) 826 827 parser.add_argument( 828 'baudrate', 829 nargs='?', 830 type=int, 831 help='set baud rate, default: %(default)s', 832 default=default_baudrate) 833 834 group = parser.add_argument_group('port settings') 835 836 group.add_argument( 837 '--parity', 838 choices=['N', 'E', 'O', 'S', 'M'], 839 type=lambda c: c.upper(), 840 help='set parity, one of {N E O S M}, default: N', 841 default='N') 842 843 group.add_argument( 844 '--rtscts', 845 action='store_true', 846 help='enable RTS/CTS flow control (default off)', 847 default=False) 848 849 group.add_argument( 850 '--xonxoff', 851 action='store_true', 852 help='enable software flow control (default off)', 853 default=False) 854 855 group.add_argument( 856 '--rts', 857 type=int, 858 help='set initial RTS line state (possible values: 0, 1)', 859 default=default_rts) 860 861 group.add_argument( 862 '--dtr', 863 type=int, 864 help='set initial DTR line state (possible values: 0, 1)', 865 default=default_dtr) 866 867 group.add_argument( 868 '--non-exclusive', 869 dest='exclusive', 870 action='store_false', 871 help='disable locking for native ports', 872 default=True) 873 874 group.add_argument( 875 '--ask', 876 action='store_true', 877 help='ask again for port when open fails', 878 default=False) 879 880 group = parser.add_argument_group('data handling') 881 882 group.add_argument( 883 '-e', '--echo', 884 action='store_true', 885 help='enable local echo (default off)', 886 default=False) 887 888 group.add_argument( 889 '--encoding', 890 dest='serial_port_encoding', 891 metavar='CODEC', 892 help='set the encoding for the serial port (e.g. hexlify, Latin1, UTF-8), default: %(default)s', 893 default='UTF-8') 894 895 group.add_argument( 896 '-f', '--filter', 897 action='append', 898 metavar='NAME', 899 help='add text transformation', 900 default=[]) 901 902 group.add_argument( 903 '--eol', 904 choices=['CR', 'LF', 'CRLF'], 905 type=lambda c: c.upper(), 906 help='end of line mode', 907 default='CRLF') 908 909 group.add_argument( 910 '--raw', 911 action='store_true', 912 help='Do no apply any encodings/transformations', 913 default=False) 914 915 group = parser.add_argument_group('hotkeys') 916 917 group.add_argument( 918 '--exit-char', 919 type=int, 920 metavar='NUM', 921 help='Unicode of special character that is used to exit the application, default: %(default)s', 922 default=0x1d) # GS/CTRL+] 923 924 group.add_argument( 925 '--menu-char', 926 type=int, 927 metavar='NUM', 928 help='Unicode code of special character that is used to control miniterm (menu), default: %(default)s', 929 default=0x14) # Menu: CTRL+T 930 931 group = parser.add_argument_group('diagnostics') 932 933 group.add_argument( 934 '-q', '--quiet', 935 action='store_true', 936 help='suppress non-error messages', 937 default=False) 938 939 group.add_argument( 940 '--develop', 941 action='store_true', 942 help='show Python traceback on error', 943 default=False) 944 945 args = parser.parse_args() 946 947 if args.menu_char == args.exit_char: 948 parser.error('--exit-char can not be the same as --menu-char') 949 950 if args.filter: 951 if 'help' in args.filter: 952 sys.stderr.write('Available filters:\n') 953 sys.stderr.write('\n'.join( 954 '{:<10} = {.__doc__}'.format(k, v) 955 for k, v in sorted(TRANSFORMATIONS.items()))) 956 sys.stderr.write('\n') 957 sys.exit(1) 958 filters = args.filter 959 else: 960 filters = ['default'] 961 962 while serial_instance is None: 963 # no port given on command line -> ask user now 964 if args.port is None or args.port == '-': 965 try: 966 args.port = ask_for_port() 967 except KeyboardInterrupt: 968 sys.stderr.write('\n') 969 parser.error('user aborted and port is not given') 970 else: 971 if not args.port: 972 parser.error('port is not given') 973 try: 974 serial_instance = serial.serial_for_url( 975 args.port, 976 args.baudrate, 977 parity=args.parity, 978 rtscts=args.rtscts, 979 xonxoff=args.xonxoff, 980 do_not_open=True) 981 982 if not hasattr(serial_instance, 'cancel_read'): 983 # enable timeout for alive flag polling if cancel_read is not available 984 serial_instance.timeout = 1 985 986 if args.dtr is not None: 987 if not args.quiet: 988 sys.stderr.write('--- forcing DTR {}\n'.format('active' if args.dtr else 'inactive')) 989 serial_instance.dtr = args.dtr 990 if args.rts is not None: 991 if not args.quiet: 992 sys.stderr.write('--- forcing RTS {}\n'.format('active' if args.rts else 'inactive')) 993 serial_instance.rts = args.rts 994 995 if isinstance(serial_instance, serial.Serial): 996 serial_instance.exclusive = args.exclusive 997 998 serial_instance.open() 999 except serial.SerialException as e: 1000 sys.stderr.write('could not open port {!r}: {}\n'.format(args.port, e)) 1001 if args.develop: 1002 raise 1003 if not args.ask: 1004 sys.exit(1) 1005 else: 1006 args.port = '-' 1007 else: 1008 break 1009 1010 miniterm = Miniterm( 1011 serial_instance, 1012 echo=args.echo, 1013 eol=args.eol.lower(), 1014 filters=filters) 1015 miniterm.exit_character = unichr(args.exit_char) 1016 miniterm.menu_character = unichr(args.menu_char) 1017 miniterm.raw = args.raw 1018 miniterm.set_rx_encoding(args.serial_port_encoding) 1019 miniterm.set_tx_encoding(args.serial_port_encoding) 1020 1021 if not args.quiet: 1022 sys.stderr.write('--- Miniterm on {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits} ---\n'.format( 1023 p=miniterm.serial)) 1024 sys.stderr.write('--- Quit: {} | Menu: {} | Help: {} followed by {} ---\n'.format( 1025 key_description(miniterm.exit_character), 1026 key_description(miniterm.menu_character), 1027 key_description(miniterm.menu_character), 1028 key_description('\x08'))) 1029 1030 miniterm.start() 1031 try: 1032 miniterm.join(True) 1033 except KeyboardInterrupt: 1034 pass 1035 if not args.quiet: 1036 sys.stderr.write('\n--- exit ---\n') 1037 miniterm.join() 1038 miniterm.close() 1039 1040# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 1041if __name__ == '__main__': 1042 main() 1043