1import unittest 2from test import support 3from test.support import os_helper 4from test.support import socket_helper 5from test.support import threading_helper 6 7import errno 8import io 9import itertools 10import socket 11import select 12import tempfile 13import time 14import traceback 15import queue 16import sys 17import os 18import platform 19import array 20import contextlib 21from weakref import proxy 22import signal 23import math 24import pickle 25import struct 26import random 27import shutil 28import string 29import _thread as thread 30import threading 31try: 32 import multiprocessing 33except ImportError: 34 multiprocessing = False 35try: 36 import fcntl 37except ImportError: 38 fcntl = None 39 40support.requires_working_socket(module=True) 41 42HOST = socket_helper.HOST 43# test unicode string and carriage return 44MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8') 45 46VSOCKPORT = 1234 47AIX = platform.system() == "AIX" 48 49try: 50 import _socket 51except ImportError: 52 _socket = None 53 54def get_cid(): 55 if fcntl is None: 56 return None 57 if not hasattr(socket, 'IOCTL_VM_SOCKETS_GET_LOCAL_CID'): 58 return None 59 try: 60 with open("/dev/vsock", "rb") as f: 61 r = fcntl.ioctl(f, socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID, " ") 62 except OSError: 63 return None 64 else: 65 return struct.unpack("I", r)[0] 66 67def _have_socket_can(): 68 """Check whether CAN sockets are supported on this host.""" 69 try: 70 s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) 71 except (AttributeError, OSError): 72 return False 73 else: 74 s.close() 75 return True 76 77def _have_socket_can_isotp(): 78 """Check whether CAN ISOTP sockets are supported on this host.""" 79 try: 80 s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) 81 except (AttributeError, OSError): 82 return False 83 else: 84 s.close() 85 return True 86 87def _have_socket_can_j1939(): 88 """Check whether CAN J1939 sockets are supported on this host.""" 89 try: 90 s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) 91 except (AttributeError, OSError): 92 return False 93 else: 94 s.close() 95 return True 96 97def _have_socket_rds(): 98 """Check whether RDS sockets are supported on this host.""" 99 try: 100 s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) 101 except (AttributeError, OSError): 102 return False 103 else: 104 s.close() 105 return True 106 107def _have_socket_alg(): 108 """Check whether AF_ALG sockets are supported on this host.""" 109 try: 110 s = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 111 except (AttributeError, OSError): 112 return False 113 else: 114 s.close() 115 return True 116 117def _have_socket_qipcrtr(): 118 """Check whether AF_QIPCRTR sockets are supported on this host.""" 119 try: 120 s = socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM, 0) 121 except (AttributeError, OSError): 122 return False 123 else: 124 s.close() 125 return True 126 127def _have_socket_vsock(): 128 """Check whether AF_VSOCK sockets are supported on this host.""" 129 ret = get_cid() is not None 130 return ret 131 132 133def _have_socket_bluetooth(): 134 """Check whether AF_BLUETOOTH sockets are supported on this host.""" 135 try: 136 # RFCOMM is supported by all platforms with bluetooth support. Windows 137 # does not support omitting the protocol. 138 s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) 139 except (AttributeError, OSError): 140 return False 141 else: 142 s.close() 143 return True 144 145 146@contextlib.contextmanager 147def socket_setdefaulttimeout(timeout): 148 old_timeout = socket.getdefaulttimeout() 149 try: 150 socket.setdefaulttimeout(timeout) 151 yield 152 finally: 153 socket.setdefaulttimeout(old_timeout) 154 155 156HAVE_SOCKET_CAN = _have_socket_can() 157 158HAVE_SOCKET_CAN_ISOTP = _have_socket_can_isotp() 159 160HAVE_SOCKET_CAN_J1939 = _have_socket_can_j1939() 161 162HAVE_SOCKET_RDS = _have_socket_rds() 163 164HAVE_SOCKET_ALG = _have_socket_alg() 165 166HAVE_SOCKET_QIPCRTR = _have_socket_qipcrtr() 167 168HAVE_SOCKET_VSOCK = _have_socket_vsock() 169 170HAVE_SOCKET_UDPLITE = hasattr(socket, "IPPROTO_UDPLITE") 171 172HAVE_SOCKET_BLUETOOTH = _have_socket_bluetooth() 173 174# Size in bytes of the int type 175SIZEOF_INT = array.array("i").itemsize 176 177class SocketTCPTest(unittest.TestCase): 178 179 def setUp(self): 180 self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 181 self.port = socket_helper.bind_port(self.serv) 182 self.serv.listen() 183 184 def tearDown(self): 185 self.serv.close() 186 self.serv = None 187 188class SocketUDPTest(unittest.TestCase): 189 190 def setUp(self): 191 self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 192 self.port = socket_helper.bind_port(self.serv) 193 194 def tearDown(self): 195 self.serv.close() 196 self.serv = None 197 198class SocketUDPLITETest(SocketUDPTest): 199 200 def setUp(self): 201 self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) 202 self.port = socket_helper.bind_port(self.serv) 203 204class ThreadSafeCleanupTestCase: 205 """Subclass of unittest.TestCase with thread-safe cleanup methods. 206 207 This subclass protects the addCleanup() and doCleanups() methods 208 with a recursive lock. 209 """ 210 211 def __init__(self, *args, **kwargs): 212 super().__init__(*args, **kwargs) 213 self._cleanup_lock = threading.RLock() 214 215 def addCleanup(self, *args, **kwargs): 216 with self._cleanup_lock: 217 return super().addCleanup(*args, **kwargs) 218 219 def doCleanups(self, *args, **kwargs): 220 with self._cleanup_lock: 221 return super().doCleanups(*args, **kwargs) 222 223class SocketCANTest(unittest.TestCase): 224 225 """To be able to run this test, a `vcan0` CAN interface can be created with 226 the following commands: 227 # modprobe vcan 228 # ip link add dev vcan0 type vcan 229 # ip link set up vcan0 230 """ 231 interface = 'vcan0' 232 bufsize = 128 233 234 """The CAN frame structure is defined in <linux/can.h>: 235 236 struct can_frame { 237 canid_t can_id; /* 32 bit CAN_ID + EFF/RTR/ERR flags */ 238 __u8 can_dlc; /* data length code: 0 .. 8 */ 239 __u8 data[8] __attribute__((aligned(8))); 240 }; 241 """ 242 can_frame_fmt = "=IB3x8s" 243 can_frame_size = struct.calcsize(can_frame_fmt) 244 245 """The Broadcast Management Command frame structure is defined 246 in <linux/can/bcm.h>: 247 248 struct bcm_msg_head { 249 __u32 opcode; 250 __u32 flags; 251 __u32 count; 252 struct timeval ival1, ival2; 253 canid_t can_id; 254 __u32 nframes; 255 struct can_frame frames[0]; 256 } 257 258 `bcm_msg_head` must be 8 bytes aligned because of the `frames` member (see 259 `struct can_frame` definition). Must use native not standard types for packing. 260 """ 261 bcm_cmd_msg_fmt = "@3I4l2I" 262 bcm_cmd_msg_fmt += "x" * (struct.calcsize(bcm_cmd_msg_fmt) % 8) 263 264 def setUp(self): 265 self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) 266 self.addCleanup(self.s.close) 267 try: 268 self.s.bind((self.interface,)) 269 except OSError: 270 self.skipTest('network interface `%s` does not exist' % 271 self.interface) 272 273 274class SocketRDSTest(unittest.TestCase): 275 276 """To be able to run this test, the `rds` kernel module must be loaded: 277 # modprobe rds 278 """ 279 bufsize = 8192 280 281 def setUp(self): 282 self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) 283 self.addCleanup(self.serv.close) 284 try: 285 self.port = socket_helper.bind_port(self.serv) 286 except OSError: 287 self.skipTest('unable to bind RDS socket') 288 289 290class ThreadableTest: 291 """Threadable Test class 292 293 The ThreadableTest class makes it easy to create a threaded 294 client/server pair from an existing unit test. To create a 295 new threaded class from an existing unit test, use multiple 296 inheritance: 297 298 class NewClass (OldClass, ThreadableTest): 299 pass 300 301 This class defines two new fixture functions with obvious 302 purposes for overriding: 303 304 clientSetUp () 305 clientTearDown () 306 307 Any new test functions within the class must then define 308 tests in pairs, where the test name is preceded with a 309 '_' to indicate the client portion of the test. Ex: 310 311 def testFoo(self): 312 # Server portion 313 314 def _testFoo(self): 315 # Client portion 316 317 Any exceptions raised by the clients during their tests 318 are caught and transferred to the main thread to alert 319 the testing framework. 320 321 Note, the server setup function cannot call any blocking 322 functions that rely on the client thread during setup, 323 unless serverExplicitReady() is called just before 324 the blocking call (such as in setting up a client/server 325 connection and performing the accept() in setUp(). 326 """ 327 328 def __init__(self): 329 # Swap the true setup function 330 self.__setUp = self.setUp 331 self.setUp = self._setUp 332 333 def serverExplicitReady(self): 334 """This method allows the server to explicitly indicate that 335 it wants the client thread to proceed. This is useful if the 336 server is about to execute a blocking routine that is 337 dependent upon the client thread during its setup routine.""" 338 self.server_ready.set() 339 340 def _setUp(self): 341 self.enterContext(threading_helper.wait_threads_exit()) 342 343 self.server_ready = threading.Event() 344 self.client_ready = threading.Event() 345 self.done = threading.Event() 346 self.queue = queue.Queue(1) 347 self.server_crashed = False 348 349 def raise_queued_exception(): 350 if self.queue.qsize(): 351 raise self.queue.get() 352 self.addCleanup(raise_queued_exception) 353 354 # Do some munging to start the client test. 355 methodname = self.id() 356 i = methodname.rfind('.') 357 methodname = methodname[i+1:] 358 test_method = getattr(self, '_' + methodname) 359 self.client_thread = thread.start_new_thread( 360 self.clientRun, (test_method,)) 361 362 try: 363 self.__setUp() 364 except: 365 self.server_crashed = True 366 raise 367 finally: 368 self.server_ready.set() 369 self.client_ready.wait() 370 self.addCleanup(self.done.wait) 371 372 def clientRun(self, test_func): 373 self.server_ready.wait() 374 try: 375 self.clientSetUp() 376 except BaseException as e: 377 self.queue.put(e) 378 self.clientTearDown() 379 return 380 finally: 381 self.client_ready.set() 382 if self.server_crashed: 383 self.clientTearDown() 384 return 385 if not hasattr(test_func, '__call__'): 386 raise TypeError("test_func must be a callable function") 387 try: 388 test_func() 389 except BaseException as e: 390 self.queue.put(e) 391 finally: 392 self.clientTearDown() 393 394 def clientSetUp(self): 395 raise NotImplementedError("clientSetUp must be implemented.") 396 397 def clientTearDown(self): 398 self.done.set() 399 thread.exit() 400 401class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest): 402 403 def __init__(self, methodName='runTest'): 404 SocketTCPTest.__init__(self, methodName=methodName) 405 ThreadableTest.__init__(self) 406 407 def clientSetUp(self): 408 self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 409 410 def clientTearDown(self): 411 self.cli.close() 412 self.cli = None 413 ThreadableTest.clientTearDown(self) 414 415class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest): 416 417 def __init__(self, methodName='runTest'): 418 SocketUDPTest.__init__(self, methodName=methodName) 419 ThreadableTest.__init__(self) 420 421 def clientSetUp(self): 422 self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 423 424 def clientTearDown(self): 425 self.cli.close() 426 self.cli = None 427 ThreadableTest.clientTearDown(self) 428 429@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 430 'UDPLITE sockets required for this test.') 431class ThreadedUDPLITESocketTest(SocketUDPLITETest, ThreadableTest): 432 433 def __init__(self, methodName='runTest'): 434 SocketUDPLITETest.__init__(self, methodName=methodName) 435 ThreadableTest.__init__(self) 436 437 def clientSetUp(self): 438 self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) 439 440 def clientTearDown(self): 441 self.cli.close() 442 self.cli = None 443 ThreadableTest.clientTearDown(self) 444 445class ThreadedCANSocketTest(SocketCANTest, ThreadableTest): 446 447 def __init__(self, methodName='runTest'): 448 SocketCANTest.__init__(self, methodName=methodName) 449 ThreadableTest.__init__(self) 450 451 def clientSetUp(self): 452 self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) 453 try: 454 self.cli.bind((self.interface,)) 455 except OSError: 456 # skipTest should not be called here, and will be called in the 457 # server instead 458 pass 459 460 def clientTearDown(self): 461 self.cli.close() 462 self.cli = None 463 ThreadableTest.clientTearDown(self) 464 465class ThreadedRDSSocketTest(SocketRDSTest, ThreadableTest): 466 467 def __init__(self, methodName='runTest'): 468 SocketRDSTest.__init__(self, methodName=methodName) 469 ThreadableTest.__init__(self) 470 471 def clientSetUp(self): 472 self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) 473 try: 474 # RDS sockets must be bound explicitly to send or receive data 475 self.cli.bind((HOST, 0)) 476 self.cli_addr = self.cli.getsockname() 477 except OSError: 478 # skipTest should not be called here, and will be called in the 479 # server instead 480 pass 481 482 def clientTearDown(self): 483 self.cli.close() 484 self.cli = None 485 ThreadableTest.clientTearDown(self) 486 487@unittest.skipIf(fcntl is None, "need fcntl") 488@unittest.skipUnless(HAVE_SOCKET_VSOCK, 489 'VSOCK sockets required for this test.') 490@unittest.skipUnless(get_cid() != 2, 491 "This test can only be run on a virtual guest.") 492class ThreadedVSOCKSocketStreamTest(unittest.TestCase, ThreadableTest): 493 494 def __init__(self, methodName='runTest'): 495 unittest.TestCase.__init__(self, methodName=methodName) 496 ThreadableTest.__init__(self) 497 498 def setUp(self): 499 self.serv = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) 500 self.addCleanup(self.serv.close) 501 self.serv.bind((socket.VMADDR_CID_ANY, VSOCKPORT)) 502 self.serv.listen() 503 self.serverExplicitReady() 504 self.conn, self.connaddr = self.serv.accept() 505 self.addCleanup(self.conn.close) 506 507 def clientSetUp(self): 508 time.sleep(0.1) 509 self.cli = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) 510 self.addCleanup(self.cli.close) 511 cid = get_cid() 512 self.cli.connect((cid, VSOCKPORT)) 513 514 def testStream(self): 515 msg = self.conn.recv(1024) 516 self.assertEqual(msg, MSG) 517 518 def _testStream(self): 519 self.cli.send(MSG) 520 self.cli.close() 521 522class SocketConnectedTest(ThreadedTCPSocketTest): 523 """Socket tests for client-server connection. 524 525 self.cli_conn is a client socket connected to the server. The 526 setUp() method guarantees that it is connected to the server. 527 """ 528 529 def __init__(self, methodName='runTest'): 530 ThreadedTCPSocketTest.__init__(self, methodName=methodName) 531 532 def setUp(self): 533 ThreadedTCPSocketTest.setUp(self) 534 # Indicate explicitly we're ready for the client thread to 535 # proceed and then perform the blocking call to accept 536 self.serverExplicitReady() 537 conn, addr = self.serv.accept() 538 self.cli_conn = conn 539 540 def tearDown(self): 541 self.cli_conn.close() 542 self.cli_conn = None 543 ThreadedTCPSocketTest.tearDown(self) 544 545 def clientSetUp(self): 546 ThreadedTCPSocketTest.clientSetUp(self) 547 self.cli.connect((HOST, self.port)) 548 self.serv_conn = self.cli 549 550 def clientTearDown(self): 551 self.serv_conn.close() 552 self.serv_conn = None 553 ThreadedTCPSocketTest.clientTearDown(self) 554 555class SocketPairTest(unittest.TestCase, ThreadableTest): 556 557 def __init__(self, methodName='runTest'): 558 unittest.TestCase.__init__(self, methodName=methodName) 559 ThreadableTest.__init__(self) 560 561 def setUp(self): 562 self.serv, self.cli = socket.socketpair() 563 564 def tearDown(self): 565 self.serv.close() 566 self.serv = None 567 568 def clientSetUp(self): 569 pass 570 571 def clientTearDown(self): 572 self.cli.close() 573 self.cli = None 574 ThreadableTest.clientTearDown(self) 575 576 577# The following classes are used by the sendmsg()/recvmsg() tests. 578# Combining, for instance, ConnectedStreamTestMixin and TCPTestBase 579# gives a drop-in replacement for SocketConnectedTest, but different 580# address families can be used, and the attributes serv_addr and 581# cli_addr will be set to the addresses of the endpoints. 582 583class SocketTestBase(unittest.TestCase): 584 """A base class for socket tests. 585 586 Subclasses must provide methods newSocket() to return a new socket 587 and bindSock(sock) to bind it to an unused address. 588 589 Creates a socket self.serv and sets self.serv_addr to its address. 590 """ 591 592 def setUp(self): 593 self.serv = self.newSocket() 594 self.bindServer() 595 596 def bindServer(self): 597 """Bind server socket and set self.serv_addr to its address.""" 598 self.bindSock(self.serv) 599 self.serv_addr = self.serv.getsockname() 600 601 def tearDown(self): 602 self.serv.close() 603 self.serv = None 604 605 606class SocketListeningTestMixin(SocketTestBase): 607 """Mixin to listen on the server socket.""" 608 609 def setUp(self): 610 super().setUp() 611 self.serv.listen() 612 613 614class ThreadedSocketTestMixin(ThreadSafeCleanupTestCase, SocketTestBase, 615 ThreadableTest): 616 """Mixin to add client socket and allow client/server tests. 617 618 Client socket is self.cli and its address is self.cli_addr. See 619 ThreadableTest for usage information. 620 """ 621 622 def __init__(self, *args, **kwargs): 623 super().__init__(*args, **kwargs) 624 ThreadableTest.__init__(self) 625 626 def clientSetUp(self): 627 self.cli = self.newClientSocket() 628 self.bindClient() 629 630 def newClientSocket(self): 631 """Return a new socket for use as client.""" 632 return self.newSocket() 633 634 def bindClient(self): 635 """Bind client socket and set self.cli_addr to its address.""" 636 self.bindSock(self.cli) 637 self.cli_addr = self.cli.getsockname() 638 639 def clientTearDown(self): 640 self.cli.close() 641 self.cli = None 642 ThreadableTest.clientTearDown(self) 643 644 645class ConnectedStreamTestMixin(SocketListeningTestMixin, 646 ThreadedSocketTestMixin): 647 """Mixin to allow client/server stream tests with connected client. 648 649 Server's socket representing connection to client is self.cli_conn 650 and client's connection to server is self.serv_conn. (Based on 651 SocketConnectedTest.) 652 """ 653 654 def setUp(self): 655 super().setUp() 656 # Indicate explicitly we're ready for the client thread to 657 # proceed and then perform the blocking call to accept 658 self.serverExplicitReady() 659 conn, addr = self.serv.accept() 660 self.cli_conn = conn 661 662 def tearDown(self): 663 self.cli_conn.close() 664 self.cli_conn = None 665 super().tearDown() 666 667 def clientSetUp(self): 668 super().clientSetUp() 669 self.cli.connect(self.serv_addr) 670 self.serv_conn = self.cli 671 672 def clientTearDown(self): 673 try: 674 self.serv_conn.close() 675 self.serv_conn = None 676 except AttributeError: 677 pass 678 super().clientTearDown() 679 680 681class UnixSocketTestBase(SocketTestBase): 682 """Base class for Unix-domain socket tests.""" 683 684 # This class is used for file descriptor passing tests, so we 685 # create the sockets in a private directory so that other users 686 # can't send anything that might be problematic for a privileged 687 # user running the tests. 688 689 def setUp(self): 690 self.dir_path = tempfile.mkdtemp() 691 self.addCleanup(os.rmdir, self.dir_path) 692 super().setUp() 693 694 def bindSock(self, sock): 695 path = tempfile.mktemp(dir=self.dir_path) 696 socket_helper.bind_unix_socket(sock, path) 697 self.addCleanup(os_helper.unlink, path) 698 699class UnixStreamBase(UnixSocketTestBase): 700 """Base class for Unix-domain SOCK_STREAM tests.""" 701 702 def newSocket(self): 703 return socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 704 705 706class InetTestBase(SocketTestBase): 707 """Base class for IPv4 socket tests.""" 708 709 host = HOST 710 711 def setUp(self): 712 super().setUp() 713 self.port = self.serv_addr[1] 714 715 def bindSock(self, sock): 716 socket_helper.bind_port(sock, host=self.host) 717 718class TCPTestBase(InetTestBase): 719 """Base class for TCP-over-IPv4 tests.""" 720 721 def newSocket(self): 722 return socket.socket(socket.AF_INET, socket.SOCK_STREAM) 723 724class UDPTestBase(InetTestBase): 725 """Base class for UDP-over-IPv4 tests.""" 726 727 def newSocket(self): 728 return socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 729 730class UDPLITETestBase(InetTestBase): 731 """Base class for UDPLITE-over-IPv4 tests.""" 732 733 def newSocket(self): 734 return socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) 735 736class SCTPStreamBase(InetTestBase): 737 """Base class for SCTP tests in one-to-one (SOCK_STREAM) mode.""" 738 739 def newSocket(self): 740 return socket.socket(socket.AF_INET, socket.SOCK_STREAM, 741 socket.IPPROTO_SCTP) 742 743 744class Inet6TestBase(InetTestBase): 745 """Base class for IPv6 socket tests.""" 746 747 host = socket_helper.HOSTv6 748 749class UDP6TestBase(Inet6TestBase): 750 """Base class for UDP-over-IPv6 tests.""" 751 752 def newSocket(self): 753 return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) 754 755class UDPLITE6TestBase(Inet6TestBase): 756 """Base class for UDPLITE-over-IPv6 tests.""" 757 758 def newSocket(self): 759 return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE) 760 761 762# Test-skipping decorators for use with ThreadableTest. 763 764def skipWithClientIf(condition, reason): 765 """Skip decorated test if condition is true, add client_skip decorator. 766 767 If the decorated object is not a class, sets its attribute 768 "client_skip" to a decorator which will return an empty function 769 if the test is to be skipped, or the original function if it is 770 not. This can be used to avoid running the client part of a 771 skipped test when using ThreadableTest. 772 """ 773 def client_pass(*args, **kwargs): 774 pass 775 def skipdec(obj): 776 retval = unittest.skip(reason)(obj) 777 if not isinstance(obj, type): 778 retval.client_skip = lambda f: client_pass 779 return retval 780 def noskipdec(obj): 781 if not (isinstance(obj, type) or hasattr(obj, "client_skip")): 782 obj.client_skip = lambda f: f 783 return obj 784 return skipdec if condition else noskipdec 785 786 787def requireAttrs(obj, *attributes): 788 """Skip decorated test if obj is missing any of the given attributes. 789 790 Sets client_skip attribute as skipWithClientIf() does. 791 """ 792 missing = [name for name in attributes if not hasattr(obj, name)] 793 return skipWithClientIf( 794 missing, "don't have " + ", ".join(name for name in missing)) 795 796 797def requireSocket(*args): 798 """Skip decorated test if a socket cannot be created with given arguments. 799 800 When an argument is given as a string, will use the value of that 801 attribute of the socket module, or skip the test if it doesn't 802 exist. Sets client_skip attribute as skipWithClientIf() does. 803 """ 804 err = None 805 missing = [obj for obj in args if 806 isinstance(obj, str) and not hasattr(socket, obj)] 807 if missing: 808 err = "don't have " + ", ".join(name for name in missing) 809 else: 810 callargs = [getattr(socket, obj) if isinstance(obj, str) else obj 811 for obj in args] 812 try: 813 s = socket.socket(*callargs) 814 except OSError as e: 815 # XXX: check errno? 816 err = str(e) 817 else: 818 s.close() 819 return skipWithClientIf( 820 err is not None, 821 "can't create socket({0}): {1}".format( 822 ", ".join(str(o) for o in args), err)) 823 824 825####################################################################### 826## Begin Tests 827 828class GeneralModuleTests(unittest.TestCase): 829 830 def test_SocketType_is_socketobject(self): 831 import _socket 832 self.assertTrue(socket.SocketType is _socket.socket) 833 s = socket.socket() 834 self.assertIsInstance(s, socket.SocketType) 835 s.close() 836 837 def test_repr(self): 838 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 839 with s: 840 self.assertIn('fd=%i' % s.fileno(), repr(s)) 841 self.assertIn('family=%s' % socket.AF_INET, repr(s)) 842 self.assertIn('type=%s' % socket.SOCK_STREAM, repr(s)) 843 self.assertIn('proto=0', repr(s)) 844 self.assertNotIn('raddr', repr(s)) 845 s.bind(('127.0.0.1', 0)) 846 self.assertIn('laddr', repr(s)) 847 self.assertIn(str(s.getsockname()), repr(s)) 848 self.assertIn('[closed]', repr(s)) 849 self.assertNotIn('laddr', repr(s)) 850 851 @unittest.skipUnless(_socket is not None, 'need _socket module') 852 def test_csocket_repr(self): 853 s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) 854 try: 855 expected = ('<socket object, fd=%s, family=%s, type=%s, proto=%s>' 856 % (s.fileno(), s.family, s.type, s.proto)) 857 self.assertEqual(repr(s), expected) 858 finally: 859 s.close() 860 expected = ('<socket object, fd=-1, family=%s, type=%s, proto=%s>' 861 % (s.family, s.type, s.proto)) 862 self.assertEqual(repr(s), expected) 863 864 def test_weakref(self): 865 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 866 p = proxy(s) 867 self.assertEqual(p.fileno(), s.fileno()) 868 s = None 869 support.gc_collect() # For PyPy or other GCs. 870 try: 871 p.fileno() 872 except ReferenceError: 873 pass 874 else: 875 self.fail('Socket proxy still exists') 876 877 def testSocketError(self): 878 # Testing socket module exceptions 879 msg = "Error raising socket exception (%s)." 880 with self.assertRaises(OSError, msg=msg % 'OSError'): 881 raise OSError 882 with self.assertRaises(OSError, msg=msg % 'socket.herror'): 883 raise socket.herror 884 with self.assertRaises(OSError, msg=msg % 'socket.gaierror'): 885 raise socket.gaierror 886 887 def testSendtoErrors(self): 888 # Testing that sendto doesn't mask failures. See #10169. 889 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 890 self.addCleanup(s.close) 891 s.bind(('', 0)) 892 sockname = s.getsockname() 893 # 2 args 894 with self.assertRaises(TypeError) as cm: 895 s.sendto('\u2620', sockname) 896 self.assertEqual(str(cm.exception), 897 "a bytes-like object is required, not 'str'") 898 with self.assertRaises(TypeError) as cm: 899 s.sendto(5j, sockname) 900 self.assertEqual(str(cm.exception), 901 "a bytes-like object is required, not 'complex'") 902 with self.assertRaises(TypeError) as cm: 903 s.sendto(b'foo', None) 904 self.assertIn('not NoneType',str(cm.exception)) 905 # 3 args 906 with self.assertRaises(TypeError) as cm: 907 s.sendto('\u2620', 0, sockname) 908 self.assertEqual(str(cm.exception), 909 "a bytes-like object is required, not 'str'") 910 with self.assertRaises(TypeError) as cm: 911 s.sendto(5j, 0, sockname) 912 self.assertEqual(str(cm.exception), 913 "a bytes-like object is required, not 'complex'") 914 with self.assertRaises(TypeError) as cm: 915 s.sendto(b'foo', 0, None) 916 self.assertIn('not NoneType', str(cm.exception)) 917 with self.assertRaises(TypeError) as cm: 918 s.sendto(b'foo', 'bar', sockname) 919 with self.assertRaises(TypeError) as cm: 920 s.sendto(b'foo', None, None) 921 # wrong number of args 922 with self.assertRaises(TypeError) as cm: 923 s.sendto(b'foo') 924 self.assertIn('(1 given)', str(cm.exception)) 925 with self.assertRaises(TypeError) as cm: 926 s.sendto(b'foo', 0, sockname, 4) 927 self.assertIn('(4 given)', str(cm.exception)) 928 929 def testCrucialConstants(self): 930 # Testing for mission critical constants 931 socket.AF_INET 932 if socket.has_ipv6: 933 socket.AF_INET6 934 socket.SOCK_STREAM 935 socket.SOCK_DGRAM 936 socket.SOCK_RAW 937 socket.SOCK_RDM 938 socket.SOCK_SEQPACKET 939 socket.SOL_SOCKET 940 socket.SO_REUSEADDR 941 942 def testCrucialIpProtoConstants(self): 943 socket.IPPROTO_TCP 944 socket.IPPROTO_UDP 945 if socket.has_ipv6: 946 socket.IPPROTO_IPV6 947 948 @unittest.skipUnless(os.name == "nt", "Windows specific") 949 def testWindowsSpecificConstants(self): 950 socket.IPPROTO_ICLFXBM 951 socket.IPPROTO_ST 952 socket.IPPROTO_CBT 953 socket.IPPROTO_IGP 954 socket.IPPROTO_RDP 955 socket.IPPROTO_PGM 956 socket.IPPROTO_L2TP 957 socket.IPPROTO_SCTP 958 959 @unittest.skipIf(support.is_wasi, "WASI is missing these methods") 960 def test_socket_methods(self): 961 # socket methods that depend on a configure HAVE_ check. They should 962 # be present on all platforms except WASI. 963 names = [ 964 "_accept", "bind", "connect", "connect_ex", "getpeername", 965 "getsockname", "listen", "recvfrom", "recvfrom_into", "sendto", 966 "setsockopt", "shutdown" 967 ] 968 for name in names: 969 if not hasattr(socket.socket, name): 970 self.fail(f"socket method {name} is missing") 971 972 @unittest.skipUnless(sys.platform == 'darwin', 'macOS specific test') 973 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 974 def test3542SocketOptions(self): 975 # Ref. issue #35569 and https://tools.ietf.org/html/rfc3542 976 opts = { 977 'IPV6_CHECKSUM', 978 'IPV6_DONTFRAG', 979 'IPV6_DSTOPTS', 980 'IPV6_HOPLIMIT', 981 'IPV6_HOPOPTS', 982 'IPV6_NEXTHOP', 983 'IPV6_PATHMTU', 984 'IPV6_PKTINFO', 985 'IPV6_RECVDSTOPTS', 986 'IPV6_RECVHOPLIMIT', 987 'IPV6_RECVHOPOPTS', 988 'IPV6_RECVPATHMTU', 989 'IPV6_RECVPKTINFO', 990 'IPV6_RECVRTHDR', 991 'IPV6_RECVTCLASS', 992 'IPV6_RTHDR', 993 'IPV6_RTHDRDSTOPTS', 994 'IPV6_RTHDR_TYPE_0', 995 'IPV6_TCLASS', 996 'IPV6_USE_MIN_MTU', 997 } 998 for opt in opts: 999 self.assertTrue( 1000 hasattr(socket, opt), f"Missing RFC3542 socket option '{opt}'" 1001 ) 1002 1003 def testHostnameRes(self): 1004 # Testing hostname resolution mechanisms 1005 hostname = socket.gethostname() 1006 try: 1007 ip = socket.gethostbyname(hostname) 1008 except OSError: 1009 # Probably name lookup wasn't set up right; skip this test 1010 self.skipTest('name lookup failure') 1011 self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.") 1012 try: 1013 hname, aliases, ipaddrs = socket.gethostbyaddr(ip) 1014 except OSError: 1015 # Probably a similar problem as above; skip this test 1016 self.skipTest('name lookup failure') 1017 all_host_names = [hostname, hname] + aliases 1018 fqhn = socket.getfqdn(ip) 1019 if not fqhn in all_host_names: 1020 self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names))) 1021 1022 def test_host_resolution(self): 1023 for addr in [socket_helper.HOSTv4, '10.0.0.1', '255.255.255.255']: 1024 self.assertEqual(socket.gethostbyname(addr), addr) 1025 1026 # we don't test socket_helper.HOSTv6 because there's a chance it doesn't have 1027 # a matching name entry (e.g. 'ip6-localhost') 1028 for host in [socket_helper.HOSTv4]: 1029 self.assertIn(host, socket.gethostbyaddr(host)[2]) 1030 1031 def test_host_resolution_bad_address(self): 1032 # These are all malformed IP addresses and expected not to resolve to 1033 # any result. But some ISPs, e.g. AWS and AT&T, may successfully 1034 # resolve these IPs. In particular, AT&T's DNS Error Assist service 1035 # will break this test. See https://bugs.python.org/issue42092 for a 1036 # workaround. 1037 explanation = ( 1038 "resolving an invalid IP address did not raise OSError; " 1039 "can be caused by a broken DNS server" 1040 ) 1041 for addr in ['0.1.1.~1', '1+.1.1.1', '::1q', '::1::2', 1042 '1:1:1:1:1:1:1:1:1']: 1043 with self.assertRaises(OSError, msg=addr): 1044 socket.gethostbyname(addr) 1045 with self.assertRaises(OSError, msg=explanation): 1046 socket.gethostbyaddr(addr) 1047 1048 @unittest.skipUnless(hasattr(socket, 'sethostname'), "test needs socket.sethostname()") 1049 @unittest.skipUnless(hasattr(socket, 'gethostname'), "test needs socket.gethostname()") 1050 def test_sethostname(self): 1051 oldhn = socket.gethostname() 1052 try: 1053 socket.sethostname('new') 1054 except OSError as e: 1055 if e.errno == errno.EPERM: 1056 self.skipTest("test should be run as root") 1057 else: 1058 raise 1059 try: 1060 # running test as root! 1061 self.assertEqual(socket.gethostname(), 'new') 1062 # Should work with bytes objects too 1063 socket.sethostname(b'bar') 1064 self.assertEqual(socket.gethostname(), 'bar') 1065 finally: 1066 socket.sethostname(oldhn) 1067 1068 @unittest.skipUnless(hasattr(socket, 'if_nameindex'), 1069 'socket.if_nameindex() not available.') 1070 def testInterfaceNameIndex(self): 1071 interfaces = socket.if_nameindex() 1072 for index, name in interfaces: 1073 self.assertIsInstance(index, int) 1074 self.assertIsInstance(name, str) 1075 # interface indices are non-zero integers 1076 self.assertGreater(index, 0) 1077 _index = socket.if_nametoindex(name) 1078 self.assertIsInstance(_index, int) 1079 self.assertEqual(index, _index) 1080 _name = socket.if_indextoname(index) 1081 self.assertIsInstance(_name, str) 1082 self.assertEqual(name, _name) 1083 1084 @unittest.skipUnless(hasattr(socket, 'if_indextoname'), 1085 'socket.if_indextoname() not available.') 1086 def testInvalidInterfaceIndexToName(self): 1087 self.assertRaises(OSError, socket.if_indextoname, 0) 1088 self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF') 1089 1090 @unittest.skipUnless(hasattr(socket, 'if_nametoindex'), 1091 'socket.if_nametoindex() not available.') 1092 def testInvalidInterfaceNameToIndex(self): 1093 self.assertRaises(TypeError, socket.if_nametoindex, 0) 1094 self.assertRaises(OSError, socket.if_nametoindex, '_DEADBEEF') 1095 1096 @unittest.skipUnless(hasattr(sys, 'getrefcount'), 1097 'test needs sys.getrefcount()') 1098 def testRefCountGetNameInfo(self): 1099 # Testing reference count for getnameinfo 1100 try: 1101 # On some versions, this loses a reference 1102 orig = sys.getrefcount(__name__) 1103 socket.getnameinfo(__name__,0) 1104 except TypeError: 1105 if sys.getrefcount(__name__) != orig: 1106 self.fail("socket.getnameinfo loses a reference") 1107 1108 def testInterpreterCrash(self): 1109 # Making sure getnameinfo doesn't crash the interpreter 1110 try: 1111 # On some versions, this crashes the interpreter. 1112 socket.getnameinfo(('x', 0, 0, 0), 0) 1113 except OSError: 1114 pass 1115 1116 def testNtoH(self): 1117 # This just checks that htons etc. are their own inverse, 1118 # when looking at the lower 16 or 32 bits. 1119 sizes = {socket.htonl: 32, socket.ntohl: 32, 1120 socket.htons: 16, socket.ntohs: 16} 1121 for func, size in sizes.items(): 1122 mask = (1<<size) - 1 1123 for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210): 1124 self.assertEqual(i & mask, func(func(i&mask)) & mask) 1125 1126 swapped = func(mask) 1127 self.assertEqual(swapped & mask, mask) 1128 self.assertRaises(OverflowError, func, 1<<34) 1129 1130 @support.cpython_only 1131 def testNtoHErrors(self): 1132 import _testcapi 1133 s_good_values = [0, 1, 2, 0xffff] 1134 l_good_values = s_good_values + [0xffffffff] 1135 l_bad_values = [-1, -2, 1<<32, 1<<1000] 1136 s_bad_values = ( 1137 l_bad_values + 1138 [_testcapi.INT_MIN-1, _testcapi.INT_MAX+1] + 1139 [1 << 16, _testcapi.INT_MAX] 1140 ) 1141 for k in s_good_values: 1142 socket.ntohs(k) 1143 socket.htons(k) 1144 for k in l_good_values: 1145 socket.ntohl(k) 1146 socket.htonl(k) 1147 for k in s_bad_values: 1148 self.assertRaises(OverflowError, socket.ntohs, k) 1149 self.assertRaises(OverflowError, socket.htons, k) 1150 for k in l_bad_values: 1151 self.assertRaises(OverflowError, socket.ntohl, k) 1152 self.assertRaises(OverflowError, socket.htonl, k) 1153 1154 def testGetServBy(self): 1155 eq = self.assertEqual 1156 # Find one service that exists, then check all the related interfaces. 1157 # I've ordered this by protocols that have both a tcp and udp 1158 # protocol, at least for modern Linuxes. 1159 if (sys.platform.startswith(('freebsd', 'netbsd', 'gnukfreebsd')) 1160 or sys.platform in ('linux', 'darwin')): 1161 # avoid the 'echo' service on this platform, as there is an 1162 # assumption breaking non-standard port/protocol entry 1163 services = ('daytime', 'qotd', 'domain') 1164 else: 1165 services = ('echo', 'daytime', 'domain') 1166 for service in services: 1167 try: 1168 port = socket.getservbyname(service, 'tcp') 1169 break 1170 except OSError: 1171 pass 1172 else: 1173 raise OSError 1174 # Try same call with optional protocol omitted 1175 # Issue #26936: Android getservbyname() was broken before API 23. 1176 if (not hasattr(sys, 'getandroidapilevel') or 1177 sys.getandroidapilevel() >= 23): 1178 port2 = socket.getservbyname(service) 1179 eq(port, port2) 1180 # Try udp, but don't barf if it doesn't exist 1181 try: 1182 udpport = socket.getservbyname(service, 'udp') 1183 except OSError: 1184 udpport = None 1185 else: 1186 eq(udpport, port) 1187 # Now make sure the lookup by port returns the same service name 1188 # Issue #26936: Android getservbyport() is broken. 1189 if not support.is_android: 1190 eq(socket.getservbyport(port2), service) 1191 eq(socket.getservbyport(port, 'tcp'), service) 1192 if udpport is not None: 1193 eq(socket.getservbyport(udpport, 'udp'), service) 1194 # Make sure getservbyport does not accept out of range ports. 1195 self.assertRaises(OverflowError, socket.getservbyport, -1) 1196 self.assertRaises(OverflowError, socket.getservbyport, 65536) 1197 1198 def testDefaultTimeout(self): 1199 # Testing default timeout 1200 # The default timeout should initially be None 1201 self.assertEqual(socket.getdefaulttimeout(), None) 1202 with socket.socket() as s: 1203 self.assertEqual(s.gettimeout(), None) 1204 1205 # Set the default timeout to 10, and see if it propagates 1206 with socket_setdefaulttimeout(10): 1207 self.assertEqual(socket.getdefaulttimeout(), 10) 1208 with socket.socket() as sock: 1209 self.assertEqual(sock.gettimeout(), 10) 1210 1211 # Reset the default timeout to None, and see if it propagates 1212 socket.setdefaulttimeout(None) 1213 self.assertEqual(socket.getdefaulttimeout(), None) 1214 with socket.socket() as sock: 1215 self.assertEqual(sock.gettimeout(), None) 1216 1217 # Check that setting it to an invalid value raises ValueError 1218 self.assertRaises(ValueError, socket.setdefaulttimeout, -1) 1219 1220 # Check that setting it to an invalid type raises TypeError 1221 self.assertRaises(TypeError, socket.setdefaulttimeout, "spam") 1222 1223 @unittest.skipUnless(hasattr(socket, 'inet_aton'), 1224 'test needs socket.inet_aton()') 1225 def testIPv4_inet_aton_fourbytes(self): 1226 # Test that issue1008086 and issue767150 are fixed. 1227 # It must return 4 bytes. 1228 self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0')) 1229 self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255')) 1230 1231 @unittest.skipUnless(hasattr(socket, 'inet_pton'), 1232 'test needs socket.inet_pton()') 1233 def testIPv4toString(self): 1234 from socket import inet_aton as f, inet_pton, AF_INET 1235 g = lambda a: inet_pton(AF_INET, a) 1236 1237 assertInvalid = lambda func,a: self.assertRaises( 1238 (OSError, ValueError), func, a 1239 ) 1240 1241 self.assertEqual(b'\x00\x00\x00\x00', f('0.0.0.0')) 1242 self.assertEqual(b'\xff\x00\xff\x00', f('255.0.255.0')) 1243 self.assertEqual(b'\xaa\xaa\xaa\xaa', f('170.170.170.170')) 1244 self.assertEqual(b'\x01\x02\x03\x04', f('1.2.3.4')) 1245 self.assertEqual(b'\xff\xff\xff\xff', f('255.255.255.255')) 1246 # bpo-29972: inet_pton() doesn't fail on AIX 1247 if not AIX: 1248 assertInvalid(f, '0.0.0.') 1249 assertInvalid(f, '300.0.0.0') 1250 assertInvalid(f, 'a.0.0.0') 1251 assertInvalid(f, '1.2.3.4.5') 1252 assertInvalid(f, '::1') 1253 1254 self.assertEqual(b'\x00\x00\x00\x00', g('0.0.0.0')) 1255 self.assertEqual(b'\xff\x00\xff\x00', g('255.0.255.0')) 1256 self.assertEqual(b'\xaa\xaa\xaa\xaa', g('170.170.170.170')) 1257 self.assertEqual(b'\xff\xff\xff\xff', g('255.255.255.255')) 1258 assertInvalid(g, '0.0.0.') 1259 assertInvalid(g, '300.0.0.0') 1260 assertInvalid(g, 'a.0.0.0') 1261 assertInvalid(g, '1.2.3.4.5') 1262 assertInvalid(g, '::1') 1263 1264 @unittest.skipUnless(hasattr(socket, 'inet_pton'), 1265 'test needs socket.inet_pton()') 1266 def testIPv6toString(self): 1267 try: 1268 from socket import inet_pton, AF_INET6, has_ipv6 1269 if not has_ipv6: 1270 self.skipTest('IPv6 not available') 1271 except ImportError: 1272 self.skipTest('could not import needed symbols from socket') 1273 1274 if sys.platform == "win32": 1275 try: 1276 inet_pton(AF_INET6, '::') 1277 except OSError as e: 1278 if e.winerror == 10022: 1279 self.skipTest('IPv6 might not be supported') 1280 1281 f = lambda a: inet_pton(AF_INET6, a) 1282 assertInvalid = lambda a: self.assertRaises( 1283 (OSError, ValueError), f, a 1284 ) 1285 1286 self.assertEqual(b'\x00' * 16, f('::')) 1287 self.assertEqual(b'\x00' * 16, f('0::0')) 1288 self.assertEqual(b'\x00\x01' + b'\x00' * 14, f('1::')) 1289 self.assertEqual( 1290 b'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae', 1291 f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae') 1292 ) 1293 self.assertEqual( 1294 b'\xad\x42\x0a\xbc' + b'\x00' * 4 + b'\x01\x27\x00\x00\x02\x54\x00\x02', 1295 f('ad42:abc::127:0:254:2') 1296 ) 1297 self.assertEqual(b'\x00\x12\x00\x0a' + b'\x00' * 12, f('12:a::')) 1298 assertInvalid('0x20::') 1299 assertInvalid(':::') 1300 assertInvalid('::0::') 1301 assertInvalid('1::abc::') 1302 assertInvalid('1::abc::def') 1303 assertInvalid('1:2:3:4:5:6') 1304 assertInvalid('1:2:3:4:5:6:') 1305 assertInvalid('1:2:3:4:5:6:7:8:0') 1306 # bpo-29972: inet_pton() doesn't fail on AIX 1307 if not AIX: 1308 assertInvalid('1:2:3:4:5:6:7:8:') 1309 1310 self.assertEqual(b'\x00' * 12 + b'\xfe\x2a\x17\x40', 1311 f('::254.42.23.64') 1312 ) 1313 self.assertEqual( 1314 b'\x00\x42' + b'\x00' * 8 + b'\xa2\x9b\xfe\x2a\x17\x40', 1315 f('42::a29b:254.42.23.64') 1316 ) 1317 self.assertEqual( 1318 b'\x00\x42\xa8\xb9\x00\x00\x00\x02\xff\xff\xa2\x9b\xfe\x2a\x17\x40', 1319 f('42:a8b9:0:2:ffff:a29b:254.42.23.64') 1320 ) 1321 assertInvalid('255.254.253.252') 1322 assertInvalid('1::260.2.3.0') 1323 assertInvalid('1::0.be.e.0') 1324 assertInvalid('1:2:3:4:5:6:7:1.2.3.4') 1325 assertInvalid('::1.2.3.4:0') 1326 assertInvalid('0.100.200.0:3:4:5:6:7:8') 1327 1328 @unittest.skipUnless(hasattr(socket, 'inet_ntop'), 1329 'test needs socket.inet_ntop()') 1330 def testStringToIPv4(self): 1331 from socket import inet_ntoa as f, inet_ntop, AF_INET 1332 g = lambda a: inet_ntop(AF_INET, a) 1333 assertInvalid = lambda func,a: self.assertRaises( 1334 (OSError, ValueError), func, a 1335 ) 1336 1337 self.assertEqual('1.0.1.0', f(b'\x01\x00\x01\x00')) 1338 self.assertEqual('170.85.170.85', f(b'\xaa\x55\xaa\x55')) 1339 self.assertEqual('255.255.255.255', f(b'\xff\xff\xff\xff')) 1340 self.assertEqual('1.2.3.4', f(b'\x01\x02\x03\x04')) 1341 assertInvalid(f, b'\x00' * 3) 1342 assertInvalid(f, b'\x00' * 5) 1343 assertInvalid(f, b'\x00' * 16) 1344 self.assertEqual('170.85.170.85', f(bytearray(b'\xaa\x55\xaa\x55'))) 1345 1346 self.assertEqual('1.0.1.0', g(b'\x01\x00\x01\x00')) 1347 self.assertEqual('170.85.170.85', g(b'\xaa\x55\xaa\x55')) 1348 self.assertEqual('255.255.255.255', g(b'\xff\xff\xff\xff')) 1349 assertInvalid(g, b'\x00' * 3) 1350 assertInvalid(g, b'\x00' * 5) 1351 assertInvalid(g, b'\x00' * 16) 1352 self.assertEqual('170.85.170.85', g(bytearray(b'\xaa\x55\xaa\x55'))) 1353 1354 @unittest.skipUnless(hasattr(socket, 'inet_ntop'), 1355 'test needs socket.inet_ntop()') 1356 def testStringToIPv6(self): 1357 try: 1358 from socket import inet_ntop, AF_INET6, has_ipv6 1359 if not has_ipv6: 1360 self.skipTest('IPv6 not available') 1361 except ImportError: 1362 self.skipTest('could not import needed symbols from socket') 1363 1364 if sys.platform == "win32": 1365 try: 1366 inet_ntop(AF_INET6, b'\x00' * 16) 1367 except OSError as e: 1368 if e.winerror == 10022: 1369 self.skipTest('IPv6 might not be supported') 1370 1371 f = lambda a: inet_ntop(AF_INET6, a) 1372 assertInvalid = lambda a: self.assertRaises( 1373 (OSError, ValueError), f, a 1374 ) 1375 1376 self.assertEqual('::', f(b'\x00' * 16)) 1377 self.assertEqual('::1', f(b'\x00' * 15 + b'\x01')) 1378 self.assertEqual( 1379 'aef:b01:506:1001:ffff:9997:55:170', 1380 f(b'\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70') 1381 ) 1382 self.assertEqual('::1', f(bytearray(b'\x00' * 15 + b'\x01'))) 1383 1384 assertInvalid(b'\x12' * 15) 1385 assertInvalid(b'\x12' * 17) 1386 assertInvalid(b'\x12' * 4) 1387 1388 # XXX The following don't test module-level functionality... 1389 1390 def testSockName(self): 1391 # Testing getsockname() 1392 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1393 self.addCleanup(sock.close) 1394 1395 # Since find_unused_port() is inherently subject to race conditions, we 1396 # call it a couple times if necessary. 1397 for i in itertools.count(): 1398 port = socket_helper.find_unused_port() 1399 try: 1400 sock.bind(("0.0.0.0", port)) 1401 except OSError as e: 1402 if e.errno != errno.EADDRINUSE or i == 5: 1403 raise 1404 else: 1405 break 1406 1407 name = sock.getsockname() 1408 # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate 1409 # it reasonable to get the host's addr in addition to 0.0.0.0. 1410 # At least for eCos. This is required for the S/390 to pass. 1411 try: 1412 my_ip_addr = socket.gethostbyname(socket.gethostname()) 1413 except OSError: 1414 # Probably name lookup wasn't set up right; skip this test 1415 self.skipTest('name lookup failure') 1416 self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0]) 1417 self.assertEqual(name[1], port) 1418 1419 def testGetSockOpt(self): 1420 # Testing getsockopt() 1421 # We know a socket should start without reuse==0 1422 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1423 self.addCleanup(sock.close) 1424 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) 1425 self.assertFalse(reuse != 0, "initial mode is reuse") 1426 1427 def testSetSockOpt(self): 1428 # Testing setsockopt() 1429 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1430 self.addCleanup(sock.close) 1431 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 1432 reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) 1433 self.assertFalse(reuse == 0, "failed to set reuse mode") 1434 1435 def testSendAfterClose(self): 1436 # testing send() after close() with timeout 1437 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: 1438 sock.settimeout(1) 1439 self.assertRaises(OSError, sock.send, b"spam") 1440 1441 def testCloseException(self): 1442 sock = socket.socket() 1443 sock.bind((socket._LOCALHOST, 0)) 1444 socket.socket(fileno=sock.fileno()).close() 1445 try: 1446 sock.close() 1447 except OSError as err: 1448 # Winsock apparently raises ENOTSOCK 1449 self.assertIn(err.errno, (errno.EBADF, errno.ENOTSOCK)) 1450 else: 1451 self.fail("close() should raise EBADF/ENOTSOCK") 1452 1453 def testNewAttributes(self): 1454 # testing .family, .type and .protocol 1455 1456 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: 1457 self.assertEqual(sock.family, socket.AF_INET) 1458 if hasattr(socket, 'SOCK_CLOEXEC'): 1459 self.assertIn(sock.type, 1460 (socket.SOCK_STREAM | socket.SOCK_CLOEXEC, 1461 socket.SOCK_STREAM)) 1462 else: 1463 self.assertEqual(sock.type, socket.SOCK_STREAM) 1464 self.assertEqual(sock.proto, 0) 1465 1466 def test_getsockaddrarg(self): 1467 sock = socket.socket() 1468 self.addCleanup(sock.close) 1469 port = socket_helper.find_unused_port() 1470 big_port = port + 65536 1471 neg_port = port - 65536 1472 self.assertRaises(OverflowError, sock.bind, (HOST, big_port)) 1473 self.assertRaises(OverflowError, sock.bind, (HOST, neg_port)) 1474 # Since find_unused_port() is inherently subject to race conditions, we 1475 # call it a couple times if necessary. 1476 for i in itertools.count(): 1477 port = socket_helper.find_unused_port() 1478 try: 1479 sock.bind((HOST, port)) 1480 except OSError as e: 1481 if e.errno != errno.EADDRINUSE or i == 5: 1482 raise 1483 else: 1484 break 1485 1486 @unittest.skipUnless(os.name == "nt", "Windows specific") 1487 def test_sock_ioctl(self): 1488 self.assertTrue(hasattr(socket.socket, 'ioctl')) 1489 self.assertTrue(hasattr(socket, 'SIO_RCVALL')) 1490 self.assertTrue(hasattr(socket, 'RCVALL_ON')) 1491 self.assertTrue(hasattr(socket, 'RCVALL_OFF')) 1492 self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS')) 1493 s = socket.socket() 1494 self.addCleanup(s.close) 1495 self.assertRaises(ValueError, s.ioctl, -1, None) 1496 s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100)) 1497 1498 @unittest.skipUnless(os.name == "nt", "Windows specific") 1499 @unittest.skipUnless(hasattr(socket, 'SIO_LOOPBACK_FAST_PATH'), 1500 'Loopback fast path support required for this test') 1501 def test_sio_loopback_fast_path(self): 1502 s = socket.socket() 1503 self.addCleanup(s.close) 1504 try: 1505 s.ioctl(socket.SIO_LOOPBACK_FAST_PATH, True) 1506 except OSError as exc: 1507 WSAEOPNOTSUPP = 10045 1508 if exc.winerror == WSAEOPNOTSUPP: 1509 self.skipTest("SIO_LOOPBACK_FAST_PATH is defined but " 1510 "doesn't implemented in this Windows version") 1511 raise 1512 self.assertRaises(TypeError, s.ioctl, socket.SIO_LOOPBACK_FAST_PATH, None) 1513 1514 def testGetaddrinfo(self): 1515 try: 1516 socket.getaddrinfo('localhost', 80) 1517 except socket.gaierror as err: 1518 if err.errno == socket.EAI_SERVICE: 1519 # see http://bugs.python.org/issue1282647 1520 self.skipTest("buggy libc version") 1521 raise 1522 # len of every sequence is supposed to be == 5 1523 for info in socket.getaddrinfo(HOST, None): 1524 self.assertEqual(len(info), 5) 1525 # host can be a domain name, a string representation of an 1526 # IPv4/v6 address or None 1527 socket.getaddrinfo('localhost', 80) 1528 socket.getaddrinfo('127.0.0.1', 80) 1529 socket.getaddrinfo(None, 80) 1530 if socket_helper.IPV6_ENABLED: 1531 socket.getaddrinfo('::1', 80) 1532 # port can be a string service name such as "http", a numeric 1533 # port number or None 1534 # Issue #26936: Android getaddrinfo() was broken before API level 23. 1535 if (not hasattr(sys, 'getandroidapilevel') or 1536 sys.getandroidapilevel() >= 23): 1537 socket.getaddrinfo(HOST, "http") 1538 socket.getaddrinfo(HOST, 80) 1539 socket.getaddrinfo(HOST, None) 1540 # test family and socktype filters 1541 infos = socket.getaddrinfo(HOST, 80, socket.AF_INET, socket.SOCK_STREAM) 1542 for family, type, _, _, _ in infos: 1543 self.assertEqual(family, socket.AF_INET) 1544 self.assertEqual(repr(family), '<AddressFamily.AF_INET: %r>' % family.value) 1545 self.assertEqual(str(family), str(family.value)) 1546 self.assertEqual(type, socket.SOCK_STREAM) 1547 self.assertEqual(repr(type), '<SocketKind.SOCK_STREAM: %r>' % type.value) 1548 self.assertEqual(str(type), str(type.value)) 1549 infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) 1550 for _, socktype, _, _, _ in infos: 1551 self.assertEqual(socktype, socket.SOCK_STREAM) 1552 # test proto and flags arguments 1553 socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP) 1554 socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE) 1555 # a server willing to support both IPv4 and IPv6 will 1556 # usually do this 1557 socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, 1558 socket.AI_PASSIVE) 1559 # test keyword arguments 1560 a = socket.getaddrinfo(HOST, None) 1561 b = socket.getaddrinfo(host=HOST, port=None) 1562 self.assertEqual(a, b) 1563 a = socket.getaddrinfo(HOST, None, socket.AF_INET) 1564 b = socket.getaddrinfo(HOST, None, family=socket.AF_INET) 1565 self.assertEqual(a, b) 1566 a = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) 1567 b = socket.getaddrinfo(HOST, None, type=socket.SOCK_STREAM) 1568 self.assertEqual(a, b) 1569 a = socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP) 1570 b = socket.getaddrinfo(HOST, None, proto=socket.SOL_TCP) 1571 self.assertEqual(a, b) 1572 a = socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE) 1573 b = socket.getaddrinfo(HOST, None, flags=socket.AI_PASSIVE) 1574 self.assertEqual(a, b) 1575 a = socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, 1576 socket.AI_PASSIVE) 1577 b = socket.getaddrinfo(host=None, port=0, family=socket.AF_UNSPEC, 1578 type=socket.SOCK_STREAM, proto=0, 1579 flags=socket.AI_PASSIVE) 1580 self.assertEqual(a, b) 1581 # Issue #6697. 1582 self.assertRaises(UnicodeEncodeError, socket.getaddrinfo, 'localhost', '\uD800') 1583 1584 # Issue 17269: test workaround for OS X platform bug segfault 1585 if hasattr(socket, 'AI_NUMERICSERV'): 1586 try: 1587 # The arguments here are undefined and the call may succeed 1588 # or fail. All we care here is that it doesn't segfault. 1589 socket.getaddrinfo("localhost", None, 0, 0, 0, 1590 socket.AI_NUMERICSERV) 1591 except socket.gaierror: 1592 pass 1593 1594 def test_getnameinfo(self): 1595 # only IP addresses are allowed 1596 self.assertRaises(OSError, socket.getnameinfo, ('mail.python.org',0), 0) 1597 1598 @unittest.skipUnless(support.is_resource_enabled('network'), 1599 'network is not enabled') 1600 def test_idna(self): 1601 # Check for internet access before running test 1602 # (issue #12804, issue #25138). 1603 with socket_helper.transient_internet('python.org'): 1604 socket.gethostbyname('python.org') 1605 1606 # these should all be successful 1607 domain = 'испытание.pythontest.net' 1608 socket.gethostbyname(domain) 1609 socket.gethostbyname_ex(domain) 1610 socket.getaddrinfo(domain,0,socket.AF_UNSPEC,socket.SOCK_STREAM) 1611 # this may not work if the forward lookup chooses the IPv6 address, as that doesn't 1612 # have a reverse entry yet 1613 # socket.gethostbyaddr('испытание.python.org') 1614 1615 def check_sendall_interrupted(self, with_timeout): 1616 # socketpair() is not strictly required, but it makes things easier. 1617 if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'): 1618 self.skipTest("signal.alarm and socket.socketpair required for this test") 1619 # Our signal handlers clobber the C errno by calling a math function 1620 # with an invalid domain value. 1621 def ok_handler(*args): 1622 self.assertRaises(ValueError, math.acosh, 0) 1623 def raising_handler(*args): 1624 self.assertRaises(ValueError, math.acosh, 0) 1625 1 // 0 1626 c, s = socket.socketpair() 1627 old_alarm = signal.signal(signal.SIGALRM, raising_handler) 1628 try: 1629 if with_timeout: 1630 # Just above the one second minimum for signal.alarm 1631 c.settimeout(1.5) 1632 with self.assertRaises(ZeroDivisionError): 1633 signal.alarm(1) 1634 c.sendall(b"x" * support.SOCK_MAX_SIZE) 1635 if with_timeout: 1636 signal.signal(signal.SIGALRM, ok_handler) 1637 signal.alarm(1) 1638 self.assertRaises(TimeoutError, c.sendall, 1639 b"x" * support.SOCK_MAX_SIZE) 1640 finally: 1641 signal.alarm(0) 1642 signal.signal(signal.SIGALRM, old_alarm) 1643 c.close() 1644 s.close() 1645 1646 def test_sendall_interrupted(self): 1647 self.check_sendall_interrupted(False) 1648 1649 def test_sendall_interrupted_with_timeout(self): 1650 self.check_sendall_interrupted(True) 1651 1652 def test_dealloc_warn(self): 1653 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1654 r = repr(sock) 1655 with self.assertWarns(ResourceWarning) as cm: 1656 sock = None 1657 support.gc_collect() 1658 self.assertIn(r, str(cm.warning.args[0])) 1659 # An open socket file object gets dereferenced after the socket 1660 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1661 f = sock.makefile('rb') 1662 r = repr(sock) 1663 sock = None 1664 support.gc_collect() 1665 with self.assertWarns(ResourceWarning): 1666 f = None 1667 support.gc_collect() 1668 1669 def test_name_closed_socketio(self): 1670 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: 1671 fp = sock.makefile("rb") 1672 fp.close() 1673 self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>") 1674 1675 def test_unusable_closed_socketio(self): 1676 with socket.socket() as sock: 1677 fp = sock.makefile("rb", buffering=0) 1678 self.assertTrue(fp.readable()) 1679 self.assertFalse(fp.writable()) 1680 self.assertFalse(fp.seekable()) 1681 fp.close() 1682 self.assertRaises(ValueError, fp.readable) 1683 self.assertRaises(ValueError, fp.writable) 1684 self.assertRaises(ValueError, fp.seekable) 1685 1686 def test_socket_close(self): 1687 sock = socket.socket() 1688 try: 1689 sock.bind((HOST, 0)) 1690 socket.close(sock.fileno()) 1691 with self.assertRaises(OSError): 1692 sock.listen(1) 1693 finally: 1694 with self.assertRaises(OSError): 1695 # sock.close() fails with EBADF 1696 sock.close() 1697 with self.assertRaises(TypeError): 1698 socket.close(None) 1699 with self.assertRaises(OSError): 1700 socket.close(-1) 1701 1702 def test_makefile_mode(self): 1703 for mode in 'r', 'rb', 'rw', 'w', 'wb': 1704 with self.subTest(mode=mode): 1705 with socket.socket() as sock: 1706 encoding = None if "b" in mode else "utf-8" 1707 with sock.makefile(mode, encoding=encoding) as fp: 1708 self.assertEqual(fp.mode, mode) 1709 1710 def test_makefile_invalid_mode(self): 1711 for mode in 'rt', 'x', '+', 'a': 1712 with self.subTest(mode=mode): 1713 with socket.socket() as sock: 1714 with self.assertRaisesRegex(ValueError, 'invalid mode'): 1715 sock.makefile(mode) 1716 1717 def test_pickle(self): 1718 sock = socket.socket() 1719 with sock: 1720 for protocol in range(pickle.HIGHEST_PROTOCOL + 1): 1721 self.assertRaises(TypeError, pickle.dumps, sock, protocol) 1722 for protocol in range(pickle.HIGHEST_PROTOCOL + 1): 1723 family = pickle.loads(pickle.dumps(socket.AF_INET, protocol)) 1724 self.assertEqual(family, socket.AF_INET) 1725 type = pickle.loads(pickle.dumps(socket.SOCK_STREAM, protocol)) 1726 self.assertEqual(type, socket.SOCK_STREAM) 1727 1728 def test_listen_backlog(self): 1729 for backlog in 0, -1: 1730 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: 1731 srv.bind((HOST, 0)) 1732 srv.listen(backlog) 1733 1734 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: 1735 srv.bind((HOST, 0)) 1736 srv.listen() 1737 1738 @support.cpython_only 1739 def test_listen_backlog_overflow(self): 1740 # Issue 15989 1741 import _testcapi 1742 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: 1743 srv.bind((HOST, 0)) 1744 self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1) 1745 1746 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 1747 def test_flowinfo(self): 1748 self.assertRaises(OverflowError, socket.getnameinfo, 1749 (socket_helper.HOSTv6, 0, 0xffffffff), 0) 1750 with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s: 1751 self.assertRaises(OverflowError, s.bind, (socket_helper.HOSTv6, 0, -10)) 1752 1753 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 1754 def test_getaddrinfo_ipv6_basic(self): 1755 ((*_, sockaddr),) = socket.getaddrinfo( 1756 'ff02::1de:c0:face:8D', # Note capital letter `D`. 1757 1234, socket.AF_INET6, 1758 socket.SOCK_DGRAM, 1759 socket.IPPROTO_UDP 1760 ) 1761 self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, 0)) 1762 1763 def test_getfqdn_filter_localhost(self): 1764 self.assertEqual(socket.getfqdn(), socket.getfqdn("0.0.0.0")) 1765 self.assertEqual(socket.getfqdn(), socket.getfqdn("::")) 1766 1767 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 1768 @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows') 1769 @unittest.skipIf(AIX, 'Symbolic scope id does not work') 1770 @unittest.skipUnless(hasattr(socket, 'if_nameindex'), "test needs socket.if_nameindex()") 1771 def test_getaddrinfo_ipv6_scopeid_symbolic(self): 1772 # Just pick up any network interface (Linux, Mac OS X) 1773 (ifindex, test_interface) = socket.if_nameindex()[0] 1774 ((*_, sockaddr),) = socket.getaddrinfo( 1775 'ff02::1de:c0:face:8D%' + test_interface, 1776 1234, socket.AF_INET6, 1777 socket.SOCK_DGRAM, 1778 socket.IPPROTO_UDP 1779 ) 1780 # Note missing interface name part in IPv6 address 1781 self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex)) 1782 1783 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 1784 @unittest.skipUnless( 1785 sys.platform == 'win32', 1786 'Numeric scope id does not work or undocumented') 1787 def test_getaddrinfo_ipv6_scopeid_numeric(self): 1788 # Also works on Linux and Mac OS X, but is not documented (?) 1789 # Windows, Linux and Max OS X allow nonexistent interface numbers here. 1790 ifindex = 42 1791 ((*_, sockaddr),) = socket.getaddrinfo( 1792 'ff02::1de:c0:face:8D%' + str(ifindex), 1793 1234, socket.AF_INET6, 1794 socket.SOCK_DGRAM, 1795 socket.IPPROTO_UDP 1796 ) 1797 # Note missing interface name part in IPv6 address 1798 self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex)) 1799 1800 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 1801 @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows') 1802 @unittest.skipIf(AIX, 'Symbolic scope id does not work') 1803 @unittest.skipUnless(hasattr(socket, 'if_nameindex'), "test needs socket.if_nameindex()") 1804 def test_getnameinfo_ipv6_scopeid_symbolic(self): 1805 # Just pick up any network interface. 1806 (ifindex, test_interface) = socket.if_nameindex()[0] 1807 sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex) # Note capital letter `D`. 1808 nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV) 1809 self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + test_interface, '1234')) 1810 1811 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 1812 @unittest.skipUnless( sys.platform == 'win32', 1813 'Numeric scope id does not work or undocumented') 1814 def test_getnameinfo_ipv6_scopeid_numeric(self): 1815 # Also works on Linux (undocumented), but does not work on Mac OS X 1816 # Windows and Linux allow nonexistent interface numbers here. 1817 ifindex = 42 1818 sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex) # Note capital letter `D`. 1819 nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV) 1820 self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + str(ifindex), '1234')) 1821 1822 def test_str_for_enums(self): 1823 # Make sure that the AF_* and SOCK_* constants have enum-like string 1824 # reprs. 1825 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: 1826 self.assertEqual(repr(s.family), '<AddressFamily.AF_INET: %r>' % s.family.value) 1827 self.assertEqual(repr(s.type), '<SocketKind.SOCK_STREAM: %r>' % s.type.value) 1828 self.assertEqual(str(s.family), str(s.family.value)) 1829 self.assertEqual(str(s.type), str(s.type.value)) 1830 1831 def test_socket_consistent_sock_type(self): 1832 SOCK_NONBLOCK = getattr(socket, 'SOCK_NONBLOCK', 0) 1833 SOCK_CLOEXEC = getattr(socket, 'SOCK_CLOEXEC', 0) 1834 sock_type = socket.SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC 1835 1836 with socket.socket(socket.AF_INET, sock_type) as s: 1837 self.assertEqual(s.type, socket.SOCK_STREAM) 1838 s.settimeout(1) 1839 self.assertEqual(s.type, socket.SOCK_STREAM) 1840 s.settimeout(0) 1841 self.assertEqual(s.type, socket.SOCK_STREAM) 1842 s.setblocking(True) 1843 self.assertEqual(s.type, socket.SOCK_STREAM) 1844 s.setblocking(False) 1845 self.assertEqual(s.type, socket.SOCK_STREAM) 1846 1847 def test_unknown_socket_family_repr(self): 1848 # Test that when created with a family that's not one of the known 1849 # AF_*/SOCK_* constants, socket.family just returns the number. 1850 # 1851 # To do this we fool socket.socket into believing it already has an 1852 # open fd because on this path it doesn't actually verify the family and 1853 # type and populates the socket object. 1854 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1855 fd = sock.detach() 1856 unknown_family = max(socket.AddressFamily.__members__.values()) + 1 1857 1858 unknown_type = max( 1859 kind 1860 for name, kind in socket.SocketKind.__members__.items() 1861 if name not in {'SOCK_NONBLOCK', 'SOCK_CLOEXEC'} 1862 ) + 1 1863 1864 with socket.socket( 1865 family=unknown_family, type=unknown_type, proto=23, 1866 fileno=fd) as s: 1867 self.assertEqual(s.family, unknown_family) 1868 self.assertEqual(s.type, unknown_type) 1869 # some OS like macOS ignore proto 1870 self.assertIn(s.proto, {0, 23}) 1871 1872 @unittest.skipUnless(hasattr(os, 'sendfile'), 'test needs os.sendfile()') 1873 def test__sendfile_use_sendfile(self): 1874 class File: 1875 def __init__(self, fd): 1876 self.fd = fd 1877 1878 def fileno(self): 1879 return self.fd 1880 with socket.socket() as sock: 1881 fd = os.open(os.curdir, os.O_RDONLY) 1882 os.close(fd) 1883 with self.assertRaises(socket._GiveupOnSendfile): 1884 sock._sendfile_use_sendfile(File(fd)) 1885 with self.assertRaises(OverflowError): 1886 sock._sendfile_use_sendfile(File(2**1000)) 1887 with self.assertRaises(TypeError): 1888 sock._sendfile_use_sendfile(File(None)) 1889 1890 def _test_socket_fileno(self, s, family, stype): 1891 self.assertEqual(s.family, family) 1892 self.assertEqual(s.type, stype) 1893 1894 fd = s.fileno() 1895 s2 = socket.socket(fileno=fd) 1896 self.addCleanup(s2.close) 1897 # detach old fd to avoid double close 1898 s.detach() 1899 self.assertEqual(s2.family, family) 1900 self.assertEqual(s2.type, stype) 1901 self.assertEqual(s2.fileno(), fd) 1902 1903 def test_socket_fileno(self): 1904 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1905 self.addCleanup(s.close) 1906 s.bind((socket_helper.HOST, 0)) 1907 self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_STREAM) 1908 1909 if hasattr(socket, "SOCK_DGRAM"): 1910 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1911 self.addCleanup(s.close) 1912 s.bind((socket_helper.HOST, 0)) 1913 self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_DGRAM) 1914 1915 if socket_helper.IPV6_ENABLED: 1916 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 1917 self.addCleanup(s.close) 1918 s.bind((socket_helper.HOSTv6, 0, 0, 0)) 1919 self._test_socket_fileno(s, socket.AF_INET6, socket.SOCK_STREAM) 1920 1921 if hasattr(socket, "AF_UNIX"): 1922 tmpdir = tempfile.mkdtemp() 1923 self.addCleanup(shutil.rmtree, tmpdir) 1924 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 1925 self.addCleanup(s.close) 1926 try: 1927 s.bind(os.path.join(tmpdir, 'socket')) 1928 except PermissionError: 1929 pass 1930 else: 1931 self._test_socket_fileno(s, socket.AF_UNIX, 1932 socket.SOCK_STREAM) 1933 1934 def test_socket_fileno_rejects_float(self): 1935 with self.assertRaises(TypeError): 1936 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=42.5) 1937 1938 def test_socket_fileno_rejects_other_types(self): 1939 with self.assertRaises(TypeError): 1940 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno="foo") 1941 1942 def test_socket_fileno_rejects_invalid_socket(self): 1943 with self.assertRaisesRegex(ValueError, "negative file descriptor"): 1944 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-1) 1945 1946 @unittest.skipIf(os.name == "nt", "Windows disallows -1 only") 1947 def test_socket_fileno_rejects_negative(self): 1948 with self.assertRaisesRegex(ValueError, "negative file descriptor"): 1949 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-42) 1950 1951 def test_socket_fileno_requires_valid_fd(self): 1952 WSAENOTSOCK = 10038 1953 with self.assertRaises(OSError) as cm: 1954 socket.socket(fileno=os_helper.make_bad_fd()) 1955 self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK)) 1956 1957 with self.assertRaises(OSError) as cm: 1958 socket.socket( 1959 socket.AF_INET, 1960 socket.SOCK_STREAM, 1961 fileno=os_helper.make_bad_fd()) 1962 self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK)) 1963 1964 def test_socket_fileno_requires_socket_fd(self): 1965 with tempfile.NamedTemporaryFile() as afile: 1966 with self.assertRaises(OSError): 1967 socket.socket(fileno=afile.fileno()) 1968 1969 with self.assertRaises(OSError) as cm: 1970 socket.socket( 1971 socket.AF_INET, 1972 socket.SOCK_STREAM, 1973 fileno=afile.fileno()) 1974 self.assertEqual(cm.exception.errno, errno.ENOTSOCK) 1975 1976 def test_addressfamily_enum(self): 1977 import _socket, enum 1978 CheckedAddressFamily = enum._old_convert_( 1979 enum.IntEnum, 'AddressFamily', 'socket', 1980 lambda C: C.isupper() and C.startswith('AF_'), 1981 source=_socket, 1982 ) 1983 enum._test_simple_enum(CheckedAddressFamily, socket.AddressFamily) 1984 1985 def test_socketkind_enum(self): 1986 import _socket, enum 1987 CheckedSocketKind = enum._old_convert_( 1988 enum.IntEnum, 'SocketKind', 'socket', 1989 lambda C: C.isupper() and C.startswith('SOCK_'), 1990 source=_socket, 1991 ) 1992 enum._test_simple_enum(CheckedSocketKind, socket.SocketKind) 1993 1994 def test_msgflag_enum(self): 1995 import _socket, enum 1996 CheckedMsgFlag = enum._old_convert_( 1997 enum.IntFlag, 'MsgFlag', 'socket', 1998 lambda C: C.isupper() and C.startswith('MSG_'), 1999 source=_socket, 2000 ) 2001 enum._test_simple_enum(CheckedMsgFlag, socket.MsgFlag) 2002 2003 def test_addressinfo_enum(self): 2004 import _socket, enum 2005 CheckedAddressInfo = enum._old_convert_( 2006 enum.IntFlag, 'AddressInfo', 'socket', 2007 lambda C: C.isupper() and C.startswith('AI_'), 2008 source=_socket) 2009 enum._test_simple_enum(CheckedAddressInfo, socket.AddressInfo) 2010 2011 2012@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') 2013class BasicCANTest(unittest.TestCase): 2014 2015 def testCrucialConstants(self): 2016 socket.AF_CAN 2017 socket.PF_CAN 2018 socket.CAN_RAW 2019 2020 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 2021 'socket.CAN_BCM required for this test.') 2022 def testBCMConstants(self): 2023 socket.CAN_BCM 2024 2025 # opcodes 2026 socket.CAN_BCM_TX_SETUP # create (cyclic) transmission task 2027 socket.CAN_BCM_TX_DELETE # remove (cyclic) transmission task 2028 socket.CAN_BCM_TX_READ # read properties of (cyclic) transmission task 2029 socket.CAN_BCM_TX_SEND # send one CAN frame 2030 socket.CAN_BCM_RX_SETUP # create RX content filter subscription 2031 socket.CAN_BCM_RX_DELETE # remove RX content filter subscription 2032 socket.CAN_BCM_RX_READ # read properties of RX content filter subscription 2033 socket.CAN_BCM_TX_STATUS # reply to TX_READ request 2034 socket.CAN_BCM_TX_EXPIRED # notification on performed transmissions (count=0) 2035 socket.CAN_BCM_RX_STATUS # reply to RX_READ request 2036 socket.CAN_BCM_RX_TIMEOUT # cyclic message is absent 2037 socket.CAN_BCM_RX_CHANGED # updated CAN frame (detected content change) 2038 2039 # flags 2040 socket.CAN_BCM_SETTIMER 2041 socket.CAN_BCM_STARTTIMER 2042 socket.CAN_BCM_TX_COUNTEVT 2043 socket.CAN_BCM_TX_ANNOUNCE 2044 socket.CAN_BCM_TX_CP_CAN_ID 2045 socket.CAN_BCM_RX_FILTER_ID 2046 socket.CAN_BCM_RX_CHECK_DLC 2047 socket.CAN_BCM_RX_NO_AUTOTIMER 2048 socket.CAN_BCM_RX_ANNOUNCE_RESUME 2049 socket.CAN_BCM_TX_RESET_MULTI_IDX 2050 socket.CAN_BCM_RX_RTR_FRAME 2051 2052 def testCreateSocket(self): 2053 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 2054 pass 2055 2056 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 2057 'socket.CAN_BCM required for this test.') 2058 def testCreateBCMSocket(self): 2059 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) as s: 2060 pass 2061 2062 def testBindAny(self): 2063 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 2064 address = ('', ) 2065 s.bind(address) 2066 self.assertEqual(s.getsockname(), address) 2067 2068 def testTooLongInterfaceName(self): 2069 # most systems limit IFNAMSIZ to 16, take 1024 to be sure 2070 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 2071 self.assertRaisesRegex(OSError, 'interface name too long', 2072 s.bind, ('x' * 1024,)) 2073 2074 @unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"), 2075 'socket.CAN_RAW_LOOPBACK required for this test.') 2076 def testLoopback(self): 2077 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 2078 for loopback in (0, 1): 2079 s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK, 2080 loopback) 2081 self.assertEqual(loopback, 2082 s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK)) 2083 2084 @unittest.skipUnless(hasattr(socket, "CAN_RAW_FILTER"), 2085 'socket.CAN_RAW_FILTER required for this test.') 2086 def testFilter(self): 2087 can_id, can_mask = 0x200, 0x700 2088 can_filter = struct.pack("=II", can_id, can_mask) 2089 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 2090 s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, can_filter) 2091 self.assertEqual(can_filter, 2092 s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, 8)) 2093 s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, bytearray(can_filter)) 2094 2095 2096@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.') 2097class CANTest(ThreadedCANSocketTest): 2098 2099 def __init__(self, methodName='runTest'): 2100 ThreadedCANSocketTest.__init__(self, methodName=methodName) 2101 2102 @classmethod 2103 def build_can_frame(cls, can_id, data): 2104 """Build a CAN frame.""" 2105 can_dlc = len(data) 2106 data = data.ljust(8, b'\x00') 2107 return struct.pack(cls.can_frame_fmt, can_id, can_dlc, data) 2108 2109 @classmethod 2110 def dissect_can_frame(cls, frame): 2111 """Dissect a CAN frame.""" 2112 can_id, can_dlc, data = struct.unpack(cls.can_frame_fmt, frame) 2113 return (can_id, can_dlc, data[:can_dlc]) 2114 2115 def testSendFrame(self): 2116 cf, addr = self.s.recvfrom(self.bufsize) 2117 self.assertEqual(self.cf, cf) 2118 self.assertEqual(addr[0], self.interface) 2119 2120 def _testSendFrame(self): 2121 self.cf = self.build_can_frame(0x00, b'\x01\x02\x03\x04\x05') 2122 self.cli.send(self.cf) 2123 2124 def testSendMaxFrame(self): 2125 cf, addr = self.s.recvfrom(self.bufsize) 2126 self.assertEqual(self.cf, cf) 2127 2128 def _testSendMaxFrame(self): 2129 self.cf = self.build_can_frame(0x00, b'\x07' * 8) 2130 self.cli.send(self.cf) 2131 2132 def testSendMultiFrames(self): 2133 cf, addr = self.s.recvfrom(self.bufsize) 2134 self.assertEqual(self.cf1, cf) 2135 2136 cf, addr = self.s.recvfrom(self.bufsize) 2137 self.assertEqual(self.cf2, cf) 2138 2139 def _testSendMultiFrames(self): 2140 self.cf1 = self.build_can_frame(0x07, b'\x44\x33\x22\x11') 2141 self.cli.send(self.cf1) 2142 2143 self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33') 2144 self.cli.send(self.cf2) 2145 2146 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 2147 'socket.CAN_BCM required for this test.') 2148 def _testBCM(self): 2149 cf, addr = self.cli.recvfrom(self.bufsize) 2150 self.assertEqual(self.cf, cf) 2151 can_id, can_dlc, data = self.dissect_can_frame(cf) 2152 self.assertEqual(self.can_id, can_id) 2153 self.assertEqual(self.data, data) 2154 2155 @unittest.skipUnless(hasattr(socket, "CAN_BCM"), 2156 'socket.CAN_BCM required for this test.') 2157 def testBCM(self): 2158 bcm = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) 2159 self.addCleanup(bcm.close) 2160 bcm.connect((self.interface,)) 2161 self.can_id = 0x123 2162 self.data = bytes([0xc0, 0xff, 0xee]) 2163 self.cf = self.build_can_frame(self.can_id, self.data) 2164 opcode = socket.CAN_BCM_TX_SEND 2165 flags = 0 2166 count = 0 2167 ival1_seconds = ival1_usec = ival2_seconds = ival2_usec = 0 2168 bcm_can_id = 0x0222 2169 nframes = 1 2170 assert len(self.cf) == 16 2171 header = struct.pack(self.bcm_cmd_msg_fmt, 2172 opcode, 2173 flags, 2174 count, 2175 ival1_seconds, 2176 ival1_usec, 2177 ival2_seconds, 2178 ival2_usec, 2179 bcm_can_id, 2180 nframes, 2181 ) 2182 header_plus_frame = header + self.cf 2183 bytes_sent = bcm.send(header_plus_frame) 2184 self.assertEqual(bytes_sent, len(header_plus_frame)) 2185 2186 2187@unittest.skipUnless(HAVE_SOCKET_CAN_ISOTP, 'CAN ISOTP required for this test.') 2188class ISOTPTest(unittest.TestCase): 2189 2190 def __init__(self, *args, **kwargs): 2191 super().__init__(*args, **kwargs) 2192 self.interface = "vcan0" 2193 2194 def testCrucialConstants(self): 2195 socket.AF_CAN 2196 socket.PF_CAN 2197 socket.CAN_ISOTP 2198 socket.SOCK_DGRAM 2199 2200 def testCreateSocket(self): 2201 with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: 2202 pass 2203 2204 @unittest.skipUnless(hasattr(socket, "CAN_ISOTP"), 2205 'socket.CAN_ISOTP required for this test.') 2206 def testCreateISOTPSocket(self): 2207 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s: 2208 pass 2209 2210 def testTooLongInterfaceName(self): 2211 # most systems limit IFNAMSIZ to 16, take 1024 to be sure 2212 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s: 2213 with self.assertRaisesRegex(OSError, 'interface name too long'): 2214 s.bind(('x' * 1024, 1, 2)) 2215 2216 def testBind(self): 2217 try: 2218 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s: 2219 addr = self.interface, 0x123, 0x456 2220 s.bind(addr) 2221 self.assertEqual(s.getsockname(), addr) 2222 except OSError as e: 2223 if e.errno == errno.ENODEV: 2224 self.skipTest('network interface `%s` does not exist' % 2225 self.interface) 2226 else: 2227 raise 2228 2229 2230@unittest.skipUnless(HAVE_SOCKET_CAN_J1939, 'CAN J1939 required for this test.') 2231class J1939Test(unittest.TestCase): 2232 2233 def __init__(self, *args, **kwargs): 2234 super().__init__(*args, **kwargs) 2235 self.interface = "vcan0" 2236 2237 @unittest.skipUnless(hasattr(socket, "CAN_J1939"), 2238 'socket.CAN_J1939 required for this test.') 2239 def testJ1939Constants(self): 2240 socket.CAN_J1939 2241 2242 socket.J1939_MAX_UNICAST_ADDR 2243 socket.J1939_IDLE_ADDR 2244 socket.J1939_NO_ADDR 2245 socket.J1939_NO_NAME 2246 socket.J1939_PGN_REQUEST 2247 socket.J1939_PGN_ADDRESS_CLAIMED 2248 socket.J1939_PGN_ADDRESS_COMMANDED 2249 socket.J1939_PGN_PDU1_MAX 2250 socket.J1939_PGN_MAX 2251 socket.J1939_NO_PGN 2252 2253 # J1939 socket options 2254 socket.SO_J1939_FILTER 2255 socket.SO_J1939_PROMISC 2256 socket.SO_J1939_SEND_PRIO 2257 socket.SO_J1939_ERRQUEUE 2258 2259 socket.SCM_J1939_DEST_ADDR 2260 socket.SCM_J1939_DEST_NAME 2261 socket.SCM_J1939_PRIO 2262 socket.SCM_J1939_ERRQUEUE 2263 2264 socket.J1939_NLA_PAD 2265 socket.J1939_NLA_BYTES_ACKED 2266 2267 socket.J1939_EE_INFO_NONE 2268 socket.J1939_EE_INFO_TX_ABORT 2269 2270 socket.J1939_FILTER_MAX 2271 2272 @unittest.skipUnless(hasattr(socket, "CAN_J1939"), 2273 'socket.CAN_J1939 required for this test.') 2274 def testCreateJ1939Socket(self): 2275 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) as s: 2276 pass 2277 2278 def testBind(self): 2279 try: 2280 with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) as s: 2281 addr = self.interface, socket.J1939_NO_NAME, socket.J1939_NO_PGN, socket.J1939_NO_ADDR 2282 s.bind(addr) 2283 self.assertEqual(s.getsockname(), addr) 2284 except OSError as e: 2285 if e.errno == errno.ENODEV: 2286 self.skipTest('network interface `%s` does not exist' % 2287 self.interface) 2288 else: 2289 raise 2290 2291 2292@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.') 2293class BasicRDSTest(unittest.TestCase): 2294 2295 def testCrucialConstants(self): 2296 socket.AF_RDS 2297 socket.PF_RDS 2298 2299 def testCreateSocket(self): 2300 with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s: 2301 pass 2302 2303 def testSocketBufferSize(self): 2304 bufsize = 16384 2305 with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s: 2306 s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, bufsize) 2307 s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, bufsize) 2308 2309 2310@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.') 2311class RDSTest(ThreadedRDSSocketTest): 2312 2313 def __init__(self, methodName='runTest'): 2314 ThreadedRDSSocketTest.__init__(self, methodName=methodName) 2315 2316 def setUp(self): 2317 super().setUp() 2318 self.evt = threading.Event() 2319 2320 def testSendAndRecv(self): 2321 data, addr = self.serv.recvfrom(self.bufsize) 2322 self.assertEqual(self.data, data) 2323 self.assertEqual(self.cli_addr, addr) 2324 2325 def _testSendAndRecv(self): 2326 self.data = b'spam' 2327 self.cli.sendto(self.data, 0, (HOST, self.port)) 2328 2329 def testPeek(self): 2330 data, addr = self.serv.recvfrom(self.bufsize, socket.MSG_PEEK) 2331 self.assertEqual(self.data, data) 2332 data, addr = self.serv.recvfrom(self.bufsize) 2333 self.assertEqual(self.data, data) 2334 2335 def _testPeek(self): 2336 self.data = b'spam' 2337 self.cli.sendto(self.data, 0, (HOST, self.port)) 2338 2339 @requireAttrs(socket.socket, 'recvmsg') 2340 def testSendAndRecvMsg(self): 2341 data, ancdata, msg_flags, addr = self.serv.recvmsg(self.bufsize) 2342 self.assertEqual(self.data, data) 2343 2344 @requireAttrs(socket.socket, 'sendmsg') 2345 def _testSendAndRecvMsg(self): 2346 self.data = b'hello ' * 10 2347 self.cli.sendmsg([self.data], (), 0, (HOST, self.port)) 2348 2349 def testSendAndRecvMulti(self): 2350 data, addr = self.serv.recvfrom(self.bufsize) 2351 self.assertEqual(self.data1, data) 2352 2353 data, addr = self.serv.recvfrom(self.bufsize) 2354 self.assertEqual(self.data2, data) 2355 2356 def _testSendAndRecvMulti(self): 2357 self.data1 = b'bacon' 2358 self.cli.sendto(self.data1, 0, (HOST, self.port)) 2359 2360 self.data2 = b'egg' 2361 self.cli.sendto(self.data2, 0, (HOST, self.port)) 2362 2363 def testSelect(self): 2364 r, w, x = select.select([self.serv], [], [], 3.0) 2365 self.assertIn(self.serv, r) 2366 data, addr = self.serv.recvfrom(self.bufsize) 2367 self.assertEqual(self.data, data) 2368 2369 def _testSelect(self): 2370 self.data = b'select' 2371 self.cli.sendto(self.data, 0, (HOST, self.port)) 2372 2373@unittest.skipUnless(HAVE_SOCKET_QIPCRTR, 2374 'QIPCRTR sockets required for this test.') 2375class BasicQIPCRTRTest(unittest.TestCase): 2376 2377 def testCrucialConstants(self): 2378 socket.AF_QIPCRTR 2379 2380 def testCreateSocket(self): 2381 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2382 pass 2383 2384 def testUnbound(self): 2385 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2386 self.assertEqual(s.getsockname()[1], 0) 2387 2388 def testBindSock(self): 2389 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2390 socket_helper.bind_port(s, host=s.getsockname()[0]) 2391 self.assertNotEqual(s.getsockname()[1], 0) 2392 2393 def testInvalidBindSock(self): 2394 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2395 self.assertRaises(OSError, socket_helper.bind_port, s, host=-2) 2396 2397 def testAutoBindSock(self): 2398 with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s: 2399 s.connect((123, 123)) 2400 self.assertNotEqual(s.getsockname()[1], 0) 2401 2402@unittest.skipIf(fcntl is None, "need fcntl") 2403@unittest.skipUnless(HAVE_SOCKET_VSOCK, 2404 'VSOCK sockets required for this test.') 2405class BasicVSOCKTest(unittest.TestCase): 2406 2407 def testCrucialConstants(self): 2408 socket.AF_VSOCK 2409 2410 def testVSOCKConstants(self): 2411 socket.SO_VM_SOCKETS_BUFFER_SIZE 2412 socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE 2413 socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE 2414 socket.VMADDR_CID_ANY 2415 socket.VMADDR_PORT_ANY 2416 socket.VMADDR_CID_HOST 2417 socket.VM_SOCKETS_INVALID_VERSION 2418 socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID 2419 2420 def testCreateSocket(self): 2421 with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s: 2422 pass 2423 2424 def testSocketBufferSize(self): 2425 with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s: 2426 orig_max = s.getsockopt(socket.AF_VSOCK, 2427 socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE) 2428 orig = s.getsockopt(socket.AF_VSOCK, 2429 socket.SO_VM_SOCKETS_BUFFER_SIZE) 2430 orig_min = s.getsockopt(socket.AF_VSOCK, 2431 socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE) 2432 2433 s.setsockopt(socket.AF_VSOCK, 2434 socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE, orig_max * 2) 2435 s.setsockopt(socket.AF_VSOCK, 2436 socket.SO_VM_SOCKETS_BUFFER_SIZE, orig * 2) 2437 s.setsockopt(socket.AF_VSOCK, 2438 socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE, orig_min * 2) 2439 2440 self.assertEqual(orig_max * 2, 2441 s.getsockopt(socket.AF_VSOCK, 2442 socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE)) 2443 self.assertEqual(orig * 2, 2444 s.getsockopt(socket.AF_VSOCK, 2445 socket.SO_VM_SOCKETS_BUFFER_SIZE)) 2446 self.assertEqual(orig_min * 2, 2447 s.getsockopt(socket.AF_VSOCK, 2448 socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE)) 2449 2450 2451@unittest.skipUnless(HAVE_SOCKET_BLUETOOTH, 2452 'Bluetooth sockets required for this test.') 2453class BasicBluetoothTest(unittest.TestCase): 2454 2455 def testBluetoothConstants(self): 2456 socket.BDADDR_ANY 2457 socket.BDADDR_LOCAL 2458 socket.AF_BLUETOOTH 2459 socket.BTPROTO_RFCOMM 2460 2461 if sys.platform != "win32": 2462 socket.BTPROTO_HCI 2463 socket.SOL_HCI 2464 socket.BTPROTO_L2CAP 2465 2466 if not sys.platform.startswith("freebsd"): 2467 socket.BTPROTO_SCO 2468 2469 def testCreateRfcommSocket(self): 2470 with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s: 2471 pass 2472 2473 @unittest.skipIf(sys.platform == "win32", "windows does not support L2CAP sockets") 2474 def testCreateL2capSocket(self): 2475 with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) as s: 2476 pass 2477 2478 @unittest.skipIf(sys.platform == "win32", "windows does not support HCI sockets") 2479 def testCreateHciSocket(self): 2480 with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s: 2481 pass 2482 2483 @unittest.skipIf(sys.platform == "win32" or sys.platform.startswith("freebsd"), 2484 "windows and freebsd do not support SCO sockets") 2485 def testCreateScoSocket(self): 2486 with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s: 2487 pass 2488 2489 2490class BasicTCPTest(SocketConnectedTest): 2491 2492 def __init__(self, methodName='runTest'): 2493 SocketConnectedTest.__init__(self, methodName=methodName) 2494 2495 def testRecv(self): 2496 # Testing large receive over TCP 2497 msg = self.cli_conn.recv(1024) 2498 self.assertEqual(msg, MSG) 2499 2500 def _testRecv(self): 2501 self.serv_conn.send(MSG) 2502 2503 def testOverFlowRecv(self): 2504 # Testing receive in chunks over TCP 2505 seg1 = self.cli_conn.recv(len(MSG) - 3) 2506 seg2 = self.cli_conn.recv(1024) 2507 msg = seg1 + seg2 2508 self.assertEqual(msg, MSG) 2509 2510 def _testOverFlowRecv(self): 2511 self.serv_conn.send(MSG) 2512 2513 def testRecvFrom(self): 2514 # Testing large recvfrom() over TCP 2515 msg, addr = self.cli_conn.recvfrom(1024) 2516 self.assertEqual(msg, MSG) 2517 2518 def _testRecvFrom(self): 2519 self.serv_conn.send(MSG) 2520 2521 def testOverFlowRecvFrom(self): 2522 # Testing recvfrom() in chunks over TCP 2523 seg1, addr = self.cli_conn.recvfrom(len(MSG)-3) 2524 seg2, addr = self.cli_conn.recvfrom(1024) 2525 msg = seg1 + seg2 2526 self.assertEqual(msg, MSG) 2527 2528 def _testOverFlowRecvFrom(self): 2529 self.serv_conn.send(MSG) 2530 2531 def testSendAll(self): 2532 # Testing sendall() with a 2048 byte string over TCP 2533 msg = b'' 2534 while 1: 2535 read = self.cli_conn.recv(1024) 2536 if not read: 2537 break 2538 msg += read 2539 self.assertEqual(msg, b'f' * 2048) 2540 2541 def _testSendAll(self): 2542 big_chunk = b'f' * 2048 2543 self.serv_conn.sendall(big_chunk) 2544 2545 def testFromFd(self): 2546 # Testing fromfd() 2547 fd = self.cli_conn.fileno() 2548 sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 2549 self.addCleanup(sock.close) 2550 self.assertIsInstance(sock, socket.socket) 2551 msg = sock.recv(1024) 2552 self.assertEqual(msg, MSG) 2553 2554 def _testFromFd(self): 2555 self.serv_conn.send(MSG) 2556 2557 def testDup(self): 2558 # Testing dup() 2559 sock = self.cli_conn.dup() 2560 self.addCleanup(sock.close) 2561 msg = sock.recv(1024) 2562 self.assertEqual(msg, MSG) 2563 2564 def _testDup(self): 2565 self.serv_conn.send(MSG) 2566 2567 def testShutdown(self): 2568 # Testing shutdown() 2569 msg = self.cli_conn.recv(1024) 2570 self.assertEqual(msg, MSG) 2571 # wait for _testShutdown to finish: on OS X, when the server 2572 # closes the connection the client also becomes disconnected, 2573 # and the client's shutdown call will fail. (Issue #4397.) 2574 self.done.wait() 2575 2576 def _testShutdown(self): 2577 self.serv_conn.send(MSG) 2578 self.serv_conn.shutdown(2) 2579 2580 testShutdown_overflow = support.cpython_only(testShutdown) 2581 2582 @support.cpython_only 2583 def _testShutdown_overflow(self): 2584 import _testcapi 2585 self.serv_conn.send(MSG) 2586 # Issue 15989 2587 self.assertRaises(OverflowError, self.serv_conn.shutdown, 2588 _testcapi.INT_MAX + 1) 2589 self.assertRaises(OverflowError, self.serv_conn.shutdown, 2590 2 + (_testcapi.UINT_MAX + 1)) 2591 self.serv_conn.shutdown(2) 2592 2593 def testDetach(self): 2594 # Testing detach() 2595 fileno = self.cli_conn.fileno() 2596 f = self.cli_conn.detach() 2597 self.assertEqual(f, fileno) 2598 # cli_conn cannot be used anymore... 2599 self.assertTrue(self.cli_conn._closed) 2600 self.assertRaises(OSError, self.cli_conn.recv, 1024) 2601 self.cli_conn.close() 2602 # ...but we can create another socket using the (still open) 2603 # file descriptor 2604 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=f) 2605 self.addCleanup(sock.close) 2606 msg = sock.recv(1024) 2607 self.assertEqual(msg, MSG) 2608 2609 def _testDetach(self): 2610 self.serv_conn.send(MSG) 2611 2612 2613class BasicUDPTest(ThreadedUDPSocketTest): 2614 2615 def __init__(self, methodName='runTest'): 2616 ThreadedUDPSocketTest.__init__(self, methodName=methodName) 2617 2618 def testSendtoAndRecv(self): 2619 # Testing sendto() and Recv() over UDP 2620 msg = self.serv.recv(len(MSG)) 2621 self.assertEqual(msg, MSG) 2622 2623 def _testSendtoAndRecv(self): 2624 self.cli.sendto(MSG, 0, (HOST, self.port)) 2625 2626 def testRecvFrom(self): 2627 # Testing recvfrom() over UDP 2628 msg, addr = self.serv.recvfrom(len(MSG)) 2629 self.assertEqual(msg, MSG) 2630 2631 def _testRecvFrom(self): 2632 self.cli.sendto(MSG, 0, (HOST, self.port)) 2633 2634 def testRecvFromNegative(self): 2635 # Negative lengths passed to recvfrom should give ValueError. 2636 self.assertRaises(ValueError, self.serv.recvfrom, -1) 2637 2638 def _testRecvFromNegative(self): 2639 self.cli.sendto(MSG, 0, (HOST, self.port)) 2640 2641 2642@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 2643 'UDPLITE sockets required for this test.') 2644class BasicUDPLITETest(ThreadedUDPLITESocketTest): 2645 2646 def __init__(self, methodName='runTest'): 2647 ThreadedUDPLITESocketTest.__init__(self, methodName=methodName) 2648 2649 def testSendtoAndRecv(self): 2650 # Testing sendto() and Recv() over UDPLITE 2651 msg = self.serv.recv(len(MSG)) 2652 self.assertEqual(msg, MSG) 2653 2654 def _testSendtoAndRecv(self): 2655 self.cli.sendto(MSG, 0, (HOST, self.port)) 2656 2657 def testRecvFrom(self): 2658 # Testing recvfrom() over UDPLITE 2659 msg, addr = self.serv.recvfrom(len(MSG)) 2660 self.assertEqual(msg, MSG) 2661 2662 def _testRecvFrom(self): 2663 self.cli.sendto(MSG, 0, (HOST, self.port)) 2664 2665 def testRecvFromNegative(self): 2666 # Negative lengths passed to recvfrom should give ValueError. 2667 self.assertRaises(ValueError, self.serv.recvfrom, -1) 2668 2669 def _testRecvFromNegative(self): 2670 self.cli.sendto(MSG, 0, (HOST, self.port)) 2671 2672# Tests for the sendmsg()/recvmsg() interface. Where possible, the 2673# same test code is used with different families and types of socket 2674# (e.g. stream, datagram), and tests using recvmsg() are repeated 2675# using recvmsg_into(). 2676# 2677# The generic test classes such as SendmsgTests and 2678# RecvmsgGenericTests inherit from SendrecvmsgBase and expect to be 2679# supplied with sockets cli_sock and serv_sock representing the 2680# client's and the server's end of the connection respectively, and 2681# attributes cli_addr and serv_addr holding their (numeric where 2682# appropriate) addresses. 2683# 2684# The final concrete test classes combine these with subclasses of 2685# SocketTestBase which set up client and server sockets of a specific 2686# type, and with subclasses of SendrecvmsgBase such as 2687# SendrecvmsgDgramBase and SendrecvmsgConnectedBase which map these 2688# sockets to cli_sock and serv_sock and override the methods and 2689# attributes of SendrecvmsgBase to fill in destination addresses if 2690# needed when sending, check for specific flags in msg_flags, etc. 2691# 2692# RecvmsgIntoMixin provides a version of doRecvmsg() implemented using 2693# recvmsg_into(). 2694 2695# XXX: like the other datagram (UDP) tests in this module, the code 2696# here assumes that datagram delivery on the local machine will be 2697# reliable. 2698 2699class SendrecvmsgBase(ThreadSafeCleanupTestCase): 2700 # Base class for sendmsg()/recvmsg() tests. 2701 2702 # Time in seconds to wait before considering a test failed, or 2703 # None for no timeout. Not all tests actually set a timeout. 2704 fail_timeout = support.LOOPBACK_TIMEOUT 2705 2706 def setUp(self): 2707 self.misc_event = threading.Event() 2708 super().setUp() 2709 2710 def sendToServer(self, msg): 2711 # Send msg to the server. 2712 return self.cli_sock.send(msg) 2713 2714 # Tuple of alternative default arguments for sendmsg() when called 2715 # via sendmsgToServer() (e.g. to include a destination address). 2716 sendmsg_to_server_defaults = () 2717 2718 def sendmsgToServer(self, *args): 2719 # Call sendmsg() on self.cli_sock with the given arguments, 2720 # filling in any arguments which are not supplied with the 2721 # corresponding items of self.sendmsg_to_server_defaults, if 2722 # any. 2723 return self.cli_sock.sendmsg( 2724 *(args + self.sendmsg_to_server_defaults[len(args):])) 2725 2726 def doRecvmsg(self, sock, bufsize, *args): 2727 # Call recvmsg() on sock with given arguments and return its 2728 # result. Should be used for tests which can use either 2729 # recvmsg() or recvmsg_into() - RecvmsgIntoMixin overrides 2730 # this method with one which emulates it using recvmsg_into(), 2731 # thus allowing the same test to be used for both methods. 2732 result = sock.recvmsg(bufsize, *args) 2733 self.registerRecvmsgResult(result) 2734 return result 2735 2736 def registerRecvmsgResult(self, result): 2737 # Called by doRecvmsg() with the return value of recvmsg() or 2738 # recvmsg_into(). Can be overridden to arrange cleanup based 2739 # on the returned ancillary data, for instance. 2740 pass 2741 2742 def checkRecvmsgAddress(self, addr1, addr2): 2743 # Called to compare the received address with the address of 2744 # the peer. 2745 self.assertEqual(addr1, addr2) 2746 2747 # Flags that are normally unset in msg_flags 2748 msg_flags_common_unset = 0 2749 for name in ("MSG_CTRUNC", "MSG_OOB"): 2750 msg_flags_common_unset |= getattr(socket, name, 0) 2751 2752 # Flags that are normally set 2753 msg_flags_common_set = 0 2754 2755 # Flags set when a complete record has been received (e.g. MSG_EOR 2756 # for SCTP) 2757 msg_flags_eor_indicator = 0 2758 2759 # Flags set when a complete record has not been received 2760 # (e.g. MSG_TRUNC for datagram sockets) 2761 msg_flags_non_eor_indicator = 0 2762 2763 def checkFlags(self, flags, eor=None, checkset=0, checkunset=0, ignore=0): 2764 # Method to check the value of msg_flags returned by recvmsg[_into](). 2765 # 2766 # Checks that all bits in msg_flags_common_set attribute are 2767 # set in "flags" and all bits in msg_flags_common_unset are 2768 # unset. 2769 # 2770 # The "eor" argument specifies whether the flags should 2771 # indicate that a full record (or datagram) has been received. 2772 # If "eor" is None, no checks are done; otherwise, checks 2773 # that: 2774 # 2775 # * if "eor" is true, all bits in msg_flags_eor_indicator are 2776 # set and all bits in msg_flags_non_eor_indicator are unset 2777 # 2778 # * if "eor" is false, all bits in msg_flags_non_eor_indicator 2779 # are set and all bits in msg_flags_eor_indicator are unset 2780 # 2781 # If "checkset" and/or "checkunset" are supplied, they require 2782 # the given bits to be set or unset respectively, overriding 2783 # what the attributes require for those bits. 2784 # 2785 # If any bits are set in "ignore", they will not be checked, 2786 # regardless of the other inputs. 2787 # 2788 # Will raise Exception if the inputs require a bit to be both 2789 # set and unset, and it is not ignored. 2790 2791 defaultset = self.msg_flags_common_set 2792 defaultunset = self.msg_flags_common_unset 2793 2794 if eor: 2795 defaultset |= self.msg_flags_eor_indicator 2796 defaultunset |= self.msg_flags_non_eor_indicator 2797 elif eor is not None: 2798 defaultset |= self.msg_flags_non_eor_indicator 2799 defaultunset |= self.msg_flags_eor_indicator 2800 2801 # Function arguments override defaults 2802 defaultset &= ~checkunset 2803 defaultunset &= ~checkset 2804 2805 # Merge arguments with remaining defaults, and check for conflicts 2806 checkset |= defaultset 2807 checkunset |= defaultunset 2808 inboth = checkset & checkunset & ~ignore 2809 if inboth: 2810 raise Exception("contradictory set, unset requirements for flags " 2811 "{0:#x}".format(inboth)) 2812 2813 # Compare with given msg_flags value 2814 mask = (checkset | checkunset) & ~ignore 2815 self.assertEqual(flags & mask, checkset & mask) 2816 2817 2818class RecvmsgIntoMixin(SendrecvmsgBase): 2819 # Mixin to implement doRecvmsg() using recvmsg_into(). 2820 2821 def doRecvmsg(self, sock, bufsize, *args): 2822 buf = bytearray(bufsize) 2823 result = sock.recvmsg_into([buf], *args) 2824 self.registerRecvmsgResult(result) 2825 self.assertGreaterEqual(result[0], 0) 2826 self.assertLessEqual(result[0], bufsize) 2827 return (bytes(buf[:result[0]]),) + result[1:] 2828 2829 2830class SendrecvmsgDgramFlagsBase(SendrecvmsgBase): 2831 # Defines flags to be checked in msg_flags for datagram sockets. 2832 2833 @property 2834 def msg_flags_non_eor_indicator(self): 2835 return super().msg_flags_non_eor_indicator | socket.MSG_TRUNC 2836 2837 2838class SendrecvmsgSCTPFlagsBase(SendrecvmsgBase): 2839 # Defines flags to be checked in msg_flags for SCTP sockets. 2840 2841 @property 2842 def msg_flags_eor_indicator(self): 2843 return super().msg_flags_eor_indicator | socket.MSG_EOR 2844 2845 2846class SendrecvmsgConnectionlessBase(SendrecvmsgBase): 2847 # Base class for tests on connectionless-mode sockets. Users must 2848 # supply sockets on attributes cli and serv to be mapped to 2849 # cli_sock and serv_sock respectively. 2850 2851 @property 2852 def serv_sock(self): 2853 return self.serv 2854 2855 @property 2856 def cli_sock(self): 2857 return self.cli 2858 2859 @property 2860 def sendmsg_to_server_defaults(self): 2861 return ([], [], 0, self.serv_addr) 2862 2863 def sendToServer(self, msg): 2864 return self.cli_sock.sendto(msg, self.serv_addr) 2865 2866 2867class SendrecvmsgConnectedBase(SendrecvmsgBase): 2868 # Base class for tests on connected sockets. Users must supply 2869 # sockets on attributes serv_conn and cli_conn (representing the 2870 # connections *to* the server and the client), to be mapped to 2871 # cli_sock and serv_sock respectively. 2872 2873 @property 2874 def serv_sock(self): 2875 return self.cli_conn 2876 2877 @property 2878 def cli_sock(self): 2879 return self.serv_conn 2880 2881 def checkRecvmsgAddress(self, addr1, addr2): 2882 # Address is currently "unspecified" for a connected socket, 2883 # so we don't examine it 2884 pass 2885 2886 2887class SendrecvmsgServerTimeoutBase(SendrecvmsgBase): 2888 # Base class to set a timeout on server's socket. 2889 2890 def setUp(self): 2891 super().setUp() 2892 self.serv_sock.settimeout(self.fail_timeout) 2893 2894 2895class SendmsgTests(SendrecvmsgServerTimeoutBase): 2896 # Tests for sendmsg() which can use any socket type and do not 2897 # involve recvmsg() or recvmsg_into(). 2898 2899 def testSendmsg(self): 2900 # Send a simple message with sendmsg(). 2901 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2902 2903 def _testSendmsg(self): 2904 self.assertEqual(self.sendmsgToServer([MSG]), len(MSG)) 2905 2906 def testSendmsgDataGenerator(self): 2907 # Send from buffer obtained from a generator (not a sequence). 2908 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2909 2910 def _testSendmsgDataGenerator(self): 2911 self.assertEqual(self.sendmsgToServer((o for o in [MSG])), 2912 len(MSG)) 2913 2914 def testSendmsgAncillaryGenerator(self): 2915 # Gather (empty) ancillary data from a generator. 2916 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2917 2918 def _testSendmsgAncillaryGenerator(self): 2919 self.assertEqual(self.sendmsgToServer([MSG], (o for o in [])), 2920 len(MSG)) 2921 2922 def testSendmsgArray(self): 2923 # Send data from an array instead of the usual bytes object. 2924 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2925 2926 def _testSendmsgArray(self): 2927 self.assertEqual(self.sendmsgToServer([array.array("B", MSG)]), 2928 len(MSG)) 2929 2930 def testSendmsgGather(self): 2931 # Send message data from more than one buffer (gather write). 2932 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 2933 2934 def _testSendmsgGather(self): 2935 self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG)) 2936 2937 def testSendmsgBadArgs(self): 2938 # Check that sendmsg() rejects invalid arguments. 2939 self.assertEqual(self.serv_sock.recv(1000), b"done") 2940 2941 def _testSendmsgBadArgs(self): 2942 self.assertRaises(TypeError, self.cli_sock.sendmsg) 2943 self.assertRaises(TypeError, self.sendmsgToServer, 2944 b"not in an iterable") 2945 self.assertRaises(TypeError, self.sendmsgToServer, 2946 object()) 2947 self.assertRaises(TypeError, self.sendmsgToServer, 2948 [object()]) 2949 self.assertRaises(TypeError, self.sendmsgToServer, 2950 [MSG, object()]) 2951 self.assertRaises(TypeError, self.sendmsgToServer, 2952 [MSG], object()) 2953 self.assertRaises(TypeError, self.sendmsgToServer, 2954 [MSG], [], object()) 2955 self.assertRaises(TypeError, self.sendmsgToServer, 2956 [MSG], [], 0, object()) 2957 self.sendToServer(b"done") 2958 2959 def testSendmsgBadCmsg(self): 2960 # Check that invalid ancillary data items are rejected. 2961 self.assertEqual(self.serv_sock.recv(1000), b"done") 2962 2963 def _testSendmsgBadCmsg(self): 2964 self.assertRaises(TypeError, self.sendmsgToServer, 2965 [MSG], [object()]) 2966 self.assertRaises(TypeError, self.sendmsgToServer, 2967 [MSG], [(object(), 0, b"data")]) 2968 self.assertRaises(TypeError, self.sendmsgToServer, 2969 [MSG], [(0, object(), b"data")]) 2970 self.assertRaises(TypeError, self.sendmsgToServer, 2971 [MSG], [(0, 0, object())]) 2972 self.assertRaises(TypeError, self.sendmsgToServer, 2973 [MSG], [(0, 0)]) 2974 self.assertRaises(TypeError, self.sendmsgToServer, 2975 [MSG], [(0, 0, b"data", 42)]) 2976 self.sendToServer(b"done") 2977 2978 @requireAttrs(socket, "CMSG_SPACE") 2979 def testSendmsgBadMultiCmsg(self): 2980 # Check that invalid ancillary data items are rejected when 2981 # more than one item is present. 2982 self.assertEqual(self.serv_sock.recv(1000), b"done") 2983 2984 @testSendmsgBadMultiCmsg.client_skip 2985 def _testSendmsgBadMultiCmsg(self): 2986 self.assertRaises(TypeError, self.sendmsgToServer, 2987 [MSG], [0, 0, b""]) 2988 self.assertRaises(TypeError, self.sendmsgToServer, 2989 [MSG], [(0, 0, b""), object()]) 2990 self.sendToServer(b"done") 2991 2992 def testSendmsgExcessCmsgReject(self): 2993 # Check that sendmsg() rejects excess ancillary data items 2994 # when the number that can be sent is limited. 2995 self.assertEqual(self.serv_sock.recv(1000), b"done") 2996 2997 def _testSendmsgExcessCmsgReject(self): 2998 if not hasattr(socket, "CMSG_SPACE"): 2999 # Can only send one item 3000 with self.assertRaises(OSError) as cm: 3001 self.sendmsgToServer([MSG], [(0, 0, b""), (0, 0, b"")]) 3002 self.assertIsNone(cm.exception.errno) 3003 self.sendToServer(b"done") 3004 3005 def testSendmsgAfterClose(self): 3006 # Check that sendmsg() fails on a closed socket. 3007 pass 3008 3009 def _testSendmsgAfterClose(self): 3010 self.cli_sock.close() 3011 self.assertRaises(OSError, self.sendmsgToServer, [MSG]) 3012 3013 3014class SendmsgStreamTests(SendmsgTests): 3015 # Tests for sendmsg() which require a stream socket and do not 3016 # involve recvmsg() or recvmsg_into(). 3017 3018 def testSendmsgExplicitNoneAddr(self): 3019 # Check that peer address can be specified as None. 3020 self.assertEqual(self.serv_sock.recv(len(MSG)), MSG) 3021 3022 def _testSendmsgExplicitNoneAddr(self): 3023 self.assertEqual(self.sendmsgToServer([MSG], [], 0, None), len(MSG)) 3024 3025 def testSendmsgTimeout(self): 3026 # Check that timeout works with sendmsg(). 3027 self.assertEqual(self.serv_sock.recv(512), b"a"*512) 3028 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3029 3030 def _testSendmsgTimeout(self): 3031 try: 3032 self.cli_sock.settimeout(0.03) 3033 try: 3034 while True: 3035 self.sendmsgToServer([b"a"*512]) 3036 except TimeoutError: 3037 pass 3038 except OSError as exc: 3039 if exc.errno != errno.ENOMEM: 3040 raise 3041 # bpo-33937 the test randomly fails on Travis CI with 3042 # "OSError: [Errno 12] Cannot allocate memory" 3043 else: 3044 self.fail("TimeoutError not raised") 3045 finally: 3046 self.misc_event.set() 3047 3048 # XXX: would be nice to have more tests for sendmsg flags argument. 3049 3050 # Linux supports MSG_DONTWAIT when sending, but in general, it 3051 # only works when receiving. Could add other platforms if they 3052 # support it too. 3053 @skipWithClientIf(sys.platform not in {"linux"}, 3054 "MSG_DONTWAIT not known to work on this platform when " 3055 "sending") 3056 def testSendmsgDontWait(self): 3057 # Check that MSG_DONTWAIT in flags causes non-blocking behaviour. 3058 self.assertEqual(self.serv_sock.recv(512), b"a"*512) 3059 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3060 3061 @testSendmsgDontWait.client_skip 3062 def _testSendmsgDontWait(self): 3063 try: 3064 with self.assertRaises(OSError) as cm: 3065 while True: 3066 self.sendmsgToServer([b"a"*512], [], socket.MSG_DONTWAIT) 3067 # bpo-33937: catch also ENOMEM, the test randomly fails on Travis CI 3068 # with "OSError: [Errno 12] Cannot allocate memory" 3069 self.assertIn(cm.exception.errno, 3070 (errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOMEM)) 3071 finally: 3072 self.misc_event.set() 3073 3074 3075class SendmsgConnectionlessTests(SendmsgTests): 3076 # Tests for sendmsg() which require a connectionless-mode 3077 # (e.g. datagram) socket, and do not involve recvmsg() or 3078 # recvmsg_into(). 3079 3080 def testSendmsgNoDestAddr(self): 3081 # Check that sendmsg() fails when no destination address is 3082 # given for unconnected socket. 3083 pass 3084 3085 def _testSendmsgNoDestAddr(self): 3086 self.assertRaises(OSError, self.cli_sock.sendmsg, 3087 [MSG]) 3088 self.assertRaises(OSError, self.cli_sock.sendmsg, 3089 [MSG], [], 0, None) 3090 3091 3092class RecvmsgGenericTests(SendrecvmsgBase): 3093 # Tests for recvmsg() which can also be emulated using 3094 # recvmsg_into(), and can use any socket type. 3095 3096 def testRecvmsg(self): 3097 # Receive a simple message with recvmsg[_into](). 3098 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG)) 3099 self.assertEqual(msg, MSG) 3100 self.checkRecvmsgAddress(addr, self.cli_addr) 3101 self.assertEqual(ancdata, []) 3102 self.checkFlags(flags, eor=True) 3103 3104 def _testRecvmsg(self): 3105 self.sendToServer(MSG) 3106 3107 def testRecvmsgExplicitDefaults(self): 3108 # Test recvmsg[_into]() with default arguments provided explicitly. 3109 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3110 len(MSG), 0, 0) 3111 self.assertEqual(msg, MSG) 3112 self.checkRecvmsgAddress(addr, self.cli_addr) 3113 self.assertEqual(ancdata, []) 3114 self.checkFlags(flags, eor=True) 3115 3116 def _testRecvmsgExplicitDefaults(self): 3117 self.sendToServer(MSG) 3118 3119 def testRecvmsgShorter(self): 3120 # Receive a message smaller than buffer. 3121 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3122 len(MSG) + 42) 3123 self.assertEqual(msg, MSG) 3124 self.checkRecvmsgAddress(addr, self.cli_addr) 3125 self.assertEqual(ancdata, []) 3126 self.checkFlags(flags, eor=True) 3127 3128 def _testRecvmsgShorter(self): 3129 self.sendToServer(MSG) 3130 3131 def testRecvmsgTrunc(self): 3132 # Receive part of message, check for truncation indicators. 3133 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3134 len(MSG) - 3) 3135 self.assertEqual(msg, MSG[:-3]) 3136 self.checkRecvmsgAddress(addr, self.cli_addr) 3137 self.assertEqual(ancdata, []) 3138 self.checkFlags(flags, eor=False) 3139 3140 def _testRecvmsgTrunc(self): 3141 self.sendToServer(MSG) 3142 3143 def testRecvmsgShortAncillaryBuf(self): 3144 # Test ancillary data buffer too small to hold any ancillary data. 3145 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3146 len(MSG), 1) 3147 self.assertEqual(msg, MSG) 3148 self.checkRecvmsgAddress(addr, self.cli_addr) 3149 self.assertEqual(ancdata, []) 3150 self.checkFlags(flags, eor=True) 3151 3152 def _testRecvmsgShortAncillaryBuf(self): 3153 self.sendToServer(MSG) 3154 3155 def testRecvmsgLongAncillaryBuf(self): 3156 # Test large ancillary data buffer. 3157 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3158 len(MSG), 10240) 3159 self.assertEqual(msg, MSG) 3160 self.checkRecvmsgAddress(addr, self.cli_addr) 3161 self.assertEqual(ancdata, []) 3162 self.checkFlags(flags, eor=True) 3163 3164 def _testRecvmsgLongAncillaryBuf(self): 3165 self.sendToServer(MSG) 3166 3167 def testRecvmsgAfterClose(self): 3168 # Check that recvmsg[_into]() fails on a closed socket. 3169 self.serv_sock.close() 3170 self.assertRaises(OSError, self.doRecvmsg, self.serv_sock, 1024) 3171 3172 def _testRecvmsgAfterClose(self): 3173 pass 3174 3175 def testRecvmsgTimeout(self): 3176 # Check that timeout works. 3177 try: 3178 self.serv_sock.settimeout(0.03) 3179 self.assertRaises(TimeoutError, 3180 self.doRecvmsg, self.serv_sock, len(MSG)) 3181 finally: 3182 self.misc_event.set() 3183 3184 def _testRecvmsgTimeout(self): 3185 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3186 3187 @requireAttrs(socket, "MSG_PEEK") 3188 def testRecvmsgPeek(self): 3189 # Check that MSG_PEEK in flags enables examination of pending 3190 # data without consuming it. 3191 3192 # Receive part of data with MSG_PEEK. 3193 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3194 len(MSG) - 3, 0, 3195 socket.MSG_PEEK) 3196 self.assertEqual(msg, MSG[:-3]) 3197 self.checkRecvmsgAddress(addr, self.cli_addr) 3198 self.assertEqual(ancdata, []) 3199 # Ignoring MSG_TRUNC here (so this test is the same for stream 3200 # and datagram sockets). Some wording in POSIX seems to 3201 # suggest that it needn't be set when peeking, but that may 3202 # just be a slip. 3203 self.checkFlags(flags, eor=False, 3204 ignore=getattr(socket, "MSG_TRUNC", 0)) 3205 3206 # Receive all data with MSG_PEEK. 3207 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3208 len(MSG), 0, 3209 socket.MSG_PEEK) 3210 self.assertEqual(msg, MSG) 3211 self.checkRecvmsgAddress(addr, self.cli_addr) 3212 self.assertEqual(ancdata, []) 3213 self.checkFlags(flags, eor=True) 3214 3215 # Check that the same data can still be received normally. 3216 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG)) 3217 self.assertEqual(msg, MSG) 3218 self.checkRecvmsgAddress(addr, self.cli_addr) 3219 self.assertEqual(ancdata, []) 3220 self.checkFlags(flags, eor=True) 3221 3222 @testRecvmsgPeek.client_skip 3223 def _testRecvmsgPeek(self): 3224 self.sendToServer(MSG) 3225 3226 @requireAttrs(socket.socket, "sendmsg") 3227 def testRecvmsgFromSendmsg(self): 3228 # Test receiving with recvmsg[_into]() when message is sent 3229 # using sendmsg(). 3230 self.serv_sock.settimeout(self.fail_timeout) 3231 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG)) 3232 self.assertEqual(msg, MSG) 3233 self.checkRecvmsgAddress(addr, self.cli_addr) 3234 self.assertEqual(ancdata, []) 3235 self.checkFlags(flags, eor=True) 3236 3237 @testRecvmsgFromSendmsg.client_skip 3238 def _testRecvmsgFromSendmsg(self): 3239 self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG)) 3240 3241 3242class RecvmsgGenericStreamTests(RecvmsgGenericTests): 3243 # Tests which require a stream socket and can use either recvmsg() 3244 # or recvmsg_into(). 3245 3246 def testRecvmsgEOF(self): 3247 # Receive end-of-stream indicator (b"", peer socket closed). 3248 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024) 3249 self.assertEqual(msg, b"") 3250 self.checkRecvmsgAddress(addr, self.cli_addr) 3251 self.assertEqual(ancdata, []) 3252 self.checkFlags(flags, eor=None) # Might not have end-of-record marker 3253 3254 def _testRecvmsgEOF(self): 3255 self.cli_sock.close() 3256 3257 def testRecvmsgOverflow(self): 3258 # Receive a message in more than one chunk. 3259 seg1, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3260 len(MSG) - 3) 3261 self.checkRecvmsgAddress(addr, self.cli_addr) 3262 self.assertEqual(ancdata, []) 3263 self.checkFlags(flags, eor=False) 3264 3265 seg2, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024) 3266 self.checkRecvmsgAddress(addr, self.cli_addr) 3267 self.assertEqual(ancdata, []) 3268 self.checkFlags(flags, eor=True) 3269 3270 msg = seg1 + seg2 3271 self.assertEqual(msg, MSG) 3272 3273 def _testRecvmsgOverflow(self): 3274 self.sendToServer(MSG) 3275 3276 3277class RecvmsgTests(RecvmsgGenericTests): 3278 # Tests for recvmsg() which can use any socket type. 3279 3280 def testRecvmsgBadArgs(self): 3281 # Check that recvmsg() rejects invalid arguments. 3282 self.assertRaises(TypeError, self.serv_sock.recvmsg) 3283 self.assertRaises(ValueError, self.serv_sock.recvmsg, 3284 -1, 0, 0) 3285 self.assertRaises(ValueError, self.serv_sock.recvmsg, 3286 len(MSG), -1, 0) 3287 self.assertRaises(TypeError, self.serv_sock.recvmsg, 3288 [bytearray(10)], 0, 0) 3289 self.assertRaises(TypeError, self.serv_sock.recvmsg, 3290 object(), 0, 0) 3291 self.assertRaises(TypeError, self.serv_sock.recvmsg, 3292 len(MSG), object(), 0) 3293 self.assertRaises(TypeError, self.serv_sock.recvmsg, 3294 len(MSG), 0, object()) 3295 3296 msg, ancdata, flags, addr = self.serv_sock.recvmsg(len(MSG), 0, 0) 3297 self.assertEqual(msg, MSG) 3298 self.checkRecvmsgAddress(addr, self.cli_addr) 3299 self.assertEqual(ancdata, []) 3300 self.checkFlags(flags, eor=True) 3301 3302 def _testRecvmsgBadArgs(self): 3303 self.sendToServer(MSG) 3304 3305 3306class RecvmsgIntoTests(RecvmsgIntoMixin, RecvmsgGenericTests): 3307 # Tests for recvmsg_into() which can use any socket type. 3308 3309 def testRecvmsgIntoBadArgs(self): 3310 # Check that recvmsg_into() rejects invalid arguments. 3311 buf = bytearray(len(MSG)) 3312 self.assertRaises(TypeError, self.serv_sock.recvmsg_into) 3313 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3314 len(MSG), 0, 0) 3315 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3316 buf, 0, 0) 3317 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3318 [object()], 0, 0) 3319 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3320 [b"I'm not writable"], 0, 0) 3321 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3322 [buf, object()], 0, 0) 3323 self.assertRaises(ValueError, self.serv_sock.recvmsg_into, 3324 [buf], -1, 0) 3325 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3326 [buf], object(), 0) 3327 self.assertRaises(TypeError, self.serv_sock.recvmsg_into, 3328 [buf], 0, object()) 3329 3330 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf], 0, 0) 3331 self.assertEqual(nbytes, len(MSG)) 3332 self.assertEqual(buf, bytearray(MSG)) 3333 self.checkRecvmsgAddress(addr, self.cli_addr) 3334 self.assertEqual(ancdata, []) 3335 self.checkFlags(flags, eor=True) 3336 3337 def _testRecvmsgIntoBadArgs(self): 3338 self.sendToServer(MSG) 3339 3340 def testRecvmsgIntoGenerator(self): 3341 # Receive into buffer obtained from a generator (not a sequence). 3342 buf = bytearray(len(MSG)) 3343 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into( 3344 (o for o in [buf])) 3345 self.assertEqual(nbytes, len(MSG)) 3346 self.assertEqual(buf, bytearray(MSG)) 3347 self.checkRecvmsgAddress(addr, self.cli_addr) 3348 self.assertEqual(ancdata, []) 3349 self.checkFlags(flags, eor=True) 3350 3351 def _testRecvmsgIntoGenerator(self): 3352 self.sendToServer(MSG) 3353 3354 def testRecvmsgIntoArray(self): 3355 # Receive into an array rather than the usual bytearray. 3356 buf = array.array("B", [0] * len(MSG)) 3357 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf]) 3358 self.assertEqual(nbytes, len(MSG)) 3359 self.assertEqual(buf.tobytes(), MSG) 3360 self.checkRecvmsgAddress(addr, self.cli_addr) 3361 self.assertEqual(ancdata, []) 3362 self.checkFlags(flags, eor=True) 3363 3364 def _testRecvmsgIntoArray(self): 3365 self.sendToServer(MSG) 3366 3367 def testRecvmsgIntoScatter(self): 3368 # Receive into multiple buffers (scatter write). 3369 b1 = bytearray(b"----") 3370 b2 = bytearray(b"0123456789") 3371 b3 = bytearray(b"--------------") 3372 nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into( 3373 [b1, memoryview(b2)[2:9], b3]) 3374 self.assertEqual(nbytes, len(b"Mary had a little lamb")) 3375 self.assertEqual(b1, bytearray(b"Mary")) 3376 self.assertEqual(b2, bytearray(b"01 had a 9")) 3377 self.assertEqual(b3, bytearray(b"little lamb---")) 3378 self.checkRecvmsgAddress(addr, self.cli_addr) 3379 self.assertEqual(ancdata, []) 3380 self.checkFlags(flags, eor=True) 3381 3382 def _testRecvmsgIntoScatter(self): 3383 self.sendToServer(b"Mary had a little lamb") 3384 3385 3386class CmsgMacroTests(unittest.TestCase): 3387 # Test the functions CMSG_LEN() and CMSG_SPACE(). Tests 3388 # assumptions used by sendmsg() and recvmsg[_into](), which share 3389 # code with these functions. 3390 3391 # Match the definition in socketmodule.c 3392 try: 3393 import _testcapi 3394 except ImportError: 3395 socklen_t_limit = 0x7fffffff 3396 else: 3397 socklen_t_limit = min(0x7fffffff, _testcapi.INT_MAX) 3398 3399 @requireAttrs(socket, "CMSG_LEN") 3400 def testCMSG_LEN(self): 3401 # Test CMSG_LEN() with various valid and invalid values, 3402 # checking the assumptions used by recvmsg() and sendmsg(). 3403 toobig = self.socklen_t_limit - socket.CMSG_LEN(0) + 1 3404 values = list(range(257)) + list(range(toobig - 257, toobig)) 3405 3406 # struct cmsghdr has at least three members, two of which are ints 3407 self.assertGreater(socket.CMSG_LEN(0), array.array("i").itemsize * 2) 3408 for n in values: 3409 ret = socket.CMSG_LEN(n) 3410 # This is how recvmsg() calculates the data size 3411 self.assertEqual(ret - socket.CMSG_LEN(0), n) 3412 self.assertLessEqual(ret, self.socklen_t_limit) 3413 3414 self.assertRaises(OverflowError, socket.CMSG_LEN, -1) 3415 # sendmsg() shares code with these functions, and requires 3416 # that it reject values over the limit. 3417 self.assertRaises(OverflowError, socket.CMSG_LEN, toobig) 3418 self.assertRaises(OverflowError, socket.CMSG_LEN, sys.maxsize) 3419 3420 @requireAttrs(socket, "CMSG_SPACE") 3421 def testCMSG_SPACE(self): 3422 # Test CMSG_SPACE() with various valid and invalid values, 3423 # checking the assumptions used by sendmsg(). 3424 toobig = self.socklen_t_limit - socket.CMSG_SPACE(1) + 1 3425 values = list(range(257)) + list(range(toobig - 257, toobig)) 3426 3427 last = socket.CMSG_SPACE(0) 3428 # struct cmsghdr has at least three members, two of which are ints 3429 self.assertGreater(last, array.array("i").itemsize * 2) 3430 for n in values: 3431 ret = socket.CMSG_SPACE(n) 3432 self.assertGreaterEqual(ret, last) 3433 self.assertGreaterEqual(ret, socket.CMSG_LEN(n)) 3434 self.assertGreaterEqual(ret, n + socket.CMSG_LEN(0)) 3435 self.assertLessEqual(ret, self.socklen_t_limit) 3436 last = ret 3437 3438 self.assertRaises(OverflowError, socket.CMSG_SPACE, -1) 3439 # sendmsg() shares code with these functions, and requires 3440 # that it reject values over the limit. 3441 self.assertRaises(OverflowError, socket.CMSG_SPACE, toobig) 3442 self.assertRaises(OverflowError, socket.CMSG_SPACE, sys.maxsize) 3443 3444 3445class SCMRightsTest(SendrecvmsgServerTimeoutBase): 3446 # Tests for file descriptor passing on Unix-domain sockets. 3447 3448 # Invalid file descriptor value that's unlikely to evaluate to a 3449 # real FD even if one of its bytes is replaced with a different 3450 # value (which shouldn't actually happen). 3451 badfd = -0x5555 3452 3453 def newFDs(self, n): 3454 # Return a list of n file descriptors for newly-created files 3455 # containing their list indices as ASCII numbers. 3456 fds = [] 3457 for i in range(n): 3458 fd, path = tempfile.mkstemp() 3459 self.addCleanup(os.unlink, path) 3460 self.addCleanup(os.close, fd) 3461 os.write(fd, str(i).encode()) 3462 fds.append(fd) 3463 return fds 3464 3465 def checkFDs(self, fds): 3466 # Check that the file descriptors in the given list contain 3467 # their correct list indices as ASCII numbers. 3468 for n, fd in enumerate(fds): 3469 os.lseek(fd, 0, os.SEEK_SET) 3470 self.assertEqual(os.read(fd, 1024), str(n).encode()) 3471 3472 def registerRecvmsgResult(self, result): 3473 self.addCleanup(self.closeRecvmsgFDs, result) 3474 3475 def closeRecvmsgFDs(self, recvmsg_result): 3476 # Close all file descriptors specified in the ancillary data 3477 # of the given return value from recvmsg() or recvmsg_into(). 3478 for cmsg_level, cmsg_type, cmsg_data in recvmsg_result[1]: 3479 if (cmsg_level == socket.SOL_SOCKET and 3480 cmsg_type == socket.SCM_RIGHTS): 3481 fds = array.array("i") 3482 fds.frombytes(cmsg_data[: 3483 len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) 3484 for fd in fds: 3485 os.close(fd) 3486 3487 def createAndSendFDs(self, n): 3488 # Send n new file descriptors created by newFDs() to the 3489 # server, with the constant MSG as the non-ancillary data. 3490 self.assertEqual( 3491 self.sendmsgToServer([MSG], 3492 [(socket.SOL_SOCKET, 3493 socket.SCM_RIGHTS, 3494 array.array("i", self.newFDs(n)))]), 3495 len(MSG)) 3496 3497 def checkRecvmsgFDs(self, numfds, result, maxcmsgs=1, ignoreflags=0): 3498 # Check that constant MSG was received with numfds file 3499 # descriptors in a maximum of maxcmsgs control messages (which 3500 # must contain only complete integers). By default, check 3501 # that MSG_CTRUNC is unset, but ignore any flags in 3502 # ignoreflags. 3503 msg, ancdata, flags, addr = result 3504 self.assertEqual(msg, MSG) 3505 self.checkRecvmsgAddress(addr, self.cli_addr) 3506 self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC, 3507 ignore=ignoreflags) 3508 3509 self.assertIsInstance(ancdata, list) 3510 self.assertLessEqual(len(ancdata), maxcmsgs) 3511 fds = array.array("i") 3512 for item in ancdata: 3513 self.assertIsInstance(item, tuple) 3514 cmsg_level, cmsg_type, cmsg_data = item 3515 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 3516 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 3517 self.assertIsInstance(cmsg_data, bytes) 3518 self.assertEqual(len(cmsg_data) % SIZEOF_INT, 0) 3519 fds.frombytes(cmsg_data) 3520 3521 self.assertEqual(len(fds), numfds) 3522 self.checkFDs(fds) 3523 3524 def testFDPassSimple(self): 3525 # Pass a single FD (array read from bytes object). 3526 self.checkRecvmsgFDs(1, self.doRecvmsg(self.serv_sock, 3527 len(MSG), 10240)) 3528 3529 def _testFDPassSimple(self): 3530 self.assertEqual( 3531 self.sendmsgToServer( 3532 [MSG], 3533 [(socket.SOL_SOCKET, 3534 socket.SCM_RIGHTS, 3535 array.array("i", self.newFDs(1)).tobytes())]), 3536 len(MSG)) 3537 3538 def testMultipleFDPass(self): 3539 # Pass multiple FDs in a single array. 3540 self.checkRecvmsgFDs(4, self.doRecvmsg(self.serv_sock, 3541 len(MSG), 10240)) 3542 3543 def _testMultipleFDPass(self): 3544 self.createAndSendFDs(4) 3545 3546 @requireAttrs(socket, "CMSG_SPACE") 3547 def testFDPassCMSG_SPACE(self): 3548 # Test using CMSG_SPACE() to calculate ancillary buffer size. 3549 self.checkRecvmsgFDs( 3550 4, self.doRecvmsg(self.serv_sock, len(MSG), 3551 socket.CMSG_SPACE(4 * SIZEOF_INT))) 3552 3553 @testFDPassCMSG_SPACE.client_skip 3554 def _testFDPassCMSG_SPACE(self): 3555 self.createAndSendFDs(4) 3556 3557 def testFDPassCMSG_LEN(self): 3558 # Test using CMSG_LEN() to calculate ancillary buffer size. 3559 self.checkRecvmsgFDs(1, 3560 self.doRecvmsg(self.serv_sock, len(MSG), 3561 socket.CMSG_LEN(4 * SIZEOF_INT)), 3562 # RFC 3542 says implementations may set 3563 # MSG_CTRUNC if there isn't enough space 3564 # for trailing padding. 3565 ignoreflags=socket.MSG_CTRUNC) 3566 3567 def _testFDPassCMSG_LEN(self): 3568 self.createAndSendFDs(1) 3569 3570 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 3571 @unittest.skipIf(AIX, "skipping, see issue #22397") 3572 @requireAttrs(socket, "CMSG_SPACE") 3573 def testFDPassSeparate(self): 3574 # Pass two FDs in two separate arrays. Arrays may be combined 3575 # into a single control message by the OS. 3576 self.checkRecvmsgFDs(2, 3577 self.doRecvmsg(self.serv_sock, len(MSG), 10240), 3578 maxcmsgs=2) 3579 3580 @testFDPassSeparate.client_skip 3581 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 3582 @unittest.skipIf(AIX, "skipping, see issue #22397") 3583 def _testFDPassSeparate(self): 3584 fd0, fd1 = self.newFDs(2) 3585 self.assertEqual( 3586 self.sendmsgToServer([MSG], [(socket.SOL_SOCKET, 3587 socket.SCM_RIGHTS, 3588 array.array("i", [fd0])), 3589 (socket.SOL_SOCKET, 3590 socket.SCM_RIGHTS, 3591 array.array("i", [fd1]))]), 3592 len(MSG)) 3593 3594 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 3595 @unittest.skipIf(AIX, "skipping, see issue #22397") 3596 @requireAttrs(socket, "CMSG_SPACE") 3597 def testFDPassSeparateMinSpace(self): 3598 # Pass two FDs in two separate arrays, receiving them into the 3599 # minimum space for two arrays. 3600 num_fds = 2 3601 self.checkRecvmsgFDs(num_fds, 3602 self.doRecvmsg(self.serv_sock, len(MSG), 3603 socket.CMSG_SPACE(SIZEOF_INT) + 3604 socket.CMSG_LEN(SIZEOF_INT * num_fds)), 3605 maxcmsgs=2, ignoreflags=socket.MSG_CTRUNC) 3606 3607 @testFDPassSeparateMinSpace.client_skip 3608 @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958") 3609 @unittest.skipIf(AIX, "skipping, see issue #22397") 3610 def _testFDPassSeparateMinSpace(self): 3611 fd0, fd1 = self.newFDs(2) 3612 self.assertEqual( 3613 self.sendmsgToServer([MSG], [(socket.SOL_SOCKET, 3614 socket.SCM_RIGHTS, 3615 array.array("i", [fd0])), 3616 (socket.SOL_SOCKET, 3617 socket.SCM_RIGHTS, 3618 array.array("i", [fd1]))]), 3619 len(MSG)) 3620 3621 def sendAncillaryIfPossible(self, msg, ancdata): 3622 # Try to send msg and ancdata to server, but if the system 3623 # call fails, just send msg with no ancillary data. 3624 try: 3625 nbytes = self.sendmsgToServer([msg], ancdata) 3626 except OSError as e: 3627 # Check that it was the system call that failed 3628 self.assertIsInstance(e.errno, int) 3629 nbytes = self.sendmsgToServer([msg]) 3630 self.assertEqual(nbytes, len(msg)) 3631 3632 @unittest.skipIf(sys.platform == "darwin", "see issue #24725") 3633 def testFDPassEmpty(self): 3634 # Try to pass an empty FD array. Can receive either no array 3635 # or an empty array. 3636 self.checkRecvmsgFDs(0, self.doRecvmsg(self.serv_sock, 3637 len(MSG), 10240), 3638 ignoreflags=socket.MSG_CTRUNC) 3639 3640 def _testFDPassEmpty(self): 3641 self.sendAncillaryIfPossible(MSG, [(socket.SOL_SOCKET, 3642 socket.SCM_RIGHTS, 3643 b"")]) 3644 3645 def testFDPassPartialInt(self): 3646 # Try to pass a truncated FD array. 3647 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3648 len(MSG), 10240) 3649 self.assertEqual(msg, MSG) 3650 self.checkRecvmsgAddress(addr, self.cli_addr) 3651 self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC) 3652 self.assertLessEqual(len(ancdata), 1) 3653 for cmsg_level, cmsg_type, cmsg_data in ancdata: 3654 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 3655 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 3656 self.assertLess(len(cmsg_data), SIZEOF_INT) 3657 3658 def _testFDPassPartialInt(self): 3659 self.sendAncillaryIfPossible( 3660 MSG, 3661 [(socket.SOL_SOCKET, 3662 socket.SCM_RIGHTS, 3663 array.array("i", [self.badfd]).tobytes()[:-1])]) 3664 3665 @requireAttrs(socket, "CMSG_SPACE") 3666 def testFDPassPartialIntInMiddle(self): 3667 # Try to pass two FD arrays, the first of which is truncated. 3668 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3669 len(MSG), 10240) 3670 self.assertEqual(msg, MSG) 3671 self.checkRecvmsgAddress(addr, self.cli_addr) 3672 self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC) 3673 self.assertLessEqual(len(ancdata), 2) 3674 fds = array.array("i") 3675 # Arrays may have been combined in a single control message 3676 for cmsg_level, cmsg_type, cmsg_data in ancdata: 3677 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 3678 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 3679 fds.frombytes(cmsg_data[: 3680 len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) 3681 self.assertLessEqual(len(fds), 2) 3682 self.checkFDs(fds) 3683 3684 @testFDPassPartialIntInMiddle.client_skip 3685 def _testFDPassPartialIntInMiddle(self): 3686 fd0, fd1 = self.newFDs(2) 3687 self.sendAncillaryIfPossible( 3688 MSG, 3689 [(socket.SOL_SOCKET, 3690 socket.SCM_RIGHTS, 3691 array.array("i", [fd0, self.badfd]).tobytes()[:-1]), 3692 (socket.SOL_SOCKET, 3693 socket.SCM_RIGHTS, 3694 array.array("i", [fd1]))]) 3695 3696 def checkTruncatedHeader(self, result, ignoreflags=0): 3697 # Check that no ancillary data items are returned when data is 3698 # truncated inside the cmsghdr structure. 3699 msg, ancdata, flags, addr = result 3700 self.assertEqual(msg, MSG) 3701 self.checkRecvmsgAddress(addr, self.cli_addr) 3702 self.assertEqual(ancdata, []) 3703 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC, 3704 ignore=ignoreflags) 3705 3706 def testCmsgTruncNoBufSize(self): 3707 # Check that no ancillary data is received when no buffer size 3708 # is specified. 3709 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG)), 3710 # BSD seems to set MSG_CTRUNC only 3711 # if an item has been partially 3712 # received. 3713 ignoreflags=socket.MSG_CTRUNC) 3714 3715 def _testCmsgTruncNoBufSize(self): 3716 self.createAndSendFDs(1) 3717 3718 def testCmsgTrunc0(self): 3719 # Check that no ancillary data is received when buffer size is 0. 3720 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 0), 3721 ignoreflags=socket.MSG_CTRUNC) 3722 3723 def _testCmsgTrunc0(self): 3724 self.createAndSendFDs(1) 3725 3726 # Check that no ancillary data is returned for various non-zero 3727 # (but still too small) buffer sizes. 3728 3729 def testCmsgTrunc1(self): 3730 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 1)) 3731 3732 def _testCmsgTrunc1(self): 3733 self.createAndSendFDs(1) 3734 3735 def testCmsgTrunc2Int(self): 3736 # The cmsghdr structure has at least three members, two of 3737 # which are ints, so we still shouldn't see any ancillary 3738 # data. 3739 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 3740 SIZEOF_INT * 2)) 3741 3742 def _testCmsgTrunc2Int(self): 3743 self.createAndSendFDs(1) 3744 3745 def testCmsgTruncLen0Minus1(self): 3746 self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 3747 socket.CMSG_LEN(0) - 1)) 3748 3749 def _testCmsgTruncLen0Minus1(self): 3750 self.createAndSendFDs(1) 3751 3752 # The following tests try to truncate the control message in the 3753 # middle of the FD array. 3754 3755 def checkTruncatedArray(self, ancbuf, maxdata, mindata=0): 3756 # Check that file descriptor data is truncated to between 3757 # mindata and maxdata bytes when received with buffer size 3758 # ancbuf, and that any complete file descriptor numbers are 3759 # valid. 3760 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3761 len(MSG), ancbuf) 3762 self.assertEqual(msg, MSG) 3763 self.checkRecvmsgAddress(addr, self.cli_addr) 3764 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) 3765 3766 if mindata == 0 and ancdata == []: 3767 return 3768 self.assertEqual(len(ancdata), 1) 3769 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 3770 self.assertEqual(cmsg_level, socket.SOL_SOCKET) 3771 self.assertEqual(cmsg_type, socket.SCM_RIGHTS) 3772 self.assertGreaterEqual(len(cmsg_data), mindata) 3773 self.assertLessEqual(len(cmsg_data), maxdata) 3774 fds = array.array("i") 3775 fds.frombytes(cmsg_data[: 3776 len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) 3777 self.checkFDs(fds) 3778 3779 def testCmsgTruncLen0(self): 3780 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0), maxdata=0) 3781 3782 def _testCmsgTruncLen0(self): 3783 self.createAndSendFDs(1) 3784 3785 def testCmsgTruncLen0Plus1(self): 3786 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0) + 1, maxdata=1) 3787 3788 def _testCmsgTruncLen0Plus1(self): 3789 self.createAndSendFDs(2) 3790 3791 def testCmsgTruncLen1(self): 3792 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(SIZEOF_INT), 3793 maxdata=SIZEOF_INT) 3794 3795 def _testCmsgTruncLen1(self): 3796 self.createAndSendFDs(2) 3797 3798 def testCmsgTruncLen2Minus1(self): 3799 self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(2 * SIZEOF_INT) - 1, 3800 maxdata=(2 * SIZEOF_INT) - 1) 3801 3802 def _testCmsgTruncLen2Minus1(self): 3803 self.createAndSendFDs(2) 3804 3805 3806class RFC3542AncillaryTest(SendrecvmsgServerTimeoutBase): 3807 # Test sendmsg() and recvmsg[_into]() using the ancillary data 3808 # features of the RFC 3542 Advanced Sockets API for IPv6. 3809 # Currently we can only handle certain data items (e.g. traffic 3810 # class, hop limit, MTU discovery and fragmentation settings) 3811 # without resorting to unportable means such as the struct module, 3812 # but the tests here are aimed at testing the ancillary data 3813 # handling in sendmsg() and recvmsg() rather than the IPv6 API 3814 # itself. 3815 3816 # Test value to use when setting hop limit of packet 3817 hop_limit = 2 3818 3819 # Test value to use when setting traffic class of packet. 3820 # -1 means "use kernel default". 3821 traffic_class = -1 3822 3823 def ancillaryMapping(self, ancdata): 3824 # Given ancillary data list ancdata, return a mapping from 3825 # pairs (cmsg_level, cmsg_type) to corresponding cmsg_data. 3826 # Check that no (level, type) pair appears more than once. 3827 d = {} 3828 for cmsg_level, cmsg_type, cmsg_data in ancdata: 3829 self.assertNotIn((cmsg_level, cmsg_type), d) 3830 d[(cmsg_level, cmsg_type)] = cmsg_data 3831 return d 3832 3833 def checkHopLimit(self, ancbufsize, maxhop=255, ignoreflags=0): 3834 # Receive hop limit into ancbufsize bytes of ancillary data 3835 # space. Check that data is MSG, ancillary data is not 3836 # truncated (but ignore any flags in ignoreflags), and hop 3837 # limit is between 0 and maxhop inclusive. 3838 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3839 socket.IPV6_RECVHOPLIMIT, 1) 3840 self.misc_event.set() 3841 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3842 len(MSG), ancbufsize) 3843 3844 self.assertEqual(msg, MSG) 3845 self.checkRecvmsgAddress(addr, self.cli_addr) 3846 self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC, 3847 ignore=ignoreflags) 3848 3849 self.assertEqual(len(ancdata), 1) 3850 self.assertIsInstance(ancdata[0], tuple) 3851 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 3852 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 3853 self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT) 3854 self.assertIsInstance(cmsg_data, bytes) 3855 self.assertEqual(len(cmsg_data), SIZEOF_INT) 3856 a = array.array("i") 3857 a.frombytes(cmsg_data) 3858 self.assertGreaterEqual(a[0], 0) 3859 self.assertLessEqual(a[0], maxhop) 3860 3861 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3862 def testRecvHopLimit(self): 3863 # Test receiving the packet hop limit as ancillary data. 3864 self.checkHopLimit(ancbufsize=10240) 3865 3866 @testRecvHopLimit.client_skip 3867 def _testRecvHopLimit(self): 3868 # Need to wait until server has asked to receive ancillary 3869 # data, as implementations are not required to buffer it 3870 # otherwise. 3871 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3872 self.sendToServer(MSG) 3873 3874 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3875 def testRecvHopLimitCMSG_SPACE(self): 3876 # Test receiving hop limit, using CMSG_SPACE to calculate buffer size. 3877 self.checkHopLimit(ancbufsize=socket.CMSG_SPACE(SIZEOF_INT)) 3878 3879 @testRecvHopLimitCMSG_SPACE.client_skip 3880 def _testRecvHopLimitCMSG_SPACE(self): 3881 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3882 self.sendToServer(MSG) 3883 3884 # Could test receiving into buffer sized using CMSG_LEN, but RFC 3885 # 3542 says portable applications must provide space for trailing 3886 # padding. Implementations may set MSG_CTRUNC if there isn't 3887 # enough space for the padding. 3888 3889 @requireAttrs(socket.socket, "sendmsg") 3890 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 3891 def testSetHopLimit(self): 3892 # Test setting hop limit on outgoing packet and receiving it 3893 # at the other end. 3894 self.checkHopLimit(ancbufsize=10240, maxhop=self.hop_limit) 3895 3896 @testSetHopLimit.client_skip 3897 def _testSetHopLimit(self): 3898 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3899 self.assertEqual( 3900 self.sendmsgToServer([MSG], 3901 [(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 3902 array.array("i", [self.hop_limit]))]), 3903 len(MSG)) 3904 3905 def checkTrafficClassAndHopLimit(self, ancbufsize, maxhop=255, 3906 ignoreflags=0): 3907 # Receive traffic class and hop limit into ancbufsize bytes of 3908 # ancillary data space. Check that data is MSG, ancillary 3909 # data is not truncated (but ignore any flags in ignoreflags), 3910 # and traffic class and hop limit are in range (hop limit no 3911 # more than maxhop). 3912 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3913 socket.IPV6_RECVHOPLIMIT, 1) 3914 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 3915 socket.IPV6_RECVTCLASS, 1) 3916 self.misc_event.set() 3917 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 3918 len(MSG), ancbufsize) 3919 3920 self.assertEqual(msg, MSG) 3921 self.checkRecvmsgAddress(addr, self.cli_addr) 3922 self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC, 3923 ignore=ignoreflags) 3924 self.assertEqual(len(ancdata), 2) 3925 ancmap = self.ancillaryMapping(ancdata) 3926 3927 tcdata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_TCLASS)] 3928 self.assertEqual(len(tcdata), SIZEOF_INT) 3929 a = array.array("i") 3930 a.frombytes(tcdata) 3931 self.assertGreaterEqual(a[0], 0) 3932 self.assertLessEqual(a[0], 255) 3933 3934 hldata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT)] 3935 self.assertEqual(len(hldata), SIZEOF_INT) 3936 a = array.array("i") 3937 a.frombytes(hldata) 3938 self.assertGreaterEqual(a[0], 0) 3939 self.assertLessEqual(a[0], maxhop) 3940 3941 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3942 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3943 def testRecvTrafficClassAndHopLimit(self): 3944 # Test receiving traffic class and hop limit as ancillary data. 3945 self.checkTrafficClassAndHopLimit(ancbufsize=10240) 3946 3947 @testRecvTrafficClassAndHopLimit.client_skip 3948 def _testRecvTrafficClassAndHopLimit(self): 3949 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3950 self.sendToServer(MSG) 3951 3952 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3953 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3954 def testRecvTrafficClassAndHopLimitCMSG_SPACE(self): 3955 # Test receiving traffic class and hop limit, using 3956 # CMSG_SPACE() to calculate buffer size. 3957 self.checkTrafficClassAndHopLimit( 3958 ancbufsize=socket.CMSG_SPACE(SIZEOF_INT) * 2) 3959 3960 @testRecvTrafficClassAndHopLimitCMSG_SPACE.client_skip 3961 def _testRecvTrafficClassAndHopLimitCMSG_SPACE(self): 3962 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3963 self.sendToServer(MSG) 3964 3965 @requireAttrs(socket.socket, "sendmsg") 3966 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3967 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3968 def testSetTrafficClassAndHopLimit(self): 3969 # Test setting traffic class and hop limit on outgoing packet, 3970 # and receiving them at the other end. 3971 self.checkTrafficClassAndHopLimit(ancbufsize=10240, 3972 maxhop=self.hop_limit) 3973 3974 @testSetTrafficClassAndHopLimit.client_skip 3975 def _testSetTrafficClassAndHopLimit(self): 3976 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3977 self.assertEqual( 3978 self.sendmsgToServer([MSG], 3979 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, 3980 array.array("i", [self.traffic_class])), 3981 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 3982 array.array("i", [self.hop_limit]))]), 3983 len(MSG)) 3984 3985 @requireAttrs(socket.socket, "sendmsg") 3986 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 3987 "IPV6_RECVTCLASS", "IPV6_TCLASS") 3988 def testOddCmsgSize(self): 3989 # Try to send ancillary data with first item one byte too 3990 # long. Fall back to sending with correct size if this fails, 3991 # and check that second item was handled correctly. 3992 self.checkTrafficClassAndHopLimit(ancbufsize=10240, 3993 maxhop=self.hop_limit) 3994 3995 @testOddCmsgSize.client_skip 3996 def _testOddCmsgSize(self): 3997 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 3998 try: 3999 nbytes = self.sendmsgToServer( 4000 [MSG], 4001 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, 4002 array.array("i", [self.traffic_class]).tobytes() + b"\x00"), 4003 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 4004 array.array("i", [self.hop_limit]))]) 4005 except OSError as e: 4006 self.assertIsInstance(e.errno, int) 4007 nbytes = self.sendmsgToServer( 4008 [MSG], 4009 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS, 4010 array.array("i", [self.traffic_class])), 4011 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT, 4012 array.array("i", [self.hop_limit]))]) 4013 self.assertEqual(nbytes, len(MSG)) 4014 4015 # Tests for proper handling of truncated ancillary data 4016 4017 def checkHopLimitTruncatedHeader(self, ancbufsize, ignoreflags=0): 4018 # Receive hop limit into ancbufsize bytes of ancillary data 4019 # space, which should be too small to contain the ancillary 4020 # data header (if ancbufsize is None, pass no second argument 4021 # to recvmsg()). Check that data is MSG, MSG_CTRUNC is set 4022 # (unless included in ignoreflags), and no ancillary data is 4023 # returned. 4024 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 4025 socket.IPV6_RECVHOPLIMIT, 1) 4026 self.misc_event.set() 4027 args = () if ancbufsize is None else (ancbufsize,) 4028 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 4029 len(MSG), *args) 4030 4031 self.assertEqual(msg, MSG) 4032 self.checkRecvmsgAddress(addr, self.cli_addr) 4033 self.assertEqual(ancdata, []) 4034 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC, 4035 ignore=ignoreflags) 4036 4037 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 4038 def testCmsgTruncNoBufSize(self): 4039 # Check that no ancillary data is received when no ancillary 4040 # buffer size is provided. 4041 self.checkHopLimitTruncatedHeader(ancbufsize=None, 4042 # BSD seems to set 4043 # MSG_CTRUNC only if an item 4044 # has been partially 4045 # received. 4046 ignoreflags=socket.MSG_CTRUNC) 4047 4048 @testCmsgTruncNoBufSize.client_skip 4049 def _testCmsgTruncNoBufSize(self): 4050 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4051 self.sendToServer(MSG) 4052 4053 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 4054 def testSingleCmsgTrunc0(self): 4055 # Check that no ancillary data is received when ancillary 4056 # buffer size is zero. 4057 self.checkHopLimitTruncatedHeader(ancbufsize=0, 4058 ignoreflags=socket.MSG_CTRUNC) 4059 4060 @testSingleCmsgTrunc0.client_skip 4061 def _testSingleCmsgTrunc0(self): 4062 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4063 self.sendToServer(MSG) 4064 4065 # Check that no ancillary data is returned for various non-zero 4066 # (but still too small) buffer sizes. 4067 4068 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 4069 def testSingleCmsgTrunc1(self): 4070 self.checkHopLimitTruncatedHeader(ancbufsize=1) 4071 4072 @testSingleCmsgTrunc1.client_skip 4073 def _testSingleCmsgTrunc1(self): 4074 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4075 self.sendToServer(MSG) 4076 4077 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 4078 def testSingleCmsgTrunc2Int(self): 4079 self.checkHopLimitTruncatedHeader(ancbufsize=2 * SIZEOF_INT) 4080 4081 @testSingleCmsgTrunc2Int.client_skip 4082 def _testSingleCmsgTrunc2Int(self): 4083 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4084 self.sendToServer(MSG) 4085 4086 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 4087 def testSingleCmsgTruncLen0Minus1(self): 4088 self.checkHopLimitTruncatedHeader(ancbufsize=socket.CMSG_LEN(0) - 1) 4089 4090 @testSingleCmsgTruncLen0Minus1.client_skip 4091 def _testSingleCmsgTruncLen0Minus1(self): 4092 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4093 self.sendToServer(MSG) 4094 4095 @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT") 4096 def testSingleCmsgTruncInData(self): 4097 # Test truncation of a control message inside its associated 4098 # data. The message may be returned with its data truncated, 4099 # or not returned at all. 4100 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 4101 socket.IPV6_RECVHOPLIMIT, 1) 4102 self.misc_event.set() 4103 msg, ancdata, flags, addr = self.doRecvmsg( 4104 self.serv_sock, len(MSG), socket.CMSG_LEN(SIZEOF_INT) - 1) 4105 4106 self.assertEqual(msg, MSG) 4107 self.checkRecvmsgAddress(addr, self.cli_addr) 4108 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) 4109 4110 self.assertLessEqual(len(ancdata), 1) 4111 if ancdata: 4112 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 4113 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 4114 self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT) 4115 self.assertLess(len(cmsg_data), SIZEOF_INT) 4116 4117 @testSingleCmsgTruncInData.client_skip 4118 def _testSingleCmsgTruncInData(self): 4119 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4120 self.sendToServer(MSG) 4121 4122 def checkTruncatedSecondHeader(self, ancbufsize, ignoreflags=0): 4123 # Receive traffic class and hop limit into ancbufsize bytes of 4124 # ancillary data space, which should be large enough to 4125 # contain the first item, but too small to contain the header 4126 # of the second. Check that data is MSG, MSG_CTRUNC is set 4127 # (unless included in ignoreflags), and only one ancillary 4128 # data item is returned. 4129 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 4130 socket.IPV6_RECVHOPLIMIT, 1) 4131 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 4132 socket.IPV6_RECVTCLASS, 1) 4133 self.misc_event.set() 4134 msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 4135 len(MSG), ancbufsize) 4136 4137 self.assertEqual(msg, MSG) 4138 self.checkRecvmsgAddress(addr, self.cli_addr) 4139 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC, 4140 ignore=ignoreflags) 4141 4142 self.assertEqual(len(ancdata), 1) 4143 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 4144 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 4145 self.assertIn(cmsg_type, {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT}) 4146 self.assertEqual(len(cmsg_data), SIZEOF_INT) 4147 a = array.array("i") 4148 a.frombytes(cmsg_data) 4149 self.assertGreaterEqual(a[0], 0) 4150 self.assertLessEqual(a[0], 255) 4151 4152 # Try the above test with various buffer sizes. 4153 4154 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 4155 "IPV6_RECVTCLASS", "IPV6_TCLASS") 4156 def testSecondCmsgTrunc0(self): 4157 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT), 4158 ignoreflags=socket.MSG_CTRUNC) 4159 4160 @testSecondCmsgTrunc0.client_skip 4161 def _testSecondCmsgTrunc0(self): 4162 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4163 self.sendToServer(MSG) 4164 4165 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 4166 "IPV6_RECVTCLASS", "IPV6_TCLASS") 4167 def testSecondCmsgTrunc1(self): 4168 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 1) 4169 4170 @testSecondCmsgTrunc1.client_skip 4171 def _testSecondCmsgTrunc1(self): 4172 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4173 self.sendToServer(MSG) 4174 4175 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 4176 "IPV6_RECVTCLASS", "IPV6_TCLASS") 4177 def testSecondCmsgTrunc2Int(self): 4178 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 4179 2 * SIZEOF_INT) 4180 4181 @testSecondCmsgTrunc2Int.client_skip 4182 def _testSecondCmsgTrunc2Int(self): 4183 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4184 self.sendToServer(MSG) 4185 4186 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 4187 "IPV6_RECVTCLASS", "IPV6_TCLASS") 4188 def testSecondCmsgTruncLen0Minus1(self): 4189 self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 4190 socket.CMSG_LEN(0) - 1) 4191 4192 @testSecondCmsgTruncLen0Minus1.client_skip 4193 def _testSecondCmsgTruncLen0Minus1(self): 4194 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4195 self.sendToServer(MSG) 4196 4197 @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT", 4198 "IPV6_RECVTCLASS", "IPV6_TCLASS") 4199 def testSecondCmsgTruncInData(self): 4200 # Test truncation of the second of two control messages inside 4201 # its associated data. 4202 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 4203 socket.IPV6_RECVHOPLIMIT, 1) 4204 self.serv_sock.setsockopt(socket.IPPROTO_IPV6, 4205 socket.IPV6_RECVTCLASS, 1) 4206 self.misc_event.set() 4207 msg, ancdata, flags, addr = self.doRecvmsg( 4208 self.serv_sock, len(MSG), 4209 socket.CMSG_SPACE(SIZEOF_INT) + socket.CMSG_LEN(SIZEOF_INT) - 1) 4210 4211 self.assertEqual(msg, MSG) 4212 self.checkRecvmsgAddress(addr, self.cli_addr) 4213 self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC) 4214 4215 cmsg_types = {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT} 4216 4217 cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0) 4218 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 4219 cmsg_types.remove(cmsg_type) 4220 self.assertEqual(len(cmsg_data), SIZEOF_INT) 4221 a = array.array("i") 4222 a.frombytes(cmsg_data) 4223 self.assertGreaterEqual(a[0], 0) 4224 self.assertLessEqual(a[0], 255) 4225 4226 if ancdata: 4227 cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0) 4228 self.assertEqual(cmsg_level, socket.IPPROTO_IPV6) 4229 cmsg_types.remove(cmsg_type) 4230 self.assertLess(len(cmsg_data), SIZEOF_INT) 4231 4232 self.assertEqual(ancdata, []) 4233 4234 @testSecondCmsgTruncInData.client_skip 4235 def _testSecondCmsgTruncInData(self): 4236 self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout)) 4237 self.sendToServer(MSG) 4238 4239 4240# Derive concrete test classes for different socket types. 4241 4242class SendrecvmsgUDPTestBase(SendrecvmsgDgramFlagsBase, 4243 SendrecvmsgConnectionlessBase, 4244 ThreadedSocketTestMixin, UDPTestBase): 4245 pass 4246 4247@requireAttrs(socket.socket, "sendmsg") 4248class SendmsgUDPTest(SendmsgConnectionlessTests, SendrecvmsgUDPTestBase): 4249 pass 4250 4251@requireAttrs(socket.socket, "recvmsg") 4252class RecvmsgUDPTest(RecvmsgTests, SendrecvmsgUDPTestBase): 4253 pass 4254 4255@requireAttrs(socket.socket, "recvmsg_into") 4256class RecvmsgIntoUDPTest(RecvmsgIntoTests, SendrecvmsgUDPTestBase): 4257 pass 4258 4259 4260class SendrecvmsgUDP6TestBase(SendrecvmsgDgramFlagsBase, 4261 SendrecvmsgConnectionlessBase, 4262 ThreadedSocketTestMixin, UDP6TestBase): 4263 4264 def checkRecvmsgAddress(self, addr1, addr2): 4265 # Called to compare the received address with the address of 4266 # the peer, ignoring scope ID 4267 self.assertEqual(addr1[:-1], addr2[:-1]) 4268 4269@requireAttrs(socket.socket, "sendmsg") 4270@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4271@requireSocket("AF_INET6", "SOCK_DGRAM") 4272class SendmsgUDP6Test(SendmsgConnectionlessTests, SendrecvmsgUDP6TestBase): 4273 pass 4274 4275@requireAttrs(socket.socket, "recvmsg") 4276@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4277@requireSocket("AF_INET6", "SOCK_DGRAM") 4278class RecvmsgUDP6Test(RecvmsgTests, SendrecvmsgUDP6TestBase): 4279 pass 4280 4281@requireAttrs(socket.socket, "recvmsg_into") 4282@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4283@requireSocket("AF_INET6", "SOCK_DGRAM") 4284class RecvmsgIntoUDP6Test(RecvmsgIntoTests, SendrecvmsgUDP6TestBase): 4285 pass 4286 4287@requireAttrs(socket.socket, "recvmsg") 4288@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4289@requireAttrs(socket, "IPPROTO_IPV6") 4290@requireSocket("AF_INET6", "SOCK_DGRAM") 4291class RecvmsgRFC3542AncillaryUDP6Test(RFC3542AncillaryTest, 4292 SendrecvmsgUDP6TestBase): 4293 pass 4294 4295@requireAttrs(socket.socket, "recvmsg_into") 4296@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4297@requireAttrs(socket, "IPPROTO_IPV6") 4298@requireSocket("AF_INET6", "SOCK_DGRAM") 4299class RecvmsgIntoRFC3542AncillaryUDP6Test(RecvmsgIntoMixin, 4300 RFC3542AncillaryTest, 4301 SendrecvmsgUDP6TestBase): 4302 pass 4303 4304 4305@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4306 'UDPLITE sockets required for this test.') 4307class SendrecvmsgUDPLITETestBase(SendrecvmsgDgramFlagsBase, 4308 SendrecvmsgConnectionlessBase, 4309 ThreadedSocketTestMixin, UDPLITETestBase): 4310 pass 4311 4312@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4313 'UDPLITE sockets required for this test.') 4314@requireAttrs(socket.socket, "sendmsg") 4315class SendmsgUDPLITETest(SendmsgConnectionlessTests, SendrecvmsgUDPLITETestBase): 4316 pass 4317 4318@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4319 'UDPLITE sockets required for this test.') 4320@requireAttrs(socket.socket, "recvmsg") 4321class RecvmsgUDPLITETest(RecvmsgTests, SendrecvmsgUDPLITETestBase): 4322 pass 4323 4324@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4325 'UDPLITE sockets required for this test.') 4326@requireAttrs(socket.socket, "recvmsg_into") 4327class RecvmsgIntoUDPLITETest(RecvmsgIntoTests, SendrecvmsgUDPLITETestBase): 4328 pass 4329 4330 4331@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4332 'UDPLITE sockets required for this test.') 4333class SendrecvmsgUDPLITE6TestBase(SendrecvmsgDgramFlagsBase, 4334 SendrecvmsgConnectionlessBase, 4335 ThreadedSocketTestMixin, UDPLITE6TestBase): 4336 4337 def checkRecvmsgAddress(self, addr1, addr2): 4338 # Called to compare the received address with the address of 4339 # the peer, ignoring scope ID 4340 self.assertEqual(addr1[:-1], addr2[:-1]) 4341 4342@requireAttrs(socket.socket, "sendmsg") 4343@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4344@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4345 'UDPLITE sockets required for this test.') 4346@requireSocket("AF_INET6", "SOCK_DGRAM") 4347class SendmsgUDPLITE6Test(SendmsgConnectionlessTests, SendrecvmsgUDPLITE6TestBase): 4348 pass 4349 4350@requireAttrs(socket.socket, "recvmsg") 4351@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4352@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4353 'UDPLITE sockets required for this test.') 4354@requireSocket("AF_INET6", "SOCK_DGRAM") 4355class RecvmsgUDPLITE6Test(RecvmsgTests, SendrecvmsgUDPLITE6TestBase): 4356 pass 4357 4358@requireAttrs(socket.socket, "recvmsg_into") 4359@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4360@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4361 'UDPLITE sockets required for this test.') 4362@requireSocket("AF_INET6", "SOCK_DGRAM") 4363class RecvmsgIntoUDPLITE6Test(RecvmsgIntoTests, SendrecvmsgUDPLITE6TestBase): 4364 pass 4365 4366@requireAttrs(socket.socket, "recvmsg") 4367@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4368@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4369 'UDPLITE sockets required for this test.') 4370@requireAttrs(socket, "IPPROTO_IPV6") 4371@requireSocket("AF_INET6", "SOCK_DGRAM") 4372class RecvmsgRFC3542AncillaryUDPLITE6Test(RFC3542AncillaryTest, 4373 SendrecvmsgUDPLITE6TestBase): 4374 pass 4375 4376@requireAttrs(socket.socket, "recvmsg_into") 4377@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.') 4378@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 4379 'UDPLITE sockets required for this test.') 4380@requireAttrs(socket, "IPPROTO_IPV6") 4381@requireSocket("AF_INET6", "SOCK_DGRAM") 4382class RecvmsgIntoRFC3542AncillaryUDPLITE6Test(RecvmsgIntoMixin, 4383 RFC3542AncillaryTest, 4384 SendrecvmsgUDPLITE6TestBase): 4385 pass 4386 4387 4388class SendrecvmsgTCPTestBase(SendrecvmsgConnectedBase, 4389 ConnectedStreamTestMixin, TCPTestBase): 4390 pass 4391 4392@requireAttrs(socket.socket, "sendmsg") 4393class SendmsgTCPTest(SendmsgStreamTests, SendrecvmsgTCPTestBase): 4394 pass 4395 4396@requireAttrs(socket.socket, "recvmsg") 4397class RecvmsgTCPTest(RecvmsgTests, RecvmsgGenericStreamTests, 4398 SendrecvmsgTCPTestBase): 4399 pass 4400 4401@requireAttrs(socket.socket, "recvmsg_into") 4402class RecvmsgIntoTCPTest(RecvmsgIntoTests, RecvmsgGenericStreamTests, 4403 SendrecvmsgTCPTestBase): 4404 pass 4405 4406 4407class SendrecvmsgSCTPStreamTestBase(SendrecvmsgSCTPFlagsBase, 4408 SendrecvmsgConnectedBase, 4409 ConnectedStreamTestMixin, SCTPStreamBase): 4410 pass 4411 4412@requireAttrs(socket.socket, "sendmsg") 4413@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX") 4414@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP") 4415class SendmsgSCTPStreamTest(SendmsgStreamTests, SendrecvmsgSCTPStreamTestBase): 4416 pass 4417 4418@requireAttrs(socket.socket, "recvmsg") 4419@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX") 4420@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP") 4421class RecvmsgSCTPStreamTest(RecvmsgTests, RecvmsgGenericStreamTests, 4422 SendrecvmsgSCTPStreamTestBase): 4423 4424 def testRecvmsgEOF(self): 4425 try: 4426 super(RecvmsgSCTPStreamTest, self).testRecvmsgEOF() 4427 except OSError as e: 4428 if e.errno != errno.ENOTCONN: 4429 raise 4430 self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876") 4431 4432@requireAttrs(socket.socket, "recvmsg_into") 4433@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX") 4434@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP") 4435class RecvmsgIntoSCTPStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests, 4436 SendrecvmsgSCTPStreamTestBase): 4437 4438 def testRecvmsgEOF(self): 4439 try: 4440 super(RecvmsgIntoSCTPStreamTest, self).testRecvmsgEOF() 4441 except OSError as e: 4442 if e.errno != errno.ENOTCONN: 4443 raise 4444 self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876") 4445 4446 4447class SendrecvmsgUnixStreamTestBase(SendrecvmsgConnectedBase, 4448 ConnectedStreamTestMixin, UnixStreamBase): 4449 pass 4450 4451@requireAttrs(socket.socket, "sendmsg") 4452@requireAttrs(socket, "AF_UNIX") 4453class SendmsgUnixStreamTest(SendmsgStreamTests, SendrecvmsgUnixStreamTestBase): 4454 pass 4455 4456@requireAttrs(socket.socket, "recvmsg") 4457@requireAttrs(socket, "AF_UNIX") 4458class RecvmsgUnixStreamTest(RecvmsgTests, RecvmsgGenericStreamTests, 4459 SendrecvmsgUnixStreamTestBase): 4460 pass 4461 4462@requireAttrs(socket.socket, "recvmsg_into") 4463@requireAttrs(socket, "AF_UNIX") 4464class RecvmsgIntoUnixStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests, 4465 SendrecvmsgUnixStreamTestBase): 4466 pass 4467 4468@requireAttrs(socket.socket, "sendmsg", "recvmsg") 4469@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS") 4470class RecvmsgSCMRightsStreamTest(SCMRightsTest, SendrecvmsgUnixStreamTestBase): 4471 pass 4472 4473@requireAttrs(socket.socket, "sendmsg", "recvmsg_into") 4474@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS") 4475class RecvmsgIntoSCMRightsStreamTest(RecvmsgIntoMixin, SCMRightsTest, 4476 SendrecvmsgUnixStreamTestBase): 4477 pass 4478 4479 4480# Test interrupting the interruptible send/receive methods with a 4481# signal when a timeout is set. These tests avoid having multiple 4482# threads alive during the test so that the OS cannot deliver the 4483# signal to the wrong one. 4484 4485class InterruptedTimeoutBase: 4486 # Base class for interrupted send/receive tests. Installs an 4487 # empty handler for SIGALRM and removes it on teardown, along with 4488 # any scheduled alarms. 4489 4490 def setUp(self): 4491 super().setUp() 4492 orig_alrm_handler = signal.signal(signal.SIGALRM, 4493 lambda signum, frame: 1 / 0) 4494 self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler) 4495 4496 # Timeout for socket operations 4497 timeout = support.LOOPBACK_TIMEOUT 4498 4499 # Provide setAlarm() method to schedule delivery of SIGALRM after 4500 # given number of seconds, or cancel it if zero, and an 4501 # appropriate time value to use. Use setitimer() if available. 4502 if hasattr(signal, "setitimer"): 4503 alarm_time = 0.05 4504 4505 def setAlarm(self, seconds): 4506 signal.setitimer(signal.ITIMER_REAL, seconds) 4507 else: 4508 # Old systems may deliver the alarm up to one second early 4509 alarm_time = 2 4510 4511 def setAlarm(self, seconds): 4512 signal.alarm(seconds) 4513 4514 4515# Require siginterrupt() in order to ensure that system calls are 4516# interrupted by default. 4517@requireAttrs(signal, "siginterrupt") 4518@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"), 4519 "Don't have signal.alarm or signal.setitimer") 4520class InterruptedRecvTimeoutTest(InterruptedTimeoutBase, UDPTestBase): 4521 # Test interrupting the recv*() methods with signals when a 4522 # timeout is set. 4523 4524 def setUp(self): 4525 super().setUp() 4526 self.serv.settimeout(self.timeout) 4527 4528 def checkInterruptedRecv(self, func, *args, **kwargs): 4529 # Check that func(*args, **kwargs) raises 4530 # errno of EINTR when interrupted by a signal. 4531 try: 4532 self.setAlarm(self.alarm_time) 4533 with self.assertRaises(ZeroDivisionError) as cm: 4534 func(*args, **kwargs) 4535 finally: 4536 self.setAlarm(0) 4537 4538 def testInterruptedRecvTimeout(self): 4539 self.checkInterruptedRecv(self.serv.recv, 1024) 4540 4541 def testInterruptedRecvIntoTimeout(self): 4542 self.checkInterruptedRecv(self.serv.recv_into, bytearray(1024)) 4543 4544 def testInterruptedRecvfromTimeout(self): 4545 self.checkInterruptedRecv(self.serv.recvfrom, 1024) 4546 4547 def testInterruptedRecvfromIntoTimeout(self): 4548 self.checkInterruptedRecv(self.serv.recvfrom_into, bytearray(1024)) 4549 4550 @requireAttrs(socket.socket, "recvmsg") 4551 def testInterruptedRecvmsgTimeout(self): 4552 self.checkInterruptedRecv(self.serv.recvmsg, 1024) 4553 4554 @requireAttrs(socket.socket, "recvmsg_into") 4555 def testInterruptedRecvmsgIntoTimeout(self): 4556 self.checkInterruptedRecv(self.serv.recvmsg_into, [bytearray(1024)]) 4557 4558 4559# Require siginterrupt() in order to ensure that system calls are 4560# interrupted by default. 4561@requireAttrs(signal, "siginterrupt") 4562@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"), 4563 "Don't have signal.alarm or signal.setitimer") 4564class InterruptedSendTimeoutTest(InterruptedTimeoutBase, 4565 ThreadSafeCleanupTestCase, 4566 SocketListeningTestMixin, TCPTestBase): 4567 # Test interrupting the interruptible send*() methods with signals 4568 # when a timeout is set. 4569 4570 def setUp(self): 4571 super().setUp() 4572 self.serv_conn = self.newSocket() 4573 self.addCleanup(self.serv_conn.close) 4574 # Use a thread to complete the connection, but wait for it to 4575 # terminate before running the test, so that there is only one 4576 # thread to accept the signal. 4577 cli_thread = threading.Thread(target=self.doConnect) 4578 cli_thread.start() 4579 self.cli_conn, addr = self.serv.accept() 4580 self.addCleanup(self.cli_conn.close) 4581 cli_thread.join() 4582 self.serv_conn.settimeout(self.timeout) 4583 4584 def doConnect(self): 4585 self.serv_conn.connect(self.serv_addr) 4586 4587 def checkInterruptedSend(self, func, *args, **kwargs): 4588 # Check that func(*args, **kwargs), run in a loop, raises 4589 # OSError with an errno of EINTR when interrupted by a 4590 # signal. 4591 try: 4592 with self.assertRaises(ZeroDivisionError) as cm: 4593 while True: 4594 self.setAlarm(self.alarm_time) 4595 func(*args, **kwargs) 4596 finally: 4597 self.setAlarm(0) 4598 4599 # Issue #12958: The following tests have problems on OS X prior to 10.7 4600 @support.requires_mac_ver(10, 7) 4601 def testInterruptedSendTimeout(self): 4602 self.checkInterruptedSend(self.serv_conn.send, b"a"*512) 4603 4604 @support.requires_mac_ver(10, 7) 4605 def testInterruptedSendtoTimeout(self): 4606 # Passing an actual address here as Python's wrapper for 4607 # sendto() doesn't allow passing a zero-length one; POSIX 4608 # requires that the address is ignored since the socket is 4609 # connection-mode, however. 4610 self.checkInterruptedSend(self.serv_conn.sendto, b"a"*512, 4611 self.serv_addr) 4612 4613 @support.requires_mac_ver(10, 7) 4614 @requireAttrs(socket.socket, "sendmsg") 4615 def testInterruptedSendmsgTimeout(self): 4616 self.checkInterruptedSend(self.serv_conn.sendmsg, [b"a"*512]) 4617 4618 4619class TCPCloserTest(ThreadedTCPSocketTest): 4620 4621 def testClose(self): 4622 conn, addr = self.serv.accept() 4623 conn.close() 4624 4625 sd = self.cli 4626 read, write, err = select.select([sd], [], [], 1.0) 4627 self.assertEqual(read, [sd]) 4628 self.assertEqual(sd.recv(1), b'') 4629 4630 # Calling close() many times should be safe. 4631 conn.close() 4632 conn.close() 4633 4634 def _testClose(self): 4635 self.cli.connect((HOST, self.port)) 4636 time.sleep(1.0) 4637 4638 4639class BasicSocketPairTest(SocketPairTest): 4640 4641 def __init__(self, methodName='runTest'): 4642 SocketPairTest.__init__(self, methodName=methodName) 4643 4644 def _check_defaults(self, sock): 4645 self.assertIsInstance(sock, socket.socket) 4646 if hasattr(socket, 'AF_UNIX'): 4647 self.assertEqual(sock.family, socket.AF_UNIX) 4648 else: 4649 self.assertEqual(sock.family, socket.AF_INET) 4650 self.assertEqual(sock.type, socket.SOCK_STREAM) 4651 self.assertEqual(sock.proto, 0) 4652 4653 def _testDefaults(self): 4654 self._check_defaults(self.cli) 4655 4656 def testDefaults(self): 4657 self._check_defaults(self.serv) 4658 4659 def testRecv(self): 4660 msg = self.serv.recv(1024) 4661 self.assertEqual(msg, MSG) 4662 4663 def _testRecv(self): 4664 self.cli.send(MSG) 4665 4666 def testSend(self): 4667 self.serv.send(MSG) 4668 4669 def _testSend(self): 4670 msg = self.cli.recv(1024) 4671 self.assertEqual(msg, MSG) 4672 4673 4674class NonBlockingTCPTests(ThreadedTCPSocketTest): 4675 4676 def __init__(self, methodName='runTest'): 4677 self.event = threading.Event() 4678 ThreadedTCPSocketTest.__init__(self, methodName=methodName) 4679 4680 def assert_sock_timeout(self, sock, timeout): 4681 self.assertEqual(self.serv.gettimeout(), timeout) 4682 4683 blocking = (timeout != 0.0) 4684 self.assertEqual(sock.getblocking(), blocking) 4685 4686 if fcntl is not None: 4687 # When a Python socket has a non-zero timeout, it's switched 4688 # internally to a non-blocking mode. Later, sock.sendall(), 4689 # sock.recv(), and other socket operations use a select() call and 4690 # handle EWOULDBLOCK/EGAIN on all socket operations. That's how 4691 # timeouts are enforced. 4692 fd_blocking = (timeout is None) 4693 4694 flag = fcntl.fcntl(sock, fcntl.F_GETFL, os.O_NONBLOCK) 4695 self.assertEqual(not bool(flag & os.O_NONBLOCK), fd_blocking) 4696 4697 def testSetBlocking(self): 4698 # Test setblocking() and settimeout() methods 4699 self.serv.setblocking(True) 4700 self.assert_sock_timeout(self.serv, None) 4701 4702 self.serv.setblocking(False) 4703 self.assert_sock_timeout(self.serv, 0.0) 4704 4705 self.serv.settimeout(None) 4706 self.assert_sock_timeout(self.serv, None) 4707 4708 self.serv.settimeout(0) 4709 self.assert_sock_timeout(self.serv, 0) 4710 4711 self.serv.settimeout(10) 4712 self.assert_sock_timeout(self.serv, 10) 4713 4714 self.serv.settimeout(0) 4715 self.assert_sock_timeout(self.serv, 0) 4716 4717 def _testSetBlocking(self): 4718 pass 4719 4720 @support.cpython_only 4721 def testSetBlocking_overflow(self): 4722 # Issue 15989 4723 import _testcapi 4724 if _testcapi.UINT_MAX >= _testcapi.ULONG_MAX: 4725 self.skipTest('needs UINT_MAX < ULONG_MAX') 4726 4727 self.serv.setblocking(False) 4728 self.assertEqual(self.serv.gettimeout(), 0.0) 4729 4730 self.serv.setblocking(_testcapi.UINT_MAX + 1) 4731 self.assertIsNone(self.serv.gettimeout()) 4732 4733 _testSetBlocking_overflow = support.cpython_only(_testSetBlocking) 4734 4735 @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), 4736 'test needs socket.SOCK_NONBLOCK') 4737 @support.requires_linux_version(2, 6, 28) 4738 def testInitNonBlocking(self): 4739 # create a socket with SOCK_NONBLOCK 4740 self.serv.close() 4741 self.serv = socket.socket(socket.AF_INET, 4742 socket.SOCK_STREAM | socket.SOCK_NONBLOCK) 4743 self.assert_sock_timeout(self.serv, 0) 4744 4745 def _testInitNonBlocking(self): 4746 pass 4747 4748 def testInheritFlagsBlocking(self): 4749 # bpo-7995: accept() on a listening socket with a timeout and the 4750 # default timeout is None, the resulting socket must be blocking. 4751 with socket_setdefaulttimeout(None): 4752 self.serv.settimeout(10) 4753 conn, addr = self.serv.accept() 4754 self.addCleanup(conn.close) 4755 self.assertIsNone(conn.gettimeout()) 4756 4757 def _testInheritFlagsBlocking(self): 4758 self.cli.connect((HOST, self.port)) 4759 4760 def testInheritFlagsTimeout(self): 4761 # bpo-7995: accept() on a listening socket with a timeout and the 4762 # default timeout is None, the resulting socket must inherit 4763 # the default timeout. 4764 default_timeout = 20.0 4765 with socket_setdefaulttimeout(default_timeout): 4766 self.serv.settimeout(10) 4767 conn, addr = self.serv.accept() 4768 self.addCleanup(conn.close) 4769 self.assertEqual(conn.gettimeout(), default_timeout) 4770 4771 def _testInheritFlagsTimeout(self): 4772 self.cli.connect((HOST, self.port)) 4773 4774 def testAccept(self): 4775 # Testing non-blocking accept 4776 self.serv.setblocking(False) 4777 4778 # connect() didn't start: non-blocking accept() fails 4779 start_time = time.monotonic() 4780 with self.assertRaises(BlockingIOError): 4781 conn, addr = self.serv.accept() 4782 dt = time.monotonic() - start_time 4783 self.assertLess(dt, 1.0) 4784 4785 self.event.set() 4786 4787 read, write, err = select.select([self.serv], [], [], support.LONG_TIMEOUT) 4788 if self.serv not in read: 4789 self.fail("Error trying to do accept after select.") 4790 4791 # connect() completed: non-blocking accept() doesn't block 4792 conn, addr = self.serv.accept() 4793 self.addCleanup(conn.close) 4794 self.assertIsNone(conn.gettimeout()) 4795 4796 def _testAccept(self): 4797 # don't connect before event is set to check 4798 # that non-blocking accept() raises BlockingIOError 4799 self.event.wait() 4800 4801 self.cli.connect((HOST, self.port)) 4802 4803 def testRecv(self): 4804 # Testing non-blocking recv 4805 conn, addr = self.serv.accept() 4806 self.addCleanup(conn.close) 4807 conn.setblocking(False) 4808 4809 # the server didn't send data yet: non-blocking recv() fails 4810 with self.assertRaises(BlockingIOError): 4811 msg = conn.recv(len(MSG)) 4812 4813 self.event.set() 4814 4815 read, write, err = select.select([conn], [], [], support.LONG_TIMEOUT) 4816 if conn not in read: 4817 self.fail("Error during select call to non-blocking socket.") 4818 4819 # the server sent data yet: non-blocking recv() doesn't block 4820 msg = conn.recv(len(MSG)) 4821 self.assertEqual(msg, MSG) 4822 4823 def _testRecv(self): 4824 self.cli.connect((HOST, self.port)) 4825 4826 # don't send anything before event is set to check 4827 # that non-blocking recv() raises BlockingIOError 4828 self.event.wait() 4829 4830 # send data: recv() will no longer block 4831 self.cli.sendall(MSG) 4832 4833 4834class FileObjectClassTestCase(SocketConnectedTest): 4835 """Unit tests for the object returned by socket.makefile() 4836 4837 self.read_file is the io object returned by makefile() on 4838 the client connection. You can read from this file to 4839 get output from the server. 4840 4841 self.write_file is the io object returned by makefile() on the 4842 server connection. You can write to this file to send output 4843 to the client. 4844 """ 4845 4846 bufsize = -1 # Use default buffer size 4847 encoding = 'utf-8' 4848 errors = 'strict' 4849 newline = None 4850 4851 read_mode = 'rb' 4852 read_msg = MSG 4853 write_mode = 'wb' 4854 write_msg = MSG 4855 4856 def __init__(self, methodName='runTest'): 4857 SocketConnectedTest.__init__(self, methodName=methodName) 4858 4859 def setUp(self): 4860 self.evt1, self.evt2, self.serv_finished, self.cli_finished = [ 4861 threading.Event() for i in range(4)] 4862 SocketConnectedTest.setUp(self) 4863 self.read_file = self.cli_conn.makefile( 4864 self.read_mode, self.bufsize, 4865 encoding = self.encoding, 4866 errors = self.errors, 4867 newline = self.newline) 4868 4869 def tearDown(self): 4870 self.serv_finished.set() 4871 self.read_file.close() 4872 self.assertTrue(self.read_file.closed) 4873 self.read_file = None 4874 SocketConnectedTest.tearDown(self) 4875 4876 def clientSetUp(self): 4877 SocketConnectedTest.clientSetUp(self) 4878 self.write_file = self.serv_conn.makefile( 4879 self.write_mode, self.bufsize, 4880 encoding = self.encoding, 4881 errors = self.errors, 4882 newline = self.newline) 4883 4884 def clientTearDown(self): 4885 self.cli_finished.set() 4886 self.write_file.close() 4887 self.assertTrue(self.write_file.closed) 4888 self.write_file = None 4889 SocketConnectedTest.clientTearDown(self) 4890 4891 def testReadAfterTimeout(self): 4892 # Issue #7322: A file object must disallow further reads 4893 # after a timeout has occurred. 4894 self.cli_conn.settimeout(1) 4895 self.read_file.read(3) 4896 # First read raises a timeout 4897 self.assertRaises(TimeoutError, self.read_file.read, 1) 4898 # Second read is disallowed 4899 with self.assertRaises(OSError) as ctx: 4900 self.read_file.read(1) 4901 self.assertIn("cannot read from timed out object", str(ctx.exception)) 4902 4903 def _testReadAfterTimeout(self): 4904 self.write_file.write(self.write_msg[0:3]) 4905 self.write_file.flush() 4906 self.serv_finished.wait() 4907 4908 def testSmallRead(self): 4909 # Performing small file read test 4910 first_seg = self.read_file.read(len(self.read_msg)-3) 4911 second_seg = self.read_file.read(3) 4912 msg = first_seg + second_seg 4913 self.assertEqual(msg, self.read_msg) 4914 4915 def _testSmallRead(self): 4916 self.write_file.write(self.write_msg) 4917 self.write_file.flush() 4918 4919 def testFullRead(self): 4920 # read until EOF 4921 msg = self.read_file.read() 4922 self.assertEqual(msg, self.read_msg) 4923 4924 def _testFullRead(self): 4925 self.write_file.write(self.write_msg) 4926 self.write_file.close() 4927 4928 def testUnbufferedRead(self): 4929 # Performing unbuffered file read test 4930 buf = type(self.read_msg)() 4931 while 1: 4932 char = self.read_file.read(1) 4933 if not char: 4934 break 4935 buf += char 4936 self.assertEqual(buf, self.read_msg) 4937 4938 def _testUnbufferedRead(self): 4939 self.write_file.write(self.write_msg) 4940 self.write_file.flush() 4941 4942 def testReadline(self): 4943 # Performing file readline test 4944 line = self.read_file.readline() 4945 self.assertEqual(line, self.read_msg) 4946 4947 def _testReadline(self): 4948 self.write_file.write(self.write_msg) 4949 self.write_file.flush() 4950 4951 def testCloseAfterMakefile(self): 4952 # The file returned by makefile should keep the socket open. 4953 self.cli_conn.close() 4954 # read until EOF 4955 msg = self.read_file.read() 4956 self.assertEqual(msg, self.read_msg) 4957 4958 def _testCloseAfterMakefile(self): 4959 self.write_file.write(self.write_msg) 4960 self.write_file.flush() 4961 4962 def testMakefileAfterMakefileClose(self): 4963 self.read_file.close() 4964 msg = self.cli_conn.recv(len(MSG)) 4965 if isinstance(self.read_msg, str): 4966 msg = msg.decode() 4967 self.assertEqual(msg, self.read_msg) 4968 4969 def _testMakefileAfterMakefileClose(self): 4970 self.write_file.write(self.write_msg) 4971 self.write_file.flush() 4972 4973 def testClosedAttr(self): 4974 self.assertTrue(not self.read_file.closed) 4975 4976 def _testClosedAttr(self): 4977 self.assertTrue(not self.write_file.closed) 4978 4979 def testAttributes(self): 4980 self.assertEqual(self.read_file.mode, self.read_mode) 4981 self.assertEqual(self.read_file.name, self.cli_conn.fileno()) 4982 4983 def _testAttributes(self): 4984 self.assertEqual(self.write_file.mode, self.write_mode) 4985 self.assertEqual(self.write_file.name, self.serv_conn.fileno()) 4986 4987 def testRealClose(self): 4988 self.read_file.close() 4989 self.assertRaises(ValueError, self.read_file.fileno) 4990 self.cli_conn.close() 4991 self.assertRaises(OSError, self.cli_conn.getsockname) 4992 4993 def _testRealClose(self): 4994 pass 4995 4996 4997class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase): 4998 4999 """Repeat the tests from FileObjectClassTestCase with bufsize==0. 5000 5001 In this case (and in this case only), it should be possible to 5002 create a file object, read a line from it, create another file 5003 object, read another line from it, without loss of data in the 5004 first file object's buffer. Note that http.client relies on this 5005 when reading multiple requests from the same socket.""" 5006 5007 bufsize = 0 # Use unbuffered mode 5008 5009 def testUnbufferedReadline(self): 5010 # Read a line, create a new file object, read another line with it 5011 line = self.read_file.readline() # first line 5012 self.assertEqual(line, b"A. " + self.write_msg) # first line 5013 self.read_file = self.cli_conn.makefile('rb', 0) 5014 line = self.read_file.readline() # second line 5015 self.assertEqual(line, b"B. " + self.write_msg) # second line 5016 5017 def _testUnbufferedReadline(self): 5018 self.write_file.write(b"A. " + self.write_msg) 5019 self.write_file.write(b"B. " + self.write_msg) 5020 self.write_file.flush() 5021 5022 def testMakefileClose(self): 5023 # The file returned by makefile should keep the socket open... 5024 self.cli_conn.close() 5025 msg = self.cli_conn.recv(1024) 5026 self.assertEqual(msg, self.read_msg) 5027 # ...until the file is itself closed 5028 self.read_file.close() 5029 self.assertRaises(OSError, self.cli_conn.recv, 1024) 5030 5031 def _testMakefileClose(self): 5032 self.write_file.write(self.write_msg) 5033 self.write_file.flush() 5034 5035 def testMakefileCloseSocketDestroy(self): 5036 refcount_before = sys.getrefcount(self.cli_conn) 5037 self.read_file.close() 5038 refcount_after = sys.getrefcount(self.cli_conn) 5039 self.assertEqual(refcount_before - 1, refcount_after) 5040 5041 def _testMakefileCloseSocketDestroy(self): 5042 pass 5043 5044 # Non-blocking ops 5045 # NOTE: to set `read_file` as non-blocking, we must call 5046 # `cli_conn.setblocking` and vice-versa (see setUp / clientSetUp). 5047 5048 def testSmallReadNonBlocking(self): 5049 self.cli_conn.setblocking(False) 5050 self.assertEqual(self.read_file.readinto(bytearray(10)), None) 5051 self.assertEqual(self.read_file.read(len(self.read_msg) - 3), None) 5052 self.evt1.set() 5053 self.evt2.wait(1.0) 5054 first_seg = self.read_file.read(len(self.read_msg) - 3) 5055 if first_seg is None: 5056 # Data not arrived (can happen under Windows), wait a bit 5057 time.sleep(0.5) 5058 first_seg = self.read_file.read(len(self.read_msg) - 3) 5059 buf = bytearray(10) 5060 n = self.read_file.readinto(buf) 5061 self.assertEqual(n, 3) 5062 msg = first_seg + buf[:n] 5063 self.assertEqual(msg, self.read_msg) 5064 self.assertEqual(self.read_file.readinto(bytearray(16)), None) 5065 self.assertEqual(self.read_file.read(1), None) 5066 5067 def _testSmallReadNonBlocking(self): 5068 self.evt1.wait(1.0) 5069 self.write_file.write(self.write_msg) 5070 self.write_file.flush() 5071 self.evt2.set() 5072 # Avoid closing the socket before the server test has finished, 5073 # otherwise system recv() will return 0 instead of EWOULDBLOCK. 5074 self.serv_finished.wait(5.0) 5075 5076 def testWriteNonBlocking(self): 5077 self.cli_finished.wait(5.0) 5078 # The client thread can't skip directly - the SkipTest exception 5079 # would appear as a failure. 5080 if self.serv_skipped: 5081 self.skipTest(self.serv_skipped) 5082 5083 def _testWriteNonBlocking(self): 5084 self.serv_skipped = None 5085 self.serv_conn.setblocking(False) 5086 # Try to saturate the socket buffer pipe with repeated large writes. 5087 BIG = b"x" * support.SOCK_MAX_SIZE 5088 LIMIT = 10 5089 # The first write() succeeds since a chunk of data can be buffered 5090 n = self.write_file.write(BIG) 5091 self.assertGreater(n, 0) 5092 for i in range(LIMIT): 5093 n = self.write_file.write(BIG) 5094 if n is None: 5095 # Succeeded 5096 break 5097 self.assertGreater(n, 0) 5098 else: 5099 # Let us know that this test didn't manage to establish 5100 # the expected conditions. This is not a failure in itself but, 5101 # if it happens repeatedly, the test should be fixed. 5102 self.serv_skipped = "failed to saturate the socket buffer" 5103 5104 5105class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase): 5106 5107 bufsize = 1 # Default-buffered for reading; line-buffered for writing 5108 5109 5110class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase): 5111 5112 bufsize = 2 # Exercise the buffering code 5113 5114 5115class UnicodeReadFileObjectClassTestCase(FileObjectClassTestCase): 5116 """Tests for socket.makefile() in text mode (rather than binary)""" 5117 5118 read_mode = 'r' 5119 read_msg = MSG.decode('utf-8') 5120 write_mode = 'wb' 5121 write_msg = MSG 5122 newline = '' 5123 5124 5125class UnicodeWriteFileObjectClassTestCase(FileObjectClassTestCase): 5126 """Tests for socket.makefile() in text mode (rather than binary)""" 5127 5128 read_mode = 'rb' 5129 read_msg = MSG 5130 write_mode = 'w' 5131 write_msg = MSG.decode('utf-8') 5132 newline = '' 5133 5134 5135class UnicodeReadWriteFileObjectClassTestCase(FileObjectClassTestCase): 5136 """Tests for socket.makefile() in text mode (rather than binary)""" 5137 5138 read_mode = 'r' 5139 read_msg = MSG.decode('utf-8') 5140 write_mode = 'w' 5141 write_msg = MSG.decode('utf-8') 5142 newline = '' 5143 5144 5145class NetworkConnectionTest(object): 5146 """Prove network connection.""" 5147 5148 def clientSetUp(self): 5149 # We're inherited below by BasicTCPTest2, which also inherits 5150 # BasicTCPTest, which defines self.port referenced below. 5151 self.cli = socket.create_connection((HOST, self.port)) 5152 self.serv_conn = self.cli 5153 5154class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest): 5155 """Tests that NetworkConnection does not break existing TCP functionality. 5156 """ 5157 5158class NetworkConnectionNoServer(unittest.TestCase): 5159 5160 class MockSocket(socket.socket): 5161 def connect(self, *args): 5162 raise TimeoutError('timed out') 5163 5164 @contextlib.contextmanager 5165 def mocked_socket_module(self): 5166 """Return a socket which times out on connect""" 5167 old_socket = socket.socket 5168 socket.socket = self.MockSocket 5169 try: 5170 yield 5171 finally: 5172 socket.socket = old_socket 5173 5174 def test_connect(self): 5175 port = socket_helper.find_unused_port() 5176 cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 5177 self.addCleanup(cli.close) 5178 with self.assertRaises(OSError) as cm: 5179 cli.connect((HOST, port)) 5180 self.assertEqual(cm.exception.errno, errno.ECONNREFUSED) 5181 5182 def test_create_connection(self): 5183 # Issue #9792: errors raised by create_connection() should have 5184 # a proper errno attribute. 5185 port = socket_helper.find_unused_port() 5186 with self.assertRaises(OSError) as cm: 5187 socket.create_connection((HOST, port)) 5188 5189 # Issue #16257: create_connection() calls getaddrinfo() against 5190 # 'localhost'. This may result in an IPV6 addr being returned 5191 # as well as an IPV4 one: 5192 # >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM) 5193 # >>> [(2, 2, 0, '', ('127.0.0.1', 41230)), 5194 # (26, 2, 0, '', ('::1', 41230, 0, 0))] 5195 # 5196 # create_connection() enumerates through all the addresses returned 5197 # and if it doesn't successfully bind to any of them, it propagates 5198 # the last exception it encountered. 5199 # 5200 # On Solaris, ENETUNREACH is returned in this circumstance instead 5201 # of ECONNREFUSED. So, if that errno exists, add it to our list of 5202 # expected errnos. 5203 expected_errnos = socket_helper.get_socket_conn_refused_errs() 5204 self.assertIn(cm.exception.errno, expected_errnos) 5205 5206 def test_create_connection_all_errors(self): 5207 port = socket_helper.find_unused_port() 5208 try: 5209 socket.create_connection((HOST, port), all_errors=True) 5210 except ExceptionGroup as e: 5211 eg = e 5212 else: 5213 self.fail('expected connection to fail') 5214 5215 self.assertIsInstance(eg, ExceptionGroup) 5216 for e in eg.exceptions: 5217 self.assertIsInstance(e, OSError) 5218 5219 addresses = socket.getaddrinfo( 5220 'localhost', port, 0, socket.SOCK_STREAM) 5221 # assert that we got an exception for each address 5222 self.assertEqual(len(addresses), len(eg.exceptions)) 5223 5224 def test_create_connection_timeout(self): 5225 # Issue #9792: create_connection() should not recast timeout errors 5226 # as generic socket errors. 5227 with self.mocked_socket_module(): 5228 try: 5229 socket.create_connection((HOST, 1234)) 5230 except TimeoutError: 5231 pass 5232 except OSError as exc: 5233 if socket_helper.IPV6_ENABLED or exc.errno != errno.EAFNOSUPPORT: 5234 raise 5235 else: 5236 self.fail('TimeoutError not raised') 5237 5238 5239class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest): 5240 5241 def __init__(self, methodName='runTest'): 5242 SocketTCPTest.__init__(self, methodName=methodName) 5243 ThreadableTest.__init__(self) 5244 5245 def clientSetUp(self): 5246 self.source_port = socket_helper.find_unused_port() 5247 5248 def clientTearDown(self): 5249 self.cli.close() 5250 self.cli = None 5251 ThreadableTest.clientTearDown(self) 5252 5253 def _justAccept(self): 5254 conn, addr = self.serv.accept() 5255 conn.close() 5256 5257 testFamily = _justAccept 5258 def _testFamily(self): 5259 self.cli = socket.create_connection((HOST, self.port), 5260 timeout=support.LOOPBACK_TIMEOUT) 5261 self.addCleanup(self.cli.close) 5262 self.assertEqual(self.cli.family, 2) 5263 5264 testSourceAddress = _justAccept 5265 def _testSourceAddress(self): 5266 self.cli = socket.create_connection((HOST, self.port), 5267 timeout=support.LOOPBACK_TIMEOUT, 5268 source_address=('', self.source_port)) 5269 self.addCleanup(self.cli.close) 5270 self.assertEqual(self.cli.getsockname()[1], self.source_port) 5271 # The port number being used is sufficient to show that the bind() 5272 # call happened. 5273 5274 testTimeoutDefault = _justAccept 5275 def _testTimeoutDefault(self): 5276 # passing no explicit timeout uses socket's global default 5277 self.assertTrue(socket.getdefaulttimeout() is None) 5278 socket.setdefaulttimeout(42) 5279 try: 5280 self.cli = socket.create_connection((HOST, self.port)) 5281 self.addCleanup(self.cli.close) 5282 finally: 5283 socket.setdefaulttimeout(None) 5284 self.assertEqual(self.cli.gettimeout(), 42) 5285 5286 testTimeoutNone = _justAccept 5287 def _testTimeoutNone(self): 5288 # None timeout means the same as sock.settimeout(None) 5289 self.assertTrue(socket.getdefaulttimeout() is None) 5290 socket.setdefaulttimeout(30) 5291 try: 5292 self.cli = socket.create_connection((HOST, self.port), timeout=None) 5293 self.addCleanup(self.cli.close) 5294 finally: 5295 socket.setdefaulttimeout(None) 5296 self.assertEqual(self.cli.gettimeout(), None) 5297 5298 testTimeoutValueNamed = _justAccept 5299 def _testTimeoutValueNamed(self): 5300 self.cli = socket.create_connection((HOST, self.port), timeout=30) 5301 self.assertEqual(self.cli.gettimeout(), 30) 5302 5303 testTimeoutValueNonamed = _justAccept 5304 def _testTimeoutValueNonamed(self): 5305 self.cli = socket.create_connection((HOST, self.port), 30) 5306 self.addCleanup(self.cli.close) 5307 self.assertEqual(self.cli.gettimeout(), 30) 5308 5309 5310class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest): 5311 5312 def __init__(self, methodName='runTest'): 5313 SocketTCPTest.__init__(self, methodName=methodName) 5314 ThreadableTest.__init__(self) 5315 5316 def clientSetUp(self): 5317 pass 5318 5319 def clientTearDown(self): 5320 self.cli.close() 5321 self.cli = None 5322 ThreadableTest.clientTearDown(self) 5323 5324 def testInsideTimeout(self): 5325 conn, addr = self.serv.accept() 5326 self.addCleanup(conn.close) 5327 time.sleep(3) 5328 conn.send(b"done!") 5329 testOutsideTimeout = testInsideTimeout 5330 5331 def _testInsideTimeout(self): 5332 self.cli = sock = socket.create_connection((HOST, self.port)) 5333 data = sock.recv(5) 5334 self.assertEqual(data, b"done!") 5335 5336 def _testOutsideTimeout(self): 5337 self.cli = sock = socket.create_connection((HOST, self.port), timeout=1) 5338 self.assertRaises(TimeoutError, lambda: sock.recv(5)) 5339 5340 5341class TCPTimeoutTest(SocketTCPTest): 5342 5343 def testTCPTimeout(self): 5344 def raise_timeout(*args, **kwargs): 5345 self.serv.settimeout(1.0) 5346 self.serv.accept() 5347 self.assertRaises(TimeoutError, raise_timeout, 5348 "Error generating a timeout exception (TCP)") 5349 5350 def testTimeoutZero(self): 5351 ok = False 5352 try: 5353 self.serv.settimeout(0.0) 5354 foo = self.serv.accept() 5355 except TimeoutError: 5356 self.fail("caught timeout instead of error (TCP)") 5357 except OSError: 5358 ok = True 5359 except: 5360 self.fail("caught unexpected exception (TCP)") 5361 if not ok: 5362 self.fail("accept() returned success when we did not expect it") 5363 5364 @unittest.skipUnless(hasattr(signal, 'alarm'), 5365 'test needs signal.alarm()') 5366 def testInterruptedTimeout(self): 5367 # XXX I don't know how to do this test on MSWindows or any other 5368 # platform that doesn't support signal.alarm() or os.kill(), though 5369 # the bug should have existed on all platforms. 5370 self.serv.settimeout(5.0) # must be longer than alarm 5371 class Alarm(Exception): 5372 pass 5373 def alarm_handler(signal, frame): 5374 raise Alarm 5375 old_alarm = signal.signal(signal.SIGALRM, alarm_handler) 5376 try: 5377 try: 5378 signal.alarm(2) # POSIX allows alarm to be up to 1 second early 5379 foo = self.serv.accept() 5380 except TimeoutError: 5381 self.fail("caught timeout instead of Alarm") 5382 except Alarm: 5383 pass 5384 except: 5385 self.fail("caught other exception instead of Alarm:" 5386 " %s(%s):\n%s" % 5387 (sys.exc_info()[:2] + (traceback.format_exc(),))) 5388 else: 5389 self.fail("nothing caught") 5390 finally: 5391 signal.alarm(0) # shut off alarm 5392 except Alarm: 5393 self.fail("got Alarm in wrong place") 5394 finally: 5395 # no alarm can be pending. Safe to restore old handler. 5396 signal.signal(signal.SIGALRM, old_alarm) 5397 5398class UDPTimeoutTest(SocketUDPTest): 5399 5400 def testUDPTimeout(self): 5401 def raise_timeout(*args, **kwargs): 5402 self.serv.settimeout(1.0) 5403 self.serv.recv(1024) 5404 self.assertRaises(TimeoutError, raise_timeout, 5405 "Error generating a timeout exception (UDP)") 5406 5407 def testTimeoutZero(self): 5408 ok = False 5409 try: 5410 self.serv.settimeout(0.0) 5411 foo = self.serv.recv(1024) 5412 except TimeoutError: 5413 self.fail("caught timeout instead of error (UDP)") 5414 except OSError: 5415 ok = True 5416 except: 5417 self.fail("caught unexpected exception (UDP)") 5418 if not ok: 5419 self.fail("recv() returned success when we did not expect it") 5420 5421@unittest.skipUnless(HAVE_SOCKET_UDPLITE, 5422 'UDPLITE sockets required for this test.') 5423class UDPLITETimeoutTest(SocketUDPLITETest): 5424 5425 def testUDPLITETimeout(self): 5426 def raise_timeout(*args, **kwargs): 5427 self.serv.settimeout(1.0) 5428 self.serv.recv(1024) 5429 self.assertRaises(TimeoutError, raise_timeout, 5430 "Error generating a timeout exception (UDPLITE)") 5431 5432 def testTimeoutZero(self): 5433 ok = False 5434 try: 5435 self.serv.settimeout(0.0) 5436 foo = self.serv.recv(1024) 5437 except TimeoutError: 5438 self.fail("caught timeout instead of error (UDPLITE)") 5439 except OSError: 5440 ok = True 5441 except: 5442 self.fail("caught unexpected exception (UDPLITE)") 5443 if not ok: 5444 self.fail("recv() returned success when we did not expect it") 5445 5446class TestExceptions(unittest.TestCase): 5447 5448 def testExceptionTree(self): 5449 self.assertTrue(issubclass(OSError, Exception)) 5450 self.assertTrue(issubclass(socket.herror, OSError)) 5451 self.assertTrue(issubclass(socket.gaierror, OSError)) 5452 self.assertTrue(issubclass(socket.timeout, OSError)) 5453 self.assertIs(socket.error, OSError) 5454 self.assertIs(socket.timeout, TimeoutError) 5455 5456 def test_setblocking_invalidfd(self): 5457 # Regression test for issue #28471 5458 5459 sock0 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) 5460 sock = socket.socket( 5461 socket.AF_INET, socket.SOCK_STREAM, 0, sock0.fileno()) 5462 sock0.close() 5463 self.addCleanup(sock.detach) 5464 5465 with self.assertRaises(OSError): 5466 sock.setblocking(False) 5467 5468 5469@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test') 5470class TestLinuxAbstractNamespace(unittest.TestCase): 5471 5472 UNIX_PATH_MAX = 108 5473 5474 def testLinuxAbstractNamespace(self): 5475 address = b"\x00python-test-hello\x00\xff" 5476 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s1: 5477 s1.bind(address) 5478 s1.listen() 5479 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s2: 5480 s2.connect(s1.getsockname()) 5481 with s1.accept()[0] as s3: 5482 self.assertEqual(s1.getsockname(), address) 5483 self.assertEqual(s2.getpeername(), address) 5484 5485 def testMaxName(self): 5486 address = b"\x00" + b"h" * (self.UNIX_PATH_MAX - 1) 5487 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: 5488 s.bind(address) 5489 self.assertEqual(s.getsockname(), address) 5490 5491 def testNameOverflow(self): 5492 address = "\x00" + "h" * self.UNIX_PATH_MAX 5493 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: 5494 self.assertRaises(OSError, s.bind, address) 5495 5496 def testStrName(self): 5497 # Check that an abstract name can be passed as a string. 5498 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 5499 try: 5500 s.bind("\x00python\x00test\x00") 5501 self.assertEqual(s.getsockname(), b"\x00python\x00test\x00") 5502 finally: 5503 s.close() 5504 5505 def testBytearrayName(self): 5506 # Check that an abstract name can be passed as a bytearray. 5507 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: 5508 s.bind(bytearray(b"\x00python\x00test\x00")) 5509 self.assertEqual(s.getsockname(), b"\x00python\x00test\x00") 5510 5511 def testAutobind(self): 5512 # Check that binding to an empty string binds to an available address 5513 # in the abstract namespace as specified in unix(7) "Autobind feature". 5514 abstract_address = b"^\0[0-9a-f]{5}" 5515 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s1: 5516 s1.bind("") 5517 self.assertRegex(s1.getsockname(), abstract_address) 5518 # Each socket is bound to a different abstract address. 5519 with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s2: 5520 s2.bind("") 5521 self.assertRegex(s2.getsockname(), abstract_address) 5522 self.assertNotEqual(s1.getsockname(), s2.getsockname()) 5523 5524 5525@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'test needs socket.AF_UNIX') 5526class TestUnixDomain(unittest.TestCase): 5527 5528 def setUp(self): 5529 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 5530 5531 def tearDown(self): 5532 self.sock.close() 5533 5534 def encoded(self, path): 5535 # Return the given path encoded in the file system encoding, 5536 # or skip the test if this is not possible. 5537 try: 5538 return os.fsencode(path) 5539 except UnicodeEncodeError: 5540 self.skipTest( 5541 "Pathname {0!a} cannot be represented in file " 5542 "system encoding {1!r}".format( 5543 path, sys.getfilesystemencoding())) 5544 5545 def bind(self, sock, path): 5546 # Bind the socket 5547 try: 5548 socket_helper.bind_unix_socket(sock, path) 5549 except OSError as e: 5550 if str(e) == "AF_UNIX path too long": 5551 self.skipTest( 5552 "Pathname {0!a} is too long to serve as an AF_UNIX path" 5553 .format(path)) 5554 else: 5555 raise 5556 5557 def testUnbound(self): 5558 # Issue #30205 (note getsockname() can return None on OS X) 5559 self.assertIn(self.sock.getsockname(), ('', None)) 5560 5561 def testStrAddr(self): 5562 # Test binding to and retrieving a normal string pathname. 5563 path = os.path.abspath(os_helper.TESTFN) 5564 self.bind(self.sock, path) 5565 self.addCleanup(os_helper.unlink, path) 5566 self.assertEqual(self.sock.getsockname(), path) 5567 5568 def testBytesAddr(self): 5569 # Test binding to a bytes pathname. 5570 path = os.path.abspath(os_helper.TESTFN) 5571 self.bind(self.sock, self.encoded(path)) 5572 self.addCleanup(os_helper.unlink, path) 5573 self.assertEqual(self.sock.getsockname(), path) 5574 5575 def testSurrogateescapeBind(self): 5576 # Test binding to a valid non-ASCII pathname, with the 5577 # non-ASCII bytes supplied using surrogateescape encoding. 5578 path = os.path.abspath(os_helper.TESTFN_UNICODE) 5579 b = self.encoded(path) 5580 self.bind(self.sock, b.decode("ascii", "surrogateescape")) 5581 self.addCleanup(os_helper.unlink, path) 5582 self.assertEqual(self.sock.getsockname(), path) 5583 5584 def testUnencodableAddr(self): 5585 # Test binding to a pathname that cannot be encoded in the 5586 # file system encoding. 5587 if os_helper.TESTFN_UNENCODABLE is None: 5588 self.skipTest("No unencodable filename available") 5589 path = os.path.abspath(os_helper.TESTFN_UNENCODABLE) 5590 self.bind(self.sock, path) 5591 self.addCleanup(os_helper.unlink, path) 5592 self.assertEqual(self.sock.getsockname(), path) 5593 5594 @unittest.skipIf(sys.platform == 'linux', 'Linux specific test') 5595 def testEmptyAddress(self): 5596 # Test that binding empty address fails. 5597 self.assertRaises(OSError, self.sock.bind, "") 5598 5599 5600class BufferIOTest(SocketConnectedTest): 5601 """ 5602 Test the buffer versions of socket.recv() and socket.send(). 5603 """ 5604 def __init__(self, methodName='runTest'): 5605 SocketConnectedTest.__init__(self, methodName=methodName) 5606 5607 def testRecvIntoArray(self): 5608 buf = array.array("B", [0] * len(MSG)) 5609 nbytes = self.cli_conn.recv_into(buf) 5610 self.assertEqual(nbytes, len(MSG)) 5611 buf = buf.tobytes() 5612 msg = buf[:len(MSG)] 5613 self.assertEqual(msg, MSG) 5614 5615 def _testRecvIntoArray(self): 5616 buf = bytes(MSG) 5617 self.serv_conn.send(buf) 5618 5619 def testRecvIntoBytearray(self): 5620 buf = bytearray(1024) 5621 nbytes = self.cli_conn.recv_into(buf) 5622 self.assertEqual(nbytes, len(MSG)) 5623 msg = buf[:len(MSG)] 5624 self.assertEqual(msg, MSG) 5625 5626 _testRecvIntoBytearray = _testRecvIntoArray 5627 5628 def testRecvIntoMemoryview(self): 5629 buf = bytearray(1024) 5630 nbytes = self.cli_conn.recv_into(memoryview(buf)) 5631 self.assertEqual(nbytes, len(MSG)) 5632 msg = buf[:len(MSG)] 5633 self.assertEqual(msg, MSG) 5634 5635 _testRecvIntoMemoryview = _testRecvIntoArray 5636 5637 def testRecvFromIntoArray(self): 5638 buf = array.array("B", [0] * len(MSG)) 5639 nbytes, addr = self.cli_conn.recvfrom_into(buf) 5640 self.assertEqual(nbytes, len(MSG)) 5641 buf = buf.tobytes() 5642 msg = buf[:len(MSG)] 5643 self.assertEqual(msg, MSG) 5644 5645 def _testRecvFromIntoArray(self): 5646 buf = bytes(MSG) 5647 self.serv_conn.send(buf) 5648 5649 def testRecvFromIntoBytearray(self): 5650 buf = bytearray(1024) 5651 nbytes, addr = self.cli_conn.recvfrom_into(buf) 5652 self.assertEqual(nbytes, len(MSG)) 5653 msg = buf[:len(MSG)] 5654 self.assertEqual(msg, MSG) 5655 5656 _testRecvFromIntoBytearray = _testRecvFromIntoArray 5657 5658 def testRecvFromIntoMemoryview(self): 5659 buf = bytearray(1024) 5660 nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf)) 5661 self.assertEqual(nbytes, len(MSG)) 5662 msg = buf[:len(MSG)] 5663 self.assertEqual(msg, MSG) 5664 5665 _testRecvFromIntoMemoryview = _testRecvFromIntoArray 5666 5667 def testRecvFromIntoSmallBuffer(self): 5668 # See issue #20246. 5669 buf = bytearray(8) 5670 self.assertRaises(ValueError, self.cli_conn.recvfrom_into, buf, 1024) 5671 5672 def _testRecvFromIntoSmallBuffer(self): 5673 self.serv_conn.send(MSG) 5674 5675 def testRecvFromIntoEmptyBuffer(self): 5676 buf = bytearray() 5677 self.cli_conn.recvfrom_into(buf) 5678 self.cli_conn.recvfrom_into(buf, 0) 5679 5680 _testRecvFromIntoEmptyBuffer = _testRecvFromIntoArray 5681 5682 5683TIPC_STYPE = 2000 5684TIPC_LOWER = 200 5685TIPC_UPPER = 210 5686 5687def isTipcAvailable(): 5688 """Check if the TIPC module is loaded 5689 5690 The TIPC module is not loaded automatically on Ubuntu and probably 5691 other Linux distros. 5692 """ 5693 if not hasattr(socket, "AF_TIPC"): 5694 return False 5695 try: 5696 f = open("/proc/modules", encoding="utf-8") 5697 except (FileNotFoundError, IsADirectoryError, PermissionError): 5698 # It's ok if the file does not exist, is a directory or if we 5699 # have not the permission to read it. 5700 return False 5701 with f: 5702 for line in f: 5703 if line.startswith("tipc "): 5704 return True 5705 return False 5706 5707@unittest.skipUnless(isTipcAvailable(), 5708 "TIPC module is not loaded, please 'sudo modprobe tipc'") 5709class TIPCTest(unittest.TestCase): 5710 def testRDM(self): 5711 srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) 5712 cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) 5713 self.addCleanup(srv.close) 5714 self.addCleanup(cli.close) 5715 5716 srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 5717 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, 5718 TIPC_LOWER, TIPC_UPPER) 5719 srv.bind(srvaddr) 5720 5721 sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, 5722 TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0) 5723 cli.sendto(MSG, sendaddr) 5724 5725 msg, recvaddr = srv.recvfrom(1024) 5726 5727 self.assertEqual(cli.getsockname(), recvaddr) 5728 self.assertEqual(msg, MSG) 5729 5730 5731@unittest.skipUnless(isTipcAvailable(), 5732 "TIPC module is not loaded, please 'sudo modprobe tipc'") 5733class TIPCThreadableTest(unittest.TestCase, ThreadableTest): 5734 def __init__(self, methodName = 'runTest'): 5735 unittest.TestCase.__init__(self, methodName = methodName) 5736 ThreadableTest.__init__(self) 5737 5738 def setUp(self): 5739 self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) 5740 self.addCleanup(self.srv.close) 5741 self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 5742 srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, 5743 TIPC_LOWER, TIPC_UPPER) 5744 self.srv.bind(srvaddr) 5745 self.srv.listen() 5746 self.serverExplicitReady() 5747 self.conn, self.connaddr = self.srv.accept() 5748 self.addCleanup(self.conn.close) 5749 5750 def clientSetUp(self): 5751 # There is a hittable race between serverExplicitReady() and the 5752 # accept() call; sleep a little while to avoid it, otherwise 5753 # we could get an exception 5754 time.sleep(0.1) 5755 self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) 5756 self.addCleanup(self.cli.close) 5757 addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, 5758 TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0) 5759 self.cli.connect(addr) 5760 self.cliaddr = self.cli.getsockname() 5761 5762 def testStream(self): 5763 msg = self.conn.recv(1024) 5764 self.assertEqual(msg, MSG) 5765 self.assertEqual(self.cliaddr, self.connaddr) 5766 5767 def _testStream(self): 5768 self.cli.send(MSG) 5769 self.cli.close() 5770 5771 5772class ContextManagersTest(ThreadedTCPSocketTest): 5773 5774 def _testSocketClass(self): 5775 # base test 5776 with socket.socket() as sock: 5777 self.assertFalse(sock._closed) 5778 self.assertTrue(sock._closed) 5779 # close inside with block 5780 with socket.socket() as sock: 5781 sock.close() 5782 self.assertTrue(sock._closed) 5783 # exception inside with block 5784 with socket.socket() as sock: 5785 self.assertRaises(OSError, sock.sendall, b'foo') 5786 self.assertTrue(sock._closed) 5787 5788 def testCreateConnectionBase(self): 5789 conn, addr = self.serv.accept() 5790 self.addCleanup(conn.close) 5791 data = conn.recv(1024) 5792 conn.sendall(data) 5793 5794 def _testCreateConnectionBase(self): 5795 address = self.serv.getsockname() 5796 with socket.create_connection(address) as sock: 5797 self.assertFalse(sock._closed) 5798 sock.sendall(b'foo') 5799 self.assertEqual(sock.recv(1024), b'foo') 5800 self.assertTrue(sock._closed) 5801 5802 def testCreateConnectionClose(self): 5803 conn, addr = self.serv.accept() 5804 self.addCleanup(conn.close) 5805 data = conn.recv(1024) 5806 conn.sendall(data) 5807 5808 def _testCreateConnectionClose(self): 5809 address = self.serv.getsockname() 5810 with socket.create_connection(address) as sock: 5811 sock.close() 5812 self.assertTrue(sock._closed) 5813 self.assertRaises(OSError, sock.sendall, b'foo') 5814 5815 5816class InheritanceTest(unittest.TestCase): 5817 @unittest.skipUnless(hasattr(socket, "SOCK_CLOEXEC"), 5818 "SOCK_CLOEXEC not defined") 5819 @support.requires_linux_version(2, 6, 28) 5820 def test_SOCK_CLOEXEC(self): 5821 with socket.socket(socket.AF_INET, 5822 socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s: 5823 self.assertEqual(s.type, socket.SOCK_STREAM) 5824 self.assertFalse(s.get_inheritable()) 5825 5826 def test_default_inheritable(self): 5827 sock = socket.socket() 5828 with sock: 5829 self.assertEqual(sock.get_inheritable(), False) 5830 5831 def test_dup(self): 5832 sock = socket.socket() 5833 with sock: 5834 newsock = sock.dup() 5835 sock.close() 5836 with newsock: 5837 self.assertEqual(newsock.get_inheritable(), False) 5838 5839 def test_set_inheritable(self): 5840 sock = socket.socket() 5841 with sock: 5842 sock.set_inheritable(True) 5843 self.assertEqual(sock.get_inheritable(), True) 5844 5845 sock.set_inheritable(False) 5846 self.assertEqual(sock.get_inheritable(), False) 5847 5848 @unittest.skipIf(fcntl is None, "need fcntl") 5849 def test_get_inheritable_cloexec(self): 5850 sock = socket.socket() 5851 with sock: 5852 fd = sock.fileno() 5853 self.assertEqual(sock.get_inheritable(), False) 5854 5855 # clear FD_CLOEXEC flag 5856 flags = fcntl.fcntl(fd, fcntl.F_GETFD) 5857 flags &= ~fcntl.FD_CLOEXEC 5858 fcntl.fcntl(fd, fcntl.F_SETFD, flags) 5859 5860 self.assertEqual(sock.get_inheritable(), True) 5861 5862 @unittest.skipIf(fcntl is None, "need fcntl") 5863 def test_set_inheritable_cloexec(self): 5864 sock = socket.socket() 5865 with sock: 5866 fd = sock.fileno() 5867 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 5868 fcntl.FD_CLOEXEC) 5869 5870 sock.set_inheritable(True) 5871 self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 5872 0) 5873 5874 5875 def test_socketpair(self): 5876 s1, s2 = socket.socketpair() 5877 self.addCleanup(s1.close) 5878 self.addCleanup(s2.close) 5879 self.assertEqual(s1.get_inheritable(), False) 5880 self.assertEqual(s2.get_inheritable(), False) 5881 5882 5883@unittest.skipUnless(hasattr(socket, "SOCK_NONBLOCK"), 5884 "SOCK_NONBLOCK not defined") 5885class NonblockConstantTest(unittest.TestCase): 5886 def checkNonblock(self, s, nonblock=True, timeout=0.0): 5887 if nonblock: 5888 self.assertEqual(s.type, socket.SOCK_STREAM) 5889 self.assertEqual(s.gettimeout(), timeout) 5890 self.assertTrue( 5891 fcntl.fcntl(s, fcntl.F_GETFL, os.O_NONBLOCK) & os.O_NONBLOCK) 5892 if timeout == 0: 5893 # timeout == 0: means that getblocking() must be False. 5894 self.assertFalse(s.getblocking()) 5895 else: 5896 # If timeout > 0, the socket will be in a "blocking" mode 5897 # from the standpoint of the Python API. For Python socket 5898 # object, "blocking" means that operations like 'sock.recv()' 5899 # will block. Internally, file descriptors for 5900 # "blocking" Python sockets *with timeouts* are in a 5901 # *non-blocking* mode, and 'sock.recv()' uses 'select()' 5902 # and handles EWOULDBLOCK/EAGAIN to enforce the timeout. 5903 self.assertTrue(s.getblocking()) 5904 else: 5905 self.assertEqual(s.type, socket.SOCK_STREAM) 5906 self.assertEqual(s.gettimeout(), None) 5907 self.assertFalse( 5908 fcntl.fcntl(s, fcntl.F_GETFL, os.O_NONBLOCK) & os.O_NONBLOCK) 5909 self.assertTrue(s.getblocking()) 5910 5911 @support.requires_linux_version(2, 6, 28) 5912 def test_SOCK_NONBLOCK(self): 5913 # a lot of it seems silly and redundant, but I wanted to test that 5914 # changing back and forth worked ok 5915 with socket.socket(socket.AF_INET, 5916 socket.SOCK_STREAM | socket.SOCK_NONBLOCK) as s: 5917 self.checkNonblock(s) 5918 s.setblocking(True) 5919 self.checkNonblock(s, nonblock=False) 5920 s.setblocking(False) 5921 self.checkNonblock(s) 5922 s.settimeout(None) 5923 self.checkNonblock(s, nonblock=False) 5924 s.settimeout(2.0) 5925 self.checkNonblock(s, timeout=2.0) 5926 s.setblocking(True) 5927 self.checkNonblock(s, nonblock=False) 5928 # defaulttimeout 5929 t = socket.getdefaulttimeout() 5930 socket.setdefaulttimeout(0.0) 5931 with socket.socket() as s: 5932 self.checkNonblock(s) 5933 socket.setdefaulttimeout(None) 5934 with socket.socket() as s: 5935 self.checkNonblock(s, False) 5936 socket.setdefaulttimeout(2.0) 5937 with socket.socket() as s: 5938 self.checkNonblock(s, timeout=2.0) 5939 socket.setdefaulttimeout(None) 5940 with socket.socket() as s: 5941 self.checkNonblock(s, False) 5942 socket.setdefaulttimeout(t) 5943 5944 5945@unittest.skipUnless(os.name == "nt", "Windows specific") 5946@unittest.skipUnless(multiprocessing, "need multiprocessing") 5947class TestSocketSharing(SocketTCPTest): 5948 # This must be classmethod and not staticmethod or multiprocessing 5949 # won't be able to bootstrap it. 5950 @classmethod 5951 def remoteProcessServer(cls, q): 5952 # Recreate socket from shared data 5953 sdata = q.get() 5954 message = q.get() 5955 5956 s = socket.fromshare(sdata) 5957 s2, c = s.accept() 5958 5959 # Send the message 5960 s2.sendall(message) 5961 s2.close() 5962 s.close() 5963 5964 def testShare(self): 5965 # Transfer the listening server socket to another process 5966 # and service it from there. 5967 5968 # Create process: 5969 q = multiprocessing.Queue() 5970 p = multiprocessing.Process(target=self.remoteProcessServer, args=(q,)) 5971 p.start() 5972 5973 # Get the shared socket data 5974 data = self.serv.share(p.pid) 5975 5976 # Pass the shared socket to the other process 5977 addr = self.serv.getsockname() 5978 self.serv.close() 5979 q.put(data) 5980 5981 # The data that the server will send us 5982 message = b"slapmahfro" 5983 q.put(message) 5984 5985 # Connect 5986 s = socket.create_connection(addr) 5987 # listen for the data 5988 m = [] 5989 while True: 5990 data = s.recv(100) 5991 if not data: 5992 break 5993 m.append(data) 5994 s.close() 5995 received = b"".join(m) 5996 self.assertEqual(received, message) 5997 p.join() 5998 5999 def testShareLength(self): 6000 data = self.serv.share(os.getpid()) 6001 self.assertRaises(ValueError, socket.fromshare, data[:-1]) 6002 self.assertRaises(ValueError, socket.fromshare, data+b"foo") 6003 6004 def compareSockets(self, org, other): 6005 # socket sharing is expected to work only for blocking socket 6006 # since the internal python timeout value isn't transferred. 6007 self.assertEqual(org.gettimeout(), None) 6008 self.assertEqual(org.gettimeout(), other.gettimeout()) 6009 6010 self.assertEqual(org.family, other.family) 6011 self.assertEqual(org.type, other.type) 6012 # If the user specified "0" for proto, then 6013 # internally windows will have picked the correct value. 6014 # Python introspection on the socket however will still return 6015 # 0. For the shared socket, the python value is recreated 6016 # from the actual value, so it may not compare correctly. 6017 if org.proto != 0: 6018 self.assertEqual(org.proto, other.proto) 6019 6020 def testShareLocal(self): 6021 data = self.serv.share(os.getpid()) 6022 s = socket.fromshare(data) 6023 try: 6024 self.compareSockets(self.serv, s) 6025 finally: 6026 s.close() 6027 6028 def testTypes(self): 6029 families = [socket.AF_INET, socket.AF_INET6] 6030 types = [socket.SOCK_STREAM, socket.SOCK_DGRAM] 6031 for f in families: 6032 for t in types: 6033 try: 6034 source = socket.socket(f, t) 6035 except OSError: 6036 continue # This combination is not supported 6037 try: 6038 data = source.share(os.getpid()) 6039 shared = socket.fromshare(data) 6040 try: 6041 self.compareSockets(source, shared) 6042 finally: 6043 shared.close() 6044 finally: 6045 source.close() 6046 6047 6048class SendfileUsingSendTest(ThreadedTCPSocketTest): 6049 """ 6050 Test the send() implementation of socket.sendfile(). 6051 """ 6052 6053 FILESIZE = (10 * 1024 * 1024) # 10 MiB 6054 BUFSIZE = 8192 6055 FILEDATA = b"" 6056 TIMEOUT = support.LOOPBACK_TIMEOUT 6057 6058 @classmethod 6059 def setUpClass(cls): 6060 def chunks(total, step): 6061 assert total >= step 6062 while total > step: 6063 yield step 6064 total -= step 6065 if total: 6066 yield total 6067 6068 chunk = b"".join([random.choice(string.ascii_letters).encode() 6069 for i in range(cls.BUFSIZE)]) 6070 with open(os_helper.TESTFN, 'wb') as f: 6071 for csize in chunks(cls.FILESIZE, cls.BUFSIZE): 6072 f.write(chunk) 6073 with open(os_helper.TESTFN, 'rb') as f: 6074 cls.FILEDATA = f.read() 6075 assert len(cls.FILEDATA) == cls.FILESIZE 6076 6077 @classmethod 6078 def tearDownClass(cls): 6079 os_helper.unlink(os_helper.TESTFN) 6080 6081 def accept_conn(self): 6082 self.serv.settimeout(support.LONG_TIMEOUT) 6083 conn, addr = self.serv.accept() 6084 conn.settimeout(self.TIMEOUT) 6085 self.addCleanup(conn.close) 6086 return conn 6087 6088 def recv_data(self, conn): 6089 received = [] 6090 while True: 6091 chunk = conn.recv(self.BUFSIZE) 6092 if not chunk: 6093 break 6094 received.append(chunk) 6095 return b''.join(received) 6096 6097 def meth_from_sock(self, sock): 6098 # Depending on the mixin class being run return either send() 6099 # or sendfile() method implementation. 6100 return getattr(sock, "_sendfile_use_send") 6101 6102 # regular file 6103 6104 def _testRegularFile(self): 6105 address = self.serv.getsockname() 6106 file = open(os_helper.TESTFN, 'rb') 6107 with socket.create_connection(address) as sock, file as file: 6108 meth = self.meth_from_sock(sock) 6109 sent = meth(file) 6110 self.assertEqual(sent, self.FILESIZE) 6111 self.assertEqual(file.tell(), self.FILESIZE) 6112 6113 def testRegularFile(self): 6114 conn = self.accept_conn() 6115 data = self.recv_data(conn) 6116 self.assertEqual(len(data), self.FILESIZE) 6117 self.assertEqual(data, self.FILEDATA) 6118 6119 # non regular file 6120 6121 def _testNonRegularFile(self): 6122 address = self.serv.getsockname() 6123 file = io.BytesIO(self.FILEDATA) 6124 with socket.create_connection(address) as sock, file as file: 6125 sent = sock.sendfile(file) 6126 self.assertEqual(sent, self.FILESIZE) 6127 self.assertEqual(file.tell(), self.FILESIZE) 6128 self.assertRaises(socket._GiveupOnSendfile, 6129 sock._sendfile_use_sendfile, file) 6130 6131 def testNonRegularFile(self): 6132 conn = self.accept_conn() 6133 data = self.recv_data(conn) 6134 self.assertEqual(len(data), self.FILESIZE) 6135 self.assertEqual(data, self.FILEDATA) 6136 6137 # empty file 6138 6139 def _testEmptyFileSend(self): 6140 address = self.serv.getsockname() 6141 filename = os_helper.TESTFN + "2" 6142 with open(filename, 'wb'): 6143 self.addCleanup(os_helper.unlink, filename) 6144 file = open(filename, 'rb') 6145 with socket.create_connection(address) as sock, file as file: 6146 meth = self.meth_from_sock(sock) 6147 sent = meth(file) 6148 self.assertEqual(sent, 0) 6149 self.assertEqual(file.tell(), 0) 6150 6151 def testEmptyFileSend(self): 6152 conn = self.accept_conn() 6153 data = self.recv_data(conn) 6154 self.assertEqual(data, b"") 6155 6156 # offset 6157 6158 def _testOffset(self): 6159 address = self.serv.getsockname() 6160 file = open(os_helper.TESTFN, 'rb') 6161 with socket.create_connection(address) as sock, file as file: 6162 meth = self.meth_from_sock(sock) 6163 sent = meth(file, offset=5000) 6164 self.assertEqual(sent, self.FILESIZE - 5000) 6165 self.assertEqual(file.tell(), self.FILESIZE) 6166 6167 def testOffset(self): 6168 conn = self.accept_conn() 6169 data = self.recv_data(conn) 6170 self.assertEqual(len(data), self.FILESIZE - 5000) 6171 self.assertEqual(data, self.FILEDATA[5000:]) 6172 6173 # count 6174 6175 def _testCount(self): 6176 address = self.serv.getsockname() 6177 file = open(os_helper.TESTFN, 'rb') 6178 sock = socket.create_connection(address, 6179 timeout=support.LOOPBACK_TIMEOUT) 6180 with sock, file: 6181 count = 5000007 6182 meth = self.meth_from_sock(sock) 6183 sent = meth(file, count=count) 6184 self.assertEqual(sent, count) 6185 self.assertEqual(file.tell(), count) 6186 6187 def testCount(self): 6188 count = 5000007 6189 conn = self.accept_conn() 6190 data = self.recv_data(conn) 6191 self.assertEqual(len(data), count) 6192 self.assertEqual(data, self.FILEDATA[:count]) 6193 6194 # count small 6195 6196 def _testCountSmall(self): 6197 address = self.serv.getsockname() 6198 file = open(os_helper.TESTFN, 'rb') 6199 sock = socket.create_connection(address, 6200 timeout=support.LOOPBACK_TIMEOUT) 6201 with sock, file: 6202 count = 1 6203 meth = self.meth_from_sock(sock) 6204 sent = meth(file, count=count) 6205 self.assertEqual(sent, count) 6206 self.assertEqual(file.tell(), count) 6207 6208 def testCountSmall(self): 6209 count = 1 6210 conn = self.accept_conn() 6211 data = self.recv_data(conn) 6212 self.assertEqual(len(data), count) 6213 self.assertEqual(data, self.FILEDATA[:count]) 6214 6215 # count + offset 6216 6217 def _testCountWithOffset(self): 6218 address = self.serv.getsockname() 6219 file = open(os_helper.TESTFN, 'rb') 6220 with socket.create_connection(address, timeout=2) as sock, file as file: 6221 count = 100007 6222 meth = self.meth_from_sock(sock) 6223 sent = meth(file, offset=2007, count=count) 6224 self.assertEqual(sent, count) 6225 self.assertEqual(file.tell(), count + 2007) 6226 6227 def testCountWithOffset(self): 6228 count = 100007 6229 conn = self.accept_conn() 6230 data = self.recv_data(conn) 6231 self.assertEqual(len(data), count) 6232 self.assertEqual(data, self.FILEDATA[2007:count+2007]) 6233 6234 # non blocking sockets are not supposed to work 6235 6236 def _testNonBlocking(self): 6237 address = self.serv.getsockname() 6238 file = open(os_helper.TESTFN, 'rb') 6239 with socket.create_connection(address) as sock, file as file: 6240 sock.setblocking(False) 6241 meth = self.meth_from_sock(sock) 6242 self.assertRaises(ValueError, meth, file) 6243 self.assertRaises(ValueError, sock.sendfile, file) 6244 6245 def testNonBlocking(self): 6246 conn = self.accept_conn() 6247 if conn.recv(8192): 6248 self.fail('was not supposed to receive any data') 6249 6250 # timeout (non-triggered) 6251 6252 def _testWithTimeout(self): 6253 address = self.serv.getsockname() 6254 file = open(os_helper.TESTFN, 'rb') 6255 sock = socket.create_connection(address, 6256 timeout=support.LOOPBACK_TIMEOUT) 6257 with sock, file: 6258 meth = self.meth_from_sock(sock) 6259 sent = meth(file) 6260 self.assertEqual(sent, self.FILESIZE) 6261 6262 def testWithTimeout(self): 6263 conn = self.accept_conn() 6264 data = self.recv_data(conn) 6265 self.assertEqual(len(data), self.FILESIZE) 6266 self.assertEqual(data, self.FILEDATA) 6267 6268 # timeout (triggered) 6269 6270 def _testWithTimeoutTriggeredSend(self): 6271 address = self.serv.getsockname() 6272 with open(os_helper.TESTFN, 'rb') as file: 6273 with socket.create_connection(address) as sock: 6274 sock.settimeout(0.01) 6275 meth = self.meth_from_sock(sock) 6276 self.assertRaises(TimeoutError, meth, file) 6277 6278 def testWithTimeoutTriggeredSend(self): 6279 conn = self.accept_conn() 6280 conn.recv(88192) 6281 # bpo-45212: the wait here needs to be longer than the client-side timeout (0.01s) 6282 time.sleep(1) 6283 6284 # errors 6285 6286 def _test_errors(self): 6287 pass 6288 6289 def test_errors(self): 6290 with open(os_helper.TESTFN, 'rb') as file: 6291 with socket.socket(type=socket.SOCK_DGRAM) as s: 6292 meth = self.meth_from_sock(s) 6293 self.assertRaisesRegex( 6294 ValueError, "SOCK_STREAM", meth, file) 6295 with open(os_helper.TESTFN, encoding="utf-8") as file: 6296 with socket.socket() as s: 6297 meth = self.meth_from_sock(s) 6298 self.assertRaisesRegex( 6299 ValueError, "binary mode", meth, file) 6300 with open(os_helper.TESTFN, 'rb') as file: 6301 with socket.socket() as s: 6302 meth = self.meth_from_sock(s) 6303 self.assertRaisesRegex(TypeError, "positive integer", 6304 meth, file, count='2') 6305 self.assertRaisesRegex(TypeError, "positive integer", 6306 meth, file, count=0.1) 6307 self.assertRaisesRegex(ValueError, "positive integer", 6308 meth, file, count=0) 6309 self.assertRaisesRegex(ValueError, "positive integer", 6310 meth, file, count=-1) 6311 6312 6313@unittest.skipUnless(hasattr(os, "sendfile"), 6314 'os.sendfile() required for this test.') 6315class SendfileUsingSendfileTest(SendfileUsingSendTest): 6316 """ 6317 Test the sendfile() implementation of socket.sendfile(). 6318 """ 6319 def meth_from_sock(self, sock): 6320 return getattr(sock, "_sendfile_use_sendfile") 6321 6322 6323@unittest.skipUnless(HAVE_SOCKET_ALG, 'AF_ALG required') 6324class LinuxKernelCryptoAPI(unittest.TestCase): 6325 # tests for AF_ALG 6326 def create_alg(self, typ, name): 6327 sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 6328 try: 6329 sock.bind((typ, name)) 6330 except FileNotFoundError as e: 6331 # type / algorithm is not available 6332 sock.close() 6333 raise unittest.SkipTest(str(e), typ, name) 6334 else: 6335 return sock 6336 6337 # bpo-31705: On kernel older than 4.5, sendto() failed with ENOKEY, 6338 # at least on ppc64le architecture 6339 @support.requires_linux_version(4, 5) 6340 def test_sha256(self): 6341 expected = bytes.fromhex("ba7816bf8f01cfea414140de5dae2223b00361a396" 6342 "177a9cb410ff61f20015ad") 6343 with self.create_alg('hash', 'sha256') as algo: 6344 op, _ = algo.accept() 6345 with op: 6346 op.sendall(b"abc") 6347 self.assertEqual(op.recv(512), expected) 6348 6349 op, _ = algo.accept() 6350 with op: 6351 op.send(b'a', socket.MSG_MORE) 6352 op.send(b'b', socket.MSG_MORE) 6353 op.send(b'c', socket.MSG_MORE) 6354 op.send(b'') 6355 self.assertEqual(op.recv(512), expected) 6356 6357 def test_hmac_sha1(self): 6358 expected = bytes.fromhex("effcdf6ae5eb2fa2d27416d5f184df9c259a7c79") 6359 with self.create_alg('hash', 'hmac(sha1)') as algo: 6360 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, b"Jefe") 6361 op, _ = algo.accept() 6362 with op: 6363 op.sendall(b"what do ya want for nothing?") 6364 self.assertEqual(op.recv(512), expected) 6365 6366 # Although it should work with 3.19 and newer the test blocks on 6367 # Ubuntu 15.10 with Kernel 4.2.0-19. 6368 @support.requires_linux_version(4, 3) 6369 def test_aes_cbc(self): 6370 key = bytes.fromhex('06a9214036b8a15b512e03d534120006') 6371 iv = bytes.fromhex('3dafba429d9eb430b422da802c9fac41') 6372 msg = b"Single block msg" 6373 ciphertext = bytes.fromhex('e353779c1079aeb82708942dbe77181a') 6374 msglen = len(msg) 6375 with self.create_alg('skcipher', 'cbc(aes)') as algo: 6376 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key) 6377 op, _ = algo.accept() 6378 with op: 6379 op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv, 6380 flags=socket.MSG_MORE) 6381 op.sendall(msg) 6382 self.assertEqual(op.recv(msglen), ciphertext) 6383 6384 op, _ = algo.accept() 6385 with op: 6386 op.sendmsg_afalg([ciphertext], 6387 op=socket.ALG_OP_DECRYPT, iv=iv) 6388 self.assertEqual(op.recv(msglen), msg) 6389 6390 # long message 6391 multiplier = 1024 6392 longmsg = [msg] * multiplier 6393 op, _ = algo.accept() 6394 with op: 6395 op.sendmsg_afalg(longmsg, 6396 op=socket.ALG_OP_ENCRYPT, iv=iv) 6397 enc = op.recv(msglen * multiplier) 6398 self.assertEqual(len(enc), msglen * multiplier) 6399 self.assertEqual(enc[:msglen], ciphertext) 6400 6401 op, _ = algo.accept() 6402 with op: 6403 op.sendmsg_afalg([enc], 6404 op=socket.ALG_OP_DECRYPT, iv=iv) 6405 dec = op.recv(msglen * multiplier) 6406 self.assertEqual(len(dec), msglen * multiplier) 6407 self.assertEqual(dec, msg * multiplier) 6408 6409 @support.requires_linux_version(4, 9) # see issue29324 6410 def test_aead_aes_gcm(self): 6411 key = bytes.fromhex('c939cc13397c1d37de6ae0e1cb7c423c') 6412 iv = bytes.fromhex('b3d8cc017cbb89b39e0f67e2') 6413 plain = bytes.fromhex('c3b3c41f113a31b73d9a5cd432103069') 6414 assoc = bytes.fromhex('24825602bd12a984e0092d3e448eda5f') 6415 expected_ct = bytes.fromhex('93fe7d9e9bfd10348a5606e5cafa7354') 6416 expected_tag = bytes.fromhex('0032a1dc85f1c9786925a2e71d8272dd') 6417 6418 taglen = len(expected_tag) 6419 assoclen = len(assoc) 6420 6421 with self.create_alg('aead', 'gcm(aes)') as algo: 6422 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key) 6423 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_AEAD_AUTHSIZE, 6424 None, taglen) 6425 6426 # send assoc, plain and tag buffer in separate steps 6427 op, _ = algo.accept() 6428 with op: 6429 op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv, 6430 assoclen=assoclen, flags=socket.MSG_MORE) 6431 op.sendall(assoc, socket.MSG_MORE) 6432 op.sendall(plain) 6433 res = op.recv(assoclen + len(plain) + taglen) 6434 self.assertEqual(expected_ct, res[assoclen:-taglen]) 6435 self.assertEqual(expected_tag, res[-taglen:]) 6436 6437 # now with msg 6438 op, _ = algo.accept() 6439 with op: 6440 msg = assoc + plain 6441 op.sendmsg_afalg([msg], op=socket.ALG_OP_ENCRYPT, iv=iv, 6442 assoclen=assoclen) 6443 res = op.recv(assoclen + len(plain) + taglen) 6444 self.assertEqual(expected_ct, res[assoclen:-taglen]) 6445 self.assertEqual(expected_tag, res[-taglen:]) 6446 6447 # create anc data manually 6448 pack_uint32 = struct.Struct('I').pack 6449 op, _ = algo.accept() 6450 with op: 6451 msg = assoc + plain 6452 op.sendmsg( 6453 [msg], 6454 ([socket.SOL_ALG, socket.ALG_SET_OP, pack_uint32(socket.ALG_OP_ENCRYPT)], 6455 [socket.SOL_ALG, socket.ALG_SET_IV, pack_uint32(len(iv)) + iv], 6456 [socket.SOL_ALG, socket.ALG_SET_AEAD_ASSOCLEN, pack_uint32(assoclen)], 6457 ) 6458 ) 6459 res = op.recv(len(msg) + taglen) 6460 self.assertEqual(expected_ct, res[assoclen:-taglen]) 6461 self.assertEqual(expected_tag, res[-taglen:]) 6462 6463 # decrypt and verify 6464 op, _ = algo.accept() 6465 with op: 6466 msg = assoc + expected_ct + expected_tag 6467 op.sendmsg_afalg([msg], op=socket.ALG_OP_DECRYPT, iv=iv, 6468 assoclen=assoclen) 6469 res = op.recv(len(msg) - taglen) 6470 self.assertEqual(plain, res[assoclen:]) 6471 6472 @support.requires_linux_version(4, 3) # see test_aes_cbc 6473 def test_drbg_pr_sha256(self): 6474 # deterministic random bit generator, prediction resistance, sha256 6475 with self.create_alg('rng', 'drbg_pr_sha256') as algo: 6476 extra_seed = os.urandom(32) 6477 algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, extra_seed) 6478 op, _ = algo.accept() 6479 with op: 6480 rn = op.recv(32) 6481 self.assertEqual(len(rn), 32) 6482 6483 def test_sendmsg_afalg_args(self): 6484 sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 6485 with sock: 6486 with self.assertRaises(TypeError): 6487 sock.sendmsg_afalg() 6488 6489 with self.assertRaises(TypeError): 6490 sock.sendmsg_afalg(op=None) 6491 6492 with self.assertRaises(TypeError): 6493 sock.sendmsg_afalg(1) 6494 6495 with self.assertRaises(TypeError): 6496 sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=None) 6497 6498 with self.assertRaises(TypeError): 6499 sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=-1) 6500 6501 def test_length_restriction(self): 6502 # bpo-35050, off-by-one error in length check 6503 sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0) 6504 self.addCleanup(sock.close) 6505 6506 # salg_type[14] 6507 with self.assertRaises(FileNotFoundError): 6508 sock.bind(("t" * 13, "name")) 6509 with self.assertRaisesRegex(ValueError, "type too long"): 6510 sock.bind(("t" * 14, "name")) 6511 6512 # salg_name[64] 6513 with self.assertRaises(FileNotFoundError): 6514 sock.bind(("type", "n" * 63)) 6515 with self.assertRaisesRegex(ValueError, "name too long"): 6516 sock.bind(("type", "n" * 64)) 6517 6518 6519@unittest.skipUnless(sys.platform == 'darwin', 'macOS specific test') 6520class TestMacOSTCPFlags(unittest.TestCase): 6521 def test_tcp_keepalive(self): 6522 self.assertTrue(socket.TCP_KEEPALIVE) 6523 6524 6525@unittest.skipUnless(sys.platform.startswith("win"), "requires Windows") 6526class TestMSWindowsTCPFlags(unittest.TestCase): 6527 knownTCPFlags = { 6528 # available since long time ago 6529 'TCP_MAXSEG', 6530 'TCP_NODELAY', 6531 # available starting with Windows 10 1607 6532 'TCP_FASTOPEN', 6533 # available starting with Windows 10 1703 6534 'TCP_KEEPCNT', 6535 # available starting with Windows 10 1709 6536 'TCP_KEEPIDLE', 6537 'TCP_KEEPINTVL' 6538 } 6539 6540 def test_new_tcp_flags(self): 6541 provided = [s for s in dir(socket) if s.startswith('TCP')] 6542 unknown = [s for s in provided if s not in self.knownTCPFlags] 6543 6544 self.assertEqual([], unknown, 6545 "New TCP flags were discovered. See bpo-32394 for more information") 6546 6547 6548class CreateServerTest(unittest.TestCase): 6549 6550 def test_address(self): 6551 port = socket_helper.find_unused_port() 6552 with socket.create_server(("127.0.0.1", port)) as sock: 6553 self.assertEqual(sock.getsockname()[0], "127.0.0.1") 6554 self.assertEqual(sock.getsockname()[1], port) 6555 if socket_helper.IPV6_ENABLED: 6556 with socket.create_server(("::1", port), 6557 family=socket.AF_INET6) as sock: 6558 self.assertEqual(sock.getsockname()[0], "::1") 6559 self.assertEqual(sock.getsockname()[1], port) 6560 6561 def test_family_and_type(self): 6562 with socket.create_server(("127.0.0.1", 0)) as sock: 6563 self.assertEqual(sock.family, socket.AF_INET) 6564 self.assertEqual(sock.type, socket.SOCK_STREAM) 6565 if socket_helper.IPV6_ENABLED: 6566 with socket.create_server(("::1", 0), family=socket.AF_INET6) as s: 6567 self.assertEqual(s.family, socket.AF_INET6) 6568 self.assertEqual(sock.type, socket.SOCK_STREAM) 6569 6570 def test_reuse_port(self): 6571 if not hasattr(socket, "SO_REUSEPORT"): 6572 with self.assertRaises(ValueError): 6573 socket.create_server(("localhost", 0), reuse_port=True) 6574 else: 6575 with socket.create_server(("localhost", 0)) as sock: 6576 opt = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) 6577 self.assertEqual(opt, 0) 6578 with socket.create_server(("localhost", 0), reuse_port=True) as sock: 6579 opt = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) 6580 self.assertNotEqual(opt, 0) 6581 6582 @unittest.skipIf(not hasattr(_socket, 'IPPROTO_IPV6') or 6583 not hasattr(_socket, 'IPV6_V6ONLY'), 6584 "IPV6_V6ONLY option not supported") 6585 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 6586 def test_ipv6_only_default(self): 6587 with socket.create_server(("::1", 0), family=socket.AF_INET6) as sock: 6588 assert sock.getsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY) 6589 6590 @unittest.skipIf(not socket.has_dualstack_ipv6(), 6591 "dualstack_ipv6 not supported") 6592 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 6593 def test_dualstack_ipv6_family(self): 6594 with socket.create_server(("::1", 0), family=socket.AF_INET6, 6595 dualstack_ipv6=True) as sock: 6596 self.assertEqual(sock.family, socket.AF_INET6) 6597 6598 6599class CreateServerFunctionalTest(unittest.TestCase): 6600 timeout = support.LOOPBACK_TIMEOUT 6601 6602 def echo_server(self, sock): 6603 def run(sock): 6604 with sock: 6605 conn, _ = sock.accept() 6606 with conn: 6607 event.wait(self.timeout) 6608 msg = conn.recv(1024) 6609 if not msg: 6610 return 6611 conn.sendall(msg) 6612 6613 event = threading.Event() 6614 sock.settimeout(self.timeout) 6615 thread = threading.Thread(target=run, args=(sock, )) 6616 thread.start() 6617 self.addCleanup(thread.join, self.timeout) 6618 event.set() 6619 6620 def echo_client(self, addr, family): 6621 with socket.socket(family=family) as sock: 6622 sock.settimeout(self.timeout) 6623 sock.connect(addr) 6624 sock.sendall(b'foo') 6625 self.assertEqual(sock.recv(1024), b'foo') 6626 6627 def test_tcp4(self): 6628 port = socket_helper.find_unused_port() 6629 with socket.create_server(("", port)) as sock: 6630 self.echo_server(sock) 6631 self.echo_client(("127.0.0.1", port), socket.AF_INET) 6632 6633 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 6634 def test_tcp6(self): 6635 port = socket_helper.find_unused_port() 6636 with socket.create_server(("", port), 6637 family=socket.AF_INET6) as sock: 6638 self.echo_server(sock) 6639 self.echo_client(("::1", port), socket.AF_INET6) 6640 6641 # --- dual stack tests 6642 6643 @unittest.skipIf(not socket.has_dualstack_ipv6(), 6644 "dualstack_ipv6 not supported") 6645 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 6646 def test_dual_stack_client_v4(self): 6647 port = socket_helper.find_unused_port() 6648 with socket.create_server(("", port), family=socket.AF_INET6, 6649 dualstack_ipv6=True) as sock: 6650 self.echo_server(sock) 6651 self.echo_client(("127.0.0.1", port), socket.AF_INET) 6652 6653 @unittest.skipIf(not socket.has_dualstack_ipv6(), 6654 "dualstack_ipv6 not supported") 6655 @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test') 6656 def test_dual_stack_client_v6(self): 6657 port = socket_helper.find_unused_port() 6658 with socket.create_server(("", port), family=socket.AF_INET6, 6659 dualstack_ipv6=True) as sock: 6660 self.echo_server(sock) 6661 self.echo_client(("::1", port), socket.AF_INET6) 6662 6663@requireAttrs(socket, "send_fds") 6664@requireAttrs(socket, "recv_fds") 6665@requireAttrs(socket, "AF_UNIX") 6666class SendRecvFdsTests(unittest.TestCase): 6667 def testSendAndRecvFds(self): 6668 def close_pipes(pipes): 6669 for fd1, fd2 in pipes: 6670 os.close(fd1) 6671 os.close(fd2) 6672 6673 def close_fds(fds): 6674 for fd in fds: 6675 os.close(fd) 6676 6677 # send 10 file descriptors 6678 pipes = [os.pipe() for _ in range(10)] 6679 self.addCleanup(close_pipes, pipes) 6680 fds = [rfd for rfd, wfd in pipes] 6681 6682 # use a UNIX socket pair to exchange file descriptors locally 6683 sock1, sock2 = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM) 6684 with sock1, sock2: 6685 socket.send_fds(sock1, [MSG], fds) 6686 # request more data and file descriptors than expected 6687 msg, fds2, flags, addr = socket.recv_fds(sock2, len(MSG) * 2, len(fds) * 2) 6688 self.addCleanup(close_fds, fds2) 6689 6690 self.assertEqual(msg, MSG) 6691 self.assertEqual(len(fds2), len(fds)) 6692 self.assertEqual(flags, 0) 6693 # don't test addr 6694 6695 # test that file descriptors are connected 6696 for index, fds in enumerate(pipes): 6697 rfd, wfd = fds 6698 os.write(wfd, str(index).encode()) 6699 6700 for index, rfd in enumerate(fds2): 6701 data = os.read(rfd, 100) 6702 self.assertEqual(data, str(index).encode()) 6703 6704 6705def setUpModule(): 6706 thread_info = threading_helper.threading_setup() 6707 unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info) 6708 6709 6710if __name__ == "__main__": 6711 unittest.main() 6712