1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#      http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import io
23import os
24import posixpath
25import random
26import re
27import shlex
28import shutil
29import signal
30import socket
31import string
32import subprocess
33import sys
34import tempfile
35import threading
36import time
37import unittest
38
39import adb_host_pb2 as adb_host_proto
40
41from datetime import datetime
42
43import adb
44
45def requires_non_root(func):
46    def wrapper(self, *args):
47        was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
48        if was_root:
49            self.device.unroot()
50            self.device.wait()
51
52        try:
53            func(self, *args)
54        finally:
55            if was_root:
56                self.device.root()
57                self.device.wait()
58
59    return wrapper
60
61
62class DeviceTest(unittest.TestCase):
63    def setUp(self) -> None:
64        self.device = adb.get_device()
65
66
67class AbbTest(DeviceTest):
68    def test_smoke(self):
69        abb = subprocess.run(['adb', 'abb'], capture_output=True)
70        cmd = subprocess.run(['adb', 'shell', 'cmd'], capture_output=True)
71
72        # abb squashes all failures to 1.
73        self.assertEqual(abb.returncode == 0, cmd.returncode == 0)
74        self.assertEqual(abb.stdout, cmd.stdout)
75        self.assertEqual(abb.stderr, cmd.stderr)
76
77class ForwardReverseTest(DeviceTest):
78    def _test_no_rebind(self, description, direction_list, direction,
79                       direction_no_rebind, direction_remove_all):
80        msg = direction_list()
81        self.assertEqual('', msg.strip(),
82                         description + ' list must be empty to run this test.')
83
84        # Use --no-rebind with no existing binding
85        direction_no_rebind('tcp:5566', 'tcp:6655')
86        msg = direction_list()
87        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
88
89        # Use --no-rebind with existing binding
90        with self.assertRaises(subprocess.CalledProcessError):
91            direction_no_rebind('tcp:5566', 'tcp:6677')
92        msg = direction_list()
93        self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
94        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
95
96        # Use the absence of --no-rebind with existing binding
97        direction('tcp:5566', 'tcp:6677')
98        msg = direction_list()
99        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
100        self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
101
102        direction_remove_all()
103        msg = direction_list()
104        self.assertEqual('', msg.strip())
105
106    def test_forward_no_rebind(self):
107        self._test_no_rebind('forward', self.device.forward_list,
108                            self.device.forward, self.device.forward_no_rebind,
109                            self.device.forward_remove_all)
110
111    def test_reverse_no_rebind(self):
112        self._test_no_rebind('reverse', self.device.reverse_list,
113                            self.device.reverse, self.device.reverse_no_rebind,
114                            self.device.reverse_remove_all)
115
116    def test_forward(self):
117        msg = self.device.forward_list()
118        self.assertEqual('', msg.strip(),
119                         'Forwarding list must be empty to run this test.')
120        self.device.forward('tcp:5566', 'tcp:6655')
121        msg = self.device.forward_list()
122        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
123        self.device.forward('tcp:7788', 'tcp:8877')
124        msg = self.device.forward_list()
125        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
126        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
127        self.device.forward_remove('tcp:5566')
128        msg = self.device.forward_list()
129        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
130        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
131        self.device.forward_remove_all()
132        msg = self.device.forward_list()
133        self.assertEqual('', msg.strip())
134
135    def test_forward_old_protocol(self):
136        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
137
138        msg = self.device.forward_list()
139        self.assertEqual('', msg.strip(),
140                         'Forwarding list must be empty to run this test.')
141
142        with socket.create_connection(("localhost", 5037)) as s:
143            service = b"host-serial:%s:forward:tcp:5566;tcp:6655" % serialno
144            cmd = b"%04x%s" % (len(service), service)
145            s.sendall(cmd)
146
147        msg = self.device.forward_list()
148        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
149
150        self.device.forward_remove_all()
151        msg = self.device.forward_list()
152        self.assertEqual('', msg.strip())
153
154    def test_forward_tcp_port_0(self):
155        self.assertEqual('', self.device.forward_list().strip(),
156                         'Forwarding list must be empty to run this test.')
157
158        try:
159            # If resolving TCP port 0 is supported, `adb forward` will print
160            # the actual port number.
161            port = self.device.forward('tcp:0', 'tcp:8888').strip()
162            if not port:
163                raise unittest.SkipTest('Forwarding tcp:0 is not available.')
164
165            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
166                                      self.device.forward_list()))
167        finally:
168            self.device.forward_remove_all()
169
170    def test_reverse(self):
171        msg = self.device.reverse_list()
172        self.assertEqual('', msg.strip(),
173                         'Reverse forwarding list must be empty to run this test.')
174        self.device.reverse('tcp:5566', 'tcp:6655')
175        msg = self.device.reverse_list()
176        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
177        self.device.reverse('tcp:7788', 'tcp:8877')
178        msg = self.device.reverse_list()
179        self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
180        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
181        self.device.reverse_remove('tcp:5566')
182        msg = self.device.reverse_list()
183        self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
184        self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
185        self.device.reverse_remove_all()
186        msg = self.device.reverse_list()
187        self.assertEqual('', msg.strip())
188
189    def test_reverse_tcp_port_0(self):
190        self.assertEqual('', self.device.reverse_list().strip(),
191                         'Reverse list must be empty to run this test.')
192
193        try:
194            # If resolving TCP port 0 is supported, `adb reverse` will print
195            # the actual port number.
196            port = self.device.reverse('tcp:0', 'tcp:8888').strip()
197            if not port:
198                raise unittest.SkipTest('Reversing tcp:0 is not available.')
199
200            self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
201                                      self.device.reverse_list()))
202        finally:
203            self.device.reverse_remove_all()
204
205    def test_forward_reverse_echo(self):
206        """Send data through adb forward and read it back via adb reverse"""
207        forward_port = 12345
208        reverse_port = forward_port + 1
209        forward_spec = 'tcp:' + str(forward_port)
210        reverse_spec = 'tcp:' + str(reverse_port)
211        forward_setup = False
212        reverse_setup = False
213
214        try:
215            # listen on localhost:forward_port, connect to remote:forward_port
216            self.device.forward(forward_spec, forward_spec)
217            forward_setup = True
218            # listen on remote:forward_port, connect to localhost:reverse_port
219            self.device.reverse(forward_spec, reverse_spec)
220            reverse_setup = True
221
222            listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
223            with contextlib.closing(listener):
224                # Use SO_REUSEADDR so that subsequent runs of the test can grab
225                # the port even if it is in TIME_WAIT.
226                listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
227
228                # Listen on localhost:reverse_port before connecting to
229                # localhost:forward_port because that will cause adb to connect
230                # back to localhost:reverse_port.
231                listener.bind(('127.0.0.1', reverse_port))
232                listener.listen(4)
233
234                client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
235                with contextlib.closing(client):
236                    # Connect to the listener.
237                    client.connect(('127.0.0.1', forward_port))
238
239                    # Accept the client connection.
240                    accepted_connection, addr = listener.accept()
241                    with contextlib.closing(accepted_connection) as server:
242                        data = b'hello'
243
244                        # Send data into the port setup by adb forward.
245                        client.sendall(data)
246                        # Explicitly close() so that server gets EOF.
247                        client.close()
248
249                        # Verify that the data came back via adb reverse.
250                        self.assertEqual(data, server.makefile().read().encode("utf8"))
251        finally:
252            if reverse_setup:
253                self.device.reverse_remove(forward_spec)
254            if forward_setup:
255                self.device.forward_remove(forward_spec)
256
257
258class ShellTest(DeviceTest):
259    def _interactive_shell(self, shell_args, input):
260        """Runs an interactive adb shell.
261
262        Args:
263          shell_args: List of string arguments to `adb shell`.
264          input: bytes input to send to the interactive shell.
265
266        Returns:
267          The remote exit code.
268
269        Raises:
270          unittest.SkipTest: The device doesn't support exit codes.
271        """
272        if not self.device.has_shell_protocol():
273            raise unittest.SkipTest('exit codes are unavailable on this device')
274
275        proc = subprocess.Popen(
276                self.device.adb_cmd + ['shell'] + shell_args,
277                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
278                stderr=subprocess.PIPE)
279        # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
280        # to explicitly add an exit command to close the session from the device
281        # side, plus the necessary newline to complete the interactive command.
282        proc.communicate(input + b'; exit\n')
283        return proc.returncode
284
285    def test_cat(self):
286        """Check that we can at least cat a file."""
287        out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
288        elements = out.split()
289        self.assertEqual(len(elements), 2)
290
291        uptime, idle = elements
292        self.assertGreater(float(uptime), 0.0)
293        self.assertGreater(float(idle), 0.0)
294
295    def test_throws_on_failure(self):
296        self.assertRaises(adb.ShellError, self.device.shell, ['false'])
297
298    def test_output_not_stripped(self):
299        out = self.device.shell(['echo', 'foo'])[0]
300        self.assertEqual(out, 'foo' + self.device.linesep)
301
302    def test_shell_command_length(self):
303        # Devices that have shell_v2 should be able to handle long commands.
304        if self.device.has_shell_protocol():
305            rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384])
306            self.assertEqual(rc, 0)
307            self.assertTrue(out == ('x' * 16384 + '\n'))
308
309    def test_shell_nocheck_failure(self):
310        rc, out, _ = self.device.shell_nocheck(['false'])
311        self.assertNotEqual(rc, 0)
312        self.assertEqual(out, '')
313
314    def test_shell_nocheck_output_not_stripped(self):
315        rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
316        self.assertEqual(rc, 0)
317        self.assertEqual(out, 'foo' + self.device.linesep)
318
319    def test_can_distinguish_tricky_results(self):
320        # If result checking on ADB shell is naively implemented as
321        # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
322        # output from the result for a cmd of `echo -n 1`.
323        rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
324        self.assertEqual(rc, 0)
325        self.assertEqual(out, '1')
326
327    def test_line_endings(self):
328        """Ensure that line ending translation is not happening in the pty.
329
330        Bug: http://b/19735063
331        """
332        output = self.device.shell(['uname'])[0]
333        self.assertEqual(output, 'Linux' + self.device.linesep)
334
335    def test_pty_logic(self):
336        """Tests that a PTY is allocated when it should be.
337
338        PTY allocation behavior should match ssh.
339        """
340        def check_pty(args):
341            """Checks adb shell PTY allocation.
342
343            Tests |args| for terminal and non-terminal stdin.
344
345            Args:
346                args: -Tt args in a list (e.g. ['-t', '-t']).
347
348            Returns:
349                A tuple (<terminal>, <non-terminal>). True indicates
350                the corresponding shell allocated a remote PTY.
351            """
352            test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
353
354            terminal = subprocess.Popen(
355                    test_cmd, stdin=None,
356                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
357            terminal.communicate()
358
359            non_terminal = subprocess.Popen(
360                    test_cmd, stdin=subprocess.PIPE,
361                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)
362            non_terminal.communicate()
363
364            return (terminal.returncode == 0, non_terminal.returncode == 0)
365
366        # -T: never allocate PTY.
367        self.assertEqual((False, False), check_pty(['-T']))
368
369        # These tests require a new device.
370        if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()):
371            # No args: PTY only if stdin is a terminal and shell is interactive,
372            # which is difficult to reliably test from a script.
373            self.assertEqual((False, False), check_pty([]))
374
375            # -t: PTY if stdin is a terminal.
376            self.assertEqual((True, False), check_pty(['-t']))
377
378        # -t -t: always allocate PTY.
379        self.assertEqual((True, True), check_pty(['-t', '-t']))
380
381        # -tt: always allocate PTY, POSIX style (http://b/32216152).
382        self.assertEqual((True, True), check_pty(['-tt']))
383
384        # -ttt: ssh has weird even/odd behavior with multiple -t flags, but
385        # we follow the man page instead.
386        self.assertEqual((True, True), check_pty(['-ttt']))
387
388        # -ttx: -x and -tt aren't incompatible (though -Tx would be an error).
389        self.assertEqual((True, True), check_pty(['-ttx']))
390
391        # -Ttt: -tt cancels out -T.
392        self.assertEqual((True, True), check_pty(['-Ttt']))
393
394        # -ttT: -T cancels out -tt.
395        self.assertEqual((False, False), check_pty(['-ttT']))
396
397    def test_shell_protocol(self):
398        """Tests the shell protocol on the device.
399
400        If the device supports shell protocol, this gives us the ability
401        to separate stdout/stderr and return the exit code directly.
402
403        Bug: http://b/19734861
404        """
405        if not self.device.has_shell_protocol():
406            raise unittest.SkipTest('shell protocol unsupported on this device')
407
408        # Shell protocol should be used by default.
409        result = self.device.shell_nocheck(
410                shlex.split('echo foo; echo bar >&2; exit 17'))
411        self.assertEqual(17, result[0])
412        self.assertEqual('foo' + self.device.linesep, result[1])
413        self.assertEqual('bar' + self.device.linesep, result[2])
414
415        self.assertEqual(17, self._interactive_shell([], b'exit 17'))
416
417        # -x flag should disable shell protocol.
418        result = self.device.shell_nocheck(
419                shlex.split('-x echo foo; echo bar >&2; exit 17'))
420        self.assertEqual(0, result[0])
421        self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
422        self.assertEqual('', result[2])
423
424        self.assertEqual(0, self._interactive_shell(['-x'], b'exit 17'))
425
426    def test_non_interactive_sigint(self):
427        """Tests that SIGINT in a non-interactive shell kills the process.
428
429        This requires the shell protocol in order to detect the broken
430        pipe; raw data transfer mode will only see the break once the
431        subprocess tries to read or write.
432
433        Bug: http://b/23825725
434        """
435        if not self.device.has_shell_protocol():
436            raise unittest.SkipTest('shell protocol unsupported on this device')
437
438        # Start a long-running process.
439        sleep_proc = subprocess.Popen(
440                self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
441                stdin=subprocess.PIPE, stdout=subprocess.PIPE,
442                stderr=subprocess.STDOUT)
443        remote_pid = sleep_proc.stdout.readline().strip().decode("utf8")
444        self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
445        proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
446
447        # Verify that the process is running, send signal, verify it stopped.
448        self.device.shell(proc_query)
449        os.kill(sleep_proc.pid, signal.SIGINT)
450        sleep_proc.communicate()
451
452        # It can take some time for the process to receive the signal and die.
453        end_time = time.time() + 3
454        while self.device.shell_nocheck(proc_query)[0] != 1:
455            self.assertFalse(time.time() > end_time,
456                             'subprocess failed to terminate in time')
457
458    def test_non_interactive_stdin(self):
459        """Tests that non-interactive shells send stdin."""
460        if not self.device.has_shell_protocol():
461            raise unittest.SkipTest('non-interactive stdin unsupported '
462                                    'on this device')
463
464        # Test both small and large inputs.
465        small_input = b'foo'
466        characters = [c.encode("utf8") for c in string.ascii_letters + string.digits]
467        large_input = b'\n'.join(characters)
468
469
470        for input in (small_input, large_input):
471            proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
472                                    stdin=subprocess.PIPE,
473                                    stdout=subprocess.PIPE,
474                                    stderr=subprocess.PIPE)
475            stdout, stderr = proc.communicate(input)
476            self.assertEqual(input.splitlines(), stdout.splitlines())
477            self.assertEqual(b'', stderr)
478
479    def test_sighup(self):
480        """Ensure that SIGHUP gets sent upon non-interactive ctrl-c"""
481        log_path = "/data/local/tmp/adb_signal_test.log"
482
483        # Clear the output file.
484        self.device.shell_nocheck(["echo", ">", log_path])
485
486        script = """
487            trap "echo SIGINT > {path}; exit 0" SIGINT
488            trap "echo SIGHUP > {path}; exit 0" SIGHUP
489            echo Waiting
490            read
491        """.format(path=log_path)
492
493        script = ";".join([x.strip() for x in script.strip().splitlines()])
494
495        with self.device.shell_popen([script], kill_atexit=False,
496                                     stdin=subprocess.PIPE,
497                                     stdout=subprocess.PIPE) as process:
498
499            self.assertEqual(b"Waiting\n", process.stdout.readline())
500            process.send_signal(signal.SIGINT)
501            process.wait()
502
503        # Waiting for the local adb to finish is insufficient, since it hangs
504        # up immediately.
505        time.sleep(1)
506
507        stdout, _ = self.device.shell(["cat", log_path])
508        self.assertEqual(stdout.strip(), "SIGHUP")
509
510    # Temporarily disabled because it seems to cause later instability.
511    # http://b/228114748
512    def disabled_test_exit_stress(self):
513        """Hammer `adb shell exit 42` with multiple threads."""
514        thread_count = 48
515        result = dict()
516        def hammer(thread_idx, thread_count, result):
517            success = True
518            for i in range(thread_idx, 240, thread_count):
519                ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)])
520                if ret != i % 256:
521                    success = False
522                    break
523            result[thread_idx] = success
524
525        threads = []
526        for i in range(thread_count):
527            thread = threading.Thread(target=hammer, args=(i, thread_count, result))
528            thread.start()
529            threads.append(thread)
530        for thread in threads:
531            thread.join()
532        for i, success in result.items():
533            self.assertTrue(success)
534
535    def disabled_test_parallel(self):
536        """Spawn a bunch of `adb shell` instances in parallel.
537
538        This was broken historically due to the use of select, which only works
539        for fds that are numerically less than 1024.
540
541        Bug: http://b/141955761"""
542
543        n_procs = 2048
544        procs = dict()
545        for i in range(0, n_procs):
546            procs[i] = subprocess.Popen(
547                ['adb', 'shell', 'read foo; echo $foo; read rc; exit $rc'],
548                stdin=subprocess.PIPE,
549                stdout=subprocess.PIPE
550            )
551
552        for i in range(0, n_procs):
553            procs[i].stdin.write("%d\n" % i)
554
555        for i in range(0, n_procs):
556            response = procs[i].stdout.readline()
557            assert(response == "%d\n" % i)
558
559        for i in range(0, n_procs):
560            procs[i].stdin.write("%d\n" % (i % 256))
561
562        for i in range(0, n_procs):
563            assert(procs[i].wait() == i % 256)
564
565
566class ArgumentEscapingTest(DeviceTest):
567    def test_shell_escaping(self):
568        """Make sure that argument escaping is somewhat sane."""
569
570        # http://b/19734868
571        # Note that this actually matches ssh(1)'s behavior --- it's
572        # converted to `sh -c echo hello; echo world` which sh interprets
573        # as `sh -c echo` (with an argument to that shell of "hello"),
574        # and then `echo world` back in the first shell.
575        result = self.device.shell(
576            shlex.split("sh -c 'echo hello; echo world'"))[0]
577        result = result.splitlines()
578        self.assertEqual(['', 'world'], result)
579        # If you really wanted "hello" and "world", here's what you'd do:
580        result = self.device.shell(
581            shlex.split(r'echo hello\;echo world'))[0].splitlines()
582        self.assertEqual(['hello', 'world'], result)
583
584        # http://b/15479704
585        result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
586        self.assertEqual('t', result)
587        result = self.device.shell(
588            shlex.split("sh -c 'true && echo t'"))[0].strip()
589        self.assertEqual('t', result)
590
591        # http://b/20564385
592        result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
593        self.assertEqual('t', result)
594        result = self.device.shell(
595            shlex.split(r'echo -n 123\;uname'))[0].strip()
596        self.assertEqual('123Linux', result)
597
598    def test_install_argument_escaping(self):
599        """Make sure that install argument escaping works."""
600        # http://b/20323053, http://b/3090932.
601        for file_suffix in (b'-text;ls;1.apk', b"-Live Hold'em.apk"):
602            tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
603                                             delete=False)
604            tf.close()
605
606            # Installing bogus .apks fails if the device supports exit codes.
607            try:
608                output = self.device.install(tf.name.decode("utf8"))
609            except subprocess.CalledProcessError as e:
610                output = e.output
611
612            self.assertIn(file_suffix, output)
613            os.remove(tf.name)
614
615
616class RootUnrootTest(DeviceTest):
617    def _test_root(self):
618        message = self.device.root()
619        if 'adbd cannot run as root in production builds' in message:
620            return
621        self.device.wait()
622        self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
623
624    def _test_unroot(self):
625        self.device.unroot()
626        self.device.wait()
627        self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
628
629    def test_root_unroot(self):
630        """Make sure that adb root and adb unroot work, using id(1)."""
631        if self.device.get_prop('ro.debuggable') != '1':
632            raise unittest.SkipTest('requires rootable build')
633
634        original_user = self.device.shell(['id', '-un'])[0].strip()
635        try:
636            if original_user == 'root':
637                self._test_unroot()
638                self._test_root()
639            elif original_user == 'shell':
640                self._test_root()
641                self._test_unroot()
642        finally:
643            if original_user == 'root':
644                self.device.root()
645            else:
646                self.device.unroot()
647            self.device.wait()
648
649
650class TcpIpTest(DeviceTest):
651    def test_tcpip_failure_raises(self):
652        """adb tcpip requires a port.
653
654        Bug: http://b/22636927
655        """
656        self.assertRaises(
657            subprocess.CalledProcessError, self.device.tcpip, '')
658        self.assertRaises(
659            subprocess.CalledProcessError, self.device.tcpip, 'foo')
660
661
662class SystemPropertiesTest(DeviceTest):
663    def test_get_prop(self):
664        self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
665
666    def test_set_prop(self):
667        # debug.* prop does not require root privileges
668        prop_name = 'debug.foo'
669        self.device.shell(['setprop', prop_name, '""'])
670
671        val = random.random()
672        self.device.set_prop(prop_name, str(val))
673        self.assertEqual(
674            self.device.shell(['getprop', prop_name])[0].strip(), str(val))
675
676
677def compute_md5(string):
678    hsh = hashlib.md5()
679    hsh.update(string)
680    return hsh.hexdigest()
681
682
683class HostFile(object):
684    def __init__(self, handle, checksum):
685        self.handle = handle
686        self.checksum = checksum
687        self.full_path = handle.name
688        self.base_name = os.path.basename(self.full_path)
689
690
691class DeviceFile(object):
692    def __init__(self, checksum, full_path):
693        self.checksum = checksum
694        self.full_path = full_path
695        self.base_name = posixpath.basename(self.full_path)
696
697
698def make_random_host_files(in_dir, num_files):
699    min_size = 1 * (1 << 10)
700    max_size = 16 * (1 << 10)
701
702    files = []
703    for _ in range(num_files):
704        file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
705
706        size = random.randrange(min_size, max_size, 1024)
707        rand_str = os.urandom(size)
708        file_handle.write(rand_str)
709        file_handle.flush()
710        file_handle.close()
711
712        md5 = compute_md5(rand_str)
713        files.append(HostFile(file_handle, md5))
714    return files
715
716
717def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
718    min_size = 1 * (1 << 10)
719    max_size = 16 * (1 << 10)
720
721    files = []
722    for file_num in range(num_files):
723        size = random.randrange(min_size, max_size, 1024)
724
725        base_name = prefix + str(file_num)
726        full_path = posixpath.join(in_dir, base_name)
727
728        device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
729                      'bs={}'.format(size), 'count=1'])
730        dev_md5, _ = device.shell(['md5sum', full_path])[0].split()
731
732        files.append(DeviceFile(dev_md5, full_path))
733    return files
734
735
736class FileOperationsTest:
737    class Base(DeviceTest):
738        SCRATCH_DIR = '/data/local/tmp'
739        DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
740        DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
741
742        def setUp(self):
743            super().setUp()
744            self.previous_env = os.environ.get("ADB_COMPRESSION")
745            os.environ["ADB_COMPRESSION"] = self.compression
746
747        def tearDown(self):
748            if self.previous_env is None:
749                del os.environ["ADB_COMPRESSION"]
750            else:
751                os.environ["ADB_COMPRESSION"] = self.previous_env
752
753        def _verify_remote(self, checksum, remote_path):
754            dev_md5, _ = self.device.shell(['md5sum', remote_path])[0].split()
755            self.assertEqual(checksum, dev_md5)
756
757        def _verify_local(self, checksum, local_path):
758            with open(local_path, 'rb') as host_file:
759                host_md5 = compute_md5(host_file.read())
760                self.assertEqual(host_md5, checksum)
761
762        def test_push(self):
763            """Push a randomly generated file to specified device."""
764            kbytes = 512
765            tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
766            rand_str = os.urandom(1024 * kbytes)
767            tmp.write(rand_str)
768            tmp.close()
769
770            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
771            self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
772
773            self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
774            self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
775
776            os.remove(tmp.name)
777
778        def test_push_dir(self):
779            """Push a randomly generated directory of files to the device."""
780            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
781            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
782
783            try:
784                host_dir = tempfile.mkdtemp()
785
786                # Make sure the temp directory isn't setuid, or else adb will complain.
787                os.chmod(host_dir, 0o700)
788
789                # Create 32 random files.
790                temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
791                self.device.push(host_dir, self.DEVICE_TEMP_DIR)
792
793                for temp_file in temp_files:
794                    remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
795                                                 os.path.basename(host_dir),
796                                                 temp_file.base_name)
797                    self._verify_remote(temp_file.checksum, remote_path)
798                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
799            finally:
800                if host_dir is not None:
801                    shutil.rmtree(host_dir)
802
803        def disabled_test_push_empty(self):
804            """Push an empty directory to the device."""
805            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
806            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
807
808            try:
809                host_dir = tempfile.mkdtemp()
810
811                # Make sure the temp directory isn't setuid, or else adb will complain.
812                os.chmod(host_dir, 0o700)
813
814                # Create an empty directory.
815                empty_dir_path = os.path.join(host_dir, 'empty')
816                os.mkdir(empty_dir_path);
817
818                self.device.push(empty_dir_path, self.DEVICE_TEMP_DIR)
819
820                remote_path = os.path.join(self.DEVICE_TEMP_DIR, "empty")
821                test_empty_cmd = ["[", "-d", remote_path, "]"]
822                rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
823
824                self.assertEqual(rc, 0)
825                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
826            finally:
827                if host_dir is not None:
828                    shutil.rmtree(host_dir)
829
830        @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows")
831        def test_push_symlink(self):
832            """Push a symlink.
833
834            Bug: http://b/31491920
835            """
836            try:
837                host_dir = tempfile.mkdtemp()
838
839                # Make sure the temp directory isn't setuid, or else adb will
840                # complain.
841                os.chmod(host_dir, 0o700)
842
843                with open(os.path.join(host_dir, 'foo'), 'w') as f:
844                    f.write('foo')
845
846                symlink_path = os.path.join(host_dir, 'symlink')
847                os.symlink('foo', symlink_path)
848
849                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
850                self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
851                self.device.push(symlink_path, self.DEVICE_TEMP_DIR)
852                rc, out, _ = self.device.shell_nocheck(
853                    ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')])
854                self.assertEqual(0, rc)
855                self.assertEqual(out.strip(), 'foo')
856            finally:
857                if host_dir is not None:
858                    shutil.rmtree(host_dir)
859
860        def test_multiple_push(self):
861            """Push multiple files to the device in one adb push command.
862
863            Bug: http://b/25324823
864            """
865
866            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
867            self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
868
869            try:
870                host_dir = tempfile.mkdtemp()
871
872                # Create some random files and a subdirectory containing more files.
873                temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
874
875                subdir = os.path.join(host_dir, 'subdir')
876                os.mkdir(subdir)
877                subdir_temp_files = make_random_host_files(in_dir=subdir,
878                                                           num_files=4)
879
880                paths = [x.full_path for x in temp_files]
881                paths.append(subdir)
882                self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
883
884                for temp_file in temp_files:
885                    remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
886                                                 temp_file.base_name)
887                    self._verify_remote(temp_file.checksum, remote_path)
888
889                for subdir_temp_file in subdir_temp_files:
890                    remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
891                                                 # BROKEN: http://b/25394682
892                                                 # 'subdir';
893                                                 temp_file.base_name)
894                    self._verify_remote(temp_file.checksum, remote_path)
895
896
897                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
898            finally:
899                if host_dir is not None:
900                    shutil.rmtree(host_dir)
901
902        @requires_non_root
903        def test_push_error_reporting(self):
904            """Make sure that errors that occur while pushing a file get reported
905
906            Bug: http://b/26816782
907            """
908            with tempfile.NamedTemporaryFile() as tmp_file:
909                tmp_file.write(b'\0' * 1024 * 1024)
910                tmp_file.flush()
911                try:
912                    self.device.push(local=tmp_file.name, remote='/system/')
913                    self.fail('push should not have succeeded')
914                except subprocess.CalledProcessError as e:
915                    output = e.output
916
917                self.assertTrue(b'Permission denied' in output or
918                                b'Read-only file system' in output)
919
920        @requires_non_root
921        def test_push_directory_creation(self):
922            """Regression test for directory creation.
923
924            Bug: http://b/110953234
925            """
926            with tempfile.NamedTemporaryFile() as tmp_file:
927                tmp_file.write(b'\0' * 1024 * 1024)
928                tmp_file.flush()
929                remote_path = self.DEVICE_TEMP_DIR + '/test_push_directory_creation'
930                self.device.shell(['rm', '-rf', remote_path])
931
932                remote_path += '/filename'
933                self.device.push(local=tmp_file.name, remote=remote_path)
934
935        def disabled_test_push_multiple_slash_root(self):
936            """Regression test for pushing to //data/local/tmp.
937
938            Bug: http://b/141311284
939
940            Disabled because this broken on the adbd side as well: b/141943968
941            """
942            with tempfile.NamedTemporaryFile() as tmp_file:
943                tmp_file.write(b'\0' * 1024 * 1024)
944                tmp_file.flush()
945                remote_path = '/' + self.DEVICE_TEMP_DIR + '/test_push_multiple_slash_root'
946                self.device.shell(['rm', '-rf', remote_path])
947                self.device.push(local=tmp_file.name, remote=remote_path)
948
949        def _test_pull(self, remote_file, checksum):
950            tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
951            tmp_write.close()
952            self.device.pull(remote=remote_file, local=tmp_write.name)
953            with open(tmp_write.name, 'rb') as tmp_read:
954                host_contents = tmp_read.read()
955                host_md5 = compute_md5(host_contents)
956            self.assertEqual(checksum, host_md5)
957            os.remove(tmp_write.name)
958
959        @requires_non_root
960        def test_pull_error_reporting(self):
961            self.device.shell(['touch', self.DEVICE_TEMP_FILE])
962            self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
963
964            try:
965                output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
966            except subprocess.CalledProcessError as e:
967                output = e.output
968
969            self.assertIn(b'Permission denied', output)
970
971            self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
972
973        def test_pull(self):
974            """Pull a randomly generated file from specified device."""
975            kbytes = 512
976            self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
977            cmd = ['dd', 'if=/dev/urandom',
978                   'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
979                   'count={}'.format(kbytes)]
980            self.device.shell(cmd)
981            dev_md5, _ = self.device.shell(['md5sum', self.DEVICE_TEMP_FILE])[0].split()
982            self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
983            self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
984
985        def test_pull_dir(self):
986            """Pull a randomly generated directory of files from the device."""
987            try:
988                host_dir = tempfile.mkdtemp()
989
990                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
991                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
992
993                # Populate device directory with random files.
994                temp_files = make_random_device_files(
995                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
996
997                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
998
999                for temp_file in temp_files:
1000                    host_path = os.path.join(
1001                        host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1002                        temp_file.base_name)
1003                    self._verify_local(temp_file.checksum, host_path)
1004
1005                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1006            finally:
1007                if host_dir is not None:
1008                    shutil.rmtree(host_dir)
1009
1010        def test_pull_dir_symlink(self):
1011            """Pull a directory into a symlink to a directory.
1012
1013            Bug: http://b/27362811
1014            """
1015            if os.name != 'posix':
1016                raise unittest.SkipTest('requires POSIX')
1017
1018            try:
1019                host_dir = tempfile.mkdtemp()
1020                real_dir = os.path.join(host_dir, 'dir')
1021                symlink = os.path.join(host_dir, 'symlink')
1022                os.mkdir(real_dir)
1023                os.symlink(real_dir, symlink)
1024
1025                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1026                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1027
1028                # Populate device directory with random files.
1029                temp_files = make_random_device_files(
1030                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1031
1032                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
1033
1034                for temp_file in temp_files:
1035                    host_path = os.path.join(
1036                        real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
1037                        temp_file.base_name)
1038                    self._verify_local(temp_file.checksum, host_path)
1039
1040                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1041            finally:
1042                if host_dir is not None:
1043                    shutil.rmtree(host_dir)
1044
1045        def test_pull_dir_symlink_collision(self):
1046            """Pull a directory into a colliding symlink to directory."""
1047            if os.name != 'posix':
1048                raise unittest.SkipTest('requires POSIX')
1049
1050            try:
1051                host_dir = tempfile.mkdtemp()
1052                real_dir = os.path.join(host_dir, 'real')
1053                tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
1054                symlink = os.path.join(host_dir, tmp_dirname)
1055                os.mkdir(real_dir)
1056                os.symlink(real_dir, symlink)
1057
1058                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1059                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1060
1061                # Populate device directory with random files.
1062                temp_files = make_random_device_files(
1063                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1064
1065                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1066
1067                for temp_file in temp_files:
1068                    host_path = os.path.join(real_dir, temp_file.base_name)
1069                    self._verify_local(temp_file.checksum, host_path)
1070
1071                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1072            finally:
1073                if host_dir is not None:
1074                    shutil.rmtree(host_dir)
1075
1076        def test_pull_dir_nonexistent(self):
1077            """Pull a directory of files from the device to a nonexistent path."""
1078            try:
1079                host_dir = tempfile.mkdtemp()
1080                dest_dir = os.path.join(host_dir, 'dest')
1081
1082                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1083                self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1084
1085                # Populate device directory with random files.
1086                temp_files = make_random_device_files(
1087                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1088
1089                self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
1090
1091                for temp_file in temp_files:
1092                    host_path = os.path.join(dest_dir, temp_file.base_name)
1093                    self._verify_local(temp_file.checksum, host_path)
1094
1095                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1096            finally:
1097                if host_dir is not None:
1098                    shutil.rmtree(host_dir)
1099
1100        # selinux prevents adbd from accessing symlinks on /data/local/tmp.
1101        def disabled_test_pull_symlink_dir(self):
1102            """Pull a symlink to a directory of symlinks to files."""
1103            try:
1104                host_dir = tempfile.mkdtemp()
1105
1106                remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
1107                remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
1108                remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
1109
1110                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1111                self.device.shell(['mkdir', '-p', remote_dir, remote_links])
1112                self.device.shell(['ln', '-s', remote_links, remote_symlink])
1113
1114                # Populate device directory with random files.
1115                temp_files = make_random_device_files(
1116                    self.device, in_dir=remote_dir, num_files=32)
1117
1118                for temp_file in temp_files:
1119                    self.device.shell(
1120                        ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
1121                         posixpath.join(remote_links, temp_file.base_name)])
1122
1123                self.device.pull(remote=remote_symlink, local=host_dir)
1124
1125                for temp_file in temp_files:
1126                    host_path = os.path.join(
1127                        host_dir, 'symlink', temp_file.base_name)
1128                    self._verify_local(temp_file.checksum, host_path)
1129
1130                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1131            finally:
1132                if host_dir is not None:
1133                    shutil.rmtree(host_dir)
1134
1135        def test_pull_empty(self):
1136            """Pull a directory containing an empty directory from the device."""
1137            try:
1138                host_dir = tempfile.mkdtemp()
1139
1140                remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
1141                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1142                self.device.shell(['mkdir', '-p', remote_empty_path])
1143
1144                self.device.pull(remote=remote_empty_path, local=host_dir)
1145                self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
1146            finally:
1147                if host_dir is not None:
1148                    shutil.rmtree(host_dir)
1149
1150        def test_multiple_pull(self):
1151            """Pull a randomly generated directory of files from the device."""
1152
1153            try:
1154                host_dir = tempfile.mkdtemp()
1155
1156                subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
1157                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1158                self.device.shell(['mkdir', '-p', subdir])
1159
1160                # Create some random files and a subdirectory containing more files.
1161                temp_files = make_random_device_files(
1162                    self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
1163
1164                subdir_temp_files = make_random_device_files(
1165                    self.device, in_dir=subdir, num_files=4, prefix='subdir_')
1166
1167                paths = [x.full_path for x in temp_files]
1168                paths.append(subdir)
1169                self.device._simple_call(['pull'] + paths + [host_dir])
1170
1171                for temp_file in temp_files:
1172                    local_path = os.path.join(host_dir, temp_file.base_name)
1173                    self._verify_local(temp_file.checksum, local_path)
1174
1175                for subdir_temp_file in subdir_temp_files:
1176                    local_path = os.path.join(host_dir,
1177                                              'subdir',
1178                                              subdir_temp_file.base_name)
1179                    self._verify_local(subdir_temp_file.checksum, local_path)
1180
1181                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1182            finally:
1183                if host_dir is not None:
1184                    shutil.rmtree(host_dir)
1185
1186        def verify_sync(self, device, temp_files, device_dir):
1187            """Verifies that a list of temp files was synced to the device."""
1188            # Confirm that every file on the device mirrors that on the host.
1189            for temp_file in temp_files:
1190                device_full_path = posixpath.join(
1191                    device_dir, temp_file.base_name)
1192                dev_md5, _ = device.shell(['md5sum', device_full_path])[0].split()
1193                self.assertEqual(temp_file.checksum, dev_md5)
1194
1195        def test_sync(self):
1196            """Sync a host directory to the data partition."""
1197
1198            try:
1199                base_dir = tempfile.mkdtemp()
1200
1201                # Create mirror device directory hierarchy within base_dir.
1202                full_dir_path = base_dir + self.DEVICE_TEMP_DIR
1203                os.makedirs(full_dir_path)
1204
1205                # Create 32 random files within the host mirror.
1206                temp_files = make_random_host_files(
1207                    in_dir=full_dir_path, num_files=32)
1208
1209                # Clean up any stale files on the device.
1210                device = adb.get_device()  # pylint: disable=no-member
1211                device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1212
1213                old_product_out = os.environ.get('ANDROID_PRODUCT_OUT')
1214                os.environ['ANDROID_PRODUCT_OUT'] = base_dir
1215                device.sync('data')
1216                if old_product_out is None:
1217                    del os.environ['ANDROID_PRODUCT_OUT']
1218                else:
1219                    os.environ['ANDROID_PRODUCT_OUT'] = old_product_out
1220
1221                self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR)
1222
1223                #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1224            finally:
1225                if base_dir is not None:
1226                    shutil.rmtree(base_dir)
1227
1228        def test_push_sync(self):
1229            """Sync a host directory to a specific path."""
1230
1231            try:
1232                temp_dir = tempfile.mkdtemp()
1233                temp_files = make_random_host_files(in_dir=temp_dir, num_files=32)
1234
1235                device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst')
1236
1237                # Clean up any stale files on the device.
1238                device = adb.get_device()  # pylint: disable=no-member
1239                device.shell(['rm', '-rf', device_dir])
1240
1241                device.push(temp_dir, device_dir, sync=True)
1242
1243                self.verify_sync(device, temp_files, device_dir)
1244
1245                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1246            finally:
1247                if temp_dir is not None:
1248                    shutil.rmtree(temp_dir)
1249
1250        def test_push_sync_multiple(self):
1251            """Sync multiple host directories to a specific path."""
1252
1253            try:
1254                temp_dir = tempfile.mkdtemp()
1255                temp_files = make_random_host_files(in_dir=temp_dir, num_files=32)
1256
1257                device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst')
1258
1259                # Clean up any stale files on the device.
1260                device = adb.get_device()  # pylint: disable=no-member
1261                device.shell(['rm', '-rf', device_dir])
1262                device.shell(['mkdir', '-p', device_dir])
1263
1264                host_paths = [os.path.join(temp_dir, x.base_name) for x in temp_files]
1265                device.push(host_paths, device_dir, sync=True)
1266
1267                self.verify_sync(device, temp_files, device_dir)
1268
1269                self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1270            finally:
1271                if temp_dir is not None:
1272                    shutil.rmtree(temp_dir)
1273
1274
1275        def test_push_dry_run_nonexistent_file(self):
1276            """Push with dry run (non-existent file)."""
1277
1278            for file_size in [8, 1024 * 1024]:
1279                try:
1280                    device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run')
1281                    device_file = posixpath.join(device_dir, 'file')
1282
1283                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1284                    self.device.shell(['mkdir', '-p', device_dir])
1285
1286                    host_dir = tempfile.mkdtemp()
1287                    host_file = posixpath.join(host_dir, 'file')
1288
1289                    with open(host_file, "w") as f:
1290                        f.write('x' * file_size)
1291
1292                    self.device._simple_call(['push', '-n', host_file, device_file])
1293                    rc, _, _ = self.device.shell_nocheck(['[', '-e', device_file, ']'])
1294                    self.assertNotEqual(0, rc)
1295
1296                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1297                finally:
1298                    if host_dir is not None:
1299                        shutil.rmtree(host_dir)
1300
1301        def test_push_dry_run_existent_file(self):
1302            """Push with dry run."""
1303
1304            for file_size in [8, 1024 * 1024]:
1305                try:
1306                    host_dir = tempfile.mkdtemp()
1307                    device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run')
1308                    device_file = posixpath.join(device_dir, 'file')
1309
1310                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1311                    self.device.shell(['mkdir', '-p', device_dir])
1312                    self.device.shell(['echo', 'foo', '>', device_file])
1313
1314                    host_file = posixpath.join(host_dir, 'file')
1315
1316                    with open(host_file, "w") as f:
1317                        f.write('x' * file_size)
1318
1319                    self.device._simple_call(['push', '-n', host_file, device_file])
1320                    stdout, stderr = self.device.shell(['cat', device_file])
1321                    self.assertEqual(stdout.strip(), "foo")
1322
1323                    self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1324                finally:
1325                    if host_dir is not None:
1326                        shutil.rmtree(host_dir)
1327
1328        def test_unicode_paths(self):
1329            """Ensure that we can support non-ASCII paths, even on Windows."""
1330            name = u'로보카 폴리'
1331
1332            self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1333            remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
1334
1335            ## push.
1336            tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
1337            tf.close()
1338            self.device.push(tf.name, remote_path)
1339            os.remove(tf.name)
1340            self.assertFalse(os.path.exists(tf.name))
1341
1342            # Verify that the device ended up with the expected UTF-8 path
1343            output = self.device.shell(
1344                    ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
1345            self.assertEqual(remote_path, output)
1346
1347            # pull.
1348            self.device.pull(remote_path, tf.name)
1349            self.assertTrue(os.path.exists(tf.name))
1350            os.remove(tf.name)
1351            self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1352
1353
1354class FileOperationsTestUncompressed(FileOperationsTest.Base):
1355    compression = "none"
1356
1357
1358class FileOperationsTestBrotli(FileOperationsTest.Base):
1359    compression = "brotli"
1360
1361
1362class FileOperationsTestLZ4(FileOperationsTest.Base):
1363    compression = "lz4"
1364
1365
1366class FileOperationsTestZstd(FileOperationsTest.Base):
1367    compression = "zstd"
1368
1369
1370class DeviceOfflineTest(DeviceTest):
1371    def _get_device_state(self, serialno):
1372        output = subprocess.check_output(self.device.adb_cmd + ['devices'])
1373        for line in output.split('\n'):
1374            m = re.match('(\S+)\s+(\S+)', line)
1375            if m and m.group(1) == serialno:
1376                return m.group(2)
1377        return None
1378
1379    def disabled_test_killed_when_pushing_a_large_file(self):
1380        """
1381           While running adb push with a large file, kill adb server.
1382           Occasionally the device becomes offline. Because the device is still
1383           reading data without realizing that the adb server has been restarted.
1384           Test if we can bring the device online automatically now.
1385           http://b/32952319
1386        """
1387        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1388        # 1. Push a large file
1389        file_path = 'tmp_large_file'
1390        try:
1391            fh = open(file_path, 'w')
1392            fh.write('\0' * (100 * 1024 * 1024))
1393            fh.close()
1394            subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp'])
1395            time.sleep(0.1)
1396            # 2. Kill the adb server
1397            subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1398            subproc.terminate()
1399        finally:
1400            try:
1401                os.unlink(file_path)
1402            except:
1403                pass
1404        # 3. See if the device still exist.
1405        # Sleep to wait for the adb server exit.
1406        time.sleep(0.5)
1407        # 4. The device should be online
1408        self.assertEqual(self._get_device_state(serialno), 'device')
1409
1410    def disabled_test_killed_when_pulling_a_large_file(self):
1411        """
1412           While running adb pull with a large file, kill adb server.
1413           Occasionally the device can't be connected. Because the device is trying to
1414           send a message larger than what is expected by the adb server.
1415           Test if we can bring the device online automatically now.
1416        """
1417        serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip()
1418        file_path = 'tmp_large_file'
1419        try:
1420            # 1. Create a large file on device.
1421            self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file',
1422                               'bs=1000000', 'count=100'])
1423            # 2. Pull the large file on host.
1424            subproc = subprocess.Popen(self.device.adb_cmd +
1425                                       ['pull','/data/local/tmp/tmp_large_file', file_path])
1426            time.sleep(0.1)
1427            # 3. Kill the adb server
1428            subprocess.check_call(self.device.adb_cmd + ['kill-server'])
1429            subproc.terminate()
1430        finally:
1431            try:
1432                os.unlink(file_path)
1433            except:
1434                pass
1435        # 4. See if the device still exist.
1436        # Sleep to wait for the adb server exit.
1437        time.sleep(0.5)
1438        self.assertEqual(self._get_device_state(serialno), 'device')
1439
1440
1441    def test_packet_size_regression(self):
1442        """Test for http://b/37783561
1443
1444        Receiving packets of a length divisible by 512 but not 1024 resulted in
1445        the adb client waiting indefinitely for more input.
1446        """
1447        # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n
1448        # Probe some surrounding values as well, for the hell of it.
1449        for base in [512] + list(range(1024, 1024 * 16, 1024)):
1450            for offset in [-6, -5, -4]:
1451                length = base + offset
1452                cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;'
1453                       'echo', 'foo']
1454                rc, stdout, _ = self.device.shell_nocheck(cmd)
1455
1456                self.assertEqual(0, rc)
1457
1458                # Output should be '\0' * length, followed by "foo\n"
1459                self.assertEqual(length, len(stdout) - 4)
1460                self.assertEqual(stdout, "\0" * length + "foo\n")
1461
1462    def test_zero_packet(self):
1463        """Test for http://b/113070258
1464
1465        Make sure that we don't blow up when sending USB transfers that line up
1466        exactly with the USB packet size.
1467        """
1468
1469        local_port = int(self.device.forward("tcp:0", "tcp:12345"))
1470        try:
1471            for size in [512, 1024]:
1472                def listener():
1473                    cmd = ["echo foo | nc -l -p 12345; echo done"]
1474                    rc, stdout, stderr = self.device.shell_nocheck(cmd)
1475
1476                thread = threading.Thread(target=listener)
1477                thread.start()
1478
1479                # Wait a bit to let the shell command start.
1480                time.sleep(0.25)
1481
1482                sock = socket.create_connection(("localhost", local_port))
1483                with contextlib.closing(sock):
1484                    bytesWritten = sock.send(b"a" * size)
1485                    self.assertEqual(size, bytesWritten)
1486                    readBytes = sock.recv(4096)
1487                    self.assertEqual(b"foo\n", readBytes)
1488
1489                thread.join()
1490        finally:
1491            self.device.forward_remove("tcp:{}".format(local_port))
1492
1493
1494class SocketTest(DeviceTest):
1495    def test_socket_flush(self):
1496        """Test that we handle socket closure properly.
1497
1498        If we're done writing to a socket, closing before the other end has
1499        closed will send a TCP_RST if we have incoming data queued up, which
1500        may result in data that we've written being discarded.
1501
1502        Bug: http://b/74616284
1503        """
1504        def adb_length_prefixed(string):
1505            encoded = string.encode("utf8")
1506            result = b"%04x%s" % (len(encoded), encoded)
1507            return result
1508
1509        if "ANDROID_SERIAL" in os.environ:
1510            transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"]
1511        else:
1512            transport_string = "host:transport-any"
1513
1514        with socket.create_connection(("localhost", 5037)) as s:
1515
1516            s.sendall(adb_length_prefixed(transport_string))
1517            response = s.recv(4)
1518            self.assertEqual(b"OKAY", response)
1519
1520            shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo"
1521            s.sendall(adb_length_prefixed(shell_string))
1522
1523            response = s.recv(4)
1524            self.assertEqual(b"OKAY", response)
1525
1526            # Spawn a thread that dumps garbage into the socket until failure.
1527            def spam():
1528                buf = b"\0" * 16384
1529                try:
1530                    while True:
1531                        s.sendall(buf)
1532                except Exception as ex:
1533                    print(ex)
1534
1535            thread = threading.Thread(target=spam)
1536            thread.start()
1537
1538            time.sleep(1)
1539
1540            received = b""
1541            while True:
1542                read = s.recv(512)
1543                if len(read) == 0:
1544                    break
1545                received += read
1546
1547        self.assertEqual(1024 * 1024 + len("foo\n"), len(received))
1548        thread.join()
1549
1550
1551class FramebufferTest(DeviceTest):
1552    def test_framebuffer(self):
1553        """Test that we get something from the framebuffer service."""
1554        output = subprocess.check_output(self.device.adb_cmd + ["raw", "framebuffer:"])
1555        self.assertFalse(len(output) == 0)
1556
1557
1558if sys.platform == "win32":
1559    # From https://stackoverflow.com/a/38749458
1560    import os
1561    import contextlib
1562    import msvcrt
1563    import ctypes
1564    from ctypes import wintypes
1565
1566    kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
1567
1568    GENERIC_READ  = 0x80000000
1569    GENERIC_WRITE = 0x40000000
1570    FILE_SHARE_READ  = 1
1571    FILE_SHARE_WRITE = 2
1572    CONSOLE_TEXTMODE_BUFFER = 1
1573    INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
1574    STD_OUTPUT_HANDLE = wintypes.DWORD(-11)
1575    STD_ERROR_HANDLE = wintypes.DWORD(-12)
1576
1577    def _check_zero(result, func, args):
1578        if not result:
1579            raise ctypes.WinError(ctypes.get_last_error())
1580        return args
1581
1582    def _check_invalid(result, func, args):
1583        if result == INVALID_HANDLE_VALUE:
1584            raise ctypes.WinError(ctypes.get_last_error())
1585        return args
1586
1587    if not hasattr(wintypes, 'LPDWORD'): # Python 2
1588        wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD)
1589        wintypes.PSMALL_RECT = ctypes.POINTER(wintypes.SMALL_RECT)
1590
1591    class COORD(ctypes.Structure):
1592        _fields_ = (('X', wintypes.SHORT),
1593                    ('Y', wintypes.SHORT))
1594
1595    class CONSOLE_SCREEN_BUFFER_INFOEX(ctypes.Structure):
1596        _fields_ = (('cbSize',               wintypes.ULONG),
1597                    ('dwSize',               COORD),
1598                    ('dwCursorPosition',     COORD),
1599                    ('wAttributes',          wintypes.WORD),
1600                    ('srWindow',             wintypes.SMALL_RECT),
1601                    ('dwMaximumWindowSize',  COORD),
1602                    ('wPopupAttributes',     wintypes.WORD),
1603                    ('bFullscreenSupported', wintypes.BOOL),
1604                    ('ColorTable',           wintypes.DWORD * 16))
1605        def __init__(self, *args, **kwds):
1606            super(CONSOLE_SCREEN_BUFFER_INFOEX, self).__init__(
1607                    *args, **kwds)
1608            self.cbSize = ctypes.sizeof(self)
1609
1610    PCONSOLE_SCREEN_BUFFER_INFOEX = ctypes.POINTER(
1611                                        CONSOLE_SCREEN_BUFFER_INFOEX)
1612    LPSECURITY_ATTRIBUTES = wintypes.LPVOID
1613
1614    kernel32.GetStdHandle.errcheck = _check_invalid
1615    kernel32.GetStdHandle.restype = wintypes.HANDLE
1616    kernel32.GetStdHandle.argtypes = (
1617        wintypes.DWORD,) # _In_ nStdHandle
1618
1619    kernel32.CreateConsoleScreenBuffer.errcheck = _check_invalid
1620    kernel32.CreateConsoleScreenBuffer.restype = wintypes.HANDLE
1621    kernel32.CreateConsoleScreenBuffer.argtypes = (
1622        wintypes.DWORD,        # _In_       dwDesiredAccess
1623        wintypes.DWORD,        # _In_       dwShareMode
1624        LPSECURITY_ATTRIBUTES, # _In_opt_   lpSecurityAttributes
1625        wintypes.DWORD,        # _In_       dwFlags
1626        wintypes.LPVOID)       # _Reserved_ lpScreenBufferData
1627
1628    kernel32.GetConsoleScreenBufferInfoEx.errcheck = _check_zero
1629    kernel32.GetConsoleScreenBufferInfoEx.argtypes = (
1630        wintypes.HANDLE,               # _In_  hConsoleOutput
1631        PCONSOLE_SCREEN_BUFFER_INFOEX) # _Out_ lpConsoleScreenBufferInfo
1632
1633    kernel32.SetConsoleScreenBufferInfoEx.errcheck = _check_zero
1634    kernel32.SetConsoleScreenBufferInfoEx.argtypes = (
1635        wintypes.HANDLE,               # _In_  hConsoleOutput
1636        PCONSOLE_SCREEN_BUFFER_INFOEX) # _In_  lpConsoleScreenBufferInfo
1637
1638    kernel32.SetConsoleWindowInfo.errcheck = _check_zero
1639    kernel32.SetConsoleWindowInfo.argtypes = (
1640        wintypes.HANDLE,      # _In_ hConsoleOutput
1641        wintypes.BOOL,        # _In_ bAbsolute
1642        wintypes.PSMALL_RECT) # _In_ lpConsoleWindow
1643
1644    kernel32.FillConsoleOutputCharacterW.errcheck = _check_zero
1645    kernel32.FillConsoleOutputCharacterW.argtypes = (
1646        wintypes.HANDLE,  # _In_  hConsoleOutput
1647        wintypes.WCHAR,   # _In_  cCharacter
1648        wintypes.DWORD,   # _In_  nLength
1649        COORD,            # _In_  dwWriteCoord
1650        wintypes.LPDWORD) # _Out_ lpNumberOfCharsWritten
1651
1652    kernel32.ReadConsoleOutputCharacterW.errcheck = _check_zero
1653    kernel32.ReadConsoleOutputCharacterW.argtypes = (
1654        wintypes.HANDLE,  # _In_  hConsoleOutput
1655        wintypes.LPWSTR,  # _Out_ lpCharacter
1656        wintypes.DWORD,   # _In_  nLength
1657        COORD,            # _In_  dwReadCoord
1658        wintypes.LPDWORD) # _Out_ lpNumberOfCharsRead
1659
1660    @contextlib.contextmanager
1661    def allocate_console():
1662        allocated = kernel32.AllocConsole()
1663        try:
1664            yield allocated
1665        finally:
1666            if allocated:
1667                kernel32.FreeConsole()
1668
1669    @contextlib.contextmanager
1670    def console_screen(ncols=None, nrows=None):
1671        info = CONSOLE_SCREEN_BUFFER_INFOEX()
1672        new_info = CONSOLE_SCREEN_BUFFER_INFOEX()
1673        nwritten = (wintypes.DWORD * 1)()
1674        hStdOut = kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
1675        kernel32.GetConsoleScreenBufferInfoEx(
1676               hStdOut, ctypes.byref(info))
1677        if ncols is None:
1678            ncols = info.dwSize.X
1679        if nrows is None:
1680            nrows = info.dwSize.Y
1681        elif nrows > 9999:
1682            raise ValueError('nrows must be 9999 or less')
1683        fd_screen = None
1684        hScreen = kernel32.CreateConsoleScreenBuffer(
1685                    GENERIC_READ | GENERIC_WRITE,
1686                    FILE_SHARE_READ | FILE_SHARE_WRITE,
1687                    None, CONSOLE_TEXTMODE_BUFFER, None)
1688        try:
1689            fd_screen = msvcrt.open_osfhandle(
1690                            hScreen, os.O_RDWR | os.O_BINARY)
1691            kernel32.GetConsoleScreenBufferInfoEx(
1692                   hScreen, ctypes.byref(new_info))
1693            new_info.dwSize = COORD(ncols, nrows)
1694            new_info.srWindow = wintypes.SMALL_RECT(
1695                    Left=0, Top=0, Right=(ncols - 1),
1696                    Bottom=(info.srWindow.Bottom - info.srWindow.Top))
1697            kernel32.SetConsoleScreenBufferInfoEx(
1698                    hScreen, ctypes.byref(new_info))
1699            kernel32.SetConsoleWindowInfo(hScreen, True,
1700                    ctypes.byref(new_info.srWindow))
1701            kernel32.FillConsoleOutputCharacterW(
1702                    hScreen, u'\0', ncols * nrows, COORD(0,0), nwritten)
1703            kernel32.SetConsoleActiveScreenBuffer(hScreen)
1704            try:
1705                yield fd_screen
1706            finally:
1707                kernel32.SetConsoleScreenBufferInfoEx(
1708                    hStdOut, ctypes.byref(info))
1709                kernel32.SetConsoleWindowInfo(hStdOut, True,
1710                        ctypes.byref(info.srWindow))
1711                kernel32.SetConsoleActiveScreenBuffer(hStdOut)
1712        finally:
1713            if fd_screen is not None:
1714                os.close(fd_screen)
1715            else:
1716                kernel32.CloseHandle(hScreen)
1717
1718    def read_screen(fd):
1719        hScreen = msvcrt.get_osfhandle(fd)
1720        csbi = CONSOLE_SCREEN_BUFFER_INFOEX()
1721        kernel32.GetConsoleScreenBufferInfoEx(
1722            hScreen, ctypes.byref(csbi))
1723        ncols = csbi.dwSize.X
1724        pos = csbi.dwCursorPosition
1725        length = ncols * pos.Y + pos.X + 1
1726        buf = (ctypes.c_wchar * length)()
1727        n = (wintypes.DWORD * 1)()
1728        kernel32.ReadConsoleOutputCharacterW(
1729            hScreen, buf, length, COORD(0,0), n)
1730        lines = [buf[i:i+ncols].rstrip(u'\0')
1731                    for i in range(0, n[0], ncols)]
1732        return u'\n'.join(lines)
1733
1734@unittest.skipUnless(sys.platform == "win32", "requires Windows")
1735class WindowsConsoleTest(DeviceTest):
1736    def test_unicode_output(self):
1737        """Test Unicode command line parameters and Unicode console window output.
1738
1739        Bug: https://issuetracker.google.com/issues/111972753
1740        """
1741        # If we don't have a console window, allocate one. This isn't necessary if we're already
1742        # being run from a console window, which is typical.
1743        with allocate_console() as allocated_console:
1744            # Create a temporary console buffer and switch to it. We could also pass a parameter of
1745            # ncols=len(unicode_string), but it causes the window to flash as it is resized and
1746            # likely unnecessary given the typical console window size.
1747            with console_screen(nrows=1000) as screen:
1748                unicode_string = u'로보카 폴리'
1749                # Run adb and allow it to detect that stdout is a console, not a pipe, by using
1750                # device.shell_popen() which does not use a pipe, unlike device.shell().
1751                process = self.device.shell_popen(['echo', '"' + unicode_string + '"'])
1752                process.wait()
1753                # Read what was written by adb to the temporary console buffer.
1754                console_output = read_screen(screen)
1755                self.assertEqual(unicode_string, console_output)
1756
1757class DevicesListing(DeviceTest):
1758
1759    serial = subprocess.check_output(['adb', 'get-serialno']).strip().decode("utf-8")
1760    # def get_serial(self):
1761    #     return subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip().decode("utf-8")
1762
1763    def test_devices(self):
1764        with subprocess.Popen(['adb', 'devices'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
1765            lines = list(map(lambda b: b.decode("utf-8"), proc.stdout.readlines()))
1766            self.assertEqual(len(lines), 3)
1767            line = lines[1]
1768            self.assertTrue(self.serial in line)
1769            self.assertFalse("{" in line)
1770            self.assertFalse("}" in line)
1771            self.assertTrue("device" in line)
1772            self.assertFalse("product" in line)
1773            self.assertFalse("transport" in line)
1774
1775    def test_devices_l(self):
1776        with subprocess.Popen(['adb', 'devices', '-l'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
1777            lines = list(map(lambda b: b.decode("utf-8"), proc.stdout.readlines()))
1778            self.assertEqual(len(lines), 3)
1779            line = lines[1]
1780            self.assertTrue(self.serial in line)
1781            self.assertFalse("{" in line)
1782            self.assertFalse("}" in line)
1783            self.assertTrue("device" in line)
1784            self.assertTrue("product" in line)
1785            self.assertTrue("transport" in line)
1786
1787    def test_track_devices(self):
1788        with subprocess.Popen(['adb', 'track-devices'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
1789            with io.TextIOWrapper(proc.stdout, encoding='utf8') as reader:
1790                output_size = int(reader.read(4), 16)
1791                output = reader.read(output_size)
1792                self.assertFalse("{" in output)
1793                self.assertFalse("}" in output)
1794                self.assertTrue(self.serial in output)
1795                self.assertTrue("device" in output)
1796                self.assertFalse("product" in output)
1797                self.assertFalse("transport" in output)
1798            proc.terminate()
1799
1800    def test_track_devices_l(self):
1801        with subprocess.Popen(['adb', 'track-devices', '-l'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
1802            with io.TextIOWrapper(proc.stdout, encoding='utf8') as reader:
1803                output_size = int(reader.read(4), 16)
1804                output = reader.read(output_size)
1805                self.assertFalse("{" in output)
1806                self.assertFalse("}" in output)
1807                self.assertTrue(self.serial in output)
1808                self.assertTrue("device" in output)
1809                self.assertTrue("product" in output)
1810                self.assertTrue("transport" in output)
1811            proc.terminate()
1812
1813    def test_track_devices_proto_text(self):
1814        with subprocess.Popen(['adb', 'track-devices', '--proto-text'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
1815            with io.TextIOWrapper(proc.stdout, encoding='utf8') as reader:
1816                output_size = int(reader.read(4), 16)
1817                output = reader.read(output_size)
1818                self.assertTrue("{" in output)
1819                self.assertTrue("}" in output)
1820                self.assertTrue(self.serial in output)
1821                self.assertTrue("device" in output)
1822                self.assertTrue("product" in output)
1823                self.assertTrue("connection_type" in output)
1824            proc.terminate()
1825
1826    def test_track_devices_proto_binary(self):
1827        with subprocess.Popen(['adb', 'track-devices', '--proto-binary'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
1828
1829            output_size = int(proc.stdout.read(4).decode("utf-8"), 16)
1830            proto = proc.stdout.read(output_size)
1831
1832            devices = adb_host_proto.Devices()
1833            devices.ParseFromString(proto)
1834
1835            device = devices.device[0]
1836            self.assertTrue(device.serial == self.serial)
1837            self.assertFalse(device.bus_address == "")
1838            self.assertFalse(device.product == "")
1839            self.assertFalse(device.model == "")
1840            self.assertFalse(device.device == "")
1841            self.assertTrue(device.negotiated_speed == int(device.negotiated_speed))
1842            self.assertTrue(device.max_speed == int(device.max_speed))
1843            self.assertTrue(device.transport_id == int(device.transport_id))
1844
1845            proc.terminate()
1846
1847class DevicesListing(DeviceTest):
1848
1849    def test_track_app_appinfo(self):
1850        subprocess.check_output(['adb', 'install', '-r', '-t', 'adb_test_app1.apk']).strip().decode("utf-8")
1851        subprocess.check_output(['adb', 'install', '-r', '-t', 'adb_test_app2.apk']).strip().decode("utf-8")
1852        subprocess.check_output(['adb', 'shell', 'am', 'start', '-W', 'adb.test.app1/.MainActivity']).strip().decode("utf-8")
1853        subprocess.check_output(['adb', 'shell', 'am', 'start', '-W', 'adb.test.app2/.MainActivity']).strip().decode("utf-8")
1854        subprocess.check_output(['adb', 'shell', 'am', 'start', '-W', 'adb.test.app1/.OwnProcessActivity']).strip().decode("utf-8")
1855        with subprocess.Popen(['adb', 'track-app', '--proto-binary'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
1856            output_size = int(proc.stdout.read(4).decode("utf-8"), 16)
1857            proto = proc.stdout.read(output_size)
1858
1859            apps = proto_track_app.AppProcesses()
1860            apps.ParseFromString(proto)
1861
1862            foundAdbAppDefProc = False
1863            foundAdbAppOwnProc = False
1864            for app in apps.process:
1865                if (app.process_name == "adb.test.process.name"):
1866                    foundAdbAppDefProc = True
1867                    self.assertTrue(app.debuggable)
1868                    self.assertTrue("adb.test.app1" in app.package_names)
1869                    self.assertTrue("adb.test.app2" in app.package_names)
1870
1871                if (app.process_name == "adb.test.own.process"):
1872                    foundAdbAppOwnProc = True
1873                    self.assertTrue(app.debuggable)
1874                    self.assertTrue("adb.test.app1" in app.package_names)
1875
1876            self.assertTrue(foundAdbAppDefProc)
1877            self.assertTrue(foundAdbAppOwnProc)
1878            proc.terminate()
1879
1880class ServerStatus(unittest.TestCase):
1881    def test_server_status(self):
1882        with subprocess.Popen(['adb', 'server-status'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:
1883            lines = list(map(lambda b: b.decode("utf-8"), proc.stdout.readlines()))
1884            self.assertTrue("usb_backend" in lines[0])
1885            self.assertTrue("mdns_backend" in lines[1])
1886            self.assertTrue("version" in lines[2])
1887            self.assertTrue("build" in lines[3])
1888            self.assertTrue("executable_absolute_path" in lines[4])
1889            self.assertTrue("log_absolute_path" in lines[5])
1890
1891def invoke(*args):
1892    return subprocess.check_output(args).strip().decode("utf-8")
1893
1894class OneDevice(unittest.TestCase):
1895
1896    serial = invoke("adb", "get-serialno")
1897    owner_server_port = "14424"
1898
1899    def test_one_device(self):
1900        invoke("adb", "kill-server")
1901        invoke("adb", "--one-device", self.serial, "-P", self.owner_server_port, "start-server")
1902        devices = invoke("adb", "devices")
1903        owned_devices = invoke("adb",  "-P", "14424", "devices")
1904        self.assertTrue(self.serial in owned_devices)
1905        self.assertFalse(self.serial in devices)
1906
1907    def tearDown(self):
1908        invoke("adb",  "-P", self.owner_server_port, "kill-server")
1909        invoke("adb",  "kill-server")
1910
1911if __name__ == '__main__':
1912    random.seed(0)
1913    unittest.main()
1914