xref: /aosp_15_r20/external/autotest/server/cros/cellular/abstract_inst.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Copyright 2021 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""Python module for Abstract Instrument Library."""
5
6import logging
7import requests
8import socket
9
10
11class SocketInstrumentError(Exception):
12    """Abstract Instrument Error Class, via Socket and SCPI."""
13
14    def __init__(self, error, command=None):
15        """Init method for Socket Instrument Error.
16
17        Args:
18            error: Exception error.
19            command: Additional information on command,
20                Type, Str.
21        """
22        super(SocketInstrumentError, self).__init__(error)
23        self._error_code = error
24        self._error_message = self._error_code
25        if command is not None:
26            self._error_message = 'Command {} returned the error: {}.'.format(
27                    repr(command), repr(self._error_message))
28
29    def __str__(self):
30        return self._error_message
31
32
33class SocketInstrument(object):
34    """Abstract Instrument Class, via Socket and SCPI."""
35
36    def __init__(self, ip_addr, ip_port):
37        """Init method for Socket Instrument.
38
39        Args:
40            ip_addr: IP Address.
41                Type, str.
42            ip_port: TCPIP Port.
43                Type, str.
44        """
45        self._logger = logging.getLogger(__name__)
46        self._socket_timeout = 120
47        self._socket_buffer_size = 1024
48
49        self._ip_addr = ip_addr
50        self._ip_port = ip_port
51
52        self._escseq = '\n'
53        self._codefmt = 'utf-8'
54
55        self._socket = None
56
57    def _connect_socket(self):
58        """Init and Connect to socket."""
59        try:
60            self._socket = socket.create_connection(
61                    (self._ip_addr, self._ip_port),
62                    timeout=self._socket_timeout)
63
64            infmsg = 'Opened Socket connection to {}:{} with handle {}.'.format(
65                    repr(self._ip_addr), repr(self._ip_port),
66                    repr(self._socket))
67
68        except socket.timeout:
69            errmsg = 'Socket timeout while connecting to instrument.'
70            raise SocketInstrumentError(errmsg)
71
72        except socket.error:
73            errmsg = 'Socket error while connecting to instrument.'
74            raise SocketInstrumentError(errmsg)
75
76    def _send(self, cmd):
77        """Send command via Socket.
78
79        Args:
80            cmd: Command to send,
81                Type, Str.
82        """
83        if not self._socket:
84            self._connect_socket()
85
86        cmd_es = cmd + self._escseq
87
88        try:
89            self._socket.sendall(cmd_es.encode(self._codefmt))
90
91        except socket.timeout:
92            errmsg = ('Socket timeout while sending command {} '
93                      'to instrument.').format(repr(cmd))
94            raise SocketInstrumentError(errmsg)
95
96        except socket.error:
97            errmsg = ('Socket error while sending command {} '
98                      'to instrument.').format(repr(cmd))
99            raise SocketInstrumentError(errmsg)
100
101        except Exception as err:
102            errmsg = ('Error {} while sending command {} '
103                      'to instrument.').format(repr(cmd), repr(err))
104            raise SocketInstrumentError(errmsg)
105
106    def _recv(self):
107        """Receive response via Socket.
108
109        Returns:
110            resp: Response from Instrument via Socket,
111                Type, Str.
112        """
113        if not self._socket:
114            self._connect_socket()
115
116        resp = ''
117
118        try:
119            while True:
120                resp_tmp = self._socket.recv(self._socket_buffer_size)
121                resp_tmp = resp_tmp.decode(self._codefmt)
122                resp += resp_tmp
123                if len(resp_tmp) < self._socket_buffer_size:
124                    break
125
126        except socket.timeout:
127            errmsg = 'Socket timeout while receiving response from instrument.'
128            raise SocketInstrumentError(errmsg)
129
130        except socket.error:
131            errmsg = 'Socket error while receiving response from instrument.'
132            raise SocketInstrumentError(errmsg)
133
134        except Exception as err:
135            errmsg = ('Error {} while receiving response '
136                      'from instrument').format(repr(err))
137            raise SocketInstrumentError(errmsg)
138
139        resp = resp.rstrip(self._escseq)
140
141        return resp
142
143    def _close_socket(self):
144        """Close Socket Instrument."""
145        if not self._socket:
146            return
147
148        try:
149            self._socket.shutdown(socket.SHUT_RDWR)
150            self._socket.close()
151            self._socket = None
152
153        except Exception as err:
154            errmsg = 'Error {} while closing instrument.'.format(repr(err))
155            raise SocketInstrumentError(errmsg)
156
157    def _query(self, cmd):
158        """query instrument via Socket.
159
160        Args:
161            cmd: Command to send,
162                Type, Str.
163
164        Returns:
165            resp: Response from Instrument via Socket,
166                Type, Str.
167        """
168        self._send(cmd + ';*OPC?')
169        resp = self._recv()
170        return resp
171
172
173class RequestInstrument(object):
174    """Abstract Instrument Class, via Request."""
175
176    def __init__(self, ip_addr):
177        """Init method for request instrument.
178
179        Args:
180            ip_addr: IP Address.
181                Type, Str.
182        """
183        self._request_timeout = 120
184        self._request_protocol = 'http'
185        self._ip_addr = ip_addr
186        self._escseq = '\r\n'
187
188    def _query(self, cmd):
189        """query instrument via request.
190
191        Args:
192            cmd: Command to send,
193                Type, Str.
194
195        Returns:
196            resp: Response from Instrument via request,
197                Type, Str.
198        """
199        request_cmd = '{}://{}/{}'.format(self._request_protocol,
200                                          self._ip_addr, cmd)
201        resp_raw = requests.get(request_cmd, timeout=self._request_timeout)
202
203        resp = resp_raw.text
204        for char_del in self._escseq:
205            resp = resp.replace(char_del, '')
206
207        return resp
208