xref: /aosp_15_r20/external/autotest/client/common_lib/pxssh.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Lint as: python2, python3
2"""This class extends pexpect.spawn to specialize setting up SSH connections.
3This adds methods for login, logout, and expecting the shell prompt.
4
5$Id: pxssh.py 487 2007-08-29 22:33:29Z noah $
6"""
7
8from __future__ import absolute_import
9from __future__ import division
10from __future__ import print_function
11
12from six.moves import range
13import time
14
15from autotest_lib.client.common_lib.pexpect import *
16from autotest_lib.client.common_lib import pexpect
17
18
19__all__ = ['ExceptionPxssh', 'pxssh']
20
21# Exception classes used by this module.
22class ExceptionPxssh(ExceptionPexpect):
23    """Raised for pxssh exceptions.
24    """
25
26class pxssh (spawn):
27
28    """This class extends pexpect.spawn to specialize setting up SSH
29    connections. This adds methods for login, logout, and expecting the shell
30    prompt. It does various tricky things to handle many situations in the SSH
31    login process. For example, if the session is your first login, then pxssh
32    automatically accepts the remote certificate; or if you have public key
33    authentication setup then pxssh won't wait for the password prompt.
34
35    pxssh uses the shell prompt to synchronize output from the remote host. In
36    order to make this more robust it sets the shell prompt to something more
37    unique than just $ or #. This should work on most Borne/Bash or Csh style
38    shells.
39
40    Example that runs a few commands on a remote server and prints the result::
41
42        import pxssh
43        import getpass
44        try:
45            s = pxssh.pxssh()
46            hostname = raw_input('hostname: ')
47            username = raw_input('username: ')
48            password = getpass.getpass('password: ')
49            s.login (hostname, username, password)
50            s.sendline ('uptime')  # run a command
51            s.prompt()             # match the prompt
52            print s.before         # print everything before the prompt.
53            s.sendline ('ls -l')
54            s.prompt()
55            print s.before
56            s.sendline ('df')
57            s.prompt()
58            print s.before
59            s.logout()
60        except pxssh.ExceptionPxssh, e:
61            print "pxssh failed on login."
62            print str(e)
63
64    Note that if you have ssh-agent running while doing development with pxssh
65    then this can lead to a lot of confusion. Many X display managers (xdm,
66    gdm, kdm, etc.) will automatically start a GUI agent. You may see a GUI
67    dialog box popup asking for a password during development. You should turn
68    off any key agents during testing. The 'force_password' attribute will turn
69    off public key authentication. This will only work if the remote SSH server
70    is configured to allow password logins. Example of using 'force_password'
71    attribute::
72
73            s = pxssh.pxssh()
74            s.force_password = True
75            hostname = raw_input('hostname: ')
76            username = raw_input('username: ')
77            password = getpass.getpass('password: ')
78            s.login (hostname, username, password)
79    """
80
81    def __init__ (self, timeout=30, maxread=2000, searchwindowsize=None, logfile=None, cwd=None, env=None):
82        spawn.__init__(self, None, timeout=timeout, maxread=maxread, searchwindowsize=searchwindowsize, logfile=logfile, cwd=cwd, env=env)
83
84        self.name = '<pxssh>'
85
86        #SUBTLE HACK ALERT! Note that the command to set the prompt uses a
87        #slightly different string than the regular expression to match it. This
88        #is because when you set the prompt the command will echo back, but we
89        #don't want to match the echoed command. So if we make the set command
90        #slightly different than the regex we eliminate the problem. To make the
91        #set command different we add a backslash in front of $. The $ doesn't
92        #need to be escaped, but it doesn't hurt and serves to make the set
93        #prompt command different than the regex.
94
95        # used to match the command-line prompt
96        self.UNIQUE_PROMPT = "\[PEXPECT\][\$\#] "
97        self.PROMPT = self.UNIQUE_PROMPT
98
99        # used to set shell command-line prompt to UNIQUE_PROMPT.
100        self.PROMPT_SET_SH = "PS1='[PEXPECT]\$ '"
101        self.PROMPT_SET_CSH = "set prompt='[PEXPECT]\$ '"
102        self.SSH_OPTS = "-o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'"
103        # Disabling X11 forwarding gets rid of the annoying SSH_ASKPASS from
104        # displaying a GUI password dialog. I have not figured out how to
105        # disable only SSH_ASKPASS without also disabling X11 forwarding.
106        # Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying!
107        #self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'"
108        self.force_password = False
109        self.auto_prompt_reset = True
110
111    def levenshtein_distance(self, a,b):
112
113        """This calculates the Levenshtein distance between a and b.
114        """
115
116        n, m = len(a), len(b)
117        if n > m:
118            a,b = b,a
119            n,m = m,n
120        current = list(range(n+1))
121        for i in range(1,m+1):
122            previous, current = current, [i]+[0]*n
123            for j in range(1,n+1):
124                add, delete = previous[j]+1, current[j-1]+1
125                change = previous[j-1]
126                if a[j-1] != b[i-1]:
127                    change = change + 1
128                current[j] = min(add, delete, change)
129        return current[n]
130
131    def synch_original_prompt (self):
132
133        """This attempts to find the prompt. Basically, press enter and record
134        the response; press enter again and record the response; if the two
135        responses are similar then assume we are at the original prompt. """
136
137        # All of these timing pace values are magic.
138        # I came up with these based on what seemed reliable for
139        # connecting to a heavily loaded machine I have.
140        # If latency is worse than these values then this will fail.
141
142        self.sendline()
143        time.sleep(0.5)
144        self.read_nonblocking(size=10000,timeout=1) # GAS: Clear out the cache before getting the prompt
145        time.sleep(0.1)
146        self.sendline()
147        time.sleep(0.5)
148        x = self.read_nonblocking(size=1000,timeout=1)
149        time.sleep(0.1)
150        self.sendline()
151        time.sleep(0.5)
152        a = self.read_nonblocking(size=1000,timeout=1)
153        time.sleep(0.1)
154        self.sendline()
155        time.sleep(0.5)
156        b = self.read_nonblocking(size=1000,timeout=1)
157        ld = self.levenshtein_distance(a,b)
158        len_a = len(a)
159        if len_a == 0:
160            return False
161        if float(ld)/len_a < 0.4:
162            return True
163        return False
164
165    ### TODO: This is getting messy and I'm pretty sure this isn't perfect.
166    ### TODO: I need to draw a flow chart for this.
167    def login (self,server,username,password='',terminal_type='ansi',original_prompt=r"[#$]",login_timeout=10,port=None,auto_prompt_reset=True):
168
169        """This logs the user into the given server. It uses the
170        'original_prompt' to try to find the prompt right after login. When it
171        finds the prompt it immediately tries to reset the prompt to something
172        more easily matched. The default 'original_prompt' is very optimistic
173        and is easily fooled. It's more reliable to try to match the original
174        prompt as exactly as possible to prevent false matches by server
175        strings such as the "Message Of The Day". On many systems you can
176        disable the MOTD on the remote server by creating a zero-length file
177        called "~/.hushlogin" on the remote server. If a prompt cannot be found
178        then this will not necessarily cause the login to fail. In the case of
179        a timeout when looking for the prompt we assume that the original
180        prompt was so weird that we could not match it, so we use a few tricks
181        to guess when we have reached the prompt. Then we hope for the best and
182        blindly try to reset the prompt to something more unique. If that fails
183        then login() raises an ExceptionPxssh exception.
184
185        In some situations it is not possible or desirable to reset the
186        original prompt. In this case, set 'auto_prompt_reset' to False to
187        inhibit setting the prompt to the UNIQUE_PROMPT. Remember that pxssh
188        uses a unique prompt in the prompt() method. If the original prompt is
189        not reset then this will disable the prompt() method unless you
190        manually set the PROMPT attribute. """
191
192        ssh_options = '-q'
193        if self.force_password:
194            ssh_options = ssh_options + ' ' + self.SSH_OPTS
195        if port is not None:
196            ssh_options = ssh_options + ' -p %s'%(str(port))
197        cmd = "ssh %s -l %s %s" % (ssh_options, username, server)
198
199        # This does not distinguish between a remote server 'password' prompt
200        # and a local ssh 'passphrase' prompt (for unlocking a private key).
201        spawn._spawn(self, cmd)
202        i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT, "(?i)connection closed by remote host"], timeout=login_timeout)
203
204        # First phase
205        if i==0:
206            # New certificate -- always accept it.
207            # This is what you get if SSH does not have the remote host's
208            # public key stored in the 'known_hosts' cache.
209            self.sendline("yes")
210            i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
211        if i==2: # password or passphrase
212            self.sendline(password)
213            i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
214        if i==4:
215            self.sendline(terminal_type)
216            i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
217
218        # Second phase
219        if i==0:
220            # This is weird. This should not happen twice in a row.
221            self.close()
222            raise ExceptionPxssh ('Weird error. Got "are you sure" prompt twice.')
223        elif i==1: # can occur if you have a public key pair set to authenticate.
224            ### TODO: May NOT be OK if expect() got tricked and matched a false prompt.
225            pass
226        elif i==2: # password prompt again
227            # For incorrect passwords, some ssh servers will
228            # ask for the password again, others return 'denied' right away.
229            # If we get the password prompt again then this means
230            # we didn't get the password right the first time.
231            self.close()
232            raise ExceptionPxssh ('password refused')
233        elif i==3: # permission denied -- password was bad.
234            self.close()
235            raise ExceptionPxssh ('permission denied')
236        elif i == 4:  # terminal type again?
237            self.close()
238            raise ExceptionPxssh ('Weird error. Got "terminal type" prompt twice.')
239        elif i==5: # Timeout
240            #This is tricky... I presume that we are at the command-line prompt.
241            #It may be that the shell prompt was so weird that we couldn't match
242            #it. Or it may be that we couldn't log in for some other reason. I
243            #can't be sure, but it's safe to guess that we did login because if
244            #I presume wrong and we are not logged in then this should be caught
245            #later when I try to set the shell prompt.
246            pass
247        elif i==6: # Connection closed by remote host
248            self.close()
249            raise ExceptionPxssh ('connection closed')
250        else: # Unexpected
251            self.close()
252            raise ExceptionPxssh ('unexpected login response')
253        if not self.synch_original_prompt():
254            self.close()
255            raise ExceptionPxssh ('could not synchronize with original prompt')
256        # We appear to be in.
257        # set shell prompt to something unique.
258        if auto_prompt_reset:
259            if not self.set_unique_prompt():
260                self.close()
261                raise ExceptionPxssh ('could not set shell prompt\n'+self.before)
262        return True
263
264    def logout (self):
265
266        """This sends exit to the remote shell. If there are stopped jobs then
267        this automatically sends exit twice. """
268
269        self.sendline("exit")
270        index = self.expect([EOF, "(?i)there are stopped jobs"])
271        if index==1:
272            self.sendline("exit")
273            self.expect(EOF)
274        self.close()
275
276    def prompt (self, timeout=20):
277
278        """This matches the shell prompt. This is little more than a short-cut
279        to the expect() method. This returns True if the shell prompt was
280        matched. This returns False if there was a timeout. Note that if you
281        called login() with auto_prompt_reset set to False then you should have
282        manually set the PROMPT attribute to a regex pattern for matching the
283        prompt. """
284
285        i = self.expect([self.PROMPT, TIMEOUT], timeout=timeout)
286        if i==1:
287            return False
288        return True
289
290    def set_unique_prompt (self):
291
292        """This sets the remote prompt to something more unique than # or $.
293        This makes it easier for the prompt() method to match the shell prompt
294        unambiguously. This method is called automatically by the login()
295        method, but you may want to call it manually if you somehow reset the
296        shell prompt. For example, if you 'su' to a different user then you
297        will need to manually reset the prompt. This sends shell commands to
298        the remote host to set the prompt, so this assumes the remote host is
299        ready to receive commands.
300
301        Alternatively, you may use your own prompt pattern. Just set the PROMPT
302        attribute to a regular expression that matches it. In this case you
303        should call login() with auto_prompt_reset=False; then set the PROMPT
304        attribute. After that the prompt() method will try to match your prompt
305        pattern."""
306
307        self.sendline ("unset PROMPT_COMMAND")
308        self.sendline (self.PROMPT_SET_SH) # sh-style
309        i = self.expect ([TIMEOUT, self.PROMPT], timeout=10)
310        if i == 0: # csh-style
311            self.sendline (self.PROMPT_SET_CSH)
312            i = self.expect ([TIMEOUT, self.PROMPT], timeout=10)
313            if i == 0:
314                return False
315        return True
316
317# vi:ts=4:sw=4:expandtab:ft=python:
318