1# Copyright 2016 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""
15Helper module for common telnet capability to communicate with
16AttenuatorDevice(s).
17
18User code shouldn't need to directly access this class.
19"""
20
21import telnetlib
22from mobly.controllers import attenuator
23
24
25def _ascii_string(uc_string):
26  return str(uc_string).encode("ASCII")
27
28
29class TelnetScpiClient:
30  """This is an internal helper class for Telnet+SCPI command-based
31  instruments. It should only be used by those implemention control libraries
32  and not by any user code directly.
33  """
34
35  def __init__(self, tx_cmd_separator="\n", rx_cmd_separator="\n", prompt=""):
36    self._tn = None
37    self.tx_cmd_separator = tx_cmd_separator
38    self.rx_cmd_separator = rx_cmd_separator
39    self.prompt = prompt
40    self.host = None
41    self.port = None
42
43  def open(self, host, port=23):
44    if self._tn:
45      self._tn.close()
46    self.host = host
47    self.port = port
48    self._tn = telnetlib.Telnet()
49    self._tn.open(host, port, 10)
50
51  @property
52  def is_open(self):
53    return bool(self._tn)
54
55  def close(self):
56    if self._tn:
57      self._tn.close()
58      self._tn = None
59
60  def cmd(self, cmd_str, wait_ret=True):
61    if not isinstance(cmd_str, str):
62      raise TypeError("Invalid command string", cmd_str)
63    if not self.is_open:
64      raise attenuator.Error("Telnet connection not open for commands")
65
66    cmd_str.strip(self.tx_cmd_separator)
67    self._tn.read_until(_ascii_string(self.prompt), 2)
68    self._tn.write(_ascii_string(cmd_str + self.tx_cmd_separator))
69    if wait_ret is False:
70      return None
71
72    match_idx, match_val, ret_text = self._tn.expect(
73        [_ascii_string("\S+" + self.rx_cmd_separator)], 1
74    )
75
76    if match_idx == -1:
77      raise attenuator.Error("Telnet command failed to return valid data")
78
79    ret_text = ret_text.decode()
80    ret_text = ret_text.strip(
81        self.tx_cmd_separator + self.rx_cmd_separator + self.prompt
82    )
83
84    return ret_text
85