1# Copyright 2016 Google Inc. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14""" 15Helper module for common telnet capability to communicate with 16AttenuatorDevice(s). 17 18User code shouldn't need to directly access this class. 19""" 20 21import telnetlib 22from mobly.controllers import attenuator 23 24 25def _ascii_string(uc_string): 26 return str(uc_string).encode("ASCII") 27 28 29class TelnetScpiClient: 30 """This is an internal helper class for Telnet+SCPI command-based 31 instruments. It should only be used by those implemention control libraries 32 and not by any user code directly. 33 """ 34 35 def __init__(self, tx_cmd_separator="\n", rx_cmd_separator="\n", prompt=""): 36 self._tn = None 37 self.tx_cmd_separator = tx_cmd_separator 38 self.rx_cmd_separator = rx_cmd_separator 39 self.prompt = prompt 40 self.host = None 41 self.port = None 42 43 def open(self, host, port=23): 44 if self._tn: 45 self._tn.close() 46 self.host = host 47 self.port = port 48 self._tn = telnetlib.Telnet() 49 self._tn.open(host, port, 10) 50 51 @property 52 def is_open(self): 53 return bool(self._tn) 54 55 def close(self): 56 if self._tn: 57 self._tn.close() 58 self._tn = None 59 60 def cmd(self, cmd_str, wait_ret=True): 61 if not isinstance(cmd_str, str): 62 raise TypeError("Invalid command string", cmd_str) 63 if not self.is_open: 64 raise attenuator.Error("Telnet connection not open for commands") 65 66 cmd_str.strip(self.tx_cmd_separator) 67 self._tn.read_until(_ascii_string(self.prompt), 2) 68 self._tn.write(_ascii_string(cmd_str + self.tx_cmd_separator)) 69 if wait_ret is False: 70 return None 71 72 match_idx, match_val, ret_text = self._tn.expect( 73 [_ascii_string("\S+" + self.rx_cmd_separator)], 1 74 ) 75 76 if match_idx == -1: 77 raise attenuator.Error("Telnet command failed to return valid data") 78 79 ret_text = ret_text.decode() 80 ret_text = ret_text.strip( 81 self.tx_cmd_separator + self.rx_cmd_separator + self.prompt 82 ) 83 84 return ret_text 85