1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Copyright (C) 2015 The Android Open Source Project 5# 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18from __future__ import print_function 19 20import contextlib 21import hashlib 22import io 23import os 24import posixpath 25import random 26import re 27import shlex 28import shutil 29import signal 30import socket 31import string 32import subprocess 33import sys 34import tempfile 35import threading 36import time 37import unittest 38 39import adb_host_pb2 as adb_host_proto 40 41from datetime import datetime 42 43import adb 44 45def requires_non_root(func): 46 def wrapper(self, *args): 47 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' 48 if was_root: 49 self.device.unroot() 50 self.device.wait() 51 52 try: 53 func(self, *args) 54 finally: 55 if was_root: 56 self.device.root() 57 self.device.wait() 58 59 return wrapper 60 61 62class DeviceTest(unittest.TestCase): 63 def setUp(self) -> None: 64 self.device = adb.get_device() 65 66 67class AbbTest(DeviceTest): 68 def test_smoke(self): 69 abb = subprocess.run(['adb', 'abb'], capture_output=True) 70 cmd = subprocess.run(['adb', 'shell', 'cmd'], capture_output=True) 71 72 # abb squashes all failures to 1. 73 self.assertEqual(abb.returncode == 0, cmd.returncode == 0) 74 self.assertEqual(abb.stdout, cmd.stdout) 75 self.assertEqual(abb.stderr, cmd.stderr) 76 77class ForwardReverseTest(DeviceTest): 78 def _test_no_rebind(self, description, direction_list, direction, 79 direction_no_rebind, direction_remove_all): 80 msg = direction_list() 81 self.assertEqual('', msg.strip(), 82 description + ' list must be empty to run this test.') 83 84 # Use --no-rebind with no existing binding 85 direction_no_rebind('tcp:5566', 'tcp:6655') 86 msg = direction_list() 87 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 88 89 # Use --no-rebind with existing binding 90 with self.assertRaises(subprocess.CalledProcessError): 91 direction_no_rebind('tcp:5566', 'tcp:6677') 92 msg = direction_list() 93 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg)) 94 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 95 96 # Use the absence of --no-rebind with existing binding 97 direction('tcp:5566', 'tcp:6677') 98 msg = direction_list() 99 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 100 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg)) 101 102 direction_remove_all() 103 msg = direction_list() 104 self.assertEqual('', msg.strip()) 105 106 def test_forward_no_rebind(self): 107 self._test_no_rebind('forward', self.device.forward_list, 108 self.device.forward, self.device.forward_no_rebind, 109 self.device.forward_remove_all) 110 111 def test_reverse_no_rebind(self): 112 self._test_no_rebind('reverse', self.device.reverse_list, 113 self.device.reverse, self.device.reverse_no_rebind, 114 self.device.reverse_remove_all) 115 116 def test_forward(self): 117 msg = self.device.forward_list() 118 self.assertEqual('', msg.strip(), 119 'Forwarding list must be empty to run this test.') 120 self.device.forward('tcp:5566', 'tcp:6655') 121 msg = self.device.forward_list() 122 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 123 self.device.forward('tcp:7788', 'tcp:8877') 124 msg = self.device.forward_list() 125 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 126 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 127 self.device.forward_remove('tcp:5566') 128 msg = self.device.forward_list() 129 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 130 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 131 self.device.forward_remove_all() 132 msg = self.device.forward_list() 133 self.assertEqual('', msg.strip()) 134 135 def test_forward_old_protocol(self): 136 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 137 138 msg = self.device.forward_list() 139 self.assertEqual('', msg.strip(), 140 'Forwarding list must be empty to run this test.') 141 142 with socket.create_connection(("localhost", 5037)) as s: 143 service = b"host-serial:%s:forward:tcp:5566;tcp:6655" % serialno 144 cmd = b"%04x%s" % (len(service), service) 145 s.sendall(cmd) 146 147 msg = self.device.forward_list() 148 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 149 150 self.device.forward_remove_all() 151 msg = self.device.forward_list() 152 self.assertEqual('', msg.strip()) 153 154 def test_forward_tcp_port_0(self): 155 self.assertEqual('', self.device.forward_list().strip(), 156 'Forwarding list must be empty to run this test.') 157 158 try: 159 # If resolving TCP port 0 is supported, `adb forward` will print 160 # the actual port number. 161 port = self.device.forward('tcp:0', 'tcp:8888').strip() 162 if not port: 163 raise unittest.SkipTest('Forwarding tcp:0 is not available.') 164 165 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 166 self.device.forward_list())) 167 finally: 168 self.device.forward_remove_all() 169 170 def test_reverse(self): 171 msg = self.device.reverse_list() 172 self.assertEqual('', msg.strip(), 173 'Reverse forwarding list must be empty to run this test.') 174 self.device.reverse('tcp:5566', 'tcp:6655') 175 msg = self.device.reverse_list() 176 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 177 self.device.reverse('tcp:7788', 'tcp:8877') 178 msg = self.device.reverse_list() 179 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg)) 180 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 181 self.device.reverse_remove('tcp:5566') 182 msg = self.device.reverse_list() 183 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg)) 184 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg)) 185 self.device.reverse_remove_all() 186 msg = self.device.reverse_list() 187 self.assertEqual('', msg.strip()) 188 189 def test_reverse_tcp_port_0(self): 190 self.assertEqual('', self.device.reverse_list().strip(), 191 'Reverse list must be empty to run this test.') 192 193 try: 194 # If resolving TCP port 0 is supported, `adb reverse` will print 195 # the actual port number. 196 port = self.device.reverse('tcp:0', 'tcp:8888').strip() 197 if not port: 198 raise unittest.SkipTest('Reversing tcp:0 is not available.') 199 200 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port), 201 self.device.reverse_list())) 202 finally: 203 self.device.reverse_remove_all() 204 205 def test_forward_reverse_echo(self): 206 """Send data through adb forward and read it back via adb reverse""" 207 forward_port = 12345 208 reverse_port = forward_port + 1 209 forward_spec = 'tcp:' + str(forward_port) 210 reverse_spec = 'tcp:' + str(reverse_port) 211 forward_setup = False 212 reverse_setup = False 213 214 try: 215 # listen on localhost:forward_port, connect to remote:forward_port 216 self.device.forward(forward_spec, forward_spec) 217 forward_setup = True 218 # listen on remote:forward_port, connect to localhost:reverse_port 219 self.device.reverse(forward_spec, reverse_spec) 220 reverse_setup = True 221 222 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 223 with contextlib.closing(listener): 224 # Use SO_REUSEADDR so that subsequent runs of the test can grab 225 # the port even if it is in TIME_WAIT. 226 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 227 228 # Listen on localhost:reverse_port before connecting to 229 # localhost:forward_port because that will cause adb to connect 230 # back to localhost:reverse_port. 231 listener.bind(('127.0.0.1', reverse_port)) 232 listener.listen(4) 233 234 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 235 with contextlib.closing(client): 236 # Connect to the listener. 237 client.connect(('127.0.0.1', forward_port)) 238 239 # Accept the client connection. 240 accepted_connection, addr = listener.accept() 241 with contextlib.closing(accepted_connection) as server: 242 data = b'hello' 243 244 # Send data into the port setup by adb forward. 245 client.sendall(data) 246 # Explicitly close() so that server gets EOF. 247 client.close() 248 249 # Verify that the data came back via adb reverse. 250 self.assertEqual(data, server.makefile().read().encode("utf8")) 251 finally: 252 if reverse_setup: 253 self.device.reverse_remove(forward_spec) 254 if forward_setup: 255 self.device.forward_remove(forward_spec) 256 257 258class ShellTest(DeviceTest): 259 def _interactive_shell(self, shell_args, input): 260 """Runs an interactive adb shell. 261 262 Args: 263 shell_args: List of string arguments to `adb shell`. 264 input: bytes input to send to the interactive shell. 265 266 Returns: 267 The remote exit code. 268 269 Raises: 270 unittest.SkipTest: The device doesn't support exit codes. 271 """ 272 if not self.device.has_shell_protocol(): 273 raise unittest.SkipTest('exit codes are unavailable on this device') 274 275 proc = subprocess.Popen( 276 self.device.adb_cmd + ['shell'] + shell_args, 277 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 278 stderr=subprocess.PIPE) 279 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need 280 # to explicitly add an exit command to close the session from the device 281 # side, plus the necessary newline to complete the interactive command. 282 proc.communicate(input + b'; exit\n') 283 return proc.returncode 284 285 def test_cat(self): 286 """Check that we can at least cat a file.""" 287 out = self.device.shell(['cat', '/proc/uptime'])[0].strip() 288 elements = out.split() 289 self.assertEqual(len(elements), 2) 290 291 uptime, idle = elements 292 self.assertGreater(float(uptime), 0.0) 293 self.assertGreater(float(idle), 0.0) 294 295 def test_throws_on_failure(self): 296 self.assertRaises(adb.ShellError, self.device.shell, ['false']) 297 298 def test_output_not_stripped(self): 299 out = self.device.shell(['echo', 'foo'])[0] 300 self.assertEqual(out, 'foo' + self.device.linesep) 301 302 def test_shell_command_length(self): 303 # Devices that have shell_v2 should be able to handle long commands. 304 if self.device.has_shell_protocol(): 305 rc, out, err = self.device.shell_nocheck(['echo', 'x' * 16384]) 306 self.assertEqual(rc, 0) 307 self.assertTrue(out == ('x' * 16384 + '\n')) 308 309 def test_shell_nocheck_failure(self): 310 rc, out, _ = self.device.shell_nocheck(['false']) 311 self.assertNotEqual(rc, 0) 312 self.assertEqual(out, '') 313 314 def test_shell_nocheck_output_not_stripped(self): 315 rc, out, _ = self.device.shell_nocheck(['echo', 'foo']) 316 self.assertEqual(rc, 0) 317 self.assertEqual(out, 'foo' + self.device.linesep) 318 319 def test_can_distinguish_tricky_results(self): 320 # If result checking on ADB shell is naively implemented as 321 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the 322 # output from the result for a cmd of `echo -n 1`. 323 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1']) 324 self.assertEqual(rc, 0) 325 self.assertEqual(out, '1') 326 327 def test_line_endings(self): 328 """Ensure that line ending translation is not happening in the pty. 329 330 Bug: http://b/19735063 331 """ 332 output = self.device.shell(['uname'])[0] 333 self.assertEqual(output, 'Linux' + self.device.linesep) 334 335 def test_pty_logic(self): 336 """Tests that a PTY is allocated when it should be. 337 338 PTY allocation behavior should match ssh. 339 """ 340 def check_pty(args): 341 """Checks adb shell PTY allocation. 342 343 Tests |args| for terminal and non-terminal stdin. 344 345 Args: 346 args: -Tt args in a list (e.g. ['-t', '-t']). 347 348 Returns: 349 A tuple (<terminal>, <non-terminal>). True indicates 350 the corresponding shell allocated a remote PTY. 351 """ 352 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]'] 353 354 terminal = subprocess.Popen( 355 test_cmd, stdin=None, 356 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 357 terminal.communicate() 358 359 non_terminal = subprocess.Popen( 360 test_cmd, stdin=subprocess.PIPE, 361 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 362 non_terminal.communicate() 363 364 return (terminal.returncode == 0, non_terminal.returncode == 0) 365 366 # -T: never allocate PTY. 367 self.assertEqual((False, False), check_pty(['-T'])) 368 369 # These tests require a new device. 370 if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()): 371 # No args: PTY only if stdin is a terminal and shell is interactive, 372 # which is difficult to reliably test from a script. 373 self.assertEqual((False, False), check_pty([])) 374 375 # -t: PTY if stdin is a terminal. 376 self.assertEqual((True, False), check_pty(['-t'])) 377 378 # -t -t: always allocate PTY. 379 self.assertEqual((True, True), check_pty(['-t', '-t'])) 380 381 # -tt: always allocate PTY, POSIX style (http://b/32216152). 382 self.assertEqual((True, True), check_pty(['-tt'])) 383 384 # -ttt: ssh has weird even/odd behavior with multiple -t flags, but 385 # we follow the man page instead. 386 self.assertEqual((True, True), check_pty(['-ttt'])) 387 388 # -ttx: -x and -tt aren't incompatible (though -Tx would be an error). 389 self.assertEqual((True, True), check_pty(['-ttx'])) 390 391 # -Ttt: -tt cancels out -T. 392 self.assertEqual((True, True), check_pty(['-Ttt'])) 393 394 # -ttT: -T cancels out -tt. 395 self.assertEqual((False, False), check_pty(['-ttT'])) 396 397 def test_shell_protocol(self): 398 """Tests the shell protocol on the device. 399 400 If the device supports shell protocol, this gives us the ability 401 to separate stdout/stderr and return the exit code directly. 402 403 Bug: http://b/19734861 404 """ 405 if not self.device.has_shell_protocol(): 406 raise unittest.SkipTest('shell protocol unsupported on this device') 407 408 # Shell protocol should be used by default. 409 result = self.device.shell_nocheck( 410 shlex.split('echo foo; echo bar >&2; exit 17')) 411 self.assertEqual(17, result[0]) 412 self.assertEqual('foo' + self.device.linesep, result[1]) 413 self.assertEqual('bar' + self.device.linesep, result[2]) 414 415 self.assertEqual(17, self._interactive_shell([], b'exit 17')) 416 417 # -x flag should disable shell protocol. 418 result = self.device.shell_nocheck( 419 shlex.split('-x echo foo; echo bar >&2; exit 17')) 420 self.assertEqual(0, result[0]) 421 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1]) 422 self.assertEqual('', result[2]) 423 424 self.assertEqual(0, self._interactive_shell(['-x'], b'exit 17')) 425 426 def test_non_interactive_sigint(self): 427 """Tests that SIGINT in a non-interactive shell kills the process. 428 429 This requires the shell protocol in order to detect the broken 430 pipe; raw data transfer mode will only see the break once the 431 subprocess tries to read or write. 432 433 Bug: http://b/23825725 434 """ 435 if not self.device.has_shell_protocol(): 436 raise unittest.SkipTest('shell protocol unsupported on this device') 437 438 # Start a long-running process. 439 sleep_proc = subprocess.Popen( 440 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'), 441 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 442 stderr=subprocess.STDOUT) 443 remote_pid = sleep_proc.stdout.readline().strip().decode("utf8") 444 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early') 445 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid)) 446 447 # Verify that the process is running, send signal, verify it stopped. 448 self.device.shell(proc_query) 449 os.kill(sleep_proc.pid, signal.SIGINT) 450 sleep_proc.communicate() 451 452 # It can take some time for the process to receive the signal and die. 453 end_time = time.time() + 3 454 while self.device.shell_nocheck(proc_query)[0] != 1: 455 self.assertFalse(time.time() > end_time, 456 'subprocess failed to terminate in time') 457 458 def test_non_interactive_stdin(self): 459 """Tests that non-interactive shells send stdin.""" 460 if not self.device.has_shell_protocol(): 461 raise unittest.SkipTest('non-interactive stdin unsupported ' 462 'on this device') 463 464 # Test both small and large inputs. 465 small_input = b'foo' 466 characters = [c.encode("utf8") for c in string.ascii_letters + string.digits] 467 large_input = b'\n'.join(characters) 468 469 470 for input in (small_input, large_input): 471 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'], 472 stdin=subprocess.PIPE, 473 stdout=subprocess.PIPE, 474 stderr=subprocess.PIPE) 475 stdout, stderr = proc.communicate(input) 476 self.assertEqual(input.splitlines(), stdout.splitlines()) 477 self.assertEqual(b'', stderr) 478 479 def test_sighup(self): 480 """Ensure that SIGHUP gets sent upon non-interactive ctrl-c""" 481 log_path = "/data/local/tmp/adb_signal_test.log" 482 483 # Clear the output file. 484 self.device.shell_nocheck(["echo", ">", log_path]) 485 486 script = """ 487 trap "echo SIGINT > {path}; exit 0" SIGINT 488 trap "echo SIGHUP > {path}; exit 0" SIGHUP 489 echo Waiting 490 read 491 """.format(path=log_path) 492 493 script = ";".join([x.strip() for x in script.strip().splitlines()]) 494 495 with self.device.shell_popen([script], kill_atexit=False, 496 stdin=subprocess.PIPE, 497 stdout=subprocess.PIPE) as process: 498 499 self.assertEqual(b"Waiting\n", process.stdout.readline()) 500 process.send_signal(signal.SIGINT) 501 process.wait() 502 503 # Waiting for the local adb to finish is insufficient, since it hangs 504 # up immediately. 505 time.sleep(1) 506 507 stdout, _ = self.device.shell(["cat", log_path]) 508 self.assertEqual(stdout.strip(), "SIGHUP") 509 510 # Temporarily disabled because it seems to cause later instability. 511 # http://b/228114748 512 def disabled_test_exit_stress(self): 513 """Hammer `adb shell exit 42` with multiple threads.""" 514 thread_count = 48 515 result = dict() 516 def hammer(thread_idx, thread_count, result): 517 success = True 518 for i in range(thread_idx, 240, thread_count): 519 ret = subprocess.call(['adb', 'shell', 'exit {}'.format(i)]) 520 if ret != i % 256: 521 success = False 522 break 523 result[thread_idx] = success 524 525 threads = [] 526 for i in range(thread_count): 527 thread = threading.Thread(target=hammer, args=(i, thread_count, result)) 528 thread.start() 529 threads.append(thread) 530 for thread in threads: 531 thread.join() 532 for i, success in result.items(): 533 self.assertTrue(success) 534 535 def disabled_test_parallel(self): 536 """Spawn a bunch of `adb shell` instances in parallel. 537 538 This was broken historically due to the use of select, which only works 539 for fds that are numerically less than 1024. 540 541 Bug: http://b/141955761""" 542 543 n_procs = 2048 544 procs = dict() 545 for i in range(0, n_procs): 546 procs[i] = subprocess.Popen( 547 ['adb', 'shell', 'read foo; echo $foo; read rc; exit $rc'], 548 stdin=subprocess.PIPE, 549 stdout=subprocess.PIPE 550 ) 551 552 for i in range(0, n_procs): 553 procs[i].stdin.write("%d\n" % i) 554 555 for i in range(0, n_procs): 556 response = procs[i].stdout.readline() 557 assert(response == "%d\n" % i) 558 559 for i in range(0, n_procs): 560 procs[i].stdin.write("%d\n" % (i % 256)) 561 562 for i in range(0, n_procs): 563 assert(procs[i].wait() == i % 256) 564 565 566class ArgumentEscapingTest(DeviceTest): 567 def test_shell_escaping(self): 568 """Make sure that argument escaping is somewhat sane.""" 569 570 # http://b/19734868 571 # Note that this actually matches ssh(1)'s behavior --- it's 572 # converted to `sh -c echo hello; echo world` which sh interprets 573 # as `sh -c echo` (with an argument to that shell of "hello"), 574 # and then `echo world` back in the first shell. 575 result = self.device.shell( 576 shlex.split("sh -c 'echo hello; echo world'"))[0] 577 result = result.splitlines() 578 self.assertEqual(['', 'world'], result) 579 # If you really wanted "hello" and "world", here's what you'd do: 580 result = self.device.shell( 581 shlex.split(r'echo hello\;echo world'))[0].splitlines() 582 self.assertEqual(['hello', 'world'], result) 583 584 # http://b/15479704 585 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip() 586 self.assertEqual('t', result) 587 result = self.device.shell( 588 shlex.split("sh -c 'true && echo t'"))[0].strip() 589 self.assertEqual('t', result) 590 591 # http://b/20564385 592 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip() 593 self.assertEqual('t', result) 594 result = self.device.shell( 595 shlex.split(r'echo -n 123\;uname'))[0].strip() 596 self.assertEqual('123Linux', result) 597 598 def test_install_argument_escaping(self): 599 """Make sure that install argument escaping works.""" 600 # http://b/20323053, http://b/3090932. 601 for file_suffix in (b'-text;ls;1.apk', b"-Live Hold'em.apk"): 602 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix, 603 delete=False) 604 tf.close() 605 606 # Installing bogus .apks fails if the device supports exit codes. 607 try: 608 output = self.device.install(tf.name.decode("utf8")) 609 except subprocess.CalledProcessError as e: 610 output = e.output 611 612 self.assertIn(file_suffix, output) 613 os.remove(tf.name) 614 615 616class RootUnrootTest(DeviceTest): 617 def _test_root(self): 618 message = self.device.root() 619 if 'adbd cannot run as root in production builds' in message: 620 return 621 self.device.wait() 622 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip()) 623 624 def _test_unroot(self): 625 self.device.unroot() 626 self.device.wait() 627 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip()) 628 629 def test_root_unroot(self): 630 """Make sure that adb root and adb unroot work, using id(1).""" 631 if self.device.get_prop('ro.debuggable') != '1': 632 raise unittest.SkipTest('requires rootable build') 633 634 original_user = self.device.shell(['id', '-un'])[0].strip() 635 try: 636 if original_user == 'root': 637 self._test_unroot() 638 self._test_root() 639 elif original_user == 'shell': 640 self._test_root() 641 self._test_unroot() 642 finally: 643 if original_user == 'root': 644 self.device.root() 645 else: 646 self.device.unroot() 647 self.device.wait() 648 649 650class TcpIpTest(DeviceTest): 651 def test_tcpip_failure_raises(self): 652 """adb tcpip requires a port. 653 654 Bug: http://b/22636927 655 """ 656 self.assertRaises( 657 subprocess.CalledProcessError, self.device.tcpip, '') 658 self.assertRaises( 659 subprocess.CalledProcessError, self.device.tcpip, 'foo') 660 661 662class SystemPropertiesTest(DeviceTest): 663 def test_get_prop(self): 664 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running') 665 666 def test_set_prop(self): 667 # debug.* prop does not require root privileges 668 prop_name = 'debug.foo' 669 self.device.shell(['setprop', prop_name, '""']) 670 671 val = random.random() 672 self.device.set_prop(prop_name, str(val)) 673 self.assertEqual( 674 self.device.shell(['getprop', prop_name])[0].strip(), str(val)) 675 676 677def compute_md5(string): 678 hsh = hashlib.md5() 679 hsh.update(string) 680 return hsh.hexdigest() 681 682 683class HostFile(object): 684 def __init__(self, handle, checksum): 685 self.handle = handle 686 self.checksum = checksum 687 self.full_path = handle.name 688 self.base_name = os.path.basename(self.full_path) 689 690 691class DeviceFile(object): 692 def __init__(self, checksum, full_path): 693 self.checksum = checksum 694 self.full_path = full_path 695 self.base_name = posixpath.basename(self.full_path) 696 697 698def make_random_host_files(in_dir, num_files): 699 min_size = 1 * (1 << 10) 700 max_size = 16 * (1 << 10) 701 702 files = [] 703 for _ in range(num_files): 704 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False) 705 706 size = random.randrange(min_size, max_size, 1024) 707 rand_str = os.urandom(size) 708 file_handle.write(rand_str) 709 file_handle.flush() 710 file_handle.close() 711 712 md5 = compute_md5(rand_str) 713 files.append(HostFile(file_handle, md5)) 714 return files 715 716 717def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'): 718 min_size = 1 * (1 << 10) 719 max_size = 16 * (1 << 10) 720 721 files = [] 722 for file_num in range(num_files): 723 size = random.randrange(min_size, max_size, 1024) 724 725 base_name = prefix + str(file_num) 726 full_path = posixpath.join(in_dir, base_name) 727 728 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path), 729 'bs={}'.format(size), 'count=1']) 730 dev_md5, _ = device.shell(['md5sum', full_path])[0].split() 731 732 files.append(DeviceFile(dev_md5, full_path)) 733 return files 734 735 736class FileOperationsTest: 737 class Base(DeviceTest): 738 SCRATCH_DIR = '/data/local/tmp' 739 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file' 740 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir' 741 742 def setUp(self): 743 super().setUp() 744 self.previous_env = os.environ.get("ADB_COMPRESSION") 745 os.environ["ADB_COMPRESSION"] = self.compression 746 747 def tearDown(self): 748 if self.previous_env is None: 749 del os.environ["ADB_COMPRESSION"] 750 else: 751 os.environ["ADB_COMPRESSION"] = self.previous_env 752 753 def _verify_remote(self, checksum, remote_path): 754 dev_md5, _ = self.device.shell(['md5sum', remote_path])[0].split() 755 self.assertEqual(checksum, dev_md5) 756 757 def _verify_local(self, checksum, local_path): 758 with open(local_path, 'rb') as host_file: 759 host_md5 = compute_md5(host_file.read()) 760 self.assertEqual(host_md5, checksum) 761 762 def test_push(self): 763 """Push a randomly generated file to specified device.""" 764 kbytes = 512 765 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False) 766 rand_str = os.urandom(1024 * kbytes) 767 tmp.write(rand_str) 768 tmp.close() 769 770 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 771 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE) 772 773 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE) 774 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 775 776 os.remove(tmp.name) 777 778 def test_push_dir(self): 779 """Push a randomly generated directory of files to the device.""" 780 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 781 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 782 783 try: 784 host_dir = tempfile.mkdtemp() 785 786 # Make sure the temp directory isn't setuid, or else adb will complain. 787 os.chmod(host_dir, 0o700) 788 789 # Create 32 random files. 790 temp_files = make_random_host_files(in_dir=host_dir, num_files=32) 791 self.device.push(host_dir, self.DEVICE_TEMP_DIR) 792 793 for temp_file in temp_files: 794 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 795 os.path.basename(host_dir), 796 temp_file.base_name) 797 self._verify_remote(temp_file.checksum, remote_path) 798 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 799 finally: 800 if host_dir is not None: 801 shutil.rmtree(host_dir) 802 803 def disabled_test_push_empty(self): 804 """Push an empty directory to the device.""" 805 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 806 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 807 808 try: 809 host_dir = tempfile.mkdtemp() 810 811 # Make sure the temp directory isn't setuid, or else adb will complain. 812 os.chmod(host_dir, 0o700) 813 814 # Create an empty directory. 815 empty_dir_path = os.path.join(host_dir, 'empty') 816 os.mkdir(empty_dir_path); 817 818 self.device.push(empty_dir_path, self.DEVICE_TEMP_DIR) 819 820 remote_path = os.path.join(self.DEVICE_TEMP_DIR, "empty") 821 test_empty_cmd = ["[", "-d", remote_path, "]"] 822 rc, _, _ = self.device.shell_nocheck(test_empty_cmd) 823 824 self.assertEqual(rc, 0) 825 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 826 finally: 827 if host_dir is not None: 828 shutil.rmtree(host_dir) 829 830 @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows") 831 def test_push_symlink(self): 832 """Push a symlink. 833 834 Bug: http://b/31491920 835 """ 836 try: 837 host_dir = tempfile.mkdtemp() 838 839 # Make sure the temp directory isn't setuid, or else adb will 840 # complain. 841 os.chmod(host_dir, 0o700) 842 843 with open(os.path.join(host_dir, 'foo'), 'w') as f: 844 f.write('foo') 845 846 symlink_path = os.path.join(host_dir, 'symlink') 847 os.symlink('foo', symlink_path) 848 849 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 850 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 851 self.device.push(symlink_path, self.DEVICE_TEMP_DIR) 852 rc, out, _ = self.device.shell_nocheck( 853 ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')]) 854 self.assertEqual(0, rc) 855 self.assertEqual(out.strip(), 'foo') 856 finally: 857 if host_dir is not None: 858 shutil.rmtree(host_dir) 859 860 def test_multiple_push(self): 861 """Push multiple files to the device in one adb push command. 862 863 Bug: http://b/25324823 864 """ 865 866 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 867 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR]) 868 869 try: 870 host_dir = tempfile.mkdtemp() 871 872 # Create some random files and a subdirectory containing more files. 873 temp_files = make_random_host_files(in_dir=host_dir, num_files=4) 874 875 subdir = os.path.join(host_dir, 'subdir') 876 os.mkdir(subdir) 877 subdir_temp_files = make_random_host_files(in_dir=subdir, 878 num_files=4) 879 880 paths = [x.full_path for x in temp_files] 881 paths.append(subdir) 882 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR]) 883 884 for temp_file in temp_files: 885 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 886 temp_file.base_name) 887 self._verify_remote(temp_file.checksum, remote_path) 888 889 for subdir_temp_file in subdir_temp_files: 890 remote_path = posixpath.join(self.DEVICE_TEMP_DIR, 891 # BROKEN: http://b/25394682 892 # 'subdir'; 893 temp_file.base_name) 894 self._verify_remote(temp_file.checksum, remote_path) 895 896 897 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 898 finally: 899 if host_dir is not None: 900 shutil.rmtree(host_dir) 901 902 @requires_non_root 903 def test_push_error_reporting(self): 904 """Make sure that errors that occur while pushing a file get reported 905 906 Bug: http://b/26816782 907 """ 908 with tempfile.NamedTemporaryFile() as tmp_file: 909 tmp_file.write(b'\0' * 1024 * 1024) 910 tmp_file.flush() 911 try: 912 self.device.push(local=tmp_file.name, remote='/system/') 913 self.fail('push should not have succeeded') 914 except subprocess.CalledProcessError as e: 915 output = e.output 916 917 self.assertTrue(b'Permission denied' in output or 918 b'Read-only file system' in output) 919 920 @requires_non_root 921 def test_push_directory_creation(self): 922 """Regression test for directory creation. 923 924 Bug: http://b/110953234 925 """ 926 with tempfile.NamedTemporaryFile() as tmp_file: 927 tmp_file.write(b'\0' * 1024 * 1024) 928 tmp_file.flush() 929 remote_path = self.DEVICE_TEMP_DIR + '/test_push_directory_creation' 930 self.device.shell(['rm', '-rf', remote_path]) 931 932 remote_path += '/filename' 933 self.device.push(local=tmp_file.name, remote=remote_path) 934 935 def disabled_test_push_multiple_slash_root(self): 936 """Regression test for pushing to //data/local/tmp. 937 938 Bug: http://b/141311284 939 940 Disabled because this broken on the adbd side as well: b/141943968 941 """ 942 with tempfile.NamedTemporaryFile() as tmp_file: 943 tmp_file.write(b'\0' * 1024 * 1024) 944 tmp_file.flush() 945 remote_path = '/' + self.DEVICE_TEMP_DIR + '/test_push_multiple_slash_root' 946 self.device.shell(['rm', '-rf', remote_path]) 947 self.device.push(local=tmp_file.name, remote=remote_path) 948 949 def _test_pull(self, remote_file, checksum): 950 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False) 951 tmp_write.close() 952 self.device.pull(remote=remote_file, local=tmp_write.name) 953 with open(tmp_write.name, 'rb') as tmp_read: 954 host_contents = tmp_read.read() 955 host_md5 = compute_md5(host_contents) 956 self.assertEqual(checksum, host_md5) 957 os.remove(tmp_write.name) 958 959 @requires_non_root 960 def test_pull_error_reporting(self): 961 self.device.shell(['touch', self.DEVICE_TEMP_FILE]) 962 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE]) 963 964 try: 965 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x') 966 except subprocess.CalledProcessError as e: 967 output = e.output 968 969 self.assertIn(b'Permission denied', output) 970 971 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) 972 973 def test_pull(self): 974 """Pull a randomly generated file from specified device.""" 975 kbytes = 512 976 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) 977 cmd = ['dd', 'if=/dev/urandom', 978 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024', 979 'count={}'.format(kbytes)] 980 self.device.shell(cmd) 981 dev_md5, _ = self.device.shell(['md5sum', self.DEVICE_TEMP_FILE])[0].split() 982 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5) 983 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE]) 984 985 def test_pull_dir(self): 986 """Pull a randomly generated directory of files from the device.""" 987 try: 988 host_dir = tempfile.mkdtemp() 989 990 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 991 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 992 993 # Populate device directory with random files. 994 temp_files = make_random_device_files( 995 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 996 997 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 998 999 for temp_file in temp_files: 1000 host_path = os.path.join( 1001 host_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 1002 temp_file.base_name) 1003 self._verify_local(temp_file.checksum, host_path) 1004 1005 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1006 finally: 1007 if host_dir is not None: 1008 shutil.rmtree(host_dir) 1009 1010 def test_pull_dir_symlink(self): 1011 """Pull a directory into a symlink to a directory. 1012 1013 Bug: http://b/27362811 1014 """ 1015 if os.name != 'posix': 1016 raise unittest.SkipTest('requires POSIX') 1017 1018 try: 1019 host_dir = tempfile.mkdtemp() 1020 real_dir = os.path.join(host_dir, 'dir') 1021 symlink = os.path.join(host_dir, 'symlink') 1022 os.mkdir(real_dir) 1023 os.symlink(real_dir, symlink) 1024 1025 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1026 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1027 1028 # Populate device directory with random files. 1029 temp_files = make_random_device_files( 1030 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1031 1032 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink) 1033 1034 for temp_file in temp_files: 1035 host_path = os.path.join( 1036 real_dir, posixpath.basename(self.DEVICE_TEMP_DIR), 1037 temp_file.base_name) 1038 self._verify_local(temp_file.checksum, host_path) 1039 1040 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1041 finally: 1042 if host_dir is not None: 1043 shutil.rmtree(host_dir) 1044 1045 def test_pull_dir_symlink_collision(self): 1046 """Pull a directory into a colliding symlink to directory.""" 1047 if os.name != 'posix': 1048 raise unittest.SkipTest('requires POSIX') 1049 1050 try: 1051 host_dir = tempfile.mkdtemp() 1052 real_dir = os.path.join(host_dir, 'real') 1053 tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR) 1054 symlink = os.path.join(host_dir, tmp_dirname) 1055 os.mkdir(real_dir) 1056 os.symlink(real_dir, symlink) 1057 1058 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1059 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1060 1061 # Populate device directory with random files. 1062 temp_files = make_random_device_files( 1063 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1064 1065 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) 1066 1067 for temp_file in temp_files: 1068 host_path = os.path.join(real_dir, temp_file.base_name) 1069 self._verify_local(temp_file.checksum, host_path) 1070 1071 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1072 finally: 1073 if host_dir is not None: 1074 shutil.rmtree(host_dir) 1075 1076 def test_pull_dir_nonexistent(self): 1077 """Pull a directory of files from the device to a nonexistent path.""" 1078 try: 1079 host_dir = tempfile.mkdtemp() 1080 dest_dir = os.path.join(host_dir, 'dest') 1081 1082 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1083 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) 1084 1085 # Populate device directory with random files. 1086 temp_files = make_random_device_files( 1087 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) 1088 1089 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir) 1090 1091 for temp_file in temp_files: 1092 host_path = os.path.join(dest_dir, temp_file.base_name) 1093 self._verify_local(temp_file.checksum, host_path) 1094 1095 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1096 finally: 1097 if host_dir is not None: 1098 shutil.rmtree(host_dir) 1099 1100 # selinux prevents adbd from accessing symlinks on /data/local/tmp. 1101 def disabled_test_pull_symlink_dir(self): 1102 """Pull a symlink to a directory of symlinks to files.""" 1103 try: 1104 host_dir = tempfile.mkdtemp() 1105 1106 remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents') 1107 remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links') 1108 remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink') 1109 1110 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1111 self.device.shell(['mkdir', '-p', remote_dir, remote_links]) 1112 self.device.shell(['ln', '-s', remote_links, remote_symlink]) 1113 1114 # Populate device directory with random files. 1115 temp_files = make_random_device_files( 1116 self.device, in_dir=remote_dir, num_files=32) 1117 1118 for temp_file in temp_files: 1119 self.device.shell( 1120 ['ln', '-s', '../contents/{}'.format(temp_file.base_name), 1121 posixpath.join(remote_links, temp_file.base_name)]) 1122 1123 self.device.pull(remote=remote_symlink, local=host_dir) 1124 1125 for temp_file in temp_files: 1126 host_path = os.path.join( 1127 host_dir, 'symlink', temp_file.base_name) 1128 self._verify_local(temp_file.checksum, host_path) 1129 1130 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1131 finally: 1132 if host_dir is not None: 1133 shutil.rmtree(host_dir) 1134 1135 def test_pull_empty(self): 1136 """Pull a directory containing an empty directory from the device.""" 1137 try: 1138 host_dir = tempfile.mkdtemp() 1139 1140 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty') 1141 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1142 self.device.shell(['mkdir', '-p', remote_empty_path]) 1143 1144 self.device.pull(remote=remote_empty_path, local=host_dir) 1145 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty'))) 1146 finally: 1147 if host_dir is not None: 1148 shutil.rmtree(host_dir) 1149 1150 def test_multiple_pull(self): 1151 """Pull a randomly generated directory of files from the device.""" 1152 1153 try: 1154 host_dir = tempfile.mkdtemp() 1155 1156 subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir') 1157 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1158 self.device.shell(['mkdir', '-p', subdir]) 1159 1160 # Create some random files and a subdirectory containing more files. 1161 temp_files = make_random_device_files( 1162 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4) 1163 1164 subdir_temp_files = make_random_device_files( 1165 self.device, in_dir=subdir, num_files=4, prefix='subdir_') 1166 1167 paths = [x.full_path for x in temp_files] 1168 paths.append(subdir) 1169 self.device._simple_call(['pull'] + paths + [host_dir]) 1170 1171 for temp_file in temp_files: 1172 local_path = os.path.join(host_dir, temp_file.base_name) 1173 self._verify_local(temp_file.checksum, local_path) 1174 1175 for subdir_temp_file in subdir_temp_files: 1176 local_path = os.path.join(host_dir, 1177 'subdir', 1178 subdir_temp_file.base_name) 1179 self._verify_local(subdir_temp_file.checksum, local_path) 1180 1181 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1182 finally: 1183 if host_dir is not None: 1184 shutil.rmtree(host_dir) 1185 1186 def verify_sync(self, device, temp_files, device_dir): 1187 """Verifies that a list of temp files was synced to the device.""" 1188 # Confirm that every file on the device mirrors that on the host. 1189 for temp_file in temp_files: 1190 device_full_path = posixpath.join( 1191 device_dir, temp_file.base_name) 1192 dev_md5, _ = device.shell(['md5sum', device_full_path])[0].split() 1193 self.assertEqual(temp_file.checksum, dev_md5) 1194 1195 def test_sync(self): 1196 """Sync a host directory to the data partition.""" 1197 1198 try: 1199 base_dir = tempfile.mkdtemp() 1200 1201 # Create mirror device directory hierarchy within base_dir. 1202 full_dir_path = base_dir + self.DEVICE_TEMP_DIR 1203 os.makedirs(full_dir_path) 1204 1205 # Create 32 random files within the host mirror. 1206 temp_files = make_random_host_files( 1207 in_dir=full_dir_path, num_files=32) 1208 1209 # Clean up any stale files on the device. 1210 device = adb.get_device() # pylint: disable=no-member 1211 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1212 1213 old_product_out = os.environ.get('ANDROID_PRODUCT_OUT') 1214 os.environ['ANDROID_PRODUCT_OUT'] = base_dir 1215 device.sync('data') 1216 if old_product_out is None: 1217 del os.environ['ANDROID_PRODUCT_OUT'] 1218 else: 1219 os.environ['ANDROID_PRODUCT_OUT'] = old_product_out 1220 1221 self.verify_sync(device, temp_files, self.DEVICE_TEMP_DIR) 1222 1223 #self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1224 finally: 1225 if base_dir is not None: 1226 shutil.rmtree(base_dir) 1227 1228 def test_push_sync(self): 1229 """Sync a host directory to a specific path.""" 1230 1231 try: 1232 temp_dir = tempfile.mkdtemp() 1233 temp_files = make_random_host_files(in_dir=temp_dir, num_files=32) 1234 1235 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst') 1236 1237 # Clean up any stale files on the device. 1238 device = adb.get_device() # pylint: disable=no-member 1239 device.shell(['rm', '-rf', device_dir]) 1240 1241 device.push(temp_dir, device_dir, sync=True) 1242 1243 self.verify_sync(device, temp_files, device_dir) 1244 1245 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1246 finally: 1247 if temp_dir is not None: 1248 shutil.rmtree(temp_dir) 1249 1250 def test_push_sync_multiple(self): 1251 """Sync multiple host directories to a specific path.""" 1252 1253 try: 1254 temp_dir = tempfile.mkdtemp() 1255 temp_files = make_random_host_files(in_dir=temp_dir, num_files=32) 1256 1257 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'sync_src_dst') 1258 1259 # Clean up any stale files on the device. 1260 device = adb.get_device() # pylint: disable=no-member 1261 device.shell(['rm', '-rf', device_dir]) 1262 device.shell(['mkdir', '-p', device_dir]) 1263 1264 host_paths = [os.path.join(temp_dir, x.base_name) for x in temp_files] 1265 device.push(host_paths, device_dir, sync=True) 1266 1267 self.verify_sync(device, temp_files, device_dir) 1268 1269 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1270 finally: 1271 if temp_dir is not None: 1272 shutil.rmtree(temp_dir) 1273 1274 1275 def test_push_dry_run_nonexistent_file(self): 1276 """Push with dry run (non-existent file).""" 1277 1278 for file_size in [8, 1024 * 1024]: 1279 try: 1280 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run') 1281 device_file = posixpath.join(device_dir, 'file') 1282 1283 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1284 self.device.shell(['mkdir', '-p', device_dir]) 1285 1286 host_dir = tempfile.mkdtemp() 1287 host_file = posixpath.join(host_dir, 'file') 1288 1289 with open(host_file, "w") as f: 1290 f.write('x' * file_size) 1291 1292 self.device._simple_call(['push', '-n', host_file, device_file]) 1293 rc, _, _ = self.device.shell_nocheck(['[', '-e', device_file, ']']) 1294 self.assertNotEqual(0, rc) 1295 1296 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1297 finally: 1298 if host_dir is not None: 1299 shutil.rmtree(host_dir) 1300 1301 def test_push_dry_run_existent_file(self): 1302 """Push with dry run.""" 1303 1304 for file_size in [8, 1024 * 1024]: 1305 try: 1306 host_dir = tempfile.mkdtemp() 1307 device_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'push_dry_run') 1308 device_file = posixpath.join(device_dir, 'file') 1309 1310 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1311 self.device.shell(['mkdir', '-p', device_dir]) 1312 self.device.shell(['echo', 'foo', '>', device_file]) 1313 1314 host_file = posixpath.join(host_dir, 'file') 1315 1316 with open(host_file, "w") as f: 1317 f.write('x' * file_size) 1318 1319 self.device._simple_call(['push', '-n', host_file, device_file]) 1320 stdout, stderr = self.device.shell(['cat', device_file]) 1321 self.assertEqual(stdout.strip(), "foo") 1322 1323 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) 1324 finally: 1325 if host_dir is not None: 1326 shutil.rmtree(host_dir) 1327 1328 def test_unicode_paths(self): 1329 """Ensure that we can support non-ASCII paths, even on Windows.""" 1330 name = u'로보카 폴리' 1331 1332 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1333 remote_path = u'/data/local/tmp/adb-test-{}'.format(name) 1334 1335 ## push. 1336 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False) 1337 tf.close() 1338 self.device.push(tf.name, remote_path) 1339 os.remove(tf.name) 1340 self.assertFalse(os.path.exists(tf.name)) 1341 1342 # Verify that the device ended up with the expected UTF-8 path 1343 output = self.device.shell( 1344 ['ls', '/data/local/tmp/adb-test-*'])[0].strip() 1345 self.assertEqual(remote_path, output) 1346 1347 # pull. 1348 self.device.pull(remote_path, tf.name) 1349 self.assertTrue(os.path.exists(tf.name)) 1350 os.remove(tf.name) 1351 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) 1352 1353 1354class FileOperationsTestUncompressed(FileOperationsTest.Base): 1355 compression = "none" 1356 1357 1358class FileOperationsTestBrotli(FileOperationsTest.Base): 1359 compression = "brotli" 1360 1361 1362class FileOperationsTestLZ4(FileOperationsTest.Base): 1363 compression = "lz4" 1364 1365 1366class FileOperationsTestZstd(FileOperationsTest.Base): 1367 compression = "zstd" 1368 1369 1370class DeviceOfflineTest(DeviceTest): 1371 def _get_device_state(self, serialno): 1372 output = subprocess.check_output(self.device.adb_cmd + ['devices']) 1373 for line in output.split('\n'): 1374 m = re.match('(\S+)\s+(\S+)', line) 1375 if m and m.group(1) == serialno: 1376 return m.group(2) 1377 return None 1378 1379 def disabled_test_killed_when_pushing_a_large_file(self): 1380 """ 1381 While running adb push with a large file, kill adb server. 1382 Occasionally the device becomes offline. Because the device is still 1383 reading data without realizing that the adb server has been restarted. 1384 Test if we can bring the device online automatically now. 1385 http://b/32952319 1386 """ 1387 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 1388 # 1. Push a large file 1389 file_path = 'tmp_large_file' 1390 try: 1391 fh = open(file_path, 'w') 1392 fh.write('\0' * (100 * 1024 * 1024)) 1393 fh.close() 1394 subproc = subprocess.Popen(self.device.adb_cmd + ['push', file_path, '/data/local/tmp']) 1395 time.sleep(0.1) 1396 # 2. Kill the adb server 1397 subprocess.check_call(self.device.adb_cmd + ['kill-server']) 1398 subproc.terminate() 1399 finally: 1400 try: 1401 os.unlink(file_path) 1402 except: 1403 pass 1404 # 3. See if the device still exist. 1405 # Sleep to wait for the adb server exit. 1406 time.sleep(0.5) 1407 # 4. The device should be online 1408 self.assertEqual(self._get_device_state(serialno), 'device') 1409 1410 def disabled_test_killed_when_pulling_a_large_file(self): 1411 """ 1412 While running adb pull with a large file, kill adb server. 1413 Occasionally the device can't be connected. Because the device is trying to 1414 send a message larger than what is expected by the adb server. 1415 Test if we can bring the device online automatically now. 1416 """ 1417 serialno = subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip() 1418 file_path = 'tmp_large_file' 1419 try: 1420 # 1. Create a large file on device. 1421 self.device.shell(['dd', 'if=/dev/zero', 'of=/data/local/tmp/tmp_large_file', 1422 'bs=1000000', 'count=100']) 1423 # 2. Pull the large file on host. 1424 subproc = subprocess.Popen(self.device.adb_cmd + 1425 ['pull','/data/local/tmp/tmp_large_file', file_path]) 1426 time.sleep(0.1) 1427 # 3. Kill the adb server 1428 subprocess.check_call(self.device.adb_cmd + ['kill-server']) 1429 subproc.terminate() 1430 finally: 1431 try: 1432 os.unlink(file_path) 1433 except: 1434 pass 1435 # 4. See if the device still exist. 1436 # Sleep to wait for the adb server exit. 1437 time.sleep(0.5) 1438 self.assertEqual(self._get_device_state(serialno), 'device') 1439 1440 1441 def test_packet_size_regression(self): 1442 """Test for http://b/37783561 1443 1444 Receiving packets of a length divisible by 512 but not 1024 resulted in 1445 the adb client waiting indefinitely for more input. 1446 """ 1447 # The values that trigger things are 507 (512 - 5 bytes from shell protocol) + 1024*n 1448 # Probe some surrounding values as well, for the hell of it. 1449 for base in [512] + list(range(1024, 1024 * 16, 1024)): 1450 for offset in [-6, -5, -4]: 1451 length = base + offset 1452 cmd = ['dd', 'if=/dev/zero', 'bs={}'.format(length), 'count=1', '2>/dev/null;' 1453 'echo', 'foo'] 1454 rc, stdout, _ = self.device.shell_nocheck(cmd) 1455 1456 self.assertEqual(0, rc) 1457 1458 # Output should be '\0' * length, followed by "foo\n" 1459 self.assertEqual(length, len(stdout) - 4) 1460 self.assertEqual(stdout, "\0" * length + "foo\n") 1461 1462 def test_zero_packet(self): 1463 """Test for http://b/113070258 1464 1465 Make sure that we don't blow up when sending USB transfers that line up 1466 exactly with the USB packet size. 1467 """ 1468 1469 local_port = int(self.device.forward("tcp:0", "tcp:12345")) 1470 try: 1471 for size in [512, 1024]: 1472 def listener(): 1473 cmd = ["echo foo | nc -l -p 12345; echo done"] 1474 rc, stdout, stderr = self.device.shell_nocheck(cmd) 1475 1476 thread = threading.Thread(target=listener) 1477 thread.start() 1478 1479 # Wait a bit to let the shell command start. 1480 time.sleep(0.25) 1481 1482 sock = socket.create_connection(("localhost", local_port)) 1483 with contextlib.closing(sock): 1484 bytesWritten = sock.send(b"a" * size) 1485 self.assertEqual(size, bytesWritten) 1486 readBytes = sock.recv(4096) 1487 self.assertEqual(b"foo\n", readBytes) 1488 1489 thread.join() 1490 finally: 1491 self.device.forward_remove("tcp:{}".format(local_port)) 1492 1493 1494class SocketTest(DeviceTest): 1495 def test_socket_flush(self): 1496 """Test that we handle socket closure properly. 1497 1498 If we're done writing to a socket, closing before the other end has 1499 closed will send a TCP_RST if we have incoming data queued up, which 1500 may result in data that we've written being discarded. 1501 1502 Bug: http://b/74616284 1503 """ 1504 def adb_length_prefixed(string): 1505 encoded = string.encode("utf8") 1506 result = b"%04x%s" % (len(encoded), encoded) 1507 return result 1508 1509 if "ANDROID_SERIAL" in os.environ: 1510 transport_string = "host:transport:" + os.environ["ANDROID_SERIAL"] 1511 else: 1512 transport_string = "host:transport-any" 1513 1514 with socket.create_connection(("localhost", 5037)) as s: 1515 1516 s.sendall(adb_length_prefixed(transport_string)) 1517 response = s.recv(4) 1518 self.assertEqual(b"OKAY", response) 1519 1520 shell_string = "shell:sleep 0.5; dd if=/dev/zero bs=1m count=1 status=none; echo foo" 1521 s.sendall(adb_length_prefixed(shell_string)) 1522 1523 response = s.recv(4) 1524 self.assertEqual(b"OKAY", response) 1525 1526 # Spawn a thread that dumps garbage into the socket until failure. 1527 def spam(): 1528 buf = b"\0" * 16384 1529 try: 1530 while True: 1531 s.sendall(buf) 1532 except Exception as ex: 1533 print(ex) 1534 1535 thread = threading.Thread(target=spam) 1536 thread.start() 1537 1538 time.sleep(1) 1539 1540 received = b"" 1541 while True: 1542 read = s.recv(512) 1543 if len(read) == 0: 1544 break 1545 received += read 1546 1547 self.assertEqual(1024 * 1024 + len("foo\n"), len(received)) 1548 thread.join() 1549 1550 1551class FramebufferTest(DeviceTest): 1552 def test_framebuffer(self): 1553 """Test that we get something from the framebuffer service.""" 1554 output = subprocess.check_output(self.device.adb_cmd + ["raw", "framebuffer:"]) 1555 self.assertFalse(len(output) == 0) 1556 1557 1558if sys.platform == "win32": 1559 # From https://stackoverflow.com/a/38749458 1560 import os 1561 import contextlib 1562 import msvcrt 1563 import ctypes 1564 from ctypes import wintypes 1565 1566 kernel32 = ctypes.WinDLL('kernel32', use_last_error=True) 1567 1568 GENERIC_READ = 0x80000000 1569 GENERIC_WRITE = 0x40000000 1570 FILE_SHARE_READ = 1 1571 FILE_SHARE_WRITE = 2 1572 CONSOLE_TEXTMODE_BUFFER = 1 1573 INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value 1574 STD_OUTPUT_HANDLE = wintypes.DWORD(-11) 1575 STD_ERROR_HANDLE = wintypes.DWORD(-12) 1576 1577 def _check_zero(result, func, args): 1578 if not result: 1579 raise ctypes.WinError(ctypes.get_last_error()) 1580 return args 1581 1582 def _check_invalid(result, func, args): 1583 if result == INVALID_HANDLE_VALUE: 1584 raise ctypes.WinError(ctypes.get_last_error()) 1585 return args 1586 1587 if not hasattr(wintypes, 'LPDWORD'): # Python 2 1588 wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD) 1589 wintypes.PSMALL_RECT = ctypes.POINTER(wintypes.SMALL_RECT) 1590 1591 class COORD(ctypes.Structure): 1592 _fields_ = (('X', wintypes.SHORT), 1593 ('Y', wintypes.SHORT)) 1594 1595 class CONSOLE_SCREEN_BUFFER_INFOEX(ctypes.Structure): 1596 _fields_ = (('cbSize', wintypes.ULONG), 1597 ('dwSize', COORD), 1598 ('dwCursorPosition', COORD), 1599 ('wAttributes', wintypes.WORD), 1600 ('srWindow', wintypes.SMALL_RECT), 1601 ('dwMaximumWindowSize', COORD), 1602 ('wPopupAttributes', wintypes.WORD), 1603 ('bFullscreenSupported', wintypes.BOOL), 1604 ('ColorTable', wintypes.DWORD * 16)) 1605 def __init__(self, *args, **kwds): 1606 super(CONSOLE_SCREEN_BUFFER_INFOEX, self).__init__( 1607 *args, **kwds) 1608 self.cbSize = ctypes.sizeof(self) 1609 1610 PCONSOLE_SCREEN_BUFFER_INFOEX = ctypes.POINTER( 1611 CONSOLE_SCREEN_BUFFER_INFOEX) 1612 LPSECURITY_ATTRIBUTES = wintypes.LPVOID 1613 1614 kernel32.GetStdHandle.errcheck = _check_invalid 1615 kernel32.GetStdHandle.restype = wintypes.HANDLE 1616 kernel32.GetStdHandle.argtypes = ( 1617 wintypes.DWORD,) # _In_ nStdHandle 1618 1619 kernel32.CreateConsoleScreenBuffer.errcheck = _check_invalid 1620 kernel32.CreateConsoleScreenBuffer.restype = wintypes.HANDLE 1621 kernel32.CreateConsoleScreenBuffer.argtypes = ( 1622 wintypes.DWORD, # _In_ dwDesiredAccess 1623 wintypes.DWORD, # _In_ dwShareMode 1624 LPSECURITY_ATTRIBUTES, # _In_opt_ lpSecurityAttributes 1625 wintypes.DWORD, # _In_ dwFlags 1626 wintypes.LPVOID) # _Reserved_ lpScreenBufferData 1627 1628 kernel32.GetConsoleScreenBufferInfoEx.errcheck = _check_zero 1629 kernel32.GetConsoleScreenBufferInfoEx.argtypes = ( 1630 wintypes.HANDLE, # _In_ hConsoleOutput 1631 PCONSOLE_SCREEN_BUFFER_INFOEX) # _Out_ lpConsoleScreenBufferInfo 1632 1633 kernel32.SetConsoleScreenBufferInfoEx.errcheck = _check_zero 1634 kernel32.SetConsoleScreenBufferInfoEx.argtypes = ( 1635 wintypes.HANDLE, # _In_ hConsoleOutput 1636 PCONSOLE_SCREEN_BUFFER_INFOEX) # _In_ lpConsoleScreenBufferInfo 1637 1638 kernel32.SetConsoleWindowInfo.errcheck = _check_zero 1639 kernel32.SetConsoleWindowInfo.argtypes = ( 1640 wintypes.HANDLE, # _In_ hConsoleOutput 1641 wintypes.BOOL, # _In_ bAbsolute 1642 wintypes.PSMALL_RECT) # _In_ lpConsoleWindow 1643 1644 kernel32.FillConsoleOutputCharacterW.errcheck = _check_zero 1645 kernel32.FillConsoleOutputCharacterW.argtypes = ( 1646 wintypes.HANDLE, # _In_ hConsoleOutput 1647 wintypes.WCHAR, # _In_ cCharacter 1648 wintypes.DWORD, # _In_ nLength 1649 COORD, # _In_ dwWriteCoord 1650 wintypes.LPDWORD) # _Out_ lpNumberOfCharsWritten 1651 1652 kernel32.ReadConsoleOutputCharacterW.errcheck = _check_zero 1653 kernel32.ReadConsoleOutputCharacterW.argtypes = ( 1654 wintypes.HANDLE, # _In_ hConsoleOutput 1655 wintypes.LPWSTR, # _Out_ lpCharacter 1656 wintypes.DWORD, # _In_ nLength 1657 COORD, # _In_ dwReadCoord 1658 wintypes.LPDWORD) # _Out_ lpNumberOfCharsRead 1659 1660 @contextlib.contextmanager 1661 def allocate_console(): 1662 allocated = kernel32.AllocConsole() 1663 try: 1664 yield allocated 1665 finally: 1666 if allocated: 1667 kernel32.FreeConsole() 1668 1669 @contextlib.contextmanager 1670 def console_screen(ncols=None, nrows=None): 1671 info = CONSOLE_SCREEN_BUFFER_INFOEX() 1672 new_info = CONSOLE_SCREEN_BUFFER_INFOEX() 1673 nwritten = (wintypes.DWORD * 1)() 1674 hStdOut = kernel32.GetStdHandle(STD_OUTPUT_HANDLE) 1675 kernel32.GetConsoleScreenBufferInfoEx( 1676 hStdOut, ctypes.byref(info)) 1677 if ncols is None: 1678 ncols = info.dwSize.X 1679 if nrows is None: 1680 nrows = info.dwSize.Y 1681 elif nrows > 9999: 1682 raise ValueError('nrows must be 9999 or less') 1683 fd_screen = None 1684 hScreen = kernel32.CreateConsoleScreenBuffer( 1685 GENERIC_READ | GENERIC_WRITE, 1686 FILE_SHARE_READ | FILE_SHARE_WRITE, 1687 None, CONSOLE_TEXTMODE_BUFFER, None) 1688 try: 1689 fd_screen = msvcrt.open_osfhandle( 1690 hScreen, os.O_RDWR | os.O_BINARY) 1691 kernel32.GetConsoleScreenBufferInfoEx( 1692 hScreen, ctypes.byref(new_info)) 1693 new_info.dwSize = COORD(ncols, nrows) 1694 new_info.srWindow = wintypes.SMALL_RECT( 1695 Left=0, Top=0, Right=(ncols - 1), 1696 Bottom=(info.srWindow.Bottom - info.srWindow.Top)) 1697 kernel32.SetConsoleScreenBufferInfoEx( 1698 hScreen, ctypes.byref(new_info)) 1699 kernel32.SetConsoleWindowInfo(hScreen, True, 1700 ctypes.byref(new_info.srWindow)) 1701 kernel32.FillConsoleOutputCharacterW( 1702 hScreen, u'\0', ncols * nrows, COORD(0,0), nwritten) 1703 kernel32.SetConsoleActiveScreenBuffer(hScreen) 1704 try: 1705 yield fd_screen 1706 finally: 1707 kernel32.SetConsoleScreenBufferInfoEx( 1708 hStdOut, ctypes.byref(info)) 1709 kernel32.SetConsoleWindowInfo(hStdOut, True, 1710 ctypes.byref(info.srWindow)) 1711 kernel32.SetConsoleActiveScreenBuffer(hStdOut) 1712 finally: 1713 if fd_screen is not None: 1714 os.close(fd_screen) 1715 else: 1716 kernel32.CloseHandle(hScreen) 1717 1718 def read_screen(fd): 1719 hScreen = msvcrt.get_osfhandle(fd) 1720 csbi = CONSOLE_SCREEN_BUFFER_INFOEX() 1721 kernel32.GetConsoleScreenBufferInfoEx( 1722 hScreen, ctypes.byref(csbi)) 1723 ncols = csbi.dwSize.X 1724 pos = csbi.dwCursorPosition 1725 length = ncols * pos.Y + pos.X + 1 1726 buf = (ctypes.c_wchar * length)() 1727 n = (wintypes.DWORD * 1)() 1728 kernel32.ReadConsoleOutputCharacterW( 1729 hScreen, buf, length, COORD(0,0), n) 1730 lines = [buf[i:i+ncols].rstrip(u'\0') 1731 for i in range(0, n[0], ncols)] 1732 return u'\n'.join(lines) 1733 1734@unittest.skipUnless(sys.platform == "win32", "requires Windows") 1735class WindowsConsoleTest(DeviceTest): 1736 def test_unicode_output(self): 1737 """Test Unicode command line parameters and Unicode console window output. 1738 1739 Bug: https://issuetracker.google.com/issues/111972753 1740 """ 1741 # If we don't have a console window, allocate one. This isn't necessary if we're already 1742 # being run from a console window, which is typical. 1743 with allocate_console() as allocated_console: 1744 # Create a temporary console buffer and switch to it. We could also pass a parameter of 1745 # ncols=len(unicode_string), but it causes the window to flash as it is resized and 1746 # likely unnecessary given the typical console window size. 1747 with console_screen(nrows=1000) as screen: 1748 unicode_string = u'로보카 폴리' 1749 # Run adb and allow it to detect that stdout is a console, not a pipe, by using 1750 # device.shell_popen() which does not use a pipe, unlike device.shell(). 1751 process = self.device.shell_popen(['echo', '"' + unicode_string + '"']) 1752 process.wait() 1753 # Read what was written by adb to the temporary console buffer. 1754 console_output = read_screen(screen) 1755 self.assertEqual(unicode_string, console_output) 1756 1757class DevicesListing(DeviceTest): 1758 1759 serial = subprocess.check_output(['adb', 'get-serialno']).strip().decode("utf-8") 1760 # def get_serial(self): 1761 # return subprocess.check_output(self.device.adb_cmd + ['get-serialno']).strip().decode("utf-8") 1762 1763 def test_devices(self): 1764 with subprocess.Popen(['adb', 'devices'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 1765 lines = list(map(lambda b: b.decode("utf-8"), proc.stdout.readlines())) 1766 self.assertEqual(len(lines), 3) 1767 line = lines[1] 1768 self.assertTrue(self.serial in line) 1769 self.assertFalse("{" in line) 1770 self.assertFalse("}" in line) 1771 self.assertTrue("device" in line) 1772 self.assertFalse("product" in line) 1773 self.assertFalse("transport" in line) 1774 1775 def test_devices_l(self): 1776 with subprocess.Popen(['adb', 'devices', '-l'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 1777 lines = list(map(lambda b: b.decode("utf-8"), proc.stdout.readlines())) 1778 self.assertEqual(len(lines), 3) 1779 line = lines[1] 1780 self.assertTrue(self.serial in line) 1781 self.assertFalse("{" in line) 1782 self.assertFalse("}" in line) 1783 self.assertTrue("device" in line) 1784 self.assertTrue("product" in line) 1785 self.assertTrue("transport" in line) 1786 1787 def test_track_devices(self): 1788 with subprocess.Popen(['adb', 'track-devices'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 1789 with io.TextIOWrapper(proc.stdout, encoding='utf8') as reader: 1790 output_size = int(reader.read(4), 16) 1791 output = reader.read(output_size) 1792 self.assertFalse("{" in output) 1793 self.assertFalse("}" in output) 1794 self.assertTrue(self.serial in output) 1795 self.assertTrue("device" in output) 1796 self.assertFalse("product" in output) 1797 self.assertFalse("transport" in output) 1798 proc.terminate() 1799 1800 def test_track_devices_l(self): 1801 with subprocess.Popen(['adb', 'track-devices', '-l'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 1802 with io.TextIOWrapper(proc.stdout, encoding='utf8') as reader: 1803 output_size = int(reader.read(4), 16) 1804 output = reader.read(output_size) 1805 self.assertFalse("{" in output) 1806 self.assertFalse("}" in output) 1807 self.assertTrue(self.serial in output) 1808 self.assertTrue("device" in output) 1809 self.assertTrue("product" in output) 1810 self.assertTrue("transport" in output) 1811 proc.terminate() 1812 1813 def test_track_devices_proto_text(self): 1814 with subprocess.Popen(['adb', 'track-devices', '--proto-text'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 1815 with io.TextIOWrapper(proc.stdout, encoding='utf8') as reader: 1816 output_size = int(reader.read(4), 16) 1817 output = reader.read(output_size) 1818 self.assertTrue("{" in output) 1819 self.assertTrue("}" in output) 1820 self.assertTrue(self.serial in output) 1821 self.assertTrue("device" in output) 1822 self.assertTrue("product" in output) 1823 self.assertTrue("connection_type" in output) 1824 proc.terminate() 1825 1826 def test_track_devices_proto_binary(self): 1827 with subprocess.Popen(['adb', 'track-devices', '--proto-binary'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 1828 1829 output_size = int(proc.stdout.read(4).decode("utf-8"), 16) 1830 proto = proc.stdout.read(output_size) 1831 1832 devices = adb_host_proto.Devices() 1833 devices.ParseFromString(proto) 1834 1835 device = devices.device[0] 1836 self.assertTrue(device.serial == self.serial) 1837 self.assertFalse(device.bus_address == "") 1838 self.assertFalse(device.product == "") 1839 self.assertFalse(device.model == "") 1840 self.assertFalse(device.device == "") 1841 self.assertTrue(device.negotiated_speed == int(device.negotiated_speed)) 1842 self.assertTrue(device.max_speed == int(device.max_speed)) 1843 self.assertTrue(device.transport_id == int(device.transport_id)) 1844 1845 proc.terminate() 1846 1847class DevicesListing(DeviceTest): 1848 1849 def test_track_app_appinfo(self): 1850 subprocess.check_output(['adb', 'install', '-r', '-t', 'adb_test_app1.apk']).strip().decode("utf-8") 1851 subprocess.check_output(['adb', 'install', '-r', '-t', 'adb_test_app2.apk']).strip().decode("utf-8") 1852 subprocess.check_output(['adb', 'shell', 'am', 'start', '-W', 'adb.test.app1/.MainActivity']).strip().decode("utf-8") 1853 subprocess.check_output(['adb', 'shell', 'am', 'start', '-W', 'adb.test.app2/.MainActivity']).strip().decode("utf-8") 1854 subprocess.check_output(['adb', 'shell', 'am', 'start', '-W', 'adb.test.app1/.OwnProcessActivity']).strip().decode("utf-8") 1855 with subprocess.Popen(['adb', 'track-app', '--proto-binary'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 1856 output_size = int(proc.stdout.read(4).decode("utf-8"), 16) 1857 proto = proc.stdout.read(output_size) 1858 1859 apps = proto_track_app.AppProcesses() 1860 apps.ParseFromString(proto) 1861 1862 foundAdbAppDefProc = False 1863 foundAdbAppOwnProc = False 1864 for app in apps.process: 1865 if (app.process_name == "adb.test.process.name"): 1866 foundAdbAppDefProc = True 1867 self.assertTrue(app.debuggable) 1868 self.assertTrue("adb.test.app1" in app.package_names) 1869 self.assertTrue("adb.test.app2" in app.package_names) 1870 1871 if (app.process_name == "adb.test.own.process"): 1872 foundAdbAppOwnProc = True 1873 self.assertTrue(app.debuggable) 1874 self.assertTrue("adb.test.app1" in app.package_names) 1875 1876 self.assertTrue(foundAdbAppDefProc) 1877 self.assertTrue(foundAdbAppOwnProc) 1878 proc.terminate() 1879 1880class ServerStatus(unittest.TestCase): 1881 def test_server_status(self): 1882 with subprocess.Popen(['adb', 'server-status'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc: 1883 lines = list(map(lambda b: b.decode("utf-8"), proc.stdout.readlines())) 1884 self.assertTrue("usb_backend" in lines[0]) 1885 self.assertTrue("mdns_backend" in lines[1]) 1886 self.assertTrue("version" in lines[2]) 1887 self.assertTrue("build" in lines[3]) 1888 self.assertTrue("executable_absolute_path" in lines[4]) 1889 self.assertTrue("log_absolute_path" in lines[5]) 1890 1891def invoke(*args): 1892 return subprocess.check_output(args).strip().decode("utf-8") 1893 1894class OneDevice(unittest.TestCase): 1895 1896 serial = invoke("adb", "get-serialno") 1897 owner_server_port = "14424" 1898 1899 def test_one_device(self): 1900 invoke("adb", "kill-server") 1901 invoke("adb", "--one-device", self.serial, "-P", self.owner_server_port, "start-server") 1902 devices = invoke("adb", "devices") 1903 owned_devices = invoke("adb", "-P", "14424", "devices") 1904 self.assertTrue(self.serial in owned_devices) 1905 self.assertFalse(self.serial in devices) 1906 1907 def tearDown(self): 1908 invoke("adb", "-P", self.owner_server_port, "kill-server") 1909 invoke("adb", "kill-server") 1910 1911if __name__ == '__main__': 1912 random.seed(0) 1913 unittest.main() 1914