1import unittest
2from test import support
3from test.support import os_helper
4from test.support import socket_helper
5from test.support import threading_helper
6
7import errno
8import io
9import itertools
10import socket
11import select
12import tempfile
13import time
14import traceback
15import queue
16import sys
17import os
18import platform
19import array
20import contextlib
21from weakref import proxy
22import signal
23import math
24import pickle
25import struct
26import random
27import shutil
28import string
29import _thread as thread
30import threading
31try:
32    import multiprocessing
33except ImportError:
34    multiprocessing = False
35try:
36    import fcntl
37except ImportError:
38    fcntl = None
39
40support.requires_working_socket(module=True)
41
42HOST = socket_helper.HOST
43# test unicode string and carriage return
44MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf-8')
45
46VSOCKPORT = 1234
47AIX = platform.system() == "AIX"
48
49try:
50    import _socket
51except ImportError:
52    _socket = None
53
54def get_cid():
55    if fcntl is None:
56        return None
57    if not hasattr(socket, 'IOCTL_VM_SOCKETS_GET_LOCAL_CID'):
58        return None
59    try:
60        with open("/dev/vsock", "rb") as f:
61            r = fcntl.ioctl(f, socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID, "    ")
62    except OSError:
63        return None
64    else:
65        return struct.unpack("I", r)[0]
66
67def _have_socket_can():
68    """Check whether CAN sockets are supported on this host."""
69    try:
70        s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
71    except (AttributeError, OSError):
72        return False
73    else:
74        s.close()
75    return True
76
77def _have_socket_can_isotp():
78    """Check whether CAN ISOTP sockets are supported on this host."""
79    try:
80        s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP)
81    except (AttributeError, OSError):
82        return False
83    else:
84        s.close()
85    return True
86
87def _have_socket_can_j1939():
88    """Check whether CAN J1939 sockets are supported on this host."""
89    try:
90        s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939)
91    except (AttributeError, OSError):
92        return False
93    else:
94        s.close()
95    return True
96
97def _have_socket_rds():
98    """Check whether RDS sockets are supported on this host."""
99    try:
100        s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
101    except (AttributeError, OSError):
102        return False
103    else:
104        s.close()
105    return True
106
107def _have_socket_alg():
108    """Check whether AF_ALG sockets are supported on this host."""
109    try:
110        s = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
111    except (AttributeError, OSError):
112        return False
113    else:
114        s.close()
115    return True
116
117def _have_socket_qipcrtr():
118    """Check whether AF_QIPCRTR sockets are supported on this host."""
119    try:
120        s = socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM, 0)
121    except (AttributeError, OSError):
122        return False
123    else:
124        s.close()
125    return True
126
127def _have_socket_vsock():
128    """Check whether AF_VSOCK sockets are supported on this host."""
129    ret = get_cid() is not None
130    return ret
131
132
133def _have_socket_bluetooth():
134    """Check whether AF_BLUETOOTH sockets are supported on this host."""
135    try:
136        # RFCOMM is supported by all platforms with bluetooth support. Windows
137        # does not support omitting the protocol.
138        s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM)
139    except (AttributeError, OSError):
140        return False
141    else:
142        s.close()
143    return True
144
145
146@contextlib.contextmanager
147def socket_setdefaulttimeout(timeout):
148    old_timeout = socket.getdefaulttimeout()
149    try:
150        socket.setdefaulttimeout(timeout)
151        yield
152    finally:
153        socket.setdefaulttimeout(old_timeout)
154
155
156HAVE_SOCKET_CAN = _have_socket_can()
157
158HAVE_SOCKET_CAN_ISOTP = _have_socket_can_isotp()
159
160HAVE_SOCKET_CAN_J1939 = _have_socket_can_j1939()
161
162HAVE_SOCKET_RDS = _have_socket_rds()
163
164HAVE_SOCKET_ALG = _have_socket_alg()
165
166HAVE_SOCKET_QIPCRTR = _have_socket_qipcrtr()
167
168HAVE_SOCKET_VSOCK = _have_socket_vsock()
169
170HAVE_SOCKET_UDPLITE = hasattr(socket, "IPPROTO_UDPLITE")
171
172HAVE_SOCKET_BLUETOOTH = _have_socket_bluetooth()
173
174# Size in bytes of the int type
175SIZEOF_INT = array.array("i").itemsize
176
177class SocketTCPTest(unittest.TestCase):
178
179    def setUp(self):
180        self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
181        self.port = socket_helper.bind_port(self.serv)
182        self.serv.listen()
183
184    def tearDown(self):
185        self.serv.close()
186        self.serv = None
187
188class SocketUDPTest(unittest.TestCase):
189
190    def setUp(self):
191        self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
192        self.port = socket_helper.bind_port(self.serv)
193
194    def tearDown(self):
195        self.serv.close()
196        self.serv = None
197
198class SocketUDPLITETest(SocketUDPTest):
199
200    def setUp(self):
201        self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE)
202        self.port = socket_helper.bind_port(self.serv)
203
204class ThreadSafeCleanupTestCase:
205    """Subclass of unittest.TestCase with thread-safe cleanup methods.
206
207    This subclass protects the addCleanup() and doCleanups() methods
208    with a recursive lock.
209    """
210
211    def __init__(self, *args, **kwargs):
212        super().__init__(*args, **kwargs)
213        self._cleanup_lock = threading.RLock()
214
215    def addCleanup(self, *args, **kwargs):
216        with self._cleanup_lock:
217            return super().addCleanup(*args, **kwargs)
218
219    def doCleanups(self, *args, **kwargs):
220        with self._cleanup_lock:
221            return super().doCleanups(*args, **kwargs)
222
223class SocketCANTest(unittest.TestCase):
224
225    """To be able to run this test, a `vcan0` CAN interface can be created with
226    the following commands:
227    # modprobe vcan
228    # ip link add dev vcan0 type vcan
229    # ip link set up vcan0
230    """
231    interface = 'vcan0'
232    bufsize = 128
233
234    """The CAN frame structure is defined in <linux/can.h>:
235
236    struct can_frame {
237        canid_t can_id;  /* 32 bit CAN_ID + EFF/RTR/ERR flags */
238        __u8    can_dlc; /* data length code: 0 .. 8 */
239        __u8    data[8] __attribute__((aligned(8)));
240    };
241    """
242    can_frame_fmt = "=IB3x8s"
243    can_frame_size = struct.calcsize(can_frame_fmt)
244
245    """The Broadcast Management Command frame structure is defined
246    in <linux/can/bcm.h>:
247
248    struct bcm_msg_head {
249        __u32 opcode;
250        __u32 flags;
251        __u32 count;
252        struct timeval ival1, ival2;
253        canid_t can_id;
254        __u32 nframes;
255        struct can_frame frames[0];
256    }
257
258    `bcm_msg_head` must be 8 bytes aligned because of the `frames` member (see
259    `struct can_frame` definition). Must use native not standard types for packing.
260    """
261    bcm_cmd_msg_fmt = "@3I4l2I"
262    bcm_cmd_msg_fmt += "x" * (struct.calcsize(bcm_cmd_msg_fmt) % 8)
263
264    def setUp(self):
265        self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
266        self.addCleanup(self.s.close)
267        try:
268            self.s.bind((self.interface,))
269        except OSError:
270            self.skipTest('network interface `%s` does not exist' %
271                           self.interface)
272
273
274class SocketRDSTest(unittest.TestCase):
275
276    """To be able to run this test, the `rds` kernel module must be loaded:
277    # modprobe rds
278    """
279    bufsize = 8192
280
281    def setUp(self):
282        self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
283        self.addCleanup(self.serv.close)
284        try:
285            self.port = socket_helper.bind_port(self.serv)
286        except OSError:
287            self.skipTest('unable to bind RDS socket')
288
289
290class ThreadableTest:
291    """Threadable Test class
292
293    The ThreadableTest class makes it easy to create a threaded
294    client/server pair from an existing unit test. To create a
295    new threaded class from an existing unit test, use multiple
296    inheritance:
297
298        class NewClass (OldClass, ThreadableTest):
299            pass
300
301    This class defines two new fixture functions with obvious
302    purposes for overriding:
303
304        clientSetUp ()
305        clientTearDown ()
306
307    Any new test functions within the class must then define
308    tests in pairs, where the test name is preceded with a
309    '_' to indicate the client portion of the test. Ex:
310
311        def testFoo(self):
312            # Server portion
313
314        def _testFoo(self):
315            # Client portion
316
317    Any exceptions raised by the clients during their tests
318    are caught and transferred to the main thread to alert
319    the testing framework.
320
321    Note, the server setup function cannot call any blocking
322    functions that rely on the client thread during setup,
323    unless serverExplicitReady() is called just before
324    the blocking call (such as in setting up a client/server
325    connection and performing the accept() in setUp().
326    """
327
328    def __init__(self):
329        # Swap the true setup function
330        self.__setUp = self.setUp
331        self.setUp = self._setUp
332
333    def serverExplicitReady(self):
334        """This method allows the server to explicitly indicate that
335        it wants the client thread to proceed. This is useful if the
336        server is about to execute a blocking routine that is
337        dependent upon the client thread during its setup routine."""
338        self.server_ready.set()
339
340    def _setUp(self):
341        self.enterContext(threading_helper.wait_threads_exit())
342
343        self.server_ready = threading.Event()
344        self.client_ready = threading.Event()
345        self.done = threading.Event()
346        self.queue = queue.Queue(1)
347        self.server_crashed = False
348
349        def raise_queued_exception():
350            if self.queue.qsize():
351                raise self.queue.get()
352        self.addCleanup(raise_queued_exception)
353
354        # Do some munging to start the client test.
355        methodname = self.id()
356        i = methodname.rfind('.')
357        methodname = methodname[i+1:]
358        test_method = getattr(self, '_' + methodname)
359        self.client_thread = thread.start_new_thread(
360            self.clientRun, (test_method,))
361
362        try:
363            self.__setUp()
364        except:
365            self.server_crashed = True
366            raise
367        finally:
368            self.server_ready.set()
369        self.client_ready.wait()
370        self.addCleanup(self.done.wait)
371
372    def clientRun(self, test_func):
373        self.server_ready.wait()
374        try:
375            self.clientSetUp()
376        except BaseException as e:
377            self.queue.put(e)
378            self.clientTearDown()
379            return
380        finally:
381            self.client_ready.set()
382        if self.server_crashed:
383            self.clientTearDown()
384            return
385        if not hasattr(test_func, '__call__'):
386            raise TypeError("test_func must be a callable function")
387        try:
388            test_func()
389        except BaseException as e:
390            self.queue.put(e)
391        finally:
392            self.clientTearDown()
393
394    def clientSetUp(self):
395        raise NotImplementedError("clientSetUp must be implemented.")
396
397    def clientTearDown(self):
398        self.done.set()
399        thread.exit()
400
401class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
402
403    def __init__(self, methodName='runTest'):
404        SocketTCPTest.__init__(self, methodName=methodName)
405        ThreadableTest.__init__(self)
406
407    def clientSetUp(self):
408        self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
409
410    def clientTearDown(self):
411        self.cli.close()
412        self.cli = None
413        ThreadableTest.clientTearDown(self)
414
415class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
416
417    def __init__(self, methodName='runTest'):
418        SocketUDPTest.__init__(self, methodName=methodName)
419        ThreadableTest.__init__(self)
420
421    def clientSetUp(self):
422        self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
423
424    def clientTearDown(self):
425        self.cli.close()
426        self.cli = None
427        ThreadableTest.clientTearDown(self)
428
429@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
430          'UDPLITE sockets required for this test.')
431class ThreadedUDPLITESocketTest(SocketUDPLITETest, ThreadableTest):
432
433    def __init__(self, methodName='runTest'):
434        SocketUDPLITETest.__init__(self, methodName=methodName)
435        ThreadableTest.__init__(self)
436
437    def clientSetUp(self):
438        self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE)
439
440    def clientTearDown(self):
441        self.cli.close()
442        self.cli = None
443        ThreadableTest.clientTearDown(self)
444
445class ThreadedCANSocketTest(SocketCANTest, ThreadableTest):
446
447    def __init__(self, methodName='runTest'):
448        SocketCANTest.__init__(self, methodName=methodName)
449        ThreadableTest.__init__(self)
450
451    def clientSetUp(self):
452        self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
453        try:
454            self.cli.bind((self.interface,))
455        except OSError:
456            # skipTest should not be called here, and will be called in the
457            # server instead
458            pass
459
460    def clientTearDown(self):
461        self.cli.close()
462        self.cli = None
463        ThreadableTest.clientTearDown(self)
464
465class ThreadedRDSSocketTest(SocketRDSTest, ThreadableTest):
466
467    def __init__(self, methodName='runTest'):
468        SocketRDSTest.__init__(self, methodName=methodName)
469        ThreadableTest.__init__(self)
470
471    def clientSetUp(self):
472        self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
473        try:
474            # RDS sockets must be bound explicitly to send or receive data
475            self.cli.bind((HOST, 0))
476            self.cli_addr = self.cli.getsockname()
477        except OSError:
478            # skipTest should not be called here, and will be called in the
479            # server instead
480            pass
481
482    def clientTearDown(self):
483        self.cli.close()
484        self.cli = None
485        ThreadableTest.clientTearDown(self)
486
487@unittest.skipIf(fcntl is None, "need fcntl")
488@unittest.skipUnless(HAVE_SOCKET_VSOCK,
489          'VSOCK sockets required for this test.')
490@unittest.skipUnless(get_cid() != 2,
491          "This test can only be run on a virtual guest.")
492class ThreadedVSOCKSocketStreamTest(unittest.TestCase, ThreadableTest):
493
494    def __init__(self, methodName='runTest'):
495        unittest.TestCase.__init__(self, methodName=methodName)
496        ThreadableTest.__init__(self)
497
498    def setUp(self):
499        self.serv = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM)
500        self.addCleanup(self.serv.close)
501        self.serv.bind((socket.VMADDR_CID_ANY, VSOCKPORT))
502        self.serv.listen()
503        self.serverExplicitReady()
504        self.conn, self.connaddr = self.serv.accept()
505        self.addCleanup(self.conn.close)
506
507    def clientSetUp(self):
508        time.sleep(0.1)
509        self.cli = socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM)
510        self.addCleanup(self.cli.close)
511        cid = get_cid()
512        self.cli.connect((cid, VSOCKPORT))
513
514    def testStream(self):
515        msg = self.conn.recv(1024)
516        self.assertEqual(msg, MSG)
517
518    def _testStream(self):
519        self.cli.send(MSG)
520        self.cli.close()
521
522class SocketConnectedTest(ThreadedTCPSocketTest):
523    """Socket tests for client-server connection.
524
525    self.cli_conn is a client socket connected to the server.  The
526    setUp() method guarantees that it is connected to the server.
527    """
528
529    def __init__(self, methodName='runTest'):
530        ThreadedTCPSocketTest.__init__(self, methodName=methodName)
531
532    def setUp(self):
533        ThreadedTCPSocketTest.setUp(self)
534        # Indicate explicitly we're ready for the client thread to
535        # proceed and then perform the blocking call to accept
536        self.serverExplicitReady()
537        conn, addr = self.serv.accept()
538        self.cli_conn = conn
539
540    def tearDown(self):
541        self.cli_conn.close()
542        self.cli_conn = None
543        ThreadedTCPSocketTest.tearDown(self)
544
545    def clientSetUp(self):
546        ThreadedTCPSocketTest.clientSetUp(self)
547        self.cli.connect((HOST, self.port))
548        self.serv_conn = self.cli
549
550    def clientTearDown(self):
551        self.serv_conn.close()
552        self.serv_conn = None
553        ThreadedTCPSocketTest.clientTearDown(self)
554
555class SocketPairTest(unittest.TestCase, ThreadableTest):
556
557    def __init__(self, methodName='runTest'):
558        unittest.TestCase.__init__(self, methodName=methodName)
559        ThreadableTest.__init__(self)
560
561    def setUp(self):
562        self.serv, self.cli = socket.socketpair()
563
564    def tearDown(self):
565        self.serv.close()
566        self.serv = None
567
568    def clientSetUp(self):
569        pass
570
571    def clientTearDown(self):
572        self.cli.close()
573        self.cli = None
574        ThreadableTest.clientTearDown(self)
575
576
577# The following classes are used by the sendmsg()/recvmsg() tests.
578# Combining, for instance, ConnectedStreamTestMixin and TCPTestBase
579# gives a drop-in replacement for SocketConnectedTest, but different
580# address families can be used, and the attributes serv_addr and
581# cli_addr will be set to the addresses of the endpoints.
582
583class SocketTestBase(unittest.TestCase):
584    """A base class for socket tests.
585
586    Subclasses must provide methods newSocket() to return a new socket
587    and bindSock(sock) to bind it to an unused address.
588
589    Creates a socket self.serv and sets self.serv_addr to its address.
590    """
591
592    def setUp(self):
593        self.serv = self.newSocket()
594        self.bindServer()
595
596    def bindServer(self):
597        """Bind server socket and set self.serv_addr to its address."""
598        self.bindSock(self.serv)
599        self.serv_addr = self.serv.getsockname()
600
601    def tearDown(self):
602        self.serv.close()
603        self.serv = None
604
605
606class SocketListeningTestMixin(SocketTestBase):
607    """Mixin to listen on the server socket."""
608
609    def setUp(self):
610        super().setUp()
611        self.serv.listen()
612
613
614class ThreadedSocketTestMixin(ThreadSafeCleanupTestCase, SocketTestBase,
615                              ThreadableTest):
616    """Mixin to add client socket and allow client/server tests.
617
618    Client socket is self.cli and its address is self.cli_addr.  See
619    ThreadableTest for usage information.
620    """
621
622    def __init__(self, *args, **kwargs):
623        super().__init__(*args, **kwargs)
624        ThreadableTest.__init__(self)
625
626    def clientSetUp(self):
627        self.cli = self.newClientSocket()
628        self.bindClient()
629
630    def newClientSocket(self):
631        """Return a new socket for use as client."""
632        return self.newSocket()
633
634    def bindClient(self):
635        """Bind client socket and set self.cli_addr to its address."""
636        self.bindSock(self.cli)
637        self.cli_addr = self.cli.getsockname()
638
639    def clientTearDown(self):
640        self.cli.close()
641        self.cli = None
642        ThreadableTest.clientTearDown(self)
643
644
645class ConnectedStreamTestMixin(SocketListeningTestMixin,
646                               ThreadedSocketTestMixin):
647    """Mixin to allow client/server stream tests with connected client.
648
649    Server's socket representing connection to client is self.cli_conn
650    and client's connection to server is self.serv_conn.  (Based on
651    SocketConnectedTest.)
652    """
653
654    def setUp(self):
655        super().setUp()
656        # Indicate explicitly we're ready for the client thread to
657        # proceed and then perform the blocking call to accept
658        self.serverExplicitReady()
659        conn, addr = self.serv.accept()
660        self.cli_conn = conn
661
662    def tearDown(self):
663        self.cli_conn.close()
664        self.cli_conn = None
665        super().tearDown()
666
667    def clientSetUp(self):
668        super().clientSetUp()
669        self.cli.connect(self.serv_addr)
670        self.serv_conn = self.cli
671
672    def clientTearDown(self):
673        try:
674            self.serv_conn.close()
675            self.serv_conn = None
676        except AttributeError:
677            pass
678        super().clientTearDown()
679
680
681class UnixSocketTestBase(SocketTestBase):
682    """Base class for Unix-domain socket tests."""
683
684    # This class is used for file descriptor passing tests, so we
685    # create the sockets in a private directory so that other users
686    # can't send anything that might be problematic for a privileged
687    # user running the tests.
688
689    def setUp(self):
690        self.dir_path = tempfile.mkdtemp()
691        self.addCleanup(os.rmdir, self.dir_path)
692        super().setUp()
693
694    def bindSock(self, sock):
695        path = tempfile.mktemp(dir=self.dir_path)
696        socket_helper.bind_unix_socket(sock, path)
697        self.addCleanup(os_helper.unlink, path)
698
699class UnixStreamBase(UnixSocketTestBase):
700    """Base class for Unix-domain SOCK_STREAM tests."""
701
702    def newSocket(self):
703        return socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
704
705
706class InetTestBase(SocketTestBase):
707    """Base class for IPv4 socket tests."""
708
709    host = HOST
710
711    def setUp(self):
712        super().setUp()
713        self.port = self.serv_addr[1]
714
715    def bindSock(self, sock):
716        socket_helper.bind_port(sock, host=self.host)
717
718class TCPTestBase(InetTestBase):
719    """Base class for TCP-over-IPv4 tests."""
720
721    def newSocket(self):
722        return socket.socket(socket.AF_INET, socket.SOCK_STREAM)
723
724class UDPTestBase(InetTestBase):
725    """Base class for UDP-over-IPv4 tests."""
726
727    def newSocket(self):
728        return socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
729
730class UDPLITETestBase(InetTestBase):
731    """Base class for UDPLITE-over-IPv4 tests."""
732
733    def newSocket(self):
734        return socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE)
735
736class SCTPStreamBase(InetTestBase):
737    """Base class for SCTP tests in one-to-one (SOCK_STREAM) mode."""
738
739    def newSocket(self):
740        return socket.socket(socket.AF_INET, socket.SOCK_STREAM,
741                             socket.IPPROTO_SCTP)
742
743
744class Inet6TestBase(InetTestBase):
745    """Base class for IPv6 socket tests."""
746
747    host = socket_helper.HOSTv6
748
749class UDP6TestBase(Inet6TestBase):
750    """Base class for UDP-over-IPv6 tests."""
751
752    def newSocket(self):
753        return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
754
755class UDPLITE6TestBase(Inet6TestBase):
756    """Base class for UDPLITE-over-IPv6 tests."""
757
758    def newSocket(self):
759        return socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE)
760
761
762# Test-skipping decorators for use with ThreadableTest.
763
764def skipWithClientIf(condition, reason):
765    """Skip decorated test if condition is true, add client_skip decorator.
766
767    If the decorated object is not a class, sets its attribute
768    "client_skip" to a decorator which will return an empty function
769    if the test is to be skipped, or the original function if it is
770    not.  This can be used to avoid running the client part of a
771    skipped test when using ThreadableTest.
772    """
773    def client_pass(*args, **kwargs):
774        pass
775    def skipdec(obj):
776        retval = unittest.skip(reason)(obj)
777        if not isinstance(obj, type):
778            retval.client_skip = lambda f: client_pass
779        return retval
780    def noskipdec(obj):
781        if not (isinstance(obj, type) or hasattr(obj, "client_skip")):
782            obj.client_skip = lambda f: f
783        return obj
784    return skipdec if condition else noskipdec
785
786
787def requireAttrs(obj, *attributes):
788    """Skip decorated test if obj is missing any of the given attributes.
789
790    Sets client_skip attribute as skipWithClientIf() does.
791    """
792    missing = [name for name in attributes if not hasattr(obj, name)]
793    return skipWithClientIf(
794        missing, "don't have " + ", ".join(name for name in missing))
795
796
797def requireSocket(*args):
798    """Skip decorated test if a socket cannot be created with given arguments.
799
800    When an argument is given as a string, will use the value of that
801    attribute of the socket module, or skip the test if it doesn't
802    exist.  Sets client_skip attribute as skipWithClientIf() does.
803    """
804    err = None
805    missing = [obj for obj in args if
806               isinstance(obj, str) and not hasattr(socket, obj)]
807    if missing:
808        err = "don't have " + ", ".join(name for name in missing)
809    else:
810        callargs = [getattr(socket, obj) if isinstance(obj, str) else obj
811                    for obj in args]
812        try:
813            s = socket.socket(*callargs)
814        except OSError as e:
815            # XXX: check errno?
816            err = str(e)
817        else:
818            s.close()
819    return skipWithClientIf(
820        err is not None,
821        "can't create socket({0}): {1}".format(
822            ", ".join(str(o) for o in args), err))
823
824
825#######################################################################
826## Begin Tests
827
828class GeneralModuleTests(unittest.TestCase):
829
830    def test_SocketType_is_socketobject(self):
831        import _socket
832        self.assertTrue(socket.SocketType is _socket.socket)
833        s = socket.socket()
834        self.assertIsInstance(s, socket.SocketType)
835        s.close()
836
837    def test_repr(self):
838        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
839        with s:
840            self.assertIn('fd=%i' % s.fileno(), repr(s))
841            self.assertIn('family=%s' % socket.AF_INET, repr(s))
842            self.assertIn('type=%s' % socket.SOCK_STREAM, repr(s))
843            self.assertIn('proto=0', repr(s))
844            self.assertNotIn('raddr', repr(s))
845            s.bind(('127.0.0.1', 0))
846            self.assertIn('laddr', repr(s))
847            self.assertIn(str(s.getsockname()), repr(s))
848        self.assertIn('[closed]', repr(s))
849        self.assertNotIn('laddr', repr(s))
850
851    @unittest.skipUnless(_socket is not None, 'need _socket module')
852    def test_csocket_repr(self):
853        s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
854        try:
855            expected = ('<socket object, fd=%s, family=%s, type=%s, proto=%s>'
856                        % (s.fileno(), s.family, s.type, s.proto))
857            self.assertEqual(repr(s), expected)
858        finally:
859            s.close()
860        expected = ('<socket object, fd=-1, family=%s, type=%s, proto=%s>'
861                    % (s.family, s.type, s.proto))
862        self.assertEqual(repr(s), expected)
863
864    def test_weakref(self):
865        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
866            p = proxy(s)
867            self.assertEqual(p.fileno(), s.fileno())
868        s = None
869        support.gc_collect()  # For PyPy or other GCs.
870        try:
871            p.fileno()
872        except ReferenceError:
873            pass
874        else:
875            self.fail('Socket proxy still exists')
876
877    def testSocketError(self):
878        # Testing socket module exceptions
879        msg = "Error raising socket exception (%s)."
880        with self.assertRaises(OSError, msg=msg % 'OSError'):
881            raise OSError
882        with self.assertRaises(OSError, msg=msg % 'socket.herror'):
883            raise socket.herror
884        with self.assertRaises(OSError, msg=msg % 'socket.gaierror'):
885            raise socket.gaierror
886
887    def testSendtoErrors(self):
888        # Testing that sendto doesn't mask failures. See #10169.
889        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
890        self.addCleanup(s.close)
891        s.bind(('', 0))
892        sockname = s.getsockname()
893        # 2 args
894        with self.assertRaises(TypeError) as cm:
895            s.sendto('\u2620', sockname)
896        self.assertEqual(str(cm.exception),
897                         "a bytes-like object is required, not 'str'")
898        with self.assertRaises(TypeError) as cm:
899            s.sendto(5j, sockname)
900        self.assertEqual(str(cm.exception),
901                         "a bytes-like object is required, not 'complex'")
902        with self.assertRaises(TypeError) as cm:
903            s.sendto(b'foo', None)
904        self.assertIn('not NoneType',str(cm.exception))
905        # 3 args
906        with self.assertRaises(TypeError) as cm:
907            s.sendto('\u2620', 0, sockname)
908        self.assertEqual(str(cm.exception),
909                         "a bytes-like object is required, not 'str'")
910        with self.assertRaises(TypeError) as cm:
911            s.sendto(5j, 0, sockname)
912        self.assertEqual(str(cm.exception),
913                         "a bytes-like object is required, not 'complex'")
914        with self.assertRaises(TypeError) as cm:
915            s.sendto(b'foo', 0, None)
916        self.assertIn('not NoneType', str(cm.exception))
917        with self.assertRaises(TypeError) as cm:
918            s.sendto(b'foo', 'bar', sockname)
919        with self.assertRaises(TypeError) as cm:
920            s.sendto(b'foo', None, None)
921        # wrong number of args
922        with self.assertRaises(TypeError) as cm:
923            s.sendto(b'foo')
924        self.assertIn('(1 given)', str(cm.exception))
925        with self.assertRaises(TypeError) as cm:
926            s.sendto(b'foo', 0, sockname, 4)
927        self.assertIn('(4 given)', str(cm.exception))
928
929    def testCrucialConstants(self):
930        # Testing for mission critical constants
931        socket.AF_INET
932        if socket.has_ipv6:
933            socket.AF_INET6
934        socket.SOCK_STREAM
935        socket.SOCK_DGRAM
936        socket.SOCK_RAW
937        socket.SOCK_RDM
938        socket.SOCK_SEQPACKET
939        socket.SOL_SOCKET
940        socket.SO_REUSEADDR
941
942    def testCrucialIpProtoConstants(self):
943        socket.IPPROTO_TCP
944        socket.IPPROTO_UDP
945        if socket.has_ipv6:
946            socket.IPPROTO_IPV6
947
948    @unittest.skipUnless(os.name == "nt", "Windows specific")
949    def testWindowsSpecificConstants(self):
950        socket.IPPROTO_ICLFXBM
951        socket.IPPROTO_ST
952        socket.IPPROTO_CBT
953        socket.IPPROTO_IGP
954        socket.IPPROTO_RDP
955        socket.IPPROTO_PGM
956        socket.IPPROTO_L2TP
957        socket.IPPROTO_SCTP
958
959    @unittest.skipIf(support.is_wasi, "WASI is missing these methods")
960    def test_socket_methods(self):
961        # socket methods that depend on a configure HAVE_ check. They should
962        # be present on all platforms except WASI.
963        names = [
964            "_accept", "bind", "connect", "connect_ex", "getpeername",
965            "getsockname", "listen", "recvfrom", "recvfrom_into", "sendto",
966            "setsockopt", "shutdown"
967        ]
968        for name in names:
969            if not hasattr(socket.socket, name):
970                self.fail(f"socket method {name} is missing")
971
972    @unittest.skipUnless(sys.platform == 'darwin', 'macOS specific test')
973    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
974    def test3542SocketOptions(self):
975        # Ref. issue #35569 and https://tools.ietf.org/html/rfc3542
976        opts = {
977            'IPV6_CHECKSUM',
978            'IPV6_DONTFRAG',
979            'IPV6_DSTOPTS',
980            'IPV6_HOPLIMIT',
981            'IPV6_HOPOPTS',
982            'IPV6_NEXTHOP',
983            'IPV6_PATHMTU',
984            'IPV6_PKTINFO',
985            'IPV6_RECVDSTOPTS',
986            'IPV6_RECVHOPLIMIT',
987            'IPV6_RECVHOPOPTS',
988            'IPV6_RECVPATHMTU',
989            'IPV6_RECVPKTINFO',
990            'IPV6_RECVRTHDR',
991            'IPV6_RECVTCLASS',
992            'IPV6_RTHDR',
993            'IPV6_RTHDRDSTOPTS',
994            'IPV6_RTHDR_TYPE_0',
995            'IPV6_TCLASS',
996            'IPV6_USE_MIN_MTU',
997        }
998        for opt in opts:
999            self.assertTrue(
1000                hasattr(socket, opt), f"Missing RFC3542 socket option '{opt}'"
1001            )
1002
1003    def testHostnameRes(self):
1004        # Testing hostname resolution mechanisms
1005        hostname = socket.gethostname()
1006        try:
1007            ip = socket.gethostbyname(hostname)
1008        except OSError:
1009            # Probably name lookup wasn't set up right; skip this test
1010            self.skipTest('name lookup failure')
1011        self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
1012        try:
1013            hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
1014        except OSError:
1015            # Probably a similar problem as above; skip this test
1016            self.skipTest('name lookup failure')
1017        all_host_names = [hostname, hname] + aliases
1018        fqhn = socket.getfqdn(ip)
1019        if not fqhn in all_host_names:
1020            self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
1021
1022    def test_host_resolution(self):
1023        for addr in [socket_helper.HOSTv4, '10.0.0.1', '255.255.255.255']:
1024            self.assertEqual(socket.gethostbyname(addr), addr)
1025
1026        # we don't test socket_helper.HOSTv6 because there's a chance it doesn't have
1027        # a matching name entry (e.g. 'ip6-localhost')
1028        for host in [socket_helper.HOSTv4]:
1029            self.assertIn(host, socket.gethostbyaddr(host)[2])
1030
1031    def test_host_resolution_bad_address(self):
1032        # These are all malformed IP addresses and expected not to resolve to
1033        # any result.  But some ISPs, e.g. AWS and AT&T, may successfully
1034        # resolve these IPs. In particular, AT&T's DNS Error Assist service
1035        # will break this test.  See https://bugs.python.org/issue42092 for a
1036        # workaround.
1037        explanation = (
1038            "resolving an invalid IP address did not raise OSError; "
1039            "can be caused by a broken DNS server"
1040        )
1041        for addr in ['0.1.1.~1', '1+.1.1.1', '::1q', '::1::2',
1042                     '1:1:1:1:1:1:1:1:1']:
1043            with self.assertRaises(OSError, msg=addr):
1044                socket.gethostbyname(addr)
1045            with self.assertRaises(OSError, msg=explanation):
1046                socket.gethostbyaddr(addr)
1047
1048    @unittest.skipUnless(hasattr(socket, 'sethostname'), "test needs socket.sethostname()")
1049    @unittest.skipUnless(hasattr(socket, 'gethostname'), "test needs socket.gethostname()")
1050    def test_sethostname(self):
1051        oldhn = socket.gethostname()
1052        try:
1053            socket.sethostname('new')
1054        except OSError as e:
1055            if e.errno == errno.EPERM:
1056                self.skipTest("test should be run as root")
1057            else:
1058                raise
1059        try:
1060            # running test as root!
1061            self.assertEqual(socket.gethostname(), 'new')
1062            # Should work with bytes objects too
1063            socket.sethostname(b'bar')
1064            self.assertEqual(socket.gethostname(), 'bar')
1065        finally:
1066            socket.sethostname(oldhn)
1067
1068    @unittest.skipUnless(hasattr(socket, 'if_nameindex'),
1069                         'socket.if_nameindex() not available.')
1070    def testInterfaceNameIndex(self):
1071        interfaces = socket.if_nameindex()
1072        for index, name in interfaces:
1073            self.assertIsInstance(index, int)
1074            self.assertIsInstance(name, str)
1075            # interface indices are non-zero integers
1076            self.assertGreater(index, 0)
1077            _index = socket.if_nametoindex(name)
1078            self.assertIsInstance(_index, int)
1079            self.assertEqual(index, _index)
1080            _name = socket.if_indextoname(index)
1081            self.assertIsInstance(_name, str)
1082            self.assertEqual(name, _name)
1083
1084    @unittest.skipUnless(hasattr(socket, 'if_indextoname'),
1085                         'socket.if_indextoname() not available.')
1086    def testInvalidInterfaceIndexToName(self):
1087        self.assertRaises(OSError, socket.if_indextoname, 0)
1088        self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF')
1089
1090    @unittest.skipUnless(hasattr(socket, 'if_nametoindex'),
1091                         'socket.if_nametoindex() not available.')
1092    def testInvalidInterfaceNameToIndex(self):
1093        self.assertRaises(TypeError, socket.if_nametoindex, 0)
1094        self.assertRaises(OSError, socket.if_nametoindex, '_DEADBEEF')
1095
1096    @unittest.skipUnless(hasattr(sys, 'getrefcount'),
1097                         'test needs sys.getrefcount()')
1098    def testRefCountGetNameInfo(self):
1099        # Testing reference count for getnameinfo
1100        try:
1101            # On some versions, this loses a reference
1102            orig = sys.getrefcount(__name__)
1103            socket.getnameinfo(__name__,0)
1104        except TypeError:
1105            if sys.getrefcount(__name__) != orig:
1106                self.fail("socket.getnameinfo loses a reference")
1107
1108    def testInterpreterCrash(self):
1109        # Making sure getnameinfo doesn't crash the interpreter
1110        try:
1111            # On some versions, this crashes the interpreter.
1112            socket.getnameinfo(('x', 0, 0, 0), 0)
1113        except OSError:
1114            pass
1115
1116    def testNtoH(self):
1117        # This just checks that htons etc. are their own inverse,
1118        # when looking at the lower 16 or 32 bits.
1119        sizes = {socket.htonl: 32, socket.ntohl: 32,
1120                 socket.htons: 16, socket.ntohs: 16}
1121        for func, size in sizes.items():
1122            mask = (1<<size) - 1
1123            for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
1124                self.assertEqual(i & mask, func(func(i&mask)) & mask)
1125
1126            swapped = func(mask)
1127            self.assertEqual(swapped & mask, mask)
1128            self.assertRaises(OverflowError, func, 1<<34)
1129
1130    @support.cpython_only
1131    def testNtoHErrors(self):
1132        import _testcapi
1133        s_good_values = [0, 1, 2, 0xffff]
1134        l_good_values = s_good_values + [0xffffffff]
1135        l_bad_values = [-1, -2, 1<<32, 1<<1000]
1136        s_bad_values = (
1137            l_bad_values +
1138            [_testcapi.INT_MIN-1, _testcapi.INT_MAX+1] +
1139            [1 << 16, _testcapi.INT_MAX]
1140        )
1141        for k in s_good_values:
1142            socket.ntohs(k)
1143            socket.htons(k)
1144        for k in l_good_values:
1145            socket.ntohl(k)
1146            socket.htonl(k)
1147        for k in s_bad_values:
1148            self.assertRaises(OverflowError, socket.ntohs, k)
1149            self.assertRaises(OverflowError, socket.htons, k)
1150        for k in l_bad_values:
1151            self.assertRaises(OverflowError, socket.ntohl, k)
1152            self.assertRaises(OverflowError, socket.htonl, k)
1153
1154    def testGetServBy(self):
1155        eq = self.assertEqual
1156        # Find one service that exists, then check all the related interfaces.
1157        # I've ordered this by protocols that have both a tcp and udp
1158        # protocol, at least for modern Linuxes.
1159        if (sys.platform.startswith(('freebsd', 'netbsd', 'gnukfreebsd'))
1160            or sys.platform in ('linux', 'darwin')):
1161            # avoid the 'echo' service on this platform, as there is an
1162            # assumption breaking non-standard port/protocol entry
1163            services = ('daytime', 'qotd', 'domain')
1164        else:
1165            services = ('echo', 'daytime', 'domain')
1166        for service in services:
1167            try:
1168                port = socket.getservbyname(service, 'tcp')
1169                break
1170            except OSError:
1171                pass
1172        else:
1173            raise OSError
1174        # Try same call with optional protocol omitted
1175        # Issue #26936: Android getservbyname() was broken before API 23.
1176        if (not hasattr(sys, 'getandroidapilevel') or
1177                sys.getandroidapilevel() >= 23):
1178            port2 = socket.getservbyname(service)
1179            eq(port, port2)
1180        # Try udp, but don't barf if it doesn't exist
1181        try:
1182            udpport = socket.getservbyname(service, 'udp')
1183        except OSError:
1184            udpport = None
1185        else:
1186            eq(udpport, port)
1187        # Now make sure the lookup by port returns the same service name
1188        # Issue #26936: Android getservbyport() is broken.
1189        if not support.is_android:
1190            eq(socket.getservbyport(port2), service)
1191        eq(socket.getservbyport(port, 'tcp'), service)
1192        if udpport is not None:
1193            eq(socket.getservbyport(udpport, 'udp'), service)
1194        # Make sure getservbyport does not accept out of range ports.
1195        self.assertRaises(OverflowError, socket.getservbyport, -1)
1196        self.assertRaises(OverflowError, socket.getservbyport, 65536)
1197
1198    def testDefaultTimeout(self):
1199        # Testing default timeout
1200        # The default timeout should initially be None
1201        self.assertEqual(socket.getdefaulttimeout(), None)
1202        with socket.socket() as s:
1203            self.assertEqual(s.gettimeout(), None)
1204
1205        # Set the default timeout to 10, and see if it propagates
1206        with socket_setdefaulttimeout(10):
1207            self.assertEqual(socket.getdefaulttimeout(), 10)
1208            with socket.socket() as sock:
1209                self.assertEqual(sock.gettimeout(), 10)
1210
1211            # Reset the default timeout to None, and see if it propagates
1212            socket.setdefaulttimeout(None)
1213            self.assertEqual(socket.getdefaulttimeout(), None)
1214            with socket.socket() as sock:
1215                self.assertEqual(sock.gettimeout(), None)
1216
1217        # Check that setting it to an invalid value raises ValueError
1218        self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
1219
1220        # Check that setting it to an invalid type raises TypeError
1221        self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
1222
1223    @unittest.skipUnless(hasattr(socket, 'inet_aton'),
1224                         'test needs socket.inet_aton()')
1225    def testIPv4_inet_aton_fourbytes(self):
1226        # Test that issue1008086 and issue767150 are fixed.
1227        # It must return 4 bytes.
1228        self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0'))
1229        self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255'))
1230
1231    @unittest.skipUnless(hasattr(socket, 'inet_pton'),
1232                         'test needs socket.inet_pton()')
1233    def testIPv4toString(self):
1234        from socket import inet_aton as f, inet_pton, AF_INET
1235        g = lambda a: inet_pton(AF_INET, a)
1236
1237        assertInvalid = lambda func,a: self.assertRaises(
1238            (OSError, ValueError), func, a
1239        )
1240
1241        self.assertEqual(b'\x00\x00\x00\x00', f('0.0.0.0'))
1242        self.assertEqual(b'\xff\x00\xff\x00', f('255.0.255.0'))
1243        self.assertEqual(b'\xaa\xaa\xaa\xaa', f('170.170.170.170'))
1244        self.assertEqual(b'\x01\x02\x03\x04', f('1.2.3.4'))
1245        self.assertEqual(b'\xff\xff\xff\xff', f('255.255.255.255'))
1246        # bpo-29972: inet_pton() doesn't fail on AIX
1247        if not AIX:
1248            assertInvalid(f, '0.0.0.')
1249        assertInvalid(f, '300.0.0.0')
1250        assertInvalid(f, 'a.0.0.0')
1251        assertInvalid(f, '1.2.3.4.5')
1252        assertInvalid(f, '::1')
1253
1254        self.assertEqual(b'\x00\x00\x00\x00', g('0.0.0.0'))
1255        self.assertEqual(b'\xff\x00\xff\x00', g('255.0.255.0'))
1256        self.assertEqual(b'\xaa\xaa\xaa\xaa', g('170.170.170.170'))
1257        self.assertEqual(b'\xff\xff\xff\xff', g('255.255.255.255'))
1258        assertInvalid(g, '0.0.0.')
1259        assertInvalid(g, '300.0.0.0')
1260        assertInvalid(g, 'a.0.0.0')
1261        assertInvalid(g, '1.2.3.4.5')
1262        assertInvalid(g, '::1')
1263
1264    @unittest.skipUnless(hasattr(socket, 'inet_pton'),
1265                         'test needs socket.inet_pton()')
1266    def testIPv6toString(self):
1267        try:
1268            from socket import inet_pton, AF_INET6, has_ipv6
1269            if not has_ipv6:
1270                self.skipTest('IPv6 not available')
1271        except ImportError:
1272            self.skipTest('could not import needed symbols from socket')
1273
1274        if sys.platform == "win32":
1275            try:
1276                inet_pton(AF_INET6, '::')
1277            except OSError as e:
1278                if e.winerror == 10022:
1279                    self.skipTest('IPv6 might not be supported')
1280
1281        f = lambda a: inet_pton(AF_INET6, a)
1282        assertInvalid = lambda a: self.assertRaises(
1283            (OSError, ValueError), f, a
1284        )
1285
1286        self.assertEqual(b'\x00' * 16, f('::'))
1287        self.assertEqual(b'\x00' * 16, f('0::0'))
1288        self.assertEqual(b'\x00\x01' + b'\x00' * 14, f('1::'))
1289        self.assertEqual(
1290            b'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
1291            f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
1292        )
1293        self.assertEqual(
1294            b'\xad\x42\x0a\xbc' + b'\x00' * 4 + b'\x01\x27\x00\x00\x02\x54\x00\x02',
1295            f('ad42:abc::127:0:254:2')
1296        )
1297        self.assertEqual(b'\x00\x12\x00\x0a' + b'\x00' * 12, f('12:a::'))
1298        assertInvalid('0x20::')
1299        assertInvalid(':::')
1300        assertInvalid('::0::')
1301        assertInvalid('1::abc::')
1302        assertInvalid('1::abc::def')
1303        assertInvalid('1:2:3:4:5:6')
1304        assertInvalid('1:2:3:4:5:6:')
1305        assertInvalid('1:2:3:4:5:6:7:8:0')
1306        # bpo-29972: inet_pton() doesn't fail on AIX
1307        if not AIX:
1308            assertInvalid('1:2:3:4:5:6:7:8:')
1309
1310        self.assertEqual(b'\x00' * 12 + b'\xfe\x2a\x17\x40',
1311            f('::254.42.23.64')
1312        )
1313        self.assertEqual(
1314            b'\x00\x42' + b'\x00' * 8 + b'\xa2\x9b\xfe\x2a\x17\x40',
1315            f('42::a29b:254.42.23.64')
1316        )
1317        self.assertEqual(
1318            b'\x00\x42\xa8\xb9\x00\x00\x00\x02\xff\xff\xa2\x9b\xfe\x2a\x17\x40',
1319            f('42:a8b9:0:2:ffff:a29b:254.42.23.64')
1320        )
1321        assertInvalid('255.254.253.252')
1322        assertInvalid('1::260.2.3.0')
1323        assertInvalid('1::0.be.e.0')
1324        assertInvalid('1:2:3:4:5:6:7:1.2.3.4')
1325        assertInvalid('::1.2.3.4:0')
1326        assertInvalid('0.100.200.0:3:4:5:6:7:8')
1327
1328    @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
1329                         'test needs socket.inet_ntop()')
1330    def testStringToIPv4(self):
1331        from socket import inet_ntoa as f, inet_ntop, AF_INET
1332        g = lambda a: inet_ntop(AF_INET, a)
1333        assertInvalid = lambda func,a: self.assertRaises(
1334            (OSError, ValueError), func, a
1335        )
1336
1337        self.assertEqual('1.0.1.0', f(b'\x01\x00\x01\x00'))
1338        self.assertEqual('170.85.170.85', f(b'\xaa\x55\xaa\x55'))
1339        self.assertEqual('255.255.255.255', f(b'\xff\xff\xff\xff'))
1340        self.assertEqual('1.2.3.4', f(b'\x01\x02\x03\x04'))
1341        assertInvalid(f, b'\x00' * 3)
1342        assertInvalid(f, b'\x00' * 5)
1343        assertInvalid(f, b'\x00' * 16)
1344        self.assertEqual('170.85.170.85', f(bytearray(b'\xaa\x55\xaa\x55')))
1345
1346        self.assertEqual('1.0.1.0', g(b'\x01\x00\x01\x00'))
1347        self.assertEqual('170.85.170.85', g(b'\xaa\x55\xaa\x55'))
1348        self.assertEqual('255.255.255.255', g(b'\xff\xff\xff\xff'))
1349        assertInvalid(g, b'\x00' * 3)
1350        assertInvalid(g, b'\x00' * 5)
1351        assertInvalid(g, b'\x00' * 16)
1352        self.assertEqual('170.85.170.85', g(bytearray(b'\xaa\x55\xaa\x55')))
1353
1354    @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
1355                         'test needs socket.inet_ntop()')
1356    def testStringToIPv6(self):
1357        try:
1358            from socket import inet_ntop, AF_INET6, has_ipv6
1359            if not has_ipv6:
1360                self.skipTest('IPv6 not available')
1361        except ImportError:
1362            self.skipTest('could not import needed symbols from socket')
1363
1364        if sys.platform == "win32":
1365            try:
1366                inet_ntop(AF_INET6, b'\x00' * 16)
1367            except OSError as e:
1368                if e.winerror == 10022:
1369                    self.skipTest('IPv6 might not be supported')
1370
1371        f = lambda a: inet_ntop(AF_INET6, a)
1372        assertInvalid = lambda a: self.assertRaises(
1373            (OSError, ValueError), f, a
1374        )
1375
1376        self.assertEqual('::', f(b'\x00' * 16))
1377        self.assertEqual('::1', f(b'\x00' * 15 + b'\x01'))
1378        self.assertEqual(
1379            'aef:b01:506:1001:ffff:9997:55:170',
1380            f(b'\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
1381        )
1382        self.assertEqual('::1', f(bytearray(b'\x00' * 15 + b'\x01')))
1383
1384        assertInvalid(b'\x12' * 15)
1385        assertInvalid(b'\x12' * 17)
1386        assertInvalid(b'\x12' * 4)
1387
1388    # XXX The following don't test module-level functionality...
1389
1390    def testSockName(self):
1391        # Testing getsockname()
1392        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1393        self.addCleanup(sock.close)
1394
1395        # Since find_unused_port() is inherently subject to race conditions, we
1396        # call it a couple times if necessary.
1397        for i in itertools.count():
1398            port = socket_helper.find_unused_port()
1399            try:
1400                sock.bind(("0.0.0.0", port))
1401            except OSError as e:
1402                if e.errno != errno.EADDRINUSE or i == 5:
1403                    raise
1404            else:
1405                break
1406
1407        name = sock.getsockname()
1408        # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
1409        # it reasonable to get the host's addr in addition to 0.0.0.0.
1410        # At least for eCos.  This is required for the S/390 to pass.
1411        try:
1412            my_ip_addr = socket.gethostbyname(socket.gethostname())
1413        except OSError:
1414            # Probably name lookup wasn't set up right; skip this test
1415            self.skipTest('name lookup failure')
1416        self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
1417        self.assertEqual(name[1], port)
1418
1419    def testGetSockOpt(self):
1420        # Testing getsockopt()
1421        # We know a socket should start without reuse==0
1422        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1423        self.addCleanup(sock.close)
1424        reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
1425        self.assertFalse(reuse != 0, "initial mode is reuse")
1426
1427    def testSetSockOpt(self):
1428        # Testing setsockopt()
1429        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1430        self.addCleanup(sock.close)
1431        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
1432        reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
1433        self.assertFalse(reuse == 0, "failed to set reuse mode")
1434
1435    def testSendAfterClose(self):
1436        # testing send() after close() with timeout
1437        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
1438            sock.settimeout(1)
1439        self.assertRaises(OSError, sock.send, b"spam")
1440
1441    def testCloseException(self):
1442        sock = socket.socket()
1443        sock.bind((socket._LOCALHOST, 0))
1444        socket.socket(fileno=sock.fileno()).close()
1445        try:
1446            sock.close()
1447        except OSError as err:
1448            # Winsock apparently raises ENOTSOCK
1449            self.assertIn(err.errno, (errno.EBADF, errno.ENOTSOCK))
1450        else:
1451            self.fail("close() should raise EBADF/ENOTSOCK")
1452
1453    def testNewAttributes(self):
1454        # testing .family, .type and .protocol
1455
1456        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
1457            self.assertEqual(sock.family, socket.AF_INET)
1458            if hasattr(socket, 'SOCK_CLOEXEC'):
1459                self.assertIn(sock.type,
1460                              (socket.SOCK_STREAM | socket.SOCK_CLOEXEC,
1461                               socket.SOCK_STREAM))
1462            else:
1463                self.assertEqual(sock.type, socket.SOCK_STREAM)
1464            self.assertEqual(sock.proto, 0)
1465
1466    def test_getsockaddrarg(self):
1467        sock = socket.socket()
1468        self.addCleanup(sock.close)
1469        port = socket_helper.find_unused_port()
1470        big_port = port + 65536
1471        neg_port = port - 65536
1472        self.assertRaises(OverflowError, sock.bind, (HOST, big_port))
1473        self.assertRaises(OverflowError, sock.bind, (HOST, neg_port))
1474        # Since find_unused_port() is inherently subject to race conditions, we
1475        # call it a couple times if necessary.
1476        for i in itertools.count():
1477            port = socket_helper.find_unused_port()
1478            try:
1479                sock.bind((HOST, port))
1480            except OSError as e:
1481                if e.errno != errno.EADDRINUSE or i == 5:
1482                    raise
1483            else:
1484                break
1485
1486    @unittest.skipUnless(os.name == "nt", "Windows specific")
1487    def test_sock_ioctl(self):
1488        self.assertTrue(hasattr(socket.socket, 'ioctl'))
1489        self.assertTrue(hasattr(socket, 'SIO_RCVALL'))
1490        self.assertTrue(hasattr(socket, 'RCVALL_ON'))
1491        self.assertTrue(hasattr(socket, 'RCVALL_OFF'))
1492        self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS'))
1493        s = socket.socket()
1494        self.addCleanup(s.close)
1495        self.assertRaises(ValueError, s.ioctl, -1, None)
1496        s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))
1497
1498    @unittest.skipUnless(os.name == "nt", "Windows specific")
1499    @unittest.skipUnless(hasattr(socket, 'SIO_LOOPBACK_FAST_PATH'),
1500                         'Loopback fast path support required for this test')
1501    def test_sio_loopback_fast_path(self):
1502        s = socket.socket()
1503        self.addCleanup(s.close)
1504        try:
1505            s.ioctl(socket.SIO_LOOPBACK_FAST_PATH, True)
1506        except OSError as exc:
1507            WSAEOPNOTSUPP = 10045
1508            if exc.winerror == WSAEOPNOTSUPP:
1509                self.skipTest("SIO_LOOPBACK_FAST_PATH is defined but "
1510                              "doesn't implemented in this Windows version")
1511            raise
1512        self.assertRaises(TypeError, s.ioctl, socket.SIO_LOOPBACK_FAST_PATH, None)
1513
1514    def testGetaddrinfo(self):
1515        try:
1516            socket.getaddrinfo('localhost', 80)
1517        except socket.gaierror as err:
1518            if err.errno == socket.EAI_SERVICE:
1519                # see http://bugs.python.org/issue1282647
1520                self.skipTest("buggy libc version")
1521            raise
1522        # len of every sequence is supposed to be == 5
1523        for info in socket.getaddrinfo(HOST, None):
1524            self.assertEqual(len(info), 5)
1525        # host can be a domain name, a string representation of an
1526        # IPv4/v6 address or None
1527        socket.getaddrinfo('localhost', 80)
1528        socket.getaddrinfo('127.0.0.1', 80)
1529        socket.getaddrinfo(None, 80)
1530        if socket_helper.IPV6_ENABLED:
1531            socket.getaddrinfo('::1', 80)
1532        # port can be a string service name such as "http", a numeric
1533        # port number or None
1534        # Issue #26936: Android getaddrinfo() was broken before API level 23.
1535        if (not hasattr(sys, 'getandroidapilevel') or
1536                sys.getandroidapilevel() >= 23):
1537            socket.getaddrinfo(HOST, "http")
1538        socket.getaddrinfo(HOST, 80)
1539        socket.getaddrinfo(HOST, None)
1540        # test family and socktype filters
1541        infos = socket.getaddrinfo(HOST, 80, socket.AF_INET, socket.SOCK_STREAM)
1542        for family, type, _, _, _ in infos:
1543            self.assertEqual(family, socket.AF_INET)
1544            self.assertEqual(repr(family), '<AddressFamily.AF_INET: %r>' % family.value)
1545            self.assertEqual(str(family), str(family.value))
1546            self.assertEqual(type, socket.SOCK_STREAM)
1547            self.assertEqual(repr(type), '<SocketKind.SOCK_STREAM: %r>' % type.value)
1548            self.assertEqual(str(type), str(type.value))
1549        infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
1550        for _, socktype, _, _, _ in infos:
1551            self.assertEqual(socktype, socket.SOCK_STREAM)
1552        # test proto and flags arguments
1553        socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
1554        socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
1555        # a server willing to support both IPv4 and IPv6 will
1556        # usually do this
1557        socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
1558                           socket.AI_PASSIVE)
1559        # test keyword arguments
1560        a = socket.getaddrinfo(HOST, None)
1561        b = socket.getaddrinfo(host=HOST, port=None)
1562        self.assertEqual(a, b)
1563        a = socket.getaddrinfo(HOST, None, socket.AF_INET)
1564        b = socket.getaddrinfo(HOST, None, family=socket.AF_INET)
1565        self.assertEqual(a, b)
1566        a = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
1567        b = socket.getaddrinfo(HOST, None, type=socket.SOCK_STREAM)
1568        self.assertEqual(a, b)
1569        a = socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
1570        b = socket.getaddrinfo(HOST, None, proto=socket.SOL_TCP)
1571        self.assertEqual(a, b)
1572        a = socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
1573        b = socket.getaddrinfo(HOST, None, flags=socket.AI_PASSIVE)
1574        self.assertEqual(a, b)
1575        a = socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
1576                               socket.AI_PASSIVE)
1577        b = socket.getaddrinfo(host=None, port=0, family=socket.AF_UNSPEC,
1578                               type=socket.SOCK_STREAM, proto=0,
1579                               flags=socket.AI_PASSIVE)
1580        self.assertEqual(a, b)
1581        # Issue #6697.
1582        self.assertRaises(UnicodeEncodeError, socket.getaddrinfo, 'localhost', '\uD800')
1583
1584        # Issue 17269: test workaround for OS X platform bug segfault
1585        if hasattr(socket, 'AI_NUMERICSERV'):
1586            try:
1587                # The arguments here are undefined and the call may succeed
1588                # or fail.  All we care here is that it doesn't segfault.
1589                socket.getaddrinfo("localhost", None, 0, 0, 0,
1590                                   socket.AI_NUMERICSERV)
1591            except socket.gaierror:
1592                pass
1593
1594    def test_getnameinfo(self):
1595        # only IP addresses are allowed
1596        self.assertRaises(OSError, socket.getnameinfo, ('mail.python.org',0), 0)
1597
1598    @unittest.skipUnless(support.is_resource_enabled('network'),
1599                         'network is not enabled')
1600    def test_idna(self):
1601        # Check for internet access before running test
1602        # (issue #12804, issue #25138).
1603        with socket_helper.transient_internet('python.org'):
1604            socket.gethostbyname('python.org')
1605
1606        # these should all be successful
1607        domain = 'испытание.pythontest.net'
1608        socket.gethostbyname(domain)
1609        socket.gethostbyname_ex(domain)
1610        socket.getaddrinfo(domain,0,socket.AF_UNSPEC,socket.SOCK_STREAM)
1611        # this may not work if the forward lookup chooses the IPv6 address, as that doesn't
1612        # have a reverse entry yet
1613        # socket.gethostbyaddr('испытание.python.org')
1614
1615    def check_sendall_interrupted(self, with_timeout):
1616        # socketpair() is not strictly required, but it makes things easier.
1617        if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'):
1618            self.skipTest("signal.alarm and socket.socketpair required for this test")
1619        # Our signal handlers clobber the C errno by calling a math function
1620        # with an invalid domain value.
1621        def ok_handler(*args):
1622            self.assertRaises(ValueError, math.acosh, 0)
1623        def raising_handler(*args):
1624            self.assertRaises(ValueError, math.acosh, 0)
1625            1 // 0
1626        c, s = socket.socketpair()
1627        old_alarm = signal.signal(signal.SIGALRM, raising_handler)
1628        try:
1629            if with_timeout:
1630                # Just above the one second minimum for signal.alarm
1631                c.settimeout(1.5)
1632            with self.assertRaises(ZeroDivisionError):
1633                signal.alarm(1)
1634                c.sendall(b"x" * support.SOCK_MAX_SIZE)
1635            if with_timeout:
1636                signal.signal(signal.SIGALRM, ok_handler)
1637                signal.alarm(1)
1638                self.assertRaises(TimeoutError, c.sendall,
1639                                  b"x" * support.SOCK_MAX_SIZE)
1640        finally:
1641            signal.alarm(0)
1642            signal.signal(signal.SIGALRM, old_alarm)
1643            c.close()
1644            s.close()
1645
1646    def test_sendall_interrupted(self):
1647        self.check_sendall_interrupted(False)
1648
1649    def test_sendall_interrupted_with_timeout(self):
1650        self.check_sendall_interrupted(True)
1651
1652    def test_dealloc_warn(self):
1653        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1654        r = repr(sock)
1655        with self.assertWarns(ResourceWarning) as cm:
1656            sock = None
1657            support.gc_collect()
1658        self.assertIn(r, str(cm.warning.args[0]))
1659        # An open socket file object gets dereferenced after the socket
1660        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1661        f = sock.makefile('rb')
1662        r = repr(sock)
1663        sock = None
1664        support.gc_collect()
1665        with self.assertWarns(ResourceWarning):
1666            f = None
1667            support.gc_collect()
1668
1669    def test_name_closed_socketio(self):
1670        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
1671            fp = sock.makefile("rb")
1672            fp.close()
1673            self.assertEqual(repr(fp), "<_io.BufferedReader name=-1>")
1674
1675    def test_unusable_closed_socketio(self):
1676        with socket.socket() as sock:
1677            fp = sock.makefile("rb", buffering=0)
1678            self.assertTrue(fp.readable())
1679            self.assertFalse(fp.writable())
1680            self.assertFalse(fp.seekable())
1681            fp.close()
1682            self.assertRaises(ValueError, fp.readable)
1683            self.assertRaises(ValueError, fp.writable)
1684            self.assertRaises(ValueError, fp.seekable)
1685
1686    def test_socket_close(self):
1687        sock = socket.socket()
1688        try:
1689            sock.bind((HOST, 0))
1690            socket.close(sock.fileno())
1691            with self.assertRaises(OSError):
1692                sock.listen(1)
1693        finally:
1694            with self.assertRaises(OSError):
1695                # sock.close() fails with EBADF
1696                sock.close()
1697        with self.assertRaises(TypeError):
1698            socket.close(None)
1699        with self.assertRaises(OSError):
1700            socket.close(-1)
1701
1702    def test_makefile_mode(self):
1703        for mode in 'r', 'rb', 'rw', 'w', 'wb':
1704            with self.subTest(mode=mode):
1705                with socket.socket() as sock:
1706                    encoding = None if "b" in mode else "utf-8"
1707                    with sock.makefile(mode, encoding=encoding) as fp:
1708                        self.assertEqual(fp.mode, mode)
1709
1710    def test_makefile_invalid_mode(self):
1711        for mode in 'rt', 'x', '+', 'a':
1712            with self.subTest(mode=mode):
1713                with socket.socket() as sock:
1714                    with self.assertRaisesRegex(ValueError, 'invalid mode'):
1715                        sock.makefile(mode)
1716
1717    def test_pickle(self):
1718        sock = socket.socket()
1719        with sock:
1720            for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
1721                self.assertRaises(TypeError, pickle.dumps, sock, protocol)
1722        for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
1723            family = pickle.loads(pickle.dumps(socket.AF_INET, protocol))
1724            self.assertEqual(family, socket.AF_INET)
1725            type = pickle.loads(pickle.dumps(socket.SOCK_STREAM, protocol))
1726            self.assertEqual(type, socket.SOCK_STREAM)
1727
1728    def test_listen_backlog(self):
1729        for backlog in 0, -1:
1730            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
1731                srv.bind((HOST, 0))
1732                srv.listen(backlog)
1733
1734        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
1735            srv.bind((HOST, 0))
1736            srv.listen()
1737
1738    @support.cpython_only
1739    def test_listen_backlog_overflow(self):
1740        # Issue 15989
1741        import _testcapi
1742        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
1743            srv.bind((HOST, 0))
1744            self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1)
1745
1746    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
1747    def test_flowinfo(self):
1748        self.assertRaises(OverflowError, socket.getnameinfo,
1749                          (socket_helper.HOSTv6, 0, 0xffffffff), 0)
1750        with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
1751            self.assertRaises(OverflowError, s.bind, (socket_helper.HOSTv6, 0, -10))
1752
1753    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
1754    def test_getaddrinfo_ipv6_basic(self):
1755        ((*_, sockaddr),) = socket.getaddrinfo(
1756            'ff02::1de:c0:face:8D',  # Note capital letter `D`.
1757            1234, socket.AF_INET6,
1758            socket.SOCK_DGRAM,
1759            socket.IPPROTO_UDP
1760        )
1761        self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, 0))
1762
1763    def test_getfqdn_filter_localhost(self):
1764        self.assertEqual(socket.getfqdn(), socket.getfqdn("0.0.0.0"))
1765        self.assertEqual(socket.getfqdn(), socket.getfqdn("::"))
1766
1767    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
1768    @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows')
1769    @unittest.skipIf(AIX, 'Symbolic scope id does not work')
1770    @unittest.skipUnless(hasattr(socket, 'if_nameindex'), "test needs socket.if_nameindex()")
1771    def test_getaddrinfo_ipv6_scopeid_symbolic(self):
1772        # Just pick up any network interface (Linux, Mac OS X)
1773        (ifindex, test_interface) = socket.if_nameindex()[0]
1774        ((*_, sockaddr),) = socket.getaddrinfo(
1775            'ff02::1de:c0:face:8D%' + test_interface,
1776            1234, socket.AF_INET6,
1777            socket.SOCK_DGRAM,
1778            socket.IPPROTO_UDP
1779        )
1780        # Note missing interface name part in IPv6 address
1781        self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex))
1782
1783    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
1784    @unittest.skipUnless(
1785        sys.platform == 'win32',
1786        'Numeric scope id does not work or undocumented')
1787    def test_getaddrinfo_ipv6_scopeid_numeric(self):
1788        # Also works on Linux and Mac OS X, but is not documented (?)
1789        # Windows, Linux and Max OS X allow nonexistent interface numbers here.
1790        ifindex = 42
1791        ((*_, sockaddr),) = socket.getaddrinfo(
1792            'ff02::1de:c0:face:8D%' + str(ifindex),
1793            1234, socket.AF_INET6,
1794            socket.SOCK_DGRAM,
1795            socket.IPPROTO_UDP
1796        )
1797        # Note missing interface name part in IPv6 address
1798        self.assertEqual(sockaddr, ('ff02::1de:c0:face:8d', 1234, 0, ifindex))
1799
1800    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
1801    @unittest.skipIf(sys.platform == 'win32', 'does not work on Windows')
1802    @unittest.skipIf(AIX, 'Symbolic scope id does not work')
1803    @unittest.skipUnless(hasattr(socket, 'if_nameindex'), "test needs socket.if_nameindex()")
1804    def test_getnameinfo_ipv6_scopeid_symbolic(self):
1805        # Just pick up any network interface.
1806        (ifindex, test_interface) = socket.if_nameindex()[0]
1807        sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex)  # Note capital letter `D`.
1808        nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV)
1809        self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + test_interface, '1234'))
1810
1811    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
1812    @unittest.skipUnless( sys.platform == 'win32',
1813        'Numeric scope id does not work or undocumented')
1814    def test_getnameinfo_ipv6_scopeid_numeric(self):
1815        # Also works on Linux (undocumented), but does not work on Mac OS X
1816        # Windows and Linux allow nonexistent interface numbers here.
1817        ifindex = 42
1818        sockaddr = ('ff02::1de:c0:face:8D', 1234, 0, ifindex)  # Note capital letter `D`.
1819        nameinfo = socket.getnameinfo(sockaddr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV)
1820        self.assertEqual(nameinfo, ('ff02::1de:c0:face:8d%' + str(ifindex), '1234'))
1821
1822    def test_str_for_enums(self):
1823        # Make sure that the AF_* and SOCK_* constants have enum-like string
1824        # reprs.
1825        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
1826            self.assertEqual(repr(s.family), '<AddressFamily.AF_INET: %r>' % s.family.value)
1827            self.assertEqual(repr(s.type), '<SocketKind.SOCK_STREAM: %r>' % s.type.value)
1828            self.assertEqual(str(s.family), str(s.family.value))
1829            self.assertEqual(str(s.type), str(s.type.value))
1830
1831    def test_socket_consistent_sock_type(self):
1832        SOCK_NONBLOCK = getattr(socket, 'SOCK_NONBLOCK', 0)
1833        SOCK_CLOEXEC = getattr(socket, 'SOCK_CLOEXEC', 0)
1834        sock_type = socket.SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC
1835
1836        with socket.socket(socket.AF_INET, sock_type) as s:
1837            self.assertEqual(s.type, socket.SOCK_STREAM)
1838            s.settimeout(1)
1839            self.assertEqual(s.type, socket.SOCK_STREAM)
1840            s.settimeout(0)
1841            self.assertEqual(s.type, socket.SOCK_STREAM)
1842            s.setblocking(True)
1843            self.assertEqual(s.type, socket.SOCK_STREAM)
1844            s.setblocking(False)
1845            self.assertEqual(s.type, socket.SOCK_STREAM)
1846
1847    def test_unknown_socket_family_repr(self):
1848        # Test that when created with a family that's not one of the known
1849        # AF_*/SOCK_* constants, socket.family just returns the number.
1850        #
1851        # To do this we fool socket.socket into believing it already has an
1852        # open fd because on this path it doesn't actually verify the family and
1853        # type and populates the socket object.
1854        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1855        fd = sock.detach()
1856        unknown_family = max(socket.AddressFamily.__members__.values()) + 1
1857
1858        unknown_type = max(
1859            kind
1860            for name, kind in socket.SocketKind.__members__.items()
1861            if name not in {'SOCK_NONBLOCK', 'SOCK_CLOEXEC'}
1862        ) + 1
1863
1864        with socket.socket(
1865                family=unknown_family, type=unknown_type, proto=23,
1866                fileno=fd) as s:
1867            self.assertEqual(s.family, unknown_family)
1868            self.assertEqual(s.type, unknown_type)
1869            # some OS like macOS ignore proto
1870            self.assertIn(s.proto, {0, 23})
1871
1872    @unittest.skipUnless(hasattr(os, 'sendfile'), 'test needs os.sendfile()')
1873    def test__sendfile_use_sendfile(self):
1874        class File:
1875            def __init__(self, fd):
1876                self.fd = fd
1877
1878            def fileno(self):
1879                return self.fd
1880        with socket.socket() as sock:
1881            fd = os.open(os.curdir, os.O_RDONLY)
1882            os.close(fd)
1883            with self.assertRaises(socket._GiveupOnSendfile):
1884                sock._sendfile_use_sendfile(File(fd))
1885            with self.assertRaises(OverflowError):
1886                sock._sendfile_use_sendfile(File(2**1000))
1887            with self.assertRaises(TypeError):
1888                sock._sendfile_use_sendfile(File(None))
1889
1890    def _test_socket_fileno(self, s, family, stype):
1891        self.assertEqual(s.family, family)
1892        self.assertEqual(s.type, stype)
1893
1894        fd = s.fileno()
1895        s2 = socket.socket(fileno=fd)
1896        self.addCleanup(s2.close)
1897        # detach old fd to avoid double close
1898        s.detach()
1899        self.assertEqual(s2.family, family)
1900        self.assertEqual(s2.type, stype)
1901        self.assertEqual(s2.fileno(), fd)
1902
1903    def test_socket_fileno(self):
1904        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1905        self.addCleanup(s.close)
1906        s.bind((socket_helper.HOST, 0))
1907        self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_STREAM)
1908
1909        if hasattr(socket, "SOCK_DGRAM"):
1910            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1911            self.addCleanup(s.close)
1912            s.bind((socket_helper.HOST, 0))
1913            self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_DGRAM)
1914
1915        if socket_helper.IPV6_ENABLED:
1916            s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
1917            self.addCleanup(s.close)
1918            s.bind((socket_helper.HOSTv6, 0, 0, 0))
1919            self._test_socket_fileno(s, socket.AF_INET6, socket.SOCK_STREAM)
1920
1921        if hasattr(socket, "AF_UNIX"):
1922            tmpdir = tempfile.mkdtemp()
1923            self.addCleanup(shutil.rmtree, tmpdir)
1924            s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
1925            self.addCleanup(s.close)
1926            try:
1927                s.bind(os.path.join(tmpdir, 'socket'))
1928            except PermissionError:
1929                pass
1930            else:
1931                self._test_socket_fileno(s, socket.AF_UNIX,
1932                                         socket.SOCK_STREAM)
1933
1934    def test_socket_fileno_rejects_float(self):
1935        with self.assertRaises(TypeError):
1936            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=42.5)
1937
1938    def test_socket_fileno_rejects_other_types(self):
1939        with self.assertRaises(TypeError):
1940            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno="foo")
1941
1942    def test_socket_fileno_rejects_invalid_socket(self):
1943        with self.assertRaisesRegex(ValueError, "negative file descriptor"):
1944            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-1)
1945
1946    @unittest.skipIf(os.name == "nt", "Windows disallows -1 only")
1947    def test_socket_fileno_rejects_negative(self):
1948        with self.assertRaisesRegex(ValueError, "negative file descriptor"):
1949            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=-42)
1950
1951    def test_socket_fileno_requires_valid_fd(self):
1952        WSAENOTSOCK = 10038
1953        with self.assertRaises(OSError) as cm:
1954            socket.socket(fileno=os_helper.make_bad_fd())
1955        self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK))
1956
1957        with self.assertRaises(OSError) as cm:
1958            socket.socket(
1959                socket.AF_INET,
1960                socket.SOCK_STREAM,
1961                fileno=os_helper.make_bad_fd())
1962        self.assertIn(cm.exception.errno, (errno.EBADF, WSAENOTSOCK))
1963
1964    def test_socket_fileno_requires_socket_fd(self):
1965        with tempfile.NamedTemporaryFile() as afile:
1966            with self.assertRaises(OSError):
1967                socket.socket(fileno=afile.fileno())
1968
1969            with self.assertRaises(OSError) as cm:
1970                socket.socket(
1971                    socket.AF_INET,
1972                    socket.SOCK_STREAM,
1973                    fileno=afile.fileno())
1974            self.assertEqual(cm.exception.errno, errno.ENOTSOCK)
1975
1976    def test_addressfamily_enum(self):
1977        import _socket, enum
1978        CheckedAddressFamily = enum._old_convert_(
1979                enum.IntEnum, 'AddressFamily', 'socket',
1980                lambda C: C.isupper() and C.startswith('AF_'),
1981                source=_socket,
1982                )
1983        enum._test_simple_enum(CheckedAddressFamily, socket.AddressFamily)
1984
1985    def test_socketkind_enum(self):
1986        import _socket, enum
1987        CheckedSocketKind = enum._old_convert_(
1988                enum.IntEnum, 'SocketKind', 'socket',
1989                lambda C: C.isupper() and C.startswith('SOCK_'),
1990                source=_socket,
1991                )
1992        enum._test_simple_enum(CheckedSocketKind, socket.SocketKind)
1993
1994    def test_msgflag_enum(self):
1995        import _socket, enum
1996        CheckedMsgFlag = enum._old_convert_(
1997                enum.IntFlag, 'MsgFlag', 'socket',
1998                lambda C: C.isupper() and C.startswith('MSG_'),
1999                source=_socket,
2000                )
2001        enum._test_simple_enum(CheckedMsgFlag, socket.MsgFlag)
2002
2003    def test_addressinfo_enum(self):
2004        import _socket, enum
2005        CheckedAddressInfo = enum._old_convert_(
2006                enum.IntFlag, 'AddressInfo', 'socket',
2007                lambda C: C.isupper() and C.startswith('AI_'),
2008                source=_socket)
2009        enum._test_simple_enum(CheckedAddressInfo, socket.AddressInfo)
2010
2011
2012@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
2013class BasicCANTest(unittest.TestCase):
2014
2015    def testCrucialConstants(self):
2016        socket.AF_CAN
2017        socket.PF_CAN
2018        socket.CAN_RAW
2019
2020    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
2021                         'socket.CAN_BCM required for this test.')
2022    def testBCMConstants(self):
2023        socket.CAN_BCM
2024
2025        # opcodes
2026        socket.CAN_BCM_TX_SETUP     # create (cyclic) transmission task
2027        socket.CAN_BCM_TX_DELETE    # remove (cyclic) transmission task
2028        socket.CAN_BCM_TX_READ      # read properties of (cyclic) transmission task
2029        socket.CAN_BCM_TX_SEND      # send one CAN frame
2030        socket.CAN_BCM_RX_SETUP     # create RX content filter subscription
2031        socket.CAN_BCM_RX_DELETE    # remove RX content filter subscription
2032        socket.CAN_BCM_RX_READ      # read properties of RX content filter subscription
2033        socket.CAN_BCM_TX_STATUS    # reply to TX_READ request
2034        socket.CAN_BCM_TX_EXPIRED   # notification on performed transmissions (count=0)
2035        socket.CAN_BCM_RX_STATUS    # reply to RX_READ request
2036        socket.CAN_BCM_RX_TIMEOUT   # cyclic message is absent
2037        socket.CAN_BCM_RX_CHANGED   # updated CAN frame (detected content change)
2038
2039        # flags
2040        socket.CAN_BCM_SETTIMER
2041        socket.CAN_BCM_STARTTIMER
2042        socket.CAN_BCM_TX_COUNTEVT
2043        socket.CAN_BCM_TX_ANNOUNCE
2044        socket.CAN_BCM_TX_CP_CAN_ID
2045        socket.CAN_BCM_RX_FILTER_ID
2046        socket.CAN_BCM_RX_CHECK_DLC
2047        socket.CAN_BCM_RX_NO_AUTOTIMER
2048        socket.CAN_BCM_RX_ANNOUNCE_RESUME
2049        socket.CAN_BCM_TX_RESET_MULTI_IDX
2050        socket.CAN_BCM_RX_RTR_FRAME
2051
2052    def testCreateSocket(self):
2053        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
2054            pass
2055
2056    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
2057                         'socket.CAN_BCM required for this test.')
2058    def testCreateBCMSocket(self):
2059        with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM) as s:
2060            pass
2061
2062    def testBindAny(self):
2063        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
2064            address = ('', )
2065            s.bind(address)
2066            self.assertEqual(s.getsockname(), address)
2067
2068    def testTooLongInterfaceName(self):
2069        # most systems limit IFNAMSIZ to 16, take 1024 to be sure
2070        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
2071            self.assertRaisesRegex(OSError, 'interface name too long',
2072                                   s.bind, ('x' * 1024,))
2073
2074    @unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"),
2075                         'socket.CAN_RAW_LOOPBACK required for this test.')
2076    def testLoopback(self):
2077        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
2078            for loopback in (0, 1):
2079                s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK,
2080                             loopback)
2081                self.assertEqual(loopback,
2082                    s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK))
2083
2084    @unittest.skipUnless(hasattr(socket, "CAN_RAW_FILTER"),
2085                         'socket.CAN_RAW_FILTER required for this test.')
2086    def testFilter(self):
2087        can_id, can_mask = 0x200, 0x700
2088        can_filter = struct.pack("=II", can_id, can_mask)
2089        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
2090            s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, can_filter)
2091            self.assertEqual(can_filter,
2092                    s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, 8))
2093            s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, bytearray(can_filter))
2094
2095
2096@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
2097class CANTest(ThreadedCANSocketTest):
2098
2099    def __init__(self, methodName='runTest'):
2100        ThreadedCANSocketTest.__init__(self, methodName=methodName)
2101
2102    @classmethod
2103    def build_can_frame(cls, can_id, data):
2104        """Build a CAN frame."""
2105        can_dlc = len(data)
2106        data = data.ljust(8, b'\x00')
2107        return struct.pack(cls.can_frame_fmt, can_id, can_dlc, data)
2108
2109    @classmethod
2110    def dissect_can_frame(cls, frame):
2111        """Dissect a CAN frame."""
2112        can_id, can_dlc, data = struct.unpack(cls.can_frame_fmt, frame)
2113        return (can_id, can_dlc, data[:can_dlc])
2114
2115    def testSendFrame(self):
2116        cf, addr = self.s.recvfrom(self.bufsize)
2117        self.assertEqual(self.cf, cf)
2118        self.assertEqual(addr[0], self.interface)
2119
2120    def _testSendFrame(self):
2121        self.cf = self.build_can_frame(0x00, b'\x01\x02\x03\x04\x05')
2122        self.cli.send(self.cf)
2123
2124    def testSendMaxFrame(self):
2125        cf, addr = self.s.recvfrom(self.bufsize)
2126        self.assertEqual(self.cf, cf)
2127
2128    def _testSendMaxFrame(self):
2129        self.cf = self.build_can_frame(0x00, b'\x07' * 8)
2130        self.cli.send(self.cf)
2131
2132    def testSendMultiFrames(self):
2133        cf, addr = self.s.recvfrom(self.bufsize)
2134        self.assertEqual(self.cf1, cf)
2135
2136        cf, addr = self.s.recvfrom(self.bufsize)
2137        self.assertEqual(self.cf2, cf)
2138
2139    def _testSendMultiFrames(self):
2140        self.cf1 = self.build_can_frame(0x07, b'\x44\x33\x22\x11')
2141        self.cli.send(self.cf1)
2142
2143        self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33')
2144        self.cli.send(self.cf2)
2145
2146    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
2147                         'socket.CAN_BCM required for this test.')
2148    def _testBCM(self):
2149        cf, addr = self.cli.recvfrom(self.bufsize)
2150        self.assertEqual(self.cf, cf)
2151        can_id, can_dlc, data = self.dissect_can_frame(cf)
2152        self.assertEqual(self.can_id, can_id)
2153        self.assertEqual(self.data, data)
2154
2155    @unittest.skipUnless(hasattr(socket, "CAN_BCM"),
2156                         'socket.CAN_BCM required for this test.')
2157    def testBCM(self):
2158        bcm = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_BCM)
2159        self.addCleanup(bcm.close)
2160        bcm.connect((self.interface,))
2161        self.can_id = 0x123
2162        self.data = bytes([0xc0, 0xff, 0xee])
2163        self.cf = self.build_can_frame(self.can_id, self.data)
2164        opcode = socket.CAN_BCM_TX_SEND
2165        flags = 0
2166        count = 0
2167        ival1_seconds = ival1_usec = ival2_seconds = ival2_usec = 0
2168        bcm_can_id = 0x0222
2169        nframes = 1
2170        assert len(self.cf) == 16
2171        header = struct.pack(self.bcm_cmd_msg_fmt,
2172                    opcode,
2173                    flags,
2174                    count,
2175                    ival1_seconds,
2176                    ival1_usec,
2177                    ival2_seconds,
2178                    ival2_usec,
2179                    bcm_can_id,
2180                    nframes,
2181                    )
2182        header_plus_frame = header + self.cf
2183        bytes_sent = bcm.send(header_plus_frame)
2184        self.assertEqual(bytes_sent, len(header_plus_frame))
2185
2186
2187@unittest.skipUnless(HAVE_SOCKET_CAN_ISOTP, 'CAN ISOTP required for this test.')
2188class ISOTPTest(unittest.TestCase):
2189
2190    def __init__(self, *args, **kwargs):
2191        super().__init__(*args, **kwargs)
2192        self.interface = "vcan0"
2193
2194    def testCrucialConstants(self):
2195        socket.AF_CAN
2196        socket.PF_CAN
2197        socket.CAN_ISOTP
2198        socket.SOCK_DGRAM
2199
2200    def testCreateSocket(self):
2201        with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
2202            pass
2203
2204    @unittest.skipUnless(hasattr(socket, "CAN_ISOTP"),
2205                         'socket.CAN_ISOTP required for this test.')
2206    def testCreateISOTPSocket(self):
2207        with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
2208            pass
2209
2210    def testTooLongInterfaceName(self):
2211        # most systems limit IFNAMSIZ to 16, take 1024 to be sure
2212        with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
2213            with self.assertRaisesRegex(OSError, 'interface name too long'):
2214                s.bind(('x' * 1024, 1, 2))
2215
2216    def testBind(self):
2217        try:
2218            with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
2219                addr = self.interface, 0x123, 0x456
2220                s.bind(addr)
2221                self.assertEqual(s.getsockname(), addr)
2222        except OSError as e:
2223            if e.errno == errno.ENODEV:
2224                self.skipTest('network interface `%s` does not exist' %
2225                           self.interface)
2226            else:
2227                raise
2228
2229
2230@unittest.skipUnless(HAVE_SOCKET_CAN_J1939, 'CAN J1939 required for this test.')
2231class J1939Test(unittest.TestCase):
2232
2233    def __init__(self, *args, **kwargs):
2234        super().__init__(*args, **kwargs)
2235        self.interface = "vcan0"
2236
2237    @unittest.skipUnless(hasattr(socket, "CAN_J1939"),
2238                         'socket.CAN_J1939 required for this test.')
2239    def testJ1939Constants(self):
2240        socket.CAN_J1939
2241
2242        socket.J1939_MAX_UNICAST_ADDR
2243        socket.J1939_IDLE_ADDR
2244        socket.J1939_NO_ADDR
2245        socket.J1939_NO_NAME
2246        socket.J1939_PGN_REQUEST
2247        socket.J1939_PGN_ADDRESS_CLAIMED
2248        socket.J1939_PGN_ADDRESS_COMMANDED
2249        socket.J1939_PGN_PDU1_MAX
2250        socket.J1939_PGN_MAX
2251        socket.J1939_NO_PGN
2252
2253        # J1939 socket options
2254        socket.SO_J1939_FILTER
2255        socket.SO_J1939_PROMISC
2256        socket.SO_J1939_SEND_PRIO
2257        socket.SO_J1939_ERRQUEUE
2258
2259        socket.SCM_J1939_DEST_ADDR
2260        socket.SCM_J1939_DEST_NAME
2261        socket.SCM_J1939_PRIO
2262        socket.SCM_J1939_ERRQUEUE
2263
2264        socket.J1939_NLA_PAD
2265        socket.J1939_NLA_BYTES_ACKED
2266
2267        socket.J1939_EE_INFO_NONE
2268        socket.J1939_EE_INFO_TX_ABORT
2269
2270        socket.J1939_FILTER_MAX
2271
2272    @unittest.skipUnless(hasattr(socket, "CAN_J1939"),
2273                         'socket.CAN_J1939 required for this test.')
2274    def testCreateJ1939Socket(self):
2275        with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) as s:
2276            pass
2277
2278    def testBind(self):
2279        try:
2280            with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) as s:
2281                addr = self.interface, socket.J1939_NO_NAME, socket.J1939_NO_PGN, socket.J1939_NO_ADDR
2282                s.bind(addr)
2283                self.assertEqual(s.getsockname(), addr)
2284        except OSError as e:
2285            if e.errno == errno.ENODEV:
2286                self.skipTest('network interface `%s` does not exist' %
2287                           self.interface)
2288            else:
2289                raise
2290
2291
2292@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
2293class BasicRDSTest(unittest.TestCase):
2294
2295    def testCrucialConstants(self):
2296        socket.AF_RDS
2297        socket.PF_RDS
2298
2299    def testCreateSocket(self):
2300        with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
2301            pass
2302
2303    def testSocketBufferSize(self):
2304        bufsize = 16384
2305        with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
2306            s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, bufsize)
2307            s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, bufsize)
2308
2309
2310@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
2311class RDSTest(ThreadedRDSSocketTest):
2312
2313    def __init__(self, methodName='runTest'):
2314        ThreadedRDSSocketTest.__init__(self, methodName=methodName)
2315
2316    def setUp(self):
2317        super().setUp()
2318        self.evt = threading.Event()
2319
2320    def testSendAndRecv(self):
2321        data, addr = self.serv.recvfrom(self.bufsize)
2322        self.assertEqual(self.data, data)
2323        self.assertEqual(self.cli_addr, addr)
2324
2325    def _testSendAndRecv(self):
2326        self.data = b'spam'
2327        self.cli.sendto(self.data, 0, (HOST, self.port))
2328
2329    def testPeek(self):
2330        data, addr = self.serv.recvfrom(self.bufsize, socket.MSG_PEEK)
2331        self.assertEqual(self.data, data)
2332        data, addr = self.serv.recvfrom(self.bufsize)
2333        self.assertEqual(self.data, data)
2334
2335    def _testPeek(self):
2336        self.data = b'spam'
2337        self.cli.sendto(self.data, 0, (HOST, self.port))
2338
2339    @requireAttrs(socket.socket, 'recvmsg')
2340    def testSendAndRecvMsg(self):
2341        data, ancdata, msg_flags, addr = self.serv.recvmsg(self.bufsize)
2342        self.assertEqual(self.data, data)
2343
2344    @requireAttrs(socket.socket, 'sendmsg')
2345    def _testSendAndRecvMsg(self):
2346        self.data = b'hello ' * 10
2347        self.cli.sendmsg([self.data], (), 0, (HOST, self.port))
2348
2349    def testSendAndRecvMulti(self):
2350        data, addr = self.serv.recvfrom(self.bufsize)
2351        self.assertEqual(self.data1, data)
2352
2353        data, addr = self.serv.recvfrom(self.bufsize)
2354        self.assertEqual(self.data2, data)
2355
2356    def _testSendAndRecvMulti(self):
2357        self.data1 = b'bacon'
2358        self.cli.sendto(self.data1, 0, (HOST, self.port))
2359
2360        self.data2 = b'egg'
2361        self.cli.sendto(self.data2, 0, (HOST, self.port))
2362
2363    def testSelect(self):
2364        r, w, x = select.select([self.serv], [], [], 3.0)
2365        self.assertIn(self.serv, r)
2366        data, addr = self.serv.recvfrom(self.bufsize)
2367        self.assertEqual(self.data, data)
2368
2369    def _testSelect(self):
2370        self.data = b'select'
2371        self.cli.sendto(self.data, 0, (HOST, self.port))
2372
2373@unittest.skipUnless(HAVE_SOCKET_QIPCRTR,
2374          'QIPCRTR sockets required for this test.')
2375class BasicQIPCRTRTest(unittest.TestCase):
2376
2377    def testCrucialConstants(self):
2378        socket.AF_QIPCRTR
2379
2380    def testCreateSocket(self):
2381        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2382            pass
2383
2384    def testUnbound(self):
2385        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2386            self.assertEqual(s.getsockname()[1], 0)
2387
2388    def testBindSock(self):
2389        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2390            socket_helper.bind_port(s, host=s.getsockname()[0])
2391            self.assertNotEqual(s.getsockname()[1], 0)
2392
2393    def testInvalidBindSock(self):
2394        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2395            self.assertRaises(OSError, socket_helper.bind_port, s, host=-2)
2396
2397    def testAutoBindSock(self):
2398        with socket.socket(socket.AF_QIPCRTR, socket.SOCK_DGRAM) as s:
2399            s.connect((123, 123))
2400            self.assertNotEqual(s.getsockname()[1], 0)
2401
2402@unittest.skipIf(fcntl is None, "need fcntl")
2403@unittest.skipUnless(HAVE_SOCKET_VSOCK,
2404          'VSOCK sockets required for this test.')
2405class BasicVSOCKTest(unittest.TestCase):
2406
2407    def testCrucialConstants(self):
2408        socket.AF_VSOCK
2409
2410    def testVSOCKConstants(self):
2411        socket.SO_VM_SOCKETS_BUFFER_SIZE
2412        socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE
2413        socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE
2414        socket.VMADDR_CID_ANY
2415        socket.VMADDR_PORT_ANY
2416        socket.VMADDR_CID_HOST
2417        socket.VM_SOCKETS_INVALID_VERSION
2418        socket.IOCTL_VM_SOCKETS_GET_LOCAL_CID
2419
2420    def testCreateSocket(self):
2421        with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s:
2422            pass
2423
2424    def testSocketBufferSize(self):
2425        with socket.socket(socket.AF_VSOCK, socket.SOCK_STREAM) as s:
2426            orig_max = s.getsockopt(socket.AF_VSOCK,
2427                                    socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE)
2428            orig = s.getsockopt(socket.AF_VSOCK,
2429                                socket.SO_VM_SOCKETS_BUFFER_SIZE)
2430            orig_min = s.getsockopt(socket.AF_VSOCK,
2431                                    socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE)
2432
2433            s.setsockopt(socket.AF_VSOCK,
2434                         socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE, orig_max * 2)
2435            s.setsockopt(socket.AF_VSOCK,
2436                         socket.SO_VM_SOCKETS_BUFFER_SIZE, orig * 2)
2437            s.setsockopt(socket.AF_VSOCK,
2438                         socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE, orig_min * 2)
2439
2440            self.assertEqual(orig_max * 2,
2441                             s.getsockopt(socket.AF_VSOCK,
2442                             socket.SO_VM_SOCKETS_BUFFER_MAX_SIZE))
2443            self.assertEqual(orig * 2,
2444                             s.getsockopt(socket.AF_VSOCK,
2445                             socket.SO_VM_SOCKETS_BUFFER_SIZE))
2446            self.assertEqual(orig_min * 2,
2447                             s.getsockopt(socket.AF_VSOCK,
2448                             socket.SO_VM_SOCKETS_BUFFER_MIN_SIZE))
2449
2450
2451@unittest.skipUnless(HAVE_SOCKET_BLUETOOTH,
2452                     'Bluetooth sockets required for this test.')
2453class BasicBluetoothTest(unittest.TestCase):
2454
2455    def testBluetoothConstants(self):
2456        socket.BDADDR_ANY
2457        socket.BDADDR_LOCAL
2458        socket.AF_BLUETOOTH
2459        socket.BTPROTO_RFCOMM
2460
2461        if sys.platform != "win32":
2462            socket.BTPROTO_HCI
2463            socket.SOL_HCI
2464            socket.BTPROTO_L2CAP
2465
2466            if not sys.platform.startswith("freebsd"):
2467                socket.BTPROTO_SCO
2468
2469    def testCreateRfcommSocket(self):
2470        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s:
2471            pass
2472
2473    @unittest.skipIf(sys.platform == "win32", "windows does not support L2CAP sockets")
2474    def testCreateL2capSocket(self):
2475        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) as s:
2476            pass
2477
2478    @unittest.skipIf(sys.platform == "win32", "windows does not support HCI sockets")
2479    def testCreateHciSocket(self):
2480        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s:
2481            pass
2482
2483    @unittest.skipIf(sys.platform == "win32" or sys.platform.startswith("freebsd"),
2484                     "windows and freebsd do not support SCO sockets")
2485    def testCreateScoSocket(self):
2486        with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s:
2487            pass
2488
2489
2490class BasicTCPTest(SocketConnectedTest):
2491
2492    def __init__(self, methodName='runTest'):
2493        SocketConnectedTest.__init__(self, methodName=methodName)
2494
2495    def testRecv(self):
2496        # Testing large receive over TCP
2497        msg = self.cli_conn.recv(1024)
2498        self.assertEqual(msg, MSG)
2499
2500    def _testRecv(self):
2501        self.serv_conn.send(MSG)
2502
2503    def testOverFlowRecv(self):
2504        # Testing receive in chunks over TCP
2505        seg1 = self.cli_conn.recv(len(MSG) - 3)
2506        seg2 = self.cli_conn.recv(1024)
2507        msg = seg1 + seg2
2508        self.assertEqual(msg, MSG)
2509
2510    def _testOverFlowRecv(self):
2511        self.serv_conn.send(MSG)
2512
2513    def testRecvFrom(self):
2514        # Testing large recvfrom() over TCP
2515        msg, addr = self.cli_conn.recvfrom(1024)
2516        self.assertEqual(msg, MSG)
2517
2518    def _testRecvFrom(self):
2519        self.serv_conn.send(MSG)
2520
2521    def testOverFlowRecvFrom(self):
2522        # Testing recvfrom() in chunks over TCP
2523        seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
2524        seg2, addr = self.cli_conn.recvfrom(1024)
2525        msg = seg1 + seg2
2526        self.assertEqual(msg, MSG)
2527
2528    def _testOverFlowRecvFrom(self):
2529        self.serv_conn.send(MSG)
2530
2531    def testSendAll(self):
2532        # Testing sendall() with a 2048 byte string over TCP
2533        msg = b''
2534        while 1:
2535            read = self.cli_conn.recv(1024)
2536            if not read:
2537                break
2538            msg += read
2539        self.assertEqual(msg, b'f' * 2048)
2540
2541    def _testSendAll(self):
2542        big_chunk = b'f' * 2048
2543        self.serv_conn.sendall(big_chunk)
2544
2545    def testFromFd(self):
2546        # Testing fromfd()
2547        fd = self.cli_conn.fileno()
2548        sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
2549        self.addCleanup(sock.close)
2550        self.assertIsInstance(sock, socket.socket)
2551        msg = sock.recv(1024)
2552        self.assertEqual(msg, MSG)
2553
2554    def _testFromFd(self):
2555        self.serv_conn.send(MSG)
2556
2557    def testDup(self):
2558        # Testing dup()
2559        sock = self.cli_conn.dup()
2560        self.addCleanup(sock.close)
2561        msg = sock.recv(1024)
2562        self.assertEqual(msg, MSG)
2563
2564    def _testDup(self):
2565        self.serv_conn.send(MSG)
2566
2567    def testShutdown(self):
2568        # Testing shutdown()
2569        msg = self.cli_conn.recv(1024)
2570        self.assertEqual(msg, MSG)
2571        # wait for _testShutdown to finish: on OS X, when the server
2572        # closes the connection the client also becomes disconnected,
2573        # and the client's shutdown call will fail. (Issue #4397.)
2574        self.done.wait()
2575
2576    def _testShutdown(self):
2577        self.serv_conn.send(MSG)
2578        self.serv_conn.shutdown(2)
2579
2580    testShutdown_overflow = support.cpython_only(testShutdown)
2581
2582    @support.cpython_only
2583    def _testShutdown_overflow(self):
2584        import _testcapi
2585        self.serv_conn.send(MSG)
2586        # Issue 15989
2587        self.assertRaises(OverflowError, self.serv_conn.shutdown,
2588                          _testcapi.INT_MAX + 1)
2589        self.assertRaises(OverflowError, self.serv_conn.shutdown,
2590                          2 + (_testcapi.UINT_MAX + 1))
2591        self.serv_conn.shutdown(2)
2592
2593    def testDetach(self):
2594        # Testing detach()
2595        fileno = self.cli_conn.fileno()
2596        f = self.cli_conn.detach()
2597        self.assertEqual(f, fileno)
2598        # cli_conn cannot be used anymore...
2599        self.assertTrue(self.cli_conn._closed)
2600        self.assertRaises(OSError, self.cli_conn.recv, 1024)
2601        self.cli_conn.close()
2602        # ...but we can create another socket using the (still open)
2603        # file descriptor
2604        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=f)
2605        self.addCleanup(sock.close)
2606        msg = sock.recv(1024)
2607        self.assertEqual(msg, MSG)
2608
2609    def _testDetach(self):
2610        self.serv_conn.send(MSG)
2611
2612
2613class BasicUDPTest(ThreadedUDPSocketTest):
2614
2615    def __init__(self, methodName='runTest'):
2616        ThreadedUDPSocketTest.__init__(self, methodName=methodName)
2617
2618    def testSendtoAndRecv(self):
2619        # Testing sendto() and Recv() over UDP
2620        msg = self.serv.recv(len(MSG))
2621        self.assertEqual(msg, MSG)
2622
2623    def _testSendtoAndRecv(self):
2624        self.cli.sendto(MSG, 0, (HOST, self.port))
2625
2626    def testRecvFrom(self):
2627        # Testing recvfrom() over UDP
2628        msg, addr = self.serv.recvfrom(len(MSG))
2629        self.assertEqual(msg, MSG)
2630
2631    def _testRecvFrom(self):
2632        self.cli.sendto(MSG, 0, (HOST, self.port))
2633
2634    def testRecvFromNegative(self):
2635        # Negative lengths passed to recvfrom should give ValueError.
2636        self.assertRaises(ValueError, self.serv.recvfrom, -1)
2637
2638    def _testRecvFromNegative(self):
2639        self.cli.sendto(MSG, 0, (HOST, self.port))
2640
2641
2642@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
2643          'UDPLITE sockets required for this test.')
2644class BasicUDPLITETest(ThreadedUDPLITESocketTest):
2645
2646    def __init__(self, methodName='runTest'):
2647        ThreadedUDPLITESocketTest.__init__(self, methodName=methodName)
2648
2649    def testSendtoAndRecv(self):
2650        # Testing sendto() and Recv() over UDPLITE
2651        msg = self.serv.recv(len(MSG))
2652        self.assertEqual(msg, MSG)
2653
2654    def _testSendtoAndRecv(self):
2655        self.cli.sendto(MSG, 0, (HOST, self.port))
2656
2657    def testRecvFrom(self):
2658        # Testing recvfrom() over UDPLITE
2659        msg, addr = self.serv.recvfrom(len(MSG))
2660        self.assertEqual(msg, MSG)
2661
2662    def _testRecvFrom(self):
2663        self.cli.sendto(MSG, 0, (HOST, self.port))
2664
2665    def testRecvFromNegative(self):
2666        # Negative lengths passed to recvfrom should give ValueError.
2667        self.assertRaises(ValueError, self.serv.recvfrom, -1)
2668
2669    def _testRecvFromNegative(self):
2670        self.cli.sendto(MSG, 0, (HOST, self.port))
2671
2672# Tests for the sendmsg()/recvmsg() interface.  Where possible, the
2673# same test code is used with different families and types of socket
2674# (e.g. stream, datagram), and tests using recvmsg() are repeated
2675# using recvmsg_into().
2676#
2677# The generic test classes such as SendmsgTests and
2678# RecvmsgGenericTests inherit from SendrecvmsgBase and expect to be
2679# supplied with sockets cli_sock and serv_sock representing the
2680# client's and the server's end of the connection respectively, and
2681# attributes cli_addr and serv_addr holding their (numeric where
2682# appropriate) addresses.
2683#
2684# The final concrete test classes combine these with subclasses of
2685# SocketTestBase which set up client and server sockets of a specific
2686# type, and with subclasses of SendrecvmsgBase such as
2687# SendrecvmsgDgramBase and SendrecvmsgConnectedBase which map these
2688# sockets to cli_sock and serv_sock and override the methods and
2689# attributes of SendrecvmsgBase to fill in destination addresses if
2690# needed when sending, check for specific flags in msg_flags, etc.
2691#
2692# RecvmsgIntoMixin provides a version of doRecvmsg() implemented using
2693# recvmsg_into().
2694
2695# XXX: like the other datagram (UDP) tests in this module, the code
2696# here assumes that datagram delivery on the local machine will be
2697# reliable.
2698
2699class SendrecvmsgBase(ThreadSafeCleanupTestCase):
2700    # Base class for sendmsg()/recvmsg() tests.
2701
2702    # Time in seconds to wait before considering a test failed, or
2703    # None for no timeout.  Not all tests actually set a timeout.
2704    fail_timeout = support.LOOPBACK_TIMEOUT
2705
2706    def setUp(self):
2707        self.misc_event = threading.Event()
2708        super().setUp()
2709
2710    def sendToServer(self, msg):
2711        # Send msg to the server.
2712        return self.cli_sock.send(msg)
2713
2714    # Tuple of alternative default arguments for sendmsg() when called
2715    # via sendmsgToServer() (e.g. to include a destination address).
2716    sendmsg_to_server_defaults = ()
2717
2718    def sendmsgToServer(self, *args):
2719        # Call sendmsg() on self.cli_sock with the given arguments,
2720        # filling in any arguments which are not supplied with the
2721        # corresponding items of self.sendmsg_to_server_defaults, if
2722        # any.
2723        return self.cli_sock.sendmsg(
2724            *(args + self.sendmsg_to_server_defaults[len(args):]))
2725
2726    def doRecvmsg(self, sock, bufsize, *args):
2727        # Call recvmsg() on sock with given arguments and return its
2728        # result.  Should be used for tests which can use either
2729        # recvmsg() or recvmsg_into() - RecvmsgIntoMixin overrides
2730        # this method with one which emulates it using recvmsg_into(),
2731        # thus allowing the same test to be used for both methods.
2732        result = sock.recvmsg(bufsize, *args)
2733        self.registerRecvmsgResult(result)
2734        return result
2735
2736    def registerRecvmsgResult(self, result):
2737        # Called by doRecvmsg() with the return value of recvmsg() or
2738        # recvmsg_into().  Can be overridden to arrange cleanup based
2739        # on the returned ancillary data, for instance.
2740        pass
2741
2742    def checkRecvmsgAddress(self, addr1, addr2):
2743        # Called to compare the received address with the address of
2744        # the peer.
2745        self.assertEqual(addr1, addr2)
2746
2747    # Flags that are normally unset in msg_flags
2748    msg_flags_common_unset = 0
2749    for name in ("MSG_CTRUNC", "MSG_OOB"):
2750        msg_flags_common_unset |= getattr(socket, name, 0)
2751
2752    # Flags that are normally set
2753    msg_flags_common_set = 0
2754
2755    # Flags set when a complete record has been received (e.g. MSG_EOR
2756    # for SCTP)
2757    msg_flags_eor_indicator = 0
2758
2759    # Flags set when a complete record has not been received
2760    # (e.g. MSG_TRUNC for datagram sockets)
2761    msg_flags_non_eor_indicator = 0
2762
2763    def checkFlags(self, flags, eor=None, checkset=0, checkunset=0, ignore=0):
2764        # Method to check the value of msg_flags returned by recvmsg[_into]().
2765        #
2766        # Checks that all bits in msg_flags_common_set attribute are
2767        # set in "flags" and all bits in msg_flags_common_unset are
2768        # unset.
2769        #
2770        # The "eor" argument specifies whether the flags should
2771        # indicate that a full record (or datagram) has been received.
2772        # If "eor" is None, no checks are done; otherwise, checks
2773        # that:
2774        #
2775        #  * if "eor" is true, all bits in msg_flags_eor_indicator are
2776        #    set and all bits in msg_flags_non_eor_indicator are unset
2777        #
2778        #  * if "eor" is false, all bits in msg_flags_non_eor_indicator
2779        #    are set and all bits in msg_flags_eor_indicator are unset
2780        #
2781        # If "checkset" and/or "checkunset" are supplied, they require
2782        # the given bits to be set or unset respectively, overriding
2783        # what the attributes require for those bits.
2784        #
2785        # If any bits are set in "ignore", they will not be checked,
2786        # regardless of the other inputs.
2787        #
2788        # Will raise Exception if the inputs require a bit to be both
2789        # set and unset, and it is not ignored.
2790
2791        defaultset = self.msg_flags_common_set
2792        defaultunset = self.msg_flags_common_unset
2793
2794        if eor:
2795            defaultset |= self.msg_flags_eor_indicator
2796            defaultunset |= self.msg_flags_non_eor_indicator
2797        elif eor is not None:
2798            defaultset |= self.msg_flags_non_eor_indicator
2799            defaultunset |= self.msg_flags_eor_indicator
2800
2801        # Function arguments override defaults
2802        defaultset &= ~checkunset
2803        defaultunset &= ~checkset
2804
2805        # Merge arguments with remaining defaults, and check for conflicts
2806        checkset |= defaultset
2807        checkunset |= defaultunset
2808        inboth = checkset & checkunset & ~ignore
2809        if inboth:
2810            raise Exception("contradictory set, unset requirements for flags "
2811                            "{0:#x}".format(inboth))
2812
2813        # Compare with given msg_flags value
2814        mask = (checkset | checkunset) & ~ignore
2815        self.assertEqual(flags & mask, checkset & mask)
2816
2817
2818class RecvmsgIntoMixin(SendrecvmsgBase):
2819    # Mixin to implement doRecvmsg() using recvmsg_into().
2820
2821    def doRecvmsg(self, sock, bufsize, *args):
2822        buf = bytearray(bufsize)
2823        result = sock.recvmsg_into([buf], *args)
2824        self.registerRecvmsgResult(result)
2825        self.assertGreaterEqual(result[0], 0)
2826        self.assertLessEqual(result[0], bufsize)
2827        return (bytes(buf[:result[0]]),) + result[1:]
2828
2829
2830class SendrecvmsgDgramFlagsBase(SendrecvmsgBase):
2831    # Defines flags to be checked in msg_flags for datagram sockets.
2832
2833    @property
2834    def msg_flags_non_eor_indicator(self):
2835        return super().msg_flags_non_eor_indicator | socket.MSG_TRUNC
2836
2837
2838class SendrecvmsgSCTPFlagsBase(SendrecvmsgBase):
2839    # Defines flags to be checked in msg_flags for SCTP sockets.
2840
2841    @property
2842    def msg_flags_eor_indicator(self):
2843        return super().msg_flags_eor_indicator | socket.MSG_EOR
2844
2845
2846class SendrecvmsgConnectionlessBase(SendrecvmsgBase):
2847    # Base class for tests on connectionless-mode sockets.  Users must
2848    # supply sockets on attributes cli and serv to be mapped to
2849    # cli_sock and serv_sock respectively.
2850
2851    @property
2852    def serv_sock(self):
2853        return self.serv
2854
2855    @property
2856    def cli_sock(self):
2857        return self.cli
2858
2859    @property
2860    def sendmsg_to_server_defaults(self):
2861        return ([], [], 0, self.serv_addr)
2862
2863    def sendToServer(self, msg):
2864        return self.cli_sock.sendto(msg, self.serv_addr)
2865
2866
2867class SendrecvmsgConnectedBase(SendrecvmsgBase):
2868    # Base class for tests on connected sockets.  Users must supply
2869    # sockets on attributes serv_conn and cli_conn (representing the
2870    # connections *to* the server and the client), to be mapped to
2871    # cli_sock and serv_sock respectively.
2872
2873    @property
2874    def serv_sock(self):
2875        return self.cli_conn
2876
2877    @property
2878    def cli_sock(self):
2879        return self.serv_conn
2880
2881    def checkRecvmsgAddress(self, addr1, addr2):
2882        # Address is currently "unspecified" for a connected socket,
2883        # so we don't examine it
2884        pass
2885
2886
2887class SendrecvmsgServerTimeoutBase(SendrecvmsgBase):
2888    # Base class to set a timeout on server's socket.
2889
2890    def setUp(self):
2891        super().setUp()
2892        self.serv_sock.settimeout(self.fail_timeout)
2893
2894
2895class SendmsgTests(SendrecvmsgServerTimeoutBase):
2896    # Tests for sendmsg() which can use any socket type and do not
2897    # involve recvmsg() or recvmsg_into().
2898
2899    def testSendmsg(self):
2900        # Send a simple message with sendmsg().
2901        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2902
2903    def _testSendmsg(self):
2904        self.assertEqual(self.sendmsgToServer([MSG]), len(MSG))
2905
2906    def testSendmsgDataGenerator(self):
2907        # Send from buffer obtained from a generator (not a sequence).
2908        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2909
2910    def _testSendmsgDataGenerator(self):
2911        self.assertEqual(self.sendmsgToServer((o for o in [MSG])),
2912                         len(MSG))
2913
2914    def testSendmsgAncillaryGenerator(self):
2915        # Gather (empty) ancillary data from a generator.
2916        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2917
2918    def _testSendmsgAncillaryGenerator(self):
2919        self.assertEqual(self.sendmsgToServer([MSG], (o for o in [])),
2920                         len(MSG))
2921
2922    def testSendmsgArray(self):
2923        # Send data from an array instead of the usual bytes object.
2924        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2925
2926    def _testSendmsgArray(self):
2927        self.assertEqual(self.sendmsgToServer([array.array("B", MSG)]),
2928                         len(MSG))
2929
2930    def testSendmsgGather(self):
2931        # Send message data from more than one buffer (gather write).
2932        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
2933
2934    def _testSendmsgGather(self):
2935        self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG))
2936
2937    def testSendmsgBadArgs(self):
2938        # Check that sendmsg() rejects invalid arguments.
2939        self.assertEqual(self.serv_sock.recv(1000), b"done")
2940
2941    def _testSendmsgBadArgs(self):
2942        self.assertRaises(TypeError, self.cli_sock.sendmsg)
2943        self.assertRaises(TypeError, self.sendmsgToServer,
2944                          b"not in an iterable")
2945        self.assertRaises(TypeError, self.sendmsgToServer,
2946                          object())
2947        self.assertRaises(TypeError, self.sendmsgToServer,
2948                          [object()])
2949        self.assertRaises(TypeError, self.sendmsgToServer,
2950                          [MSG, object()])
2951        self.assertRaises(TypeError, self.sendmsgToServer,
2952                          [MSG], object())
2953        self.assertRaises(TypeError, self.sendmsgToServer,
2954                          [MSG], [], object())
2955        self.assertRaises(TypeError, self.sendmsgToServer,
2956                          [MSG], [], 0, object())
2957        self.sendToServer(b"done")
2958
2959    def testSendmsgBadCmsg(self):
2960        # Check that invalid ancillary data items are rejected.
2961        self.assertEqual(self.serv_sock.recv(1000), b"done")
2962
2963    def _testSendmsgBadCmsg(self):
2964        self.assertRaises(TypeError, self.sendmsgToServer,
2965                          [MSG], [object()])
2966        self.assertRaises(TypeError, self.sendmsgToServer,
2967                          [MSG], [(object(), 0, b"data")])
2968        self.assertRaises(TypeError, self.sendmsgToServer,
2969                          [MSG], [(0, object(), b"data")])
2970        self.assertRaises(TypeError, self.sendmsgToServer,
2971                          [MSG], [(0, 0, object())])
2972        self.assertRaises(TypeError, self.sendmsgToServer,
2973                          [MSG], [(0, 0)])
2974        self.assertRaises(TypeError, self.sendmsgToServer,
2975                          [MSG], [(0, 0, b"data", 42)])
2976        self.sendToServer(b"done")
2977
2978    @requireAttrs(socket, "CMSG_SPACE")
2979    def testSendmsgBadMultiCmsg(self):
2980        # Check that invalid ancillary data items are rejected when
2981        # more than one item is present.
2982        self.assertEqual(self.serv_sock.recv(1000), b"done")
2983
2984    @testSendmsgBadMultiCmsg.client_skip
2985    def _testSendmsgBadMultiCmsg(self):
2986        self.assertRaises(TypeError, self.sendmsgToServer,
2987                          [MSG], [0, 0, b""])
2988        self.assertRaises(TypeError, self.sendmsgToServer,
2989                          [MSG], [(0, 0, b""), object()])
2990        self.sendToServer(b"done")
2991
2992    def testSendmsgExcessCmsgReject(self):
2993        # Check that sendmsg() rejects excess ancillary data items
2994        # when the number that can be sent is limited.
2995        self.assertEqual(self.serv_sock.recv(1000), b"done")
2996
2997    def _testSendmsgExcessCmsgReject(self):
2998        if not hasattr(socket, "CMSG_SPACE"):
2999            # Can only send one item
3000            with self.assertRaises(OSError) as cm:
3001                self.sendmsgToServer([MSG], [(0, 0, b""), (0, 0, b"")])
3002            self.assertIsNone(cm.exception.errno)
3003        self.sendToServer(b"done")
3004
3005    def testSendmsgAfterClose(self):
3006        # Check that sendmsg() fails on a closed socket.
3007        pass
3008
3009    def _testSendmsgAfterClose(self):
3010        self.cli_sock.close()
3011        self.assertRaises(OSError, self.sendmsgToServer, [MSG])
3012
3013
3014class SendmsgStreamTests(SendmsgTests):
3015    # Tests for sendmsg() which require a stream socket and do not
3016    # involve recvmsg() or recvmsg_into().
3017
3018    def testSendmsgExplicitNoneAddr(self):
3019        # Check that peer address can be specified as None.
3020        self.assertEqual(self.serv_sock.recv(len(MSG)), MSG)
3021
3022    def _testSendmsgExplicitNoneAddr(self):
3023        self.assertEqual(self.sendmsgToServer([MSG], [], 0, None), len(MSG))
3024
3025    def testSendmsgTimeout(self):
3026        # Check that timeout works with sendmsg().
3027        self.assertEqual(self.serv_sock.recv(512), b"a"*512)
3028        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3029
3030    def _testSendmsgTimeout(self):
3031        try:
3032            self.cli_sock.settimeout(0.03)
3033            try:
3034                while True:
3035                    self.sendmsgToServer([b"a"*512])
3036            except TimeoutError:
3037                pass
3038            except OSError as exc:
3039                if exc.errno != errno.ENOMEM:
3040                    raise
3041                # bpo-33937 the test randomly fails on Travis CI with
3042                # "OSError: [Errno 12] Cannot allocate memory"
3043            else:
3044                self.fail("TimeoutError not raised")
3045        finally:
3046            self.misc_event.set()
3047
3048    # XXX: would be nice to have more tests for sendmsg flags argument.
3049
3050    # Linux supports MSG_DONTWAIT when sending, but in general, it
3051    # only works when receiving.  Could add other platforms if they
3052    # support it too.
3053    @skipWithClientIf(sys.platform not in {"linux"},
3054                      "MSG_DONTWAIT not known to work on this platform when "
3055                      "sending")
3056    def testSendmsgDontWait(self):
3057        # Check that MSG_DONTWAIT in flags causes non-blocking behaviour.
3058        self.assertEqual(self.serv_sock.recv(512), b"a"*512)
3059        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3060
3061    @testSendmsgDontWait.client_skip
3062    def _testSendmsgDontWait(self):
3063        try:
3064            with self.assertRaises(OSError) as cm:
3065                while True:
3066                    self.sendmsgToServer([b"a"*512], [], socket.MSG_DONTWAIT)
3067            # bpo-33937: catch also ENOMEM, the test randomly fails on Travis CI
3068            # with "OSError: [Errno 12] Cannot allocate memory"
3069            self.assertIn(cm.exception.errno,
3070                          (errno.EAGAIN, errno.EWOULDBLOCK, errno.ENOMEM))
3071        finally:
3072            self.misc_event.set()
3073
3074
3075class SendmsgConnectionlessTests(SendmsgTests):
3076    # Tests for sendmsg() which require a connectionless-mode
3077    # (e.g. datagram) socket, and do not involve recvmsg() or
3078    # recvmsg_into().
3079
3080    def testSendmsgNoDestAddr(self):
3081        # Check that sendmsg() fails when no destination address is
3082        # given for unconnected socket.
3083        pass
3084
3085    def _testSendmsgNoDestAddr(self):
3086        self.assertRaises(OSError, self.cli_sock.sendmsg,
3087                          [MSG])
3088        self.assertRaises(OSError, self.cli_sock.sendmsg,
3089                          [MSG], [], 0, None)
3090
3091
3092class RecvmsgGenericTests(SendrecvmsgBase):
3093    # Tests for recvmsg() which can also be emulated using
3094    # recvmsg_into(), and can use any socket type.
3095
3096    def testRecvmsg(self):
3097        # Receive a simple message with recvmsg[_into]().
3098        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
3099        self.assertEqual(msg, MSG)
3100        self.checkRecvmsgAddress(addr, self.cli_addr)
3101        self.assertEqual(ancdata, [])
3102        self.checkFlags(flags, eor=True)
3103
3104    def _testRecvmsg(self):
3105        self.sendToServer(MSG)
3106
3107    def testRecvmsgExplicitDefaults(self):
3108        # Test recvmsg[_into]() with default arguments provided explicitly.
3109        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3110                                                   len(MSG), 0, 0)
3111        self.assertEqual(msg, MSG)
3112        self.checkRecvmsgAddress(addr, self.cli_addr)
3113        self.assertEqual(ancdata, [])
3114        self.checkFlags(flags, eor=True)
3115
3116    def _testRecvmsgExplicitDefaults(self):
3117        self.sendToServer(MSG)
3118
3119    def testRecvmsgShorter(self):
3120        # Receive a message smaller than buffer.
3121        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3122                                                   len(MSG) + 42)
3123        self.assertEqual(msg, MSG)
3124        self.checkRecvmsgAddress(addr, self.cli_addr)
3125        self.assertEqual(ancdata, [])
3126        self.checkFlags(flags, eor=True)
3127
3128    def _testRecvmsgShorter(self):
3129        self.sendToServer(MSG)
3130
3131    def testRecvmsgTrunc(self):
3132        # Receive part of message, check for truncation indicators.
3133        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3134                                                   len(MSG) - 3)
3135        self.assertEqual(msg, MSG[:-3])
3136        self.checkRecvmsgAddress(addr, self.cli_addr)
3137        self.assertEqual(ancdata, [])
3138        self.checkFlags(flags, eor=False)
3139
3140    def _testRecvmsgTrunc(self):
3141        self.sendToServer(MSG)
3142
3143    def testRecvmsgShortAncillaryBuf(self):
3144        # Test ancillary data buffer too small to hold any ancillary data.
3145        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3146                                                   len(MSG), 1)
3147        self.assertEqual(msg, MSG)
3148        self.checkRecvmsgAddress(addr, self.cli_addr)
3149        self.assertEqual(ancdata, [])
3150        self.checkFlags(flags, eor=True)
3151
3152    def _testRecvmsgShortAncillaryBuf(self):
3153        self.sendToServer(MSG)
3154
3155    def testRecvmsgLongAncillaryBuf(self):
3156        # Test large ancillary data buffer.
3157        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3158                                                   len(MSG), 10240)
3159        self.assertEqual(msg, MSG)
3160        self.checkRecvmsgAddress(addr, self.cli_addr)
3161        self.assertEqual(ancdata, [])
3162        self.checkFlags(flags, eor=True)
3163
3164    def _testRecvmsgLongAncillaryBuf(self):
3165        self.sendToServer(MSG)
3166
3167    def testRecvmsgAfterClose(self):
3168        # Check that recvmsg[_into]() fails on a closed socket.
3169        self.serv_sock.close()
3170        self.assertRaises(OSError, self.doRecvmsg, self.serv_sock, 1024)
3171
3172    def _testRecvmsgAfterClose(self):
3173        pass
3174
3175    def testRecvmsgTimeout(self):
3176        # Check that timeout works.
3177        try:
3178            self.serv_sock.settimeout(0.03)
3179            self.assertRaises(TimeoutError,
3180                              self.doRecvmsg, self.serv_sock, len(MSG))
3181        finally:
3182            self.misc_event.set()
3183
3184    def _testRecvmsgTimeout(self):
3185        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3186
3187    @requireAttrs(socket, "MSG_PEEK")
3188    def testRecvmsgPeek(self):
3189        # Check that MSG_PEEK in flags enables examination of pending
3190        # data without consuming it.
3191
3192        # Receive part of data with MSG_PEEK.
3193        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3194                                                   len(MSG) - 3, 0,
3195                                                   socket.MSG_PEEK)
3196        self.assertEqual(msg, MSG[:-3])
3197        self.checkRecvmsgAddress(addr, self.cli_addr)
3198        self.assertEqual(ancdata, [])
3199        # Ignoring MSG_TRUNC here (so this test is the same for stream
3200        # and datagram sockets).  Some wording in POSIX seems to
3201        # suggest that it needn't be set when peeking, but that may
3202        # just be a slip.
3203        self.checkFlags(flags, eor=False,
3204                        ignore=getattr(socket, "MSG_TRUNC", 0))
3205
3206        # Receive all data with MSG_PEEK.
3207        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3208                                                   len(MSG), 0,
3209                                                   socket.MSG_PEEK)
3210        self.assertEqual(msg, MSG)
3211        self.checkRecvmsgAddress(addr, self.cli_addr)
3212        self.assertEqual(ancdata, [])
3213        self.checkFlags(flags, eor=True)
3214
3215        # Check that the same data can still be received normally.
3216        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
3217        self.assertEqual(msg, MSG)
3218        self.checkRecvmsgAddress(addr, self.cli_addr)
3219        self.assertEqual(ancdata, [])
3220        self.checkFlags(flags, eor=True)
3221
3222    @testRecvmsgPeek.client_skip
3223    def _testRecvmsgPeek(self):
3224        self.sendToServer(MSG)
3225
3226    @requireAttrs(socket.socket, "sendmsg")
3227    def testRecvmsgFromSendmsg(self):
3228        # Test receiving with recvmsg[_into]() when message is sent
3229        # using sendmsg().
3230        self.serv_sock.settimeout(self.fail_timeout)
3231        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, len(MSG))
3232        self.assertEqual(msg, MSG)
3233        self.checkRecvmsgAddress(addr, self.cli_addr)
3234        self.assertEqual(ancdata, [])
3235        self.checkFlags(flags, eor=True)
3236
3237    @testRecvmsgFromSendmsg.client_skip
3238    def _testRecvmsgFromSendmsg(self):
3239        self.assertEqual(self.sendmsgToServer([MSG[:3], MSG[3:]]), len(MSG))
3240
3241
3242class RecvmsgGenericStreamTests(RecvmsgGenericTests):
3243    # Tests which require a stream socket and can use either recvmsg()
3244    # or recvmsg_into().
3245
3246    def testRecvmsgEOF(self):
3247        # Receive end-of-stream indicator (b"", peer socket closed).
3248        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024)
3249        self.assertEqual(msg, b"")
3250        self.checkRecvmsgAddress(addr, self.cli_addr)
3251        self.assertEqual(ancdata, [])
3252        self.checkFlags(flags, eor=None) # Might not have end-of-record marker
3253
3254    def _testRecvmsgEOF(self):
3255        self.cli_sock.close()
3256
3257    def testRecvmsgOverflow(self):
3258        # Receive a message in more than one chunk.
3259        seg1, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3260                                                    len(MSG) - 3)
3261        self.checkRecvmsgAddress(addr, self.cli_addr)
3262        self.assertEqual(ancdata, [])
3263        self.checkFlags(flags, eor=False)
3264
3265        seg2, ancdata, flags, addr = self.doRecvmsg(self.serv_sock, 1024)
3266        self.checkRecvmsgAddress(addr, self.cli_addr)
3267        self.assertEqual(ancdata, [])
3268        self.checkFlags(flags, eor=True)
3269
3270        msg = seg1 + seg2
3271        self.assertEqual(msg, MSG)
3272
3273    def _testRecvmsgOverflow(self):
3274        self.sendToServer(MSG)
3275
3276
3277class RecvmsgTests(RecvmsgGenericTests):
3278    # Tests for recvmsg() which can use any socket type.
3279
3280    def testRecvmsgBadArgs(self):
3281        # Check that recvmsg() rejects invalid arguments.
3282        self.assertRaises(TypeError, self.serv_sock.recvmsg)
3283        self.assertRaises(ValueError, self.serv_sock.recvmsg,
3284                          -1, 0, 0)
3285        self.assertRaises(ValueError, self.serv_sock.recvmsg,
3286                          len(MSG), -1, 0)
3287        self.assertRaises(TypeError, self.serv_sock.recvmsg,
3288                          [bytearray(10)], 0, 0)
3289        self.assertRaises(TypeError, self.serv_sock.recvmsg,
3290                          object(), 0, 0)
3291        self.assertRaises(TypeError, self.serv_sock.recvmsg,
3292                          len(MSG), object(), 0)
3293        self.assertRaises(TypeError, self.serv_sock.recvmsg,
3294                          len(MSG), 0, object())
3295
3296        msg, ancdata, flags, addr = self.serv_sock.recvmsg(len(MSG), 0, 0)
3297        self.assertEqual(msg, MSG)
3298        self.checkRecvmsgAddress(addr, self.cli_addr)
3299        self.assertEqual(ancdata, [])
3300        self.checkFlags(flags, eor=True)
3301
3302    def _testRecvmsgBadArgs(self):
3303        self.sendToServer(MSG)
3304
3305
3306class RecvmsgIntoTests(RecvmsgIntoMixin, RecvmsgGenericTests):
3307    # Tests for recvmsg_into() which can use any socket type.
3308
3309    def testRecvmsgIntoBadArgs(self):
3310        # Check that recvmsg_into() rejects invalid arguments.
3311        buf = bytearray(len(MSG))
3312        self.assertRaises(TypeError, self.serv_sock.recvmsg_into)
3313        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3314                          len(MSG), 0, 0)
3315        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3316                          buf, 0, 0)
3317        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3318                          [object()], 0, 0)
3319        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3320                          [b"I'm not writable"], 0, 0)
3321        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3322                          [buf, object()], 0, 0)
3323        self.assertRaises(ValueError, self.serv_sock.recvmsg_into,
3324                          [buf], -1, 0)
3325        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3326                          [buf], object(), 0)
3327        self.assertRaises(TypeError, self.serv_sock.recvmsg_into,
3328                          [buf], 0, object())
3329
3330        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf], 0, 0)
3331        self.assertEqual(nbytes, len(MSG))
3332        self.assertEqual(buf, bytearray(MSG))
3333        self.checkRecvmsgAddress(addr, self.cli_addr)
3334        self.assertEqual(ancdata, [])
3335        self.checkFlags(flags, eor=True)
3336
3337    def _testRecvmsgIntoBadArgs(self):
3338        self.sendToServer(MSG)
3339
3340    def testRecvmsgIntoGenerator(self):
3341        # Receive into buffer obtained from a generator (not a sequence).
3342        buf = bytearray(len(MSG))
3343        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into(
3344            (o for o in [buf]))
3345        self.assertEqual(nbytes, len(MSG))
3346        self.assertEqual(buf, bytearray(MSG))
3347        self.checkRecvmsgAddress(addr, self.cli_addr)
3348        self.assertEqual(ancdata, [])
3349        self.checkFlags(flags, eor=True)
3350
3351    def _testRecvmsgIntoGenerator(self):
3352        self.sendToServer(MSG)
3353
3354    def testRecvmsgIntoArray(self):
3355        # Receive into an array rather than the usual bytearray.
3356        buf = array.array("B", [0] * len(MSG))
3357        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into([buf])
3358        self.assertEqual(nbytes, len(MSG))
3359        self.assertEqual(buf.tobytes(), MSG)
3360        self.checkRecvmsgAddress(addr, self.cli_addr)
3361        self.assertEqual(ancdata, [])
3362        self.checkFlags(flags, eor=True)
3363
3364    def _testRecvmsgIntoArray(self):
3365        self.sendToServer(MSG)
3366
3367    def testRecvmsgIntoScatter(self):
3368        # Receive into multiple buffers (scatter write).
3369        b1 = bytearray(b"----")
3370        b2 = bytearray(b"0123456789")
3371        b3 = bytearray(b"--------------")
3372        nbytes, ancdata, flags, addr = self.serv_sock.recvmsg_into(
3373            [b1, memoryview(b2)[2:9], b3])
3374        self.assertEqual(nbytes, len(b"Mary had a little lamb"))
3375        self.assertEqual(b1, bytearray(b"Mary"))
3376        self.assertEqual(b2, bytearray(b"01 had a 9"))
3377        self.assertEqual(b3, bytearray(b"little lamb---"))
3378        self.checkRecvmsgAddress(addr, self.cli_addr)
3379        self.assertEqual(ancdata, [])
3380        self.checkFlags(flags, eor=True)
3381
3382    def _testRecvmsgIntoScatter(self):
3383        self.sendToServer(b"Mary had a little lamb")
3384
3385
3386class CmsgMacroTests(unittest.TestCase):
3387    # Test the functions CMSG_LEN() and CMSG_SPACE().  Tests
3388    # assumptions used by sendmsg() and recvmsg[_into](), which share
3389    # code with these functions.
3390
3391    # Match the definition in socketmodule.c
3392    try:
3393        import _testcapi
3394    except ImportError:
3395        socklen_t_limit = 0x7fffffff
3396    else:
3397        socklen_t_limit = min(0x7fffffff, _testcapi.INT_MAX)
3398
3399    @requireAttrs(socket, "CMSG_LEN")
3400    def testCMSG_LEN(self):
3401        # Test CMSG_LEN() with various valid and invalid values,
3402        # checking the assumptions used by recvmsg() and sendmsg().
3403        toobig = self.socklen_t_limit - socket.CMSG_LEN(0) + 1
3404        values = list(range(257)) + list(range(toobig - 257, toobig))
3405
3406        # struct cmsghdr has at least three members, two of which are ints
3407        self.assertGreater(socket.CMSG_LEN(0), array.array("i").itemsize * 2)
3408        for n in values:
3409            ret = socket.CMSG_LEN(n)
3410            # This is how recvmsg() calculates the data size
3411            self.assertEqual(ret - socket.CMSG_LEN(0), n)
3412            self.assertLessEqual(ret, self.socklen_t_limit)
3413
3414        self.assertRaises(OverflowError, socket.CMSG_LEN, -1)
3415        # sendmsg() shares code with these functions, and requires
3416        # that it reject values over the limit.
3417        self.assertRaises(OverflowError, socket.CMSG_LEN, toobig)
3418        self.assertRaises(OverflowError, socket.CMSG_LEN, sys.maxsize)
3419
3420    @requireAttrs(socket, "CMSG_SPACE")
3421    def testCMSG_SPACE(self):
3422        # Test CMSG_SPACE() with various valid and invalid values,
3423        # checking the assumptions used by sendmsg().
3424        toobig = self.socklen_t_limit - socket.CMSG_SPACE(1) + 1
3425        values = list(range(257)) + list(range(toobig - 257, toobig))
3426
3427        last = socket.CMSG_SPACE(0)
3428        # struct cmsghdr has at least three members, two of which are ints
3429        self.assertGreater(last, array.array("i").itemsize * 2)
3430        for n in values:
3431            ret = socket.CMSG_SPACE(n)
3432            self.assertGreaterEqual(ret, last)
3433            self.assertGreaterEqual(ret, socket.CMSG_LEN(n))
3434            self.assertGreaterEqual(ret, n + socket.CMSG_LEN(0))
3435            self.assertLessEqual(ret, self.socklen_t_limit)
3436            last = ret
3437
3438        self.assertRaises(OverflowError, socket.CMSG_SPACE, -1)
3439        # sendmsg() shares code with these functions, and requires
3440        # that it reject values over the limit.
3441        self.assertRaises(OverflowError, socket.CMSG_SPACE, toobig)
3442        self.assertRaises(OverflowError, socket.CMSG_SPACE, sys.maxsize)
3443
3444
3445class SCMRightsTest(SendrecvmsgServerTimeoutBase):
3446    # Tests for file descriptor passing on Unix-domain sockets.
3447
3448    # Invalid file descriptor value that's unlikely to evaluate to a
3449    # real FD even if one of its bytes is replaced with a different
3450    # value (which shouldn't actually happen).
3451    badfd = -0x5555
3452
3453    def newFDs(self, n):
3454        # Return a list of n file descriptors for newly-created files
3455        # containing their list indices as ASCII numbers.
3456        fds = []
3457        for i in range(n):
3458            fd, path = tempfile.mkstemp()
3459            self.addCleanup(os.unlink, path)
3460            self.addCleanup(os.close, fd)
3461            os.write(fd, str(i).encode())
3462            fds.append(fd)
3463        return fds
3464
3465    def checkFDs(self, fds):
3466        # Check that the file descriptors in the given list contain
3467        # their correct list indices as ASCII numbers.
3468        for n, fd in enumerate(fds):
3469            os.lseek(fd, 0, os.SEEK_SET)
3470            self.assertEqual(os.read(fd, 1024), str(n).encode())
3471
3472    def registerRecvmsgResult(self, result):
3473        self.addCleanup(self.closeRecvmsgFDs, result)
3474
3475    def closeRecvmsgFDs(self, recvmsg_result):
3476        # Close all file descriptors specified in the ancillary data
3477        # of the given return value from recvmsg() or recvmsg_into().
3478        for cmsg_level, cmsg_type, cmsg_data in recvmsg_result[1]:
3479            if (cmsg_level == socket.SOL_SOCKET and
3480                    cmsg_type == socket.SCM_RIGHTS):
3481                fds = array.array("i")
3482                fds.frombytes(cmsg_data[:
3483                        len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
3484                for fd in fds:
3485                    os.close(fd)
3486
3487    def createAndSendFDs(self, n):
3488        # Send n new file descriptors created by newFDs() to the
3489        # server, with the constant MSG as the non-ancillary data.
3490        self.assertEqual(
3491            self.sendmsgToServer([MSG],
3492                                 [(socket.SOL_SOCKET,
3493                                   socket.SCM_RIGHTS,
3494                                   array.array("i", self.newFDs(n)))]),
3495            len(MSG))
3496
3497    def checkRecvmsgFDs(self, numfds, result, maxcmsgs=1, ignoreflags=0):
3498        # Check that constant MSG was received with numfds file
3499        # descriptors in a maximum of maxcmsgs control messages (which
3500        # must contain only complete integers).  By default, check
3501        # that MSG_CTRUNC is unset, but ignore any flags in
3502        # ignoreflags.
3503        msg, ancdata, flags, addr = result
3504        self.assertEqual(msg, MSG)
3505        self.checkRecvmsgAddress(addr, self.cli_addr)
3506        self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
3507                        ignore=ignoreflags)
3508
3509        self.assertIsInstance(ancdata, list)
3510        self.assertLessEqual(len(ancdata), maxcmsgs)
3511        fds = array.array("i")
3512        for item in ancdata:
3513            self.assertIsInstance(item, tuple)
3514            cmsg_level, cmsg_type, cmsg_data = item
3515            self.assertEqual(cmsg_level, socket.SOL_SOCKET)
3516            self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
3517            self.assertIsInstance(cmsg_data, bytes)
3518            self.assertEqual(len(cmsg_data) % SIZEOF_INT, 0)
3519            fds.frombytes(cmsg_data)
3520
3521        self.assertEqual(len(fds), numfds)
3522        self.checkFDs(fds)
3523
3524    def testFDPassSimple(self):
3525        # Pass a single FD (array read from bytes object).
3526        self.checkRecvmsgFDs(1, self.doRecvmsg(self.serv_sock,
3527                                               len(MSG), 10240))
3528
3529    def _testFDPassSimple(self):
3530        self.assertEqual(
3531            self.sendmsgToServer(
3532                [MSG],
3533                [(socket.SOL_SOCKET,
3534                  socket.SCM_RIGHTS,
3535                  array.array("i", self.newFDs(1)).tobytes())]),
3536            len(MSG))
3537
3538    def testMultipleFDPass(self):
3539        # Pass multiple FDs in a single array.
3540        self.checkRecvmsgFDs(4, self.doRecvmsg(self.serv_sock,
3541                                               len(MSG), 10240))
3542
3543    def _testMultipleFDPass(self):
3544        self.createAndSendFDs(4)
3545
3546    @requireAttrs(socket, "CMSG_SPACE")
3547    def testFDPassCMSG_SPACE(self):
3548        # Test using CMSG_SPACE() to calculate ancillary buffer size.
3549        self.checkRecvmsgFDs(
3550            4, self.doRecvmsg(self.serv_sock, len(MSG),
3551                              socket.CMSG_SPACE(4 * SIZEOF_INT)))
3552
3553    @testFDPassCMSG_SPACE.client_skip
3554    def _testFDPassCMSG_SPACE(self):
3555        self.createAndSendFDs(4)
3556
3557    def testFDPassCMSG_LEN(self):
3558        # Test using CMSG_LEN() to calculate ancillary buffer size.
3559        self.checkRecvmsgFDs(1,
3560                             self.doRecvmsg(self.serv_sock, len(MSG),
3561                                            socket.CMSG_LEN(4 * SIZEOF_INT)),
3562                             # RFC 3542 says implementations may set
3563                             # MSG_CTRUNC if there isn't enough space
3564                             # for trailing padding.
3565                             ignoreflags=socket.MSG_CTRUNC)
3566
3567    def _testFDPassCMSG_LEN(self):
3568        self.createAndSendFDs(1)
3569
3570    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
3571    @unittest.skipIf(AIX, "skipping, see issue #22397")
3572    @requireAttrs(socket, "CMSG_SPACE")
3573    def testFDPassSeparate(self):
3574        # Pass two FDs in two separate arrays.  Arrays may be combined
3575        # into a single control message by the OS.
3576        self.checkRecvmsgFDs(2,
3577                             self.doRecvmsg(self.serv_sock, len(MSG), 10240),
3578                             maxcmsgs=2)
3579
3580    @testFDPassSeparate.client_skip
3581    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
3582    @unittest.skipIf(AIX, "skipping, see issue #22397")
3583    def _testFDPassSeparate(self):
3584        fd0, fd1 = self.newFDs(2)
3585        self.assertEqual(
3586            self.sendmsgToServer([MSG], [(socket.SOL_SOCKET,
3587                                          socket.SCM_RIGHTS,
3588                                          array.array("i", [fd0])),
3589                                         (socket.SOL_SOCKET,
3590                                          socket.SCM_RIGHTS,
3591                                          array.array("i", [fd1]))]),
3592            len(MSG))
3593
3594    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
3595    @unittest.skipIf(AIX, "skipping, see issue #22397")
3596    @requireAttrs(socket, "CMSG_SPACE")
3597    def testFDPassSeparateMinSpace(self):
3598        # Pass two FDs in two separate arrays, receiving them into the
3599        # minimum space for two arrays.
3600        num_fds = 2
3601        self.checkRecvmsgFDs(num_fds,
3602                             self.doRecvmsg(self.serv_sock, len(MSG),
3603                                            socket.CMSG_SPACE(SIZEOF_INT) +
3604                                            socket.CMSG_LEN(SIZEOF_INT * num_fds)),
3605                             maxcmsgs=2, ignoreflags=socket.MSG_CTRUNC)
3606
3607    @testFDPassSeparateMinSpace.client_skip
3608    @unittest.skipIf(sys.platform == "darwin", "skipping, see issue #12958")
3609    @unittest.skipIf(AIX, "skipping, see issue #22397")
3610    def _testFDPassSeparateMinSpace(self):
3611        fd0, fd1 = self.newFDs(2)
3612        self.assertEqual(
3613            self.sendmsgToServer([MSG], [(socket.SOL_SOCKET,
3614                                          socket.SCM_RIGHTS,
3615                                          array.array("i", [fd0])),
3616                                         (socket.SOL_SOCKET,
3617                                          socket.SCM_RIGHTS,
3618                                          array.array("i", [fd1]))]),
3619            len(MSG))
3620
3621    def sendAncillaryIfPossible(self, msg, ancdata):
3622        # Try to send msg and ancdata to server, but if the system
3623        # call fails, just send msg with no ancillary data.
3624        try:
3625            nbytes = self.sendmsgToServer([msg], ancdata)
3626        except OSError as e:
3627            # Check that it was the system call that failed
3628            self.assertIsInstance(e.errno, int)
3629            nbytes = self.sendmsgToServer([msg])
3630        self.assertEqual(nbytes, len(msg))
3631
3632    @unittest.skipIf(sys.platform == "darwin", "see issue #24725")
3633    def testFDPassEmpty(self):
3634        # Try to pass an empty FD array.  Can receive either no array
3635        # or an empty array.
3636        self.checkRecvmsgFDs(0, self.doRecvmsg(self.serv_sock,
3637                                               len(MSG), 10240),
3638                             ignoreflags=socket.MSG_CTRUNC)
3639
3640    def _testFDPassEmpty(self):
3641        self.sendAncillaryIfPossible(MSG, [(socket.SOL_SOCKET,
3642                                            socket.SCM_RIGHTS,
3643                                            b"")])
3644
3645    def testFDPassPartialInt(self):
3646        # Try to pass a truncated FD array.
3647        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3648                                                   len(MSG), 10240)
3649        self.assertEqual(msg, MSG)
3650        self.checkRecvmsgAddress(addr, self.cli_addr)
3651        self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC)
3652        self.assertLessEqual(len(ancdata), 1)
3653        for cmsg_level, cmsg_type, cmsg_data in ancdata:
3654            self.assertEqual(cmsg_level, socket.SOL_SOCKET)
3655            self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
3656            self.assertLess(len(cmsg_data), SIZEOF_INT)
3657
3658    def _testFDPassPartialInt(self):
3659        self.sendAncillaryIfPossible(
3660            MSG,
3661            [(socket.SOL_SOCKET,
3662              socket.SCM_RIGHTS,
3663              array.array("i", [self.badfd]).tobytes()[:-1])])
3664
3665    @requireAttrs(socket, "CMSG_SPACE")
3666    def testFDPassPartialIntInMiddle(self):
3667        # Try to pass two FD arrays, the first of which is truncated.
3668        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3669                                                   len(MSG), 10240)
3670        self.assertEqual(msg, MSG)
3671        self.checkRecvmsgAddress(addr, self.cli_addr)
3672        self.checkFlags(flags, eor=True, ignore=socket.MSG_CTRUNC)
3673        self.assertLessEqual(len(ancdata), 2)
3674        fds = array.array("i")
3675        # Arrays may have been combined in a single control message
3676        for cmsg_level, cmsg_type, cmsg_data in ancdata:
3677            self.assertEqual(cmsg_level, socket.SOL_SOCKET)
3678            self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
3679            fds.frombytes(cmsg_data[:
3680                    len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
3681        self.assertLessEqual(len(fds), 2)
3682        self.checkFDs(fds)
3683
3684    @testFDPassPartialIntInMiddle.client_skip
3685    def _testFDPassPartialIntInMiddle(self):
3686        fd0, fd1 = self.newFDs(2)
3687        self.sendAncillaryIfPossible(
3688            MSG,
3689            [(socket.SOL_SOCKET,
3690              socket.SCM_RIGHTS,
3691              array.array("i", [fd0, self.badfd]).tobytes()[:-1]),
3692             (socket.SOL_SOCKET,
3693              socket.SCM_RIGHTS,
3694              array.array("i", [fd1]))])
3695
3696    def checkTruncatedHeader(self, result, ignoreflags=0):
3697        # Check that no ancillary data items are returned when data is
3698        # truncated inside the cmsghdr structure.
3699        msg, ancdata, flags, addr = result
3700        self.assertEqual(msg, MSG)
3701        self.checkRecvmsgAddress(addr, self.cli_addr)
3702        self.assertEqual(ancdata, [])
3703        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
3704                        ignore=ignoreflags)
3705
3706    def testCmsgTruncNoBufSize(self):
3707        # Check that no ancillary data is received when no buffer size
3708        # is specified.
3709        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG)),
3710                                  # BSD seems to set MSG_CTRUNC only
3711                                  # if an item has been partially
3712                                  # received.
3713                                  ignoreflags=socket.MSG_CTRUNC)
3714
3715    def _testCmsgTruncNoBufSize(self):
3716        self.createAndSendFDs(1)
3717
3718    def testCmsgTrunc0(self):
3719        # Check that no ancillary data is received when buffer size is 0.
3720        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 0),
3721                                  ignoreflags=socket.MSG_CTRUNC)
3722
3723    def _testCmsgTrunc0(self):
3724        self.createAndSendFDs(1)
3725
3726    # Check that no ancillary data is returned for various non-zero
3727    # (but still too small) buffer sizes.
3728
3729    def testCmsgTrunc1(self):
3730        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG), 1))
3731
3732    def _testCmsgTrunc1(self):
3733        self.createAndSendFDs(1)
3734
3735    def testCmsgTrunc2Int(self):
3736        # The cmsghdr structure has at least three members, two of
3737        # which are ints, so we still shouldn't see any ancillary
3738        # data.
3739        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG),
3740                                                 SIZEOF_INT * 2))
3741
3742    def _testCmsgTrunc2Int(self):
3743        self.createAndSendFDs(1)
3744
3745    def testCmsgTruncLen0Minus1(self):
3746        self.checkTruncatedHeader(self.doRecvmsg(self.serv_sock, len(MSG),
3747                                                 socket.CMSG_LEN(0) - 1))
3748
3749    def _testCmsgTruncLen0Minus1(self):
3750        self.createAndSendFDs(1)
3751
3752    # The following tests try to truncate the control message in the
3753    # middle of the FD array.
3754
3755    def checkTruncatedArray(self, ancbuf, maxdata, mindata=0):
3756        # Check that file descriptor data is truncated to between
3757        # mindata and maxdata bytes when received with buffer size
3758        # ancbuf, and that any complete file descriptor numbers are
3759        # valid.
3760        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3761                                                   len(MSG), ancbuf)
3762        self.assertEqual(msg, MSG)
3763        self.checkRecvmsgAddress(addr, self.cli_addr)
3764        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
3765
3766        if mindata == 0 and ancdata == []:
3767            return
3768        self.assertEqual(len(ancdata), 1)
3769        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
3770        self.assertEqual(cmsg_level, socket.SOL_SOCKET)
3771        self.assertEqual(cmsg_type, socket.SCM_RIGHTS)
3772        self.assertGreaterEqual(len(cmsg_data), mindata)
3773        self.assertLessEqual(len(cmsg_data), maxdata)
3774        fds = array.array("i")
3775        fds.frombytes(cmsg_data[:
3776                len(cmsg_data) - (len(cmsg_data) % fds.itemsize)])
3777        self.checkFDs(fds)
3778
3779    def testCmsgTruncLen0(self):
3780        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0), maxdata=0)
3781
3782    def _testCmsgTruncLen0(self):
3783        self.createAndSendFDs(1)
3784
3785    def testCmsgTruncLen0Plus1(self):
3786        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(0) + 1, maxdata=1)
3787
3788    def _testCmsgTruncLen0Plus1(self):
3789        self.createAndSendFDs(2)
3790
3791    def testCmsgTruncLen1(self):
3792        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(SIZEOF_INT),
3793                                 maxdata=SIZEOF_INT)
3794
3795    def _testCmsgTruncLen1(self):
3796        self.createAndSendFDs(2)
3797
3798    def testCmsgTruncLen2Minus1(self):
3799        self.checkTruncatedArray(ancbuf=socket.CMSG_LEN(2 * SIZEOF_INT) - 1,
3800                                 maxdata=(2 * SIZEOF_INT) - 1)
3801
3802    def _testCmsgTruncLen2Minus1(self):
3803        self.createAndSendFDs(2)
3804
3805
3806class RFC3542AncillaryTest(SendrecvmsgServerTimeoutBase):
3807    # Test sendmsg() and recvmsg[_into]() using the ancillary data
3808    # features of the RFC 3542 Advanced Sockets API for IPv6.
3809    # Currently we can only handle certain data items (e.g. traffic
3810    # class, hop limit, MTU discovery and fragmentation settings)
3811    # without resorting to unportable means such as the struct module,
3812    # but the tests here are aimed at testing the ancillary data
3813    # handling in sendmsg() and recvmsg() rather than the IPv6 API
3814    # itself.
3815
3816    # Test value to use when setting hop limit of packet
3817    hop_limit = 2
3818
3819    # Test value to use when setting traffic class of packet.
3820    # -1 means "use kernel default".
3821    traffic_class = -1
3822
3823    def ancillaryMapping(self, ancdata):
3824        # Given ancillary data list ancdata, return a mapping from
3825        # pairs (cmsg_level, cmsg_type) to corresponding cmsg_data.
3826        # Check that no (level, type) pair appears more than once.
3827        d = {}
3828        for cmsg_level, cmsg_type, cmsg_data in ancdata:
3829            self.assertNotIn((cmsg_level, cmsg_type), d)
3830            d[(cmsg_level, cmsg_type)] = cmsg_data
3831        return d
3832
3833    def checkHopLimit(self, ancbufsize, maxhop=255, ignoreflags=0):
3834        # Receive hop limit into ancbufsize bytes of ancillary data
3835        # space.  Check that data is MSG, ancillary data is not
3836        # truncated (but ignore any flags in ignoreflags), and hop
3837        # limit is between 0 and maxhop inclusive.
3838        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3839                                  socket.IPV6_RECVHOPLIMIT, 1)
3840        self.misc_event.set()
3841        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3842                                                   len(MSG), ancbufsize)
3843
3844        self.assertEqual(msg, MSG)
3845        self.checkRecvmsgAddress(addr, self.cli_addr)
3846        self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
3847                        ignore=ignoreflags)
3848
3849        self.assertEqual(len(ancdata), 1)
3850        self.assertIsInstance(ancdata[0], tuple)
3851        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
3852        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
3853        self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT)
3854        self.assertIsInstance(cmsg_data, bytes)
3855        self.assertEqual(len(cmsg_data), SIZEOF_INT)
3856        a = array.array("i")
3857        a.frombytes(cmsg_data)
3858        self.assertGreaterEqual(a[0], 0)
3859        self.assertLessEqual(a[0], maxhop)
3860
3861    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3862    def testRecvHopLimit(self):
3863        # Test receiving the packet hop limit as ancillary data.
3864        self.checkHopLimit(ancbufsize=10240)
3865
3866    @testRecvHopLimit.client_skip
3867    def _testRecvHopLimit(self):
3868        # Need to wait until server has asked to receive ancillary
3869        # data, as implementations are not required to buffer it
3870        # otherwise.
3871        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3872        self.sendToServer(MSG)
3873
3874    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3875    def testRecvHopLimitCMSG_SPACE(self):
3876        # Test receiving hop limit, using CMSG_SPACE to calculate buffer size.
3877        self.checkHopLimit(ancbufsize=socket.CMSG_SPACE(SIZEOF_INT))
3878
3879    @testRecvHopLimitCMSG_SPACE.client_skip
3880    def _testRecvHopLimitCMSG_SPACE(self):
3881        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3882        self.sendToServer(MSG)
3883
3884    # Could test receiving into buffer sized using CMSG_LEN, but RFC
3885    # 3542 says portable applications must provide space for trailing
3886    # padding.  Implementations may set MSG_CTRUNC if there isn't
3887    # enough space for the padding.
3888
3889    @requireAttrs(socket.socket, "sendmsg")
3890    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
3891    def testSetHopLimit(self):
3892        # Test setting hop limit on outgoing packet and receiving it
3893        # at the other end.
3894        self.checkHopLimit(ancbufsize=10240, maxhop=self.hop_limit)
3895
3896    @testSetHopLimit.client_skip
3897    def _testSetHopLimit(self):
3898        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3899        self.assertEqual(
3900            self.sendmsgToServer([MSG],
3901                                 [(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3902                                   array.array("i", [self.hop_limit]))]),
3903            len(MSG))
3904
3905    def checkTrafficClassAndHopLimit(self, ancbufsize, maxhop=255,
3906                                     ignoreflags=0):
3907        # Receive traffic class and hop limit into ancbufsize bytes of
3908        # ancillary data space.  Check that data is MSG, ancillary
3909        # data is not truncated (but ignore any flags in ignoreflags),
3910        # and traffic class and hop limit are in range (hop limit no
3911        # more than maxhop).
3912        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3913                                  socket.IPV6_RECVHOPLIMIT, 1)
3914        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
3915                                  socket.IPV6_RECVTCLASS, 1)
3916        self.misc_event.set()
3917        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
3918                                                   len(MSG), ancbufsize)
3919
3920        self.assertEqual(msg, MSG)
3921        self.checkRecvmsgAddress(addr, self.cli_addr)
3922        self.checkFlags(flags, eor=True, checkunset=socket.MSG_CTRUNC,
3923                        ignore=ignoreflags)
3924        self.assertEqual(len(ancdata), 2)
3925        ancmap = self.ancillaryMapping(ancdata)
3926
3927        tcdata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_TCLASS)]
3928        self.assertEqual(len(tcdata), SIZEOF_INT)
3929        a = array.array("i")
3930        a.frombytes(tcdata)
3931        self.assertGreaterEqual(a[0], 0)
3932        self.assertLessEqual(a[0], 255)
3933
3934        hldata = ancmap[(socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT)]
3935        self.assertEqual(len(hldata), SIZEOF_INT)
3936        a = array.array("i")
3937        a.frombytes(hldata)
3938        self.assertGreaterEqual(a[0], 0)
3939        self.assertLessEqual(a[0], maxhop)
3940
3941    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3942                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3943    def testRecvTrafficClassAndHopLimit(self):
3944        # Test receiving traffic class and hop limit as ancillary data.
3945        self.checkTrafficClassAndHopLimit(ancbufsize=10240)
3946
3947    @testRecvTrafficClassAndHopLimit.client_skip
3948    def _testRecvTrafficClassAndHopLimit(self):
3949        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3950        self.sendToServer(MSG)
3951
3952    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3953                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3954    def testRecvTrafficClassAndHopLimitCMSG_SPACE(self):
3955        # Test receiving traffic class and hop limit, using
3956        # CMSG_SPACE() to calculate buffer size.
3957        self.checkTrafficClassAndHopLimit(
3958            ancbufsize=socket.CMSG_SPACE(SIZEOF_INT) * 2)
3959
3960    @testRecvTrafficClassAndHopLimitCMSG_SPACE.client_skip
3961    def _testRecvTrafficClassAndHopLimitCMSG_SPACE(self):
3962        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3963        self.sendToServer(MSG)
3964
3965    @requireAttrs(socket.socket, "sendmsg")
3966    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3967                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3968    def testSetTrafficClassAndHopLimit(self):
3969        # Test setting traffic class and hop limit on outgoing packet,
3970        # and receiving them at the other end.
3971        self.checkTrafficClassAndHopLimit(ancbufsize=10240,
3972                                          maxhop=self.hop_limit)
3973
3974    @testSetTrafficClassAndHopLimit.client_skip
3975    def _testSetTrafficClassAndHopLimit(self):
3976        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3977        self.assertEqual(
3978            self.sendmsgToServer([MSG],
3979                                 [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
3980                                   array.array("i", [self.traffic_class])),
3981                                  (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
3982                                   array.array("i", [self.hop_limit]))]),
3983            len(MSG))
3984
3985    @requireAttrs(socket.socket, "sendmsg")
3986    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
3987                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
3988    def testOddCmsgSize(self):
3989        # Try to send ancillary data with first item one byte too
3990        # long.  Fall back to sending with correct size if this fails,
3991        # and check that second item was handled correctly.
3992        self.checkTrafficClassAndHopLimit(ancbufsize=10240,
3993                                          maxhop=self.hop_limit)
3994
3995    @testOddCmsgSize.client_skip
3996    def _testOddCmsgSize(self):
3997        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
3998        try:
3999            nbytes = self.sendmsgToServer(
4000                [MSG],
4001                [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
4002                  array.array("i", [self.traffic_class]).tobytes() + b"\x00"),
4003                 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
4004                  array.array("i", [self.hop_limit]))])
4005        except OSError as e:
4006            self.assertIsInstance(e.errno, int)
4007            nbytes = self.sendmsgToServer(
4008                [MSG],
4009                [(socket.IPPROTO_IPV6, socket.IPV6_TCLASS,
4010                  array.array("i", [self.traffic_class])),
4011                 (socket.IPPROTO_IPV6, socket.IPV6_HOPLIMIT,
4012                  array.array("i", [self.hop_limit]))])
4013            self.assertEqual(nbytes, len(MSG))
4014
4015    # Tests for proper handling of truncated ancillary data
4016
4017    def checkHopLimitTruncatedHeader(self, ancbufsize, ignoreflags=0):
4018        # Receive hop limit into ancbufsize bytes of ancillary data
4019        # space, which should be too small to contain the ancillary
4020        # data header (if ancbufsize is None, pass no second argument
4021        # to recvmsg()).  Check that data is MSG, MSG_CTRUNC is set
4022        # (unless included in ignoreflags), and no ancillary data is
4023        # returned.
4024        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
4025                                  socket.IPV6_RECVHOPLIMIT, 1)
4026        self.misc_event.set()
4027        args = () if ancbufsize is None else (ancbufsize,)
4028        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
4029                                                   len(MSG), *args)
4030
4031        self.assertEqual(msg, MSG)
4032        self.checkRecvmsgAddress(addr, self.cli_addr)
4033        self.assertEqual(ancdata, [])
4034        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
4035                        ignore=ignoreflags)
4036
4037    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
4038    def testCmsgTruncNoBufSize(self):
4039        # Check that no ancillary data is received when no ancillary
4040        # buffer size is provided.
4041        self.checkHopLimitTruncatedHeader(ancbufsize=None,
4042                                          # BSD seems to set
4043                                          # MSG_CTRUNC only if an item
4044                                          # has been partially
4045                                          # received.
4046                                          ignoreflags=socket.MSG_CTRUNC)
4047
4048    @testCmsgTruncNoBufSize.client_skip
4049    def _testCmsgTruncNoBufSize(self):
4050        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4051        self.sendToServer(MSG)
4052
4053    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
4054    def testSingleCmsgTrunc0(self):
4055        # Check that no ancillary data is received when ancillary
4056        # buffer size is zero.
4057        self.checkHopLimitTruncatedHeader(ancbufsize=0,
4058                                          ignoreflags=socket.MSG_CTRUNC)
4059
4060    @testSingleCmsgTrunc0.client_skip
4061    def _testSingleCmsgTrunc0(self):
4062        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4063        self.sendToServer(MSG)
4064
4065    # Check that no ancillary data is returned for various non-zero
4066    # (but still too small) buffer sizes.
4067
4068    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
4069    def testSingleCmsgTrunc1(self):
4070        self.checkHopLimitTruncatedHeader(ancbufsize=1)
4071
4072    @testSingleCmsgTrunc1.client_skip
4073    def _testSingleCmsgTrunc1(self):
4074        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4075        self.sendToServer(MSG)
4076
4077    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
4078    def testSingleCmsgTrunc2Int(self):
4079        self.checkHopLimitTruncatedHeader(ancbufsize=2 * SIZEOF_INT)
4080
4081    @testSingleCmsgTrunc2Int.client_skip
4082    def _testSingleCmsgTrunc2Int(self):
4083        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4084        self.sendToServer(MSG)
4085
4086    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
4087    def testSingleCmsgTruncLen0Minus1(self):
4088        self.checkHopLimitTruncatedHeader(ancbufsize=socket.CMSG_LEN(0) - 1)
4089
4090    @testSingleCmsgTruncLen0Minus1.client_skip
4091    def _testSingleCmsgTruncLen0Minus1(self):
4092        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4093        self.sendToServer(MSG)
4094
4095    @requireAttrs(socket, "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT")
4096    def testSingleCmsgTruncInData(self):
4097        # Test truncation of a control message inside its associated
4098        # data.  The message may be returned with its data truncated,
4099        # or not returned at all.
4100        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
4101                                  socket.IPV6_RECVHOPLIMIT, 1)
4102        self.misc_event.set()
4103        msg, ancdata, flags, addr = self.doRecvmsg(
4104            self.serv_sock, len(MSG), socket.CMSG_LEN(SIZEOF_INT) - 1)
4105
4106        self.assertEqual(msg, MSG)
4107        self.checkRecvmsgAddress(addr, self.cli_addr)
4108        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
4109
4110        self.assertLessEqual(len(ancdata), 1)
4111        if ancdata:
4112            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
4113            self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
4114            self.assertEqual(cmsg_type, socket.IPV6_HOPLIMIT)
4115            self.assertLess(len(cmsg_data), SIZEOF_INT)
4116
4117    @testSingleCmsgTruncInData.client_skip
4118    def _testSingleCmsgTruncInData(self):
4119        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4120        self.sendToServer(MSG)
4121
4122    def checkTruncatedSecondHeader(self, ancbufsize, ignoreflags=0):
4123        # Receive traffic class and hop limit into ancbufsize bytes of
4124        # ancillary data space, which should be large enough to
4125        # contain the first item, but too small to contain the header
4126        # of the second.  Check that data is MSG, MSG_CTRUNC is set
4127        # (unless included in ignoreflags), and only one ancillary
4128        # data item is returned.
4129        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
4130                                  socket.IPV6_RECVHOPLIMIT, 1)
4131        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
4132                                  socket.IPV6_RECVTCLASS, 1)
4133        self.misc_event.set()
4134        msg, ancdata, flags, addr = self.doRecvmsg(self.serv_sock,
4135                                                   len(MSG), ancbufsize)
4136
4137        self.assertEqual(msg, MSG)
4138        self.checkRecvmsgAddress(addr, self.cli_addr)
4139        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC,
4140                        ignore=ignoreflags)
4141
4142        self.assertEqual(len(ancdata), 1)
4143        cmsg_level, cmsg_type, cmsg_data = ancdata[0]
4144        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
4145        self.assertIn(cmsg_type, {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT})
4146        self.assertEqual(len(cmsg_data), SIZEOF_INT)
4147        a = array.array("i")
4148        a.frombytes(cmsg_data)
4149        self.assertGreaterEqual(a[0], 0)
4150        self.assertLessEqual(a[0], 255)
4151
4152    # Try the above test with various buffer sizes.
4153
4154    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
4155                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
4156    def testSecondCmsgTrunc0(self):
4157        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT),
4158                                        ignoreflags=socket.MSG_CTRUNC)
4159
4160    @testSecondCmsgTrunc0.client_skip
4161    def _testSecondCmsgTrunc0(self):
4162        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4163        self.sendToServer(MSG)
4164
4165    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
4166                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
4167    def testSecondCmsgTrunc1(self):
4168        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) + 1)
4169
4170    @testSecondCmsgTrunc1.client_skip
4171    def _testSecondCmsgTrunc1(self):
4172        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4173        self.sendToServer(MSG)
4174
4175    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
4176                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
4177    def testSecondCmsgTrunc2Int(self):
4178        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) +
4179                                        2 * SIZEOF_INT)
4180
4181    @testSecondCmsgTrunc2Int.client_skip
4182    def _testSecondCmsgTrunc2Int(self):
4183        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4184        self.sendToServer(MSG)
4185
4186    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
4187                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
4188    def testSecondCmsgTruncLen0Minus1(self):
4189        self.checkTruncatedSecondHeader(socket.CMSG_SPACE(SIZEOF_INT) +
4190                                        socket.CMSG_LEN(0) - 1)
4191
4192    @testSecondCmsgTruncLen0Minus1.client_skip
4193    def _testSecondCmsgTruncLen0Minus1(self):
4194        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4195        self.sendToServer(MSG)
4196
4197    @requireAttrs(socket, "CMSG_SPACE", "IPV6_RECVHOPLIMIT", "IPV6_HOPLIMIT",
4198                  "IPV6_RECVTCLASS", "IPV6_TCLASS")
4199    def testSecondCmsgTruncInData(self):
4200        # Test truncation of the second of two control messages inside
4201        # its associated data.
4202        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
4203                                  socket.IPV6_RECVHOPLIMIT, 1)
4204        self.serv_sock.setsockopt(socket.IPPROTO_IPV6,
4205                                  socket.IPV6_RECVTCLASS, 1)
4206        self.misc_event.set()
4207        msg, ancdata, flags, addr = self.doRecvmsg(
4208            self.serv_sock, len(MSG),
4209            socket.CMSG_SPACE(SIZEOF_INT) + socket.CMSG_LEN(SIZEOF_INT) - 1)
4210
4211        self.assertEqual(msg, MSG)
4212        self.checkRecvmsgAddress(addr, self.cli_addr)
4213        self.checkFlags(flags, eor=True, checkset=socket.MSG_CTRUNC)
4214
4215        cmsg_types = {socket.IPV6_TCLASS, socket.IPV6_HOPLIMIT}
4216
4217        cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0)
4218        self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
4219        cmsg_types.remove(cmsg_type)
4220        self.assertEqual(len(cmsg_data), SIZEOF_INT)
4221        a = array.array("i")
4222        a.frombytes(cmsg_data)
4223        self.assertGreaterEqual(a[0], 0)
4224        self.assertLessEqual(a[0], 255)
4225
4226        if ancdata:
4227            cmsg_level, cmsg_type, cmsg_data = ancdata.pop(0)
4228            self.assertEqual(cmsg_level, socket.IPPROTO_IPV6)
4229            cmsg_types.remove(cmsg_type)
4230            self.assertLess(len(cmsg_data), SIZEOF_INT)
4231
4232        self.assertEqual(ancdata, [])
4233
4234    @testSecondCmsgTruncInData.client_skip
4235    def _testSecondCmsgTruncInData(self):
4236        self.assertTrue(self.misc_event.wait(timeout=self.fail_timeout))
4237        self.sendToServer(MSG)
4238
4239
4240# Derive concrete test classes for different socket types.
4241
4242class SendrecvmsgUDPTestBase(SendrecvmsgDgramFlagsBase,
4243                             SendrecvmsgConnectionlessBase,
4244                             ThreadedSocketTestMixin, UDPTestBase):
4245    pass
4246
4247@requireAttrs(socket.socket, "sendmsg")
4248class SendmsgUDPTest(SendmsgConnectionlessTests, SendrecvmsgUDPTestBase):
4249    pass
4250
4251@requireAttrs(socket.socket, "recvmsg")
4252class RecvmsgUDPTest(RecvmsgTests, SendrecvmsgUDPTestBase):
4253    pass
4254
4255@requireAttrs(socket.socket, "recvmsg_into")
4256class RecvmsgIntoUDPTest(RecvmsgIntoTests, SendrecvmsgUDPTestBase):
4257    pass
4258
4259
4260class SendrecvmsgUDP6TestBase(SendrecvmsgDgramFlagsBase,
4261                              SendrecvmsgConnectionlessBase,
4262                              ThreadedSocketTestMixin, UDP6TestBase):
4263
4264    def checkRecvmsgAddress(self, addr1, addr2):
4265        # Called to compare the received address with the address of
4266        # the peer, ignoring scope ID
4267        self.assertEqual(addr1[:-1], addr2[:-1])
4268
4269@requireAttrs(socket.socket, "sendmsg")
4270@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4271@requireSocket("AF_INET6", "SOCK_DGRAM")
4272class SendmsgUDP6Test(SendmsgConnectionlessTests, SendrecvmsgUDP6TestBase):
4273    pass
4274
4275@requireAttrs(socket.socket, "recvmsg")
4276@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4277@requireSocket("AF_INET6", "SOCK_DGRAM")
4278class RecvmsgUDP6Test(RecvmsgTests, SendrecvmsgUDP6TestBase):
4279    pass
4280
4281@requireAttrs(socket.socket, "recvmsg_into")
4282@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4283@requireSocket("AF_INET6", "SOCK_DGRAM")
4284class RecvmsgIntoUDP6Test(RecvmsgIntoTests, SendrecvmsgUDP6TestBase):
4285    pass
4286
4287@requireAttrs(socket.socket, "recvmsg")
4288@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4289@requireAttrs(socket, "IPPROTO_IPV6")
4290@requireSocket("AF_INET6", "SOCK_DGRAM")
4291class RecvmsgRFC3542AncillaryUDP6Test(RFC3542AncillaryTest,
4292                                      SendrecvmsgUDP6TestBase):
4293    pass
4294
4295@requireAttrs(socket.socket, "recvmsg_into")
4296@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4297@requireAttrs(socket, "IPPROTO_IPV6")
4298@requireSocket("AF_INET6", "SOCK_DGRAM")
4299class RecvmsgIntoRFC3542AncillaryUDP6Test(RecvmsgIntoMixin,
4300                                          RFC3542AncillaryTest,
4301                                          SendrecvmsgUDP6TestBase):
4302    pass
4303
4304
4305@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4306          'UDPLITE sockets required for this test.')
4307class SendrecvmsgUDPLITETestBase(SendrecvmsgDgramFlagsBase,
4308                             SendrecvmsgConnectionlessBase,
4309                             ThreadedSocketTestMixin, UDPLITETestBase):
4310    pass
4311
4312@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4313          'UDPLITE sockets required for this test.')
4314@requireAttrs(socket.socket, "sendmsg")
4315class SendmsgUDPLITETest(SendmsgConnectionlessTests, SendrecvmsgUDPLITETestBase):
4316    pass
4317
4318@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4319          'UDPLITE sockets required for this test.')
4320@requireAttrs(socket.socket, "recvmsg")
4321class RecvmsgUDPLITETest(RecvmsgTests, SendrecvmsgUDPLITETestBase):
4322    pass
4323
4324@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4325          'UDPLITE sockets required for this test.')
4326@requireAttrs(socket.socket, "recvmsg_into")
4327class RecvmsgIntoUDPLITETest(RecvmsgIntoTests, SendrecvmsgUDPLITETestBase):
4328    pass
4329
4330
4331@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4332          'UDPLITE sockets required for this test.')
4333class SendrecvmsgUDPLITE6TestBase(SendrecvmsgDgramFlagsBase,
4334                              SendrecvmsgConnectionlessBase,
4335                              ThreadedSocketTestMixin, UDPLITE6TestBase):
4336
4337    def checkRecvmsgAddress(self, addr1, addr2):
4338        # Called to compare the received address with the address of
4339        # the peer, ignoring scope ID
4340        self.assertEqual(addr1[:-1], addr2[:-1])
4341
4342@requireAttrs(socket.socket, "sendmsg")
4343@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4344@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4345          'UDPLITE sockets required for this test.')
4346@requireSocket("AF_INET6", "SOCK_DGRAM")
4347class SendmsgUDPLITE6Test(SendmsgConnectionlessTests, SendrecvmsgUDPLITE6TestBase):
4348    pass
4349
4350@requireAttrs(socket.socket, "recvmsg")
4351@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4352@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4353          'UDPLITE sockets required for this test.')
4354@requireSocket("AF_INET6", "SOCK_DGRAM")
4355class RecvmsgUDPLITE6Test(RecvmsgTests, SendrecvmsgUDPLITE6TestBase):
4356    pass
4357
4358@requireAttrs(socket.socket, "recvmsg_into")
4359@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4360@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4361          'UDPLITE sockets required for this test.')
4362@requireSocket("AF_INET6", "SOCK_DGRAM")
4363class RecvmsgIntoUDPLITE6Test(RecvmsgIntoTests, SendrecvmsgUDPLITE6TestBase):
4364    pass
4365
4366@requireAttrs(socket.socket, "recvmsg")
4367@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4368@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4369          'UDPLITE sockets required for this test.')
4370@requireAttrs(socket, "IPPROTO_IPV6")
4371@requireSocket("AF_INET6", "SOCK_DGRAM")
4372class RecvmsgRFC3542AncillaryUDPLITE6Test(RFC3542AncillaryTest,
4373                                      SendrecvmsgUDPLITE6TestBase):
4374    pass
4375
4376@requireAttrs(socket.socket, "recvmsg_into")
4377@unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test.')
4378@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
4379          'UDPLITE sockets required for this test.')
4380@requireAttrs(socket, "IPPROTO_IPV6")
4381@requireSocket("AF_INET6", "SOCK_DGRAM")
4382class RecvmsgIntoRFC3542AncillaryUDPLITE6Test(RecvmsgIntoMixin,
4383                                          RFC3542AncillaryTest,
4384                                          SendrecvmsgUDPLITE6TestBase):
4385    pass
4386
4387
4388class SendrecvmsgTCPTestBase(SendrecvmsgConnectedBase,
4389                             ConnectedStreamTestMixin, TCPTestBase):
4390    pass
4391
4392@requireAttrs(socket.socket, "sendmsg")
4393class SendmsgTCPTest(SendmsgStreamTests, SendrecvmsgTCPTestBase):
4394    pass
4395
4396@requireAttrs(socket.socket, "recvmsg")
4397class RecvmsgTCPTest(RecvmsgTests, RecvmsgGenericStreamTests,
4398                     SendrecvmsgTCPTestBase):
4399    pass
4400
4401@requireAttrs(socket.socket, "recvmsg_into")
4402class RecvmsgIntoTCPTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
4403                         SendrecvmsgTCPTestBase):
4404    pass
4405
4406
4407class SendrecvmsgSCTPStreamTestBase(SendrecvmsgSCTPFlagsBase,
4408                                    SendrecvmsgConnectedBase,
4409                                    ConnectedStreamTestMixin, SCTPStreamBase):
4410    pass
4411
4412@requireAttrs(socket.socket, "sendmsg")
4413@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX")
4414@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
4415class SendmsgSCTPStreamTest(SendmsgStreamTests, SendrecvmsgSCTPStreamTestBase):
4416    pass
4417
4418@requireAttrs(socket.socket, "recvmsg")
4419@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX")
4420@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
4421class RecvmsgSCTPStreamTest(RecvmsgTests, RecvmsgGenericStreamTests,
4422                            SendrecvmsgSCTPStreamTestBase):
4423
4424    def testRecvmsgEOF(self):
4425        try:
4426            super(RecvmsgSCTPStreamTest, self).testRecvmsgEOF()
4427        except OSError as e:
4428            if e.errno != errno.ENOTCONN:
4429                raise
4430            self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876")
4431
4432@requireAttrs(socket.socket, "recvmsg_into")
4433@unittest.skipIf(AIX, "IPPROTO_SCTP: [Errno 62] Protocol not supported on AIX")
4434@requireSocket("AF_INET", "SOCK_STREAM", "IPPROTO_SCTP")
4435class RecvmsgIntoSCTPStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
4436                                SendrecvmsgSCTPStreamTestBase):
4437
4438    def testRecvmsgEOF(self):
4439        try:
4440            super(RecvmsgIntoSCTPStreamTest, self).testRecvmsgEOF()
4441        except OSError as e:
4442            if e.errno != errno.ENOTCONN:
4443                raise
4444            self.skipTest("sporadic ENOTCONN (kernel issue?) - see issue #13876")
4445
4446
4447class SendrecvmsgUnixStreamTestBase(SendrecvmsgConnectedBase,
4448                                    ConnectedStreamTestMixin, UnixStreamBase):
4449    pass
4450
4451@requireAttrs(socket.socket, "sendmsg")
4452@requireAttrs(socket, "AF_UNIX")
4453class SendmsgUnixStreamTest(SendmsgStreamTests, SendrecvmsgUnixStreamTestBase):
4454    pass
4455
4456@requireAttrs(socket.socket, "recvmsg")
4457@requireAttrs(socket, "AF_UNIX")
4458class RecvmsgUnixStreamTest(RecvmsgTests, RecvmsgGenericStreamTests,
4459                            SendrecvmsgUnixStreamTestBase):
4460    pass
4461
4462@requireAttrs(socket.socket, "recvmsg_into")
4463@requireAttrs(socket, "AF_UNIX")
4464class RecvmsgIntoUnixStreamTest(RecvmsgIntoTests, RecvmsgGenericStreamTests,
4465                                SendrecvmsgUnixStreamTestBase):
4466    pass
4467
4468@requireAttrs(socket.socket, "sendmsg", "recvmsg")
4469@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS")
4470class RecvmsgSCMRightsStreamTest(SCMRightsTest, SendrecvmsgUnixStreamTestBase):
4471    pass
4472
4473@requireAttrs(socket.socket, "sendmsg", "recvmsg_into")
4474@requireAttrs(socket, "AF_UNIX", "SOL_SOCKET", "SCM_RIGHTS")
4475class RecvmsgIntoSCMRightsStreamTest(RecvmsgIntoMixin, SCMRightsTest,
4476                                     SendrecvmsgUnixStreamTestBase):
4477    pass
4478
4479
4480# Test interrupting the interruptible send/receive methods with a
4481# signal when a timeout is set.  These tests avoid having multiple
4482# threads alive during the test so that the OS cannot deliver the
4483# signal to the wrong one.
4484
4485class InterruptedTimeoutBase:
4486    # Base class for interrupted send/receive tests.  Installs an
4487    # empty handler for SIGALRM and removes it on teardown, along with
4488    # any scheduled alarms.
4489
4490    def setUp(self):
4491        super().setUp()
4492        orig_alrm_handler = signal.signal(signal.SIGALRM,
4493                                          lambda signum, frame: 1 / 0)
4494        self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)
4495
4496    # Timeout for socket operations
4497    timeout = support.LOOPBACK_TIMEOUT
4498
4499    # Provide setAlarm() method to schedule delivery of SIGALRM after
4500    # given number of seconds, or cancel it if zero, and an
4501    # appropriate time value to use.  Use setitimer() if available.
4502    if hasattr(signal, "setitimer"):
4503        alarm_time = 0.05
4504
4505        def setAlarm(self, seconds):
4506            signal.setitimer(signal.ITIMER_REAL, seconds)
4507    else:
4508        # Old systems may deliver the alarm up to one second early
4509        alarm_time = 2
4510
4511        def setAlarm(self, seconds):
4512            signal.alarm(seconds)
4513
4514
4515# Require siginterrupt() in order to ensure that system calls are
4516# interrupted by default.
4517@requireAttrs(signal, "siginterrupt")
4518@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"),
4519                     "Don't have signal.alarm or signal.setitimer")
4520class InterruptedRecvTimeoutTest(InterruptedTimeoutBase, UDPTestBase):
4521    # Test interrupting the recv*() methods with signals when a
4522    # timeout is set.
4523
4524    def setUp(self):
4525        super().setUp()
4526        self.serv.settimeout(self.timeout)
4527
4528    def checkInterruptedRecv(self, func, *args, **kwargs):
4529        # Check that func(*args, **kwargs) raises
4530        # errno of EINTR when interrupted by a signal.
4531        try:
4532            self.setAlarm(self.alarm_time)
4533            with self.assertRaises(ZeroDivisionError) as cm:
4534                func(*args, **kwargs)
4535        finally:
4536            self.setAlarm(0)
4537
4538    def testInterruptedRecvTimeout(self):
4539        self.checkInterruptedRecv(self.serv.recv, 1024)
4540
4541    def testInterruptedRecvIntoTimeout(self):
4542        self.checkInterruptedRecv(self.serv.recv_into, bytearray(1024))
4543
4544    def testInterruptedRecvfromTimeout(self):
4545        self.checkInterruptedRecv(self.serv.recvfrom, 1024)
4546
4547    def testInterruptedRecvfromIntoTimeout(self):
4548        self.checkInterruptedRecv(self.serv.recvfrom_into, bytearray(1024))
4549
4550    @requireAttrs(socket.socket, "recvmsg")
4551    def testInterruptedRecvmsgTimeout(self):
4552        self.checkInterruptedRecv(self.serv.recvmsg, 1024)
4553
4554    @requireAttrs(socket.socket, "recvmsg_into")
4555    def testInterruptedRecvmsgIntoTimeout(self):
4556        self.checkInterruptedRecv(self.serv.recvmsg_into, [bytearray(1024)])
4557
4558
4559# Require siginterrupt() in order to ensure that system calls are
4560# interrupted by default.
4561@requireAttrs(signal, "siginterrupt")
4562@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"),
4563                     "Don't have signal.alarm or signal.setitimer")
4564class InterruptedSendTimeoutTest(InterruptedTimeoutBase,
4565                                 ThreadSafeCleanupTestCase,
4566                                 SocketListeningTestMixin, TCPTestBase):
4567    # Test interrupting the interruptible send*() methods with signals
4568    # when a timeout is set.
4569
4570    def setUp(self):
4571        super().setUp()
4572        self.serv_conn = self.newSocket()
4573        self.addCleanup(self.serv_conn.close)
4574        # Use a thread to complete the connection, but wait for it to
4575        # terminate before running the test, so that there is only one
4576        # thread to accept the signal.
4577        cli_thread = threading.Thread(target=self.doConnect)
4578        cli_thread.start()
4579        self.cli_conn, addr = self.serv.accept()
4580        self.addCleanup(self.cli_conn.close)
4581        cli_thread.join()
4582        self.serv_conn.settimeout(self.timeout)
4583
4584    def doConnect(self):
4585        self.serv_conn.connect(self.serv_addr)
4586
4587    def checkInterruptedSend(self, func, *args, **kwargs):
4588        # Check that func(*args, **kwargs), run in a loop, raises
4589        # OSError with an errno of EINTR when interrupted by a
4590        # signal.
4591        try:
4592            with self.assertRaises(ZeroDivisionError) as cm:
4593                while True:
4594                    self.setAlarm(self.alarm_time)
4595                    func(*args, **kwargs)
4596        finally:
4597            self.setAlarm(0)
4598
4599    # Issue #12958: The following tests have problems on OS X prior to 10.7
4600    @support.requires_mac_ver(10, 7)
4601    def testInterruptedSendTimeout(self):
4602        self.checkInterruptedSend(self.serv_conn.send, b"a"*512)
4603
4604    @support.requires_mac_ver(10, 7)
4605    def testInterruptedSendtoTimeout(self):
4606        # Passing an actual address here as Python's wrapper for
4607        # sendto() doesn't allow passing a zero-length one; POSIX
4608        # requires that the address is ignored since the socket is
4609        # connection-mode, however.
4610        self.checkInterruptedSend(self.serv_conn.sendto, b"a"*512,
4611                                  self.serv_addr)
4612
4613    @support.requires_mac_ver(10, 7)
4614    @requireAttrs(socket.socket, "sendmsg")
4615    def testInterruptedSendmsgTimeout(self):
4616        self.checkInterruptedSend(self.serv_conn.sendmsg, [b"a"*512])
4617
4618
4619class TCPCloserTest(ThreadedTCPSocketTest):
4620
4621    def testClose(self):
4622        conn, addr = self.serv.accept()
4623        conn.close()
4624
4625        sd = self.cli
4626        read, write, err = select.select([sd], [], [], 1.0)
4627        self.assertEqual(read, [sd])
4628        self.assertEqual(sd.recv(1), b'')
4629
4630        # Calling close() many times should be safe.
4631        conn.close()
4632        conn.close()
4633
4634    def _testClose(self):
4635        self.cli.connect((HOST, self.port))
4636        time.sleep(1.0)
4637
4638
4639class BasicSocketPairTest(SocketPairTest):
4640
4641    def __init__(self, methodName='runTest'):
4642        SocketPairTest.__init__(self, methodName=methodName)
4643
4644    def _check_defaults(self, sock):
4645        self.assertIsInstance(sock, socket.socket)
4646        if hasattr(socket, 'AF_UNIX'):
4647            self.assertEqual(sock.family, socket.AF_UNIX)
4648        else:
4649            self.assertEqual(sock.family, socket.AF_INET)
4650        self.assertEqual(sock.type, socket.SOCK_STREAM)
4651        self.assertEqual(sock.proto, 0)
4652
4653    def _testDefaults(self):
4654        self._check_defaults(self.cli)
4655
4656    def testDefaults(self):
4657        self._check_defaults(self.serv)
4658
4659    def testRecv(self):
4660        msg = self.serv.recv(1024)
4661        self.assertEqual(msg, MSG)
4662
4663    def _testRecv(self):
4664        self.cli.send(MSG)
4665
4666    def testSend(self):
4667        self.serv.send(MSG)
4668
4669    def _testSend(self):
4670        msg = self.cli.recv(1024)
4671        self.assertEqual(msg, MSG)
4672
4673
4674class NonBlockingTCPTests(ThreadedTCPSocketTest):
4675
4676    def __init__(self, methodName='runTest'):
4677        self.event = threading.Event()
4678        ThreadedTCPSocketTest.__init__(self, methodName=methodName)
4679
4680    def assert_sock_timeout(self, sock, timeout):
4681        self.assertEqual(self.serv.gettimeout(), timeout)
4682
4683        blocking = (timeout != 0.0)
4684        self.assertEqual(sock.getblocking(), blocking)
4685
4686        if fcntl is not None:
4687            # When a Python socket has a non-zero timeout, it's switched
4688            # internally to a non-blocking mode. Later, sock.sendall(),
4689            # sock.recv(), and other socket operations use a select() call and
4690            # handle EWOULDBLOCK/EGAIN on all socket operations. That's how
4691            # timeouts are enforced.
4692            fd_blocking = (timeout is None)
4693
4694            flag = fcntl.fcntl(sock, fcntl.F_GETFL, os.O_NONBLOCK)
4695            self.assertEqual(not bool(flag & os.O_NONBLOCK), fd_blocking)
4696
4697    def testSetBlocking(self):
4698        # Test setblocking() and settimeout() methods
4699        self.serv.setblocking(True)
4700        self.assert_sock_timeout(self.serv, None)
4701
4702        self.serv.setblocking(False)
4703        self.assert_sock_timeout(self.serv, 0.0)
4704
4705        self.serv.settimeout(None)
4706        self.assert_sock_timeout(self.serv, None)
4707
4708        self.serv.settimeout(0)
4709        self.assert_sock_timeout(self.serv, 0)
4710
4711        self.serv.settimeout(10)
4712        self.assert_sock_timeout(self.serv, 10)
4713
4714        self.serv.settimeout(0)
4715        self.assert_sock_timeout(self.serv, 0)
4716
4717    def _testSetBlocking(self):
4718        pass
4719
4720    @support.cpython_only
4721    def testSetBlocking_overflow(self):
4722        # Issue 15989
4723        import _testcapi
4724        if _testcapi.UINT_MAX >= _testcapi.ULONG_MAX:
4725            self.skipTest('needs UINT_MAX < ULONG_MAX')
4726
4727        self.serv.setblocking(False)
4728        self.assertEqual(self.serv.gettimeout(), 0.0)
4729
4730        self.serv.setblocking(_testcapi.UINT_MAX + 1)
4731        self.assertIsNone(self.serv.gettimeout())
4732
4733    _testSetBlocking_overflow = support.cpython_only(_testSetBlocking)
4734
4735    @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
4736                         'test needs socket.SOCK_NONBLOCK')
4737    @support.requires_linux_version(2, 6, 28)
4738    def testInitNonBlocking(self):
4739        # create a socket with SOCK_NONBLOCK
4740        self.serv.close()
4741        self.serv = socket.socket(socket.AF_INET,
4742                                  socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
4743        self.assert_sock_timeout(self.serv, 0)
4744
4745    def _testInitNonBlocking(self):
4746        pass
4747
4748    def testInheritFlagsBlocking(self):
4749        # bpo-7995: accept() on a listening socket with a timeout and the
4750        # default timeout is None, the resulting socket must be blocking.
4751        with socket_setdefaulttimeout(None):
4752            self.serv.settimeout(10)
4753            conn, addr = self.serv.accept()
4754            self.addCleanup(conn.close)
4755            self.assertIsNone(conn.gettimeout())
4756
4757    def _testInheritFlagsBlocking(self):
4758        self.cli.connect((HOST, self.port))
4759
4760    def testInheritFlagsTimeout(self):
4761        # bpo-7995: accept() on a listening socket with a timeout and the
4762        # default timeout is None, the resulting socket must inherit
4763        # the default timeout.
4764        default_timeout = 20.0
4765        with socket_setdefaulttimeout(default_timeout):
4766            self.serv.settimeout(10)
4767            conn, addr = self.serv.accept()
4768            self.addCleanup(conn.close)
4769            self.assertEqual(conn.gettimeout(), default_timeout)
4770
4771    def _testInheritFlagsTimeout(self):
4772        self.cli.connect((HOST, self.port))
4773
4774    def testAccept(self):
4775        # Testing non-blocking accept
4776        self.serv.setblocking(False)
4777
4778        # connect() didn't start: non-blocking accept() fails
4779        start_time = time.monotonic()
4780        with self.assertRaises(BlockingIOError):
4781            conn, addr = self.serv.accept()
4782        dt = time.monotonic() - start_time
4783        self.assertLess(dt, 1.0)
4784
4785        self.event.set()
4786
4787        read, write, err = select.select([self.serv], [], [], support.LONG_TIMEOUT)
4788        if self.serv not in read:
4789            self.fail("Error trying to do accept after select.")
4790
4791        # connect() completed: non-blocking accept() doesn't block
4792        conn, addr = self.serv.accept()
4793        self.addCleanup(conn.close)
4794        self.assertIsNone(conn.gettimeout())
4795
4796    def _testAccept(self):
4797        # don't connect before event is set to check
4798        # that non-blocking accept() raises BlockingIOError
4799        self.event.wait()
4800
4801        self.cli.connect((HOST, self.port))
4802
4803    def testRecv(self):
4804        # Testing non-blocking recv
4805        conn, addr = self.serv.accept()
4806        self.addCleanup(conn.close)
4807        conn.setblocking(False)
4808
4809        # the server didn't send data yet: non-blocking recv() fails
4810        with self.assertRaises(BlockingIOError):
4811            msg = conn.recv(len(MSG))
4812
4813        self.event.set()
4814
4815        read, write, err = select.select([conn], [], [], support.LONG_TIMEOUT)
4816        if conn not in read:
4817            self.fail("Error during select call to non-blocking socket.")
4818
4819        # the server sent data yet: non-blocking recv() doesn't block
4820        msg = conn.recv(len(MSG))
4821        self.assertEqual(msg, MSG)
4822
4823    def _testRecv(self):
4824        self.cli.connect((HOST, self.port))
4825
4826        # don't send anything before event is set to check
4827        # that non-blocking recv() raises BlockingIOError
4828        self.event.wait()
4829
4830        # send data: recv() will no longer block
4831        self.cli.sendall(MSG)
4832
4833
4834class FileObjectClassTestCase(SocketConnectedTest):
4835    """Unit tests for the object returned by socket.makefile()
4836
4837    self.read_file is the io object returned by makefile() on
4838    the client connection.  You can read from this file to
4839    get output from the server.
4840
4841    self.write_file is the io object returned by makefile() on the
4842    server connection.  You can write to this file to send output
4843    to the client.
4844    """
4845
4846    bufsize = -1 # Use default buffer size
4847    encoding = 'utf-8'
4848    errors = 'strict'
4849    newline = None
4850
4851    read_mode = 'rb'
4852    read_msg = MSG
4853    write_mode = 'wb'
4854    write_msg = MSG
4855
4856    def __init__(self, methodName='runTest'):
4857        SocketConnectedTest.__init__(self, methodName=methodName)
4858
4859    def setUp(self):
4860        self.evt1, self.evt2, self.serv_finished, self.cli_finished = [
4861            threading.Event() for i in range(4)]
4862        SocketConnectedTest.setUp(self)
4863        self.read_file = self.cli_conn.makefile(
4864            self.read_mode, self.bufsize,
4865            encoding = self.encoding,
4866            errors = self.errors,
4867            newline = self.newline)
4868
4869    def tearDown(self):
4870        self.serv_finished.set()
4871        self.read_file.close()
4872        self.assertTrue(self.read_file.closed)
4873        self.read_file = None
4874        SocketConnectedTest.tearDown(self)
4875
4876    def clientSetUp(self):
4877        SocketConnectedTest.clientSetUp(self)
4878        self.write_file = self.serv_conn.makefile(
4879            self.write_mode, self.bufsize,
4880            encoding = self.encoding,
4881            errors = self.errors,
4882            newline = self.newline)
4883
4884    def clientTearDown(self):
4885        self.cli_finished.set()
4886        self.write_file.close()
4887        self.assertTrue(self.write_file.closed)
4888        self.write_file = None
4889        SocketConnectedTest.clientTearDown(self)
4890
4891    def testReadAfterTimeout(self):
4892        # Issue #7322: A file object must disallow further reads
4893        # after a timeout has occurred.
4894        self.cli_conn.settimeout(1)
4895        self.read_file.read(3)
4896        # First read raises a timeout
4897        self.assertRaises(TimeoutError, self.read_file.read, 1)
4898        # Second read is disallowed
4899        with self.assertRaises(OSError) as ctx:
4900            self.read_file.read(1)
4901        self.assertIn("cannot read from timed out object", str(ctx.exception))
4902
4903    def _testReadAfterTimeout(self):
4904        self.write_file.write(self.write_msg[0:3])
4905        self.write_file.flush()
4906        self.serv_finished.wait()
4907
4908    def testSmallRead(self):
4909        # Performing small file read test
4910        first_seg = self.read_file.read(len(self.read_msg)-3)
4911        second_seg = self.read_file.read(3)
4912        msg = first_seg + second_seg
4913        self.assertEqual(msg, self.read_msg)
4914
4915    def _testSmallRead(self):
4916        self.write_file.write(self.write_msg)
4917        self.write_file.flush()
4918
4919    def testFullRead(self):
4920        # read until EOF
4921        msg = self.read_file.read()
4922        self.assertEqual(msg, self.read_msg)
4923
4924    def _testFullRead(self):
4925        self.write_file.write(self.write_msg)
4926        self.write_file.close()
4927
4928    def testUnbufferedRead(self):
4929        # Performing unbuffered file read test
4930        buf = type(self.read_msg)()
4931        while 1:
4932            char = self.read_file.read(1)
4933            if not char:
4934                break
4935            buf += char
4936        self.assertEqual(buf, self.read_msg)
4937
4938    def _testUnbufferedRead(self):
4939        self.write_file.write(self.write_msg)
4940        self.write_file.flush()
4941
4942    def testReadline(self):
4943        # Performing file readline test
4944        line = self.read_file.readline()
4945        self.assertEqual(line, self.read_msg)
4946
4947    def _testReadline(self):
4948        self.write_file.write(self.write_msg)
4949        self.write_file.flush()
4950
4951    def testCloseAfterMakefile(self):
4952        # The file returned by makefile should keep the socket open.
4953        self.cli_conn.close()
4954        # read until EOF
4955        msg = self.read_file.read()
4956        self.assertEqual(msg, self.read_msg)
4957
4958    def _testCloseAfterMakefile(self):
4959        self.write_file.write(self.write_msg)
4960        self.write_file.flush()
4961
4962    def testMakefileAfterMakefileClose(self):
4963        self.read_file.close()
4964        msg = self.cli_conn.recv(len(MSG))
4965        if isinstance(self.read_msg, str):
4966            msg = msg.decode()
4967        self.assertEqual(msg, self.read_msg)
4968
4969    def _testMakefileAfterMakefileClose(self):
4970        self.write_file.write(self.write_msg)
4971        self.write_file.flush()
4972
4973    def testClosedAttr(self):
4974        self.assertTrue(not self.read_file.closed)
4975
4976    def _testClosedAttr(self):
4977        self.assertTrue(not self.write_file.closed)
4978
4979    def testAttributes(self):
4980        self.assertEqual(self.read_file.mode, self.read_mode)
4981        self.assertEqual(self.read_file.name, self.cli_conn.fileno())
4982
4983    def _testAttributes(self):
4984        self.assertEqual(self.write_file.mode, self.write_mode)
4985        self.assertEqual(self.write_file.name, self.serv_conn.fileno())
4986
4987    def testRealClose(self):
4988        self.read_file.close()
4989        self.assertRaises(ValueError, self.read_file.fileno)
4990        self.cli_conn.close()
4991        self.assertRaises(OSError, self.cli_conn.getsockname)
4992
4993    def _testRealClose(self):
4994        pass
4995
4996
4997class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
4998
4999    """Repeat the tests from FileObjectClassTestCase with bufsize==0.
5000
5001    In this case (and in this case only), it should be possible to
5002    create a file object, read a line from it, create another file
5003    object, read another line from it, without loss of data in the
5004    first file object's buffer.  Note that http.client relies on this
5005    when reading multiple requests from the same socket."""
5006
5007    bufsize = 0 # Use unbuffered mode
5008
5009    def testUnbufferedReadline(self):
5010        # Read a line, create a new file object, read another line with it
5011        line = self.read_file.readline() # first line
5012        self.assertEqual(line, b"A. " + self.write_msg) # first line
5013        self.read_file = self.cli_conn.makefile('rb', 0)
5014        line = self.read_file.readline() # second line
5015        self.assertEqual(line, b"B. " + self.write_msg) # second line
5016
5017    def _testUnbufferedReadline(self):
5018        self.write_file.write(b"A. " + self.write_msg)
5019        self.write_file.write(b"B. " + self.write_msg)
5020        self.write_file.flush()
5021
5022    def testMakefileClose(self):
5023        # The file returned by makefile should keep the socket open...
5024        self.cli_conn.close()
5025        msg = self.cli_conn.recv(1024)
5026        self.assertEqual(msg, self.read_msg)
5027        # ...until the file is itself closed
5028        self.read_file.close()
5029        self.assertRaises(OSError, self.cli_conn.recv, 1024)
5030
5031    def _testMakefileClose(self):
5032        self.write_file.write(self.write_msg)
5033        self.write_file.flush()
5034
5035    def testMakefileCloseSocketDestroy(self):
5036        refcount_before = sys.getrefcount(self.cli_conn)
5037        self.read_file.close()
5038        refcount_after = sys.getrefcount(self.cli_conn)
5039        self.assertEqual(refcount_before - 1, refcount_after)
5040
5041    def _testMakefileCloseSocketDestroy(self):
5042        pass
5043
5044    # Non-blocking ops
5045    # NOTE: to set `read_file` as non-blocking, we must call
5046    # `cli_conn.setblocking` and vice-versa (see setUp / clientSetUp).
5047
5048    def testSmallReadNonBlocking(self):
5049        self.cli_conn.setblocking(False)
5050        self.assertEqual(self.read_file.readinto(bytearray(10)), None)
5051        self.assertEqual(self.read_file.read(len(self.read_msg) - 3), None)
5052        self.evt1.set()
5053        self.evt2.wait(1.0)
5054        first_seg = self.read_file.read(len(self.read_msg) - 3)
5055        if first_seg is None:
5056            # Data not arrived (can happen under Windows), wait a bit
5057            time.sleep(0.5)
5058            first_seg = self.read_file.read(len(self.read_msg) - 3)
5059        buf = bytearray(10)
5060        n = self.read_file.readinto(buf)
5061        self.assertEqual(n, 3)
5062        msg = first_seg + buf[:n]
5063        self.assertEqual(msg, self.read_msg)
5064        self.assertEqual(self.read_file.readinto(bytearray(16)), None)
5065        self.assertEqual(self.read_file.read(1), None)
5066
5067    def _testSmallReadNonBlocking(self):
5068        self.evt1.wait(1.0)
5069        self.write_file.write(self.write_msg)
5070        self.write_file.flush()
5071        self.evt2.set()
5072        # Avoid closing the socket before the server test has finished,
5073        # otherwise system recv() will return 0 instead of EWOULDBLOCK.
5074        self.serv_finished.wait(5.0)
5075
5076    def testWriteNonBlocking(self):
5077        self.cli_finished.wait(5.0)
5078        # The client thread can't skip directly - the SkipTest exception
5079        # would appear as a failure.
5080        if self.serv_skipped:
5081            self.skipTest(self.serv_skipped)
5082
5083    def _testWriteNonBlocking(self):
5084        self.serv_skipped = None
5085        self.serv_conn.setblocking(False)
5086        # Try to saturate the socket buffer pipe with repeated large writes.
5087        BIG = b"x" * support.SOCK_MAX_SIZE
5088        LIMIT = 10
5089        # The first write() succeeds since a chunk of data can be buffered
5090        n = self.write_file.write(BIG)
5091        self.assertGreater(n, 0)
5092        for i in range(LIMIT):
5093            n = self.write_file.write(BIG)
5094            if n is None:
5095                # Succeeded
5096                break
5097            self.assertGreater(n, 0)
5098        else:
5099            # Let us know that this test didn't manage to establish
5100            # the expected conditions. This is not a failure in itself but,
5101            # if it happens repeatedly, the test should be fixed.
5102            self.serv_skipped = "failed to saturate the socket buffer"
5103
5104
5105class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
5106
5107    bufsize = 1 # Default-buffered for reading; line-buffered for writing
5108
5109
5110class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
5111
5112    bufsize = 2 # Exercise the buffering code
5113
5114
5115class UnicodeReadFileObjectClassTestCase(FileObjectClassTestCase):
5116    """Tests for socket.makefile() in text mode (rather than binary)"""
5117
5118    read_mode = 'r'
5119    read_msg = MSG.decode('utf-8')
5120    write_mode = 'wb'
5121    write_msg = MSG
5122    newline = ''
5123
5124
5125class UnicodeWriteFileObjectClassTestCase(FileObjectClassTestCase):
5126    """Tests for socket.makefile() in text mode (rather than binary)"""
5127
5128    read_mode = 'rb'
5129    read_msg = MSG
5130    write_mode = 'w'
5131    write_msg = MSG.decode('utf-8')
5132    newline = ''
5133
5134
5135class UnicodeReadWriteFileObjectClassTestCase(FileObjectClassTestCase):
5136    """Tests for socket.makefile() in text mode (rather than binary)"""
5137
5138    read_mode = 'r'
5139    read_msg = MSG.decode('utf-8')
5140    write_mode = 'w'
5141    write_msg = MSG.decode('utf-8')
5142    newline = ''
5143
5144
5145class NetworkConnectionTest(object):
5146    """Prove network connection."""
5147
5148    def clientSetUp(self):
5149        # We're inherited below by BasicTCPTest2, which also inherits
5150        # BasicTCPTest, which defines self.port referenced below.
5151        self.cli = socket.create_connection((HOST, self.port))
5152        self.serv_conn = self.cli
5153
5154class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
5155    """Tests that NetworkConnection does not break existing TCP functionality.
5156    """
5157
5158class NetworkConnectionNoServer(unittest.TestCase):
5159
5160    class MockSocket(socket.socket):
5161        def connect(self, *args):
5162            raise TimeoutError('timed out')
5163
5164    @contextlib.contextmanager
5165    def mocked_socket_module(self):
5166        """Return a socket which times out on connect"""
5167        old_socket = socket.socket
5168        socket.socket = self.MockSocket
5169        try:
5170            yield
5171        finally:
5172            socket.socket = old_socket
5173
5174    def test_connect(self):
5175        port = socket_helper.find_unused_port()
5176        cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
5177        self.addCleanup(cli.close)
5178        with self.assertRaises(OSError) as cm:
5179            cli.connect((HOST, port))
5180        self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)
5181
5182    def test_create_connection(self):
5183        # Issue #9792: errors raised by create_connection() should have
5184        # a proper errno attribute.
5185        port = socket_helper.find_unused_port()
5186        with self.assertRaises(OSError) as cm:
5187            socket.create_connection((HOST, port))
5188
5189        # Issue #16257: create_connection() calls getaddrinfo() against
5190        # 'localhost'.  This may result in an IPV6 addr being returned
5191        # as well as an IPV4 one:
5192        #   >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM)
5193        #   >>> [(2,  2, 0, '', ('127.0.0.1', 41230)),
5194        #        (26, 2, 0, '', ('::1', 41230, 0, 0))]
5195        #
5196        # create_connection() enumerates through all the addresses returned
5197        # and if it doesn't successfully bind to any of them, it propagates
5198        # the last exception it encountered.
5199        #
5200        # On Solaris, ENETUNREACH is returned in this circumstance instead
5201        # of ECONNREFUSED.  So, if that errno exists, add it to our list of
5202        # expected errnos.
5203        expected_errnos = socket_helper.get_socket_conn_refused_errs()
5204        self.assertIn(cm.exception.errno, expected_errnos)
5205
5206    def test_create_connection_all_errors(self):
5207        port = socket_helper.find_unused_port()
5208        try:
5209            socket.create_connection((HOST, port), all_errors=True)
5210        except ExceptionGroup as e:
5211            eg = e
5212        else:
5213            self.fail('expected connection to fail')
5214
5215        self.assertIsInstance(eg, ExceptionGroup)
5216        for e in eg.exceptions:
5217            self.assertIsInstance(e, OSError)
5218
5219        addresses = socket.getaddrinfo(
5220            'localhost', port, 0, socket.SOCK_STREAM)
5221        # assert that we got an exception for each address
5222        self.assertEqual(len(addresses), len(eg.exceptions))
5223
5224    def test_create_connection_timeout(self):
5225        # Issue #9792: create_connection() should not recast timeout errors
5226        # as generic socket errors.
5227        with self.mocked_socket_module():
5228            try:
5229                socket.create_connection((HOST, 1234))
5230            except TimeoutError:
5231                pass
5232            except OSError as exc:
5233                if socket_helper.IPV6_ENABLED or exc.errno != errno.EAFNOSUPPORT:
5234                    raise
5235            else:
5236                self.fail('TimeoutError not raised')
5237
5238
5239class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
5240
5241    def __init__(self, methodName='runTest'):
5242        SocketTCPTest.__init__(self, methodName=methodName)
5243        ThreadableTest.__init__(self)
5244
5245    def clientSetUp(self):
5246        self.source_port = socket_helper.find_unused_port()
5247
5248    def clientTearDown(self):
5249        self.cli.close()
5250        self.cli = None
5251        ThreadableTest.clientTearDown(self)
5252
5253    def _justAccept(self):
5254        conn, addr = self.serv.accept()
5255        conn.close()
5256
5257    testFamily = _justAccept
5258    def _testFamily(self):
5259        self.cli = socket.create_connection((HOST, self.port),
5260                            timeout=support.LOOPBACK_TIMEOUT)
5261        self.addCleanup(self.cli.close)
5262        self.assertEqual(self.cli.family, 2)
5263
5264    testSourceAddress = _justAccept
5265    def _testSourceAddress(self):
5266        self.cli = socket.create_connection((HOST, self.port),
5267                            timeout=support.LOOPBACK_TIMEOUT,
5268                            source_address=('', self.source_port))
5269        self.addCleanup(self.cli.close)
5270        self.assertEqual(self.cli.getsockname()[1], self.source_port)
5271        # The port number being used is sufficient to show that the bind()
5272        # call happened.
5273
5274    testTimeoutDefault = _justAccept
5275    def _testTimeoutDefault(self):
5276        # passing no explicit timeout uses socket's global default
5277        self.assertTrue(socket.getdefaulttimeout() is None)
5278        socket.setdefaulttimeout(42)
5279        try:
5280            self.cli = socket.create_connection((HOST, self.port))
5281            self.addCleanup(self.cli.close)
5282        finally:
5283            socket.setdefaulttimeout(None)
5284        self.assertEqual(self.cli.gettimeout(), 42)
5285
5286    testTimeoutNone = _justAccept
5287    def _testTimeoutNone(self):
5288        # None timeout means the same as sock.settimeout(None)
5289        self.assertTrue(socket.getdefaulttimeout() is None)
5290        socket.setdefaulttimeout(30)
5291        try:
5292            self.cli = socket.create_connection((HOST, self.port), timeout=None)
5293            self.addCleanup(self.cli.close)
5294        finally:
5295            socket.setdefaulttimeout(None)
5296        self.assertEqual(self.cli.gettimeout(), None)
5297
5298    testTimeoutValueNamed = _justAccept
5299    def _testTimeoutValueNamed(self):
5300        self.cli = socket.create_connection((HOST, self.port), timeout=30)
5301        self.assertEqual(self.cli.gettimeout(), 30)
5302
5303    testTimeoutValueNonamed = _justAccept
5304    def _testTimeoutValueNonamed(self):
5305        self.cli = socket.create_connection((HOST, self.port), 30)
5306        self.addCleanup(self.cli.close)
5307        self.assertEqual(self.cli.gettimeout(), 30)
5308
5309
5310class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
5311
5312    def __init__(self, methodName='runTest'):
5313        SocketTCPTest.__init__(self, methodName=methodName)
5314        ThreadableTest.__init__(self)
5315
5316    def clientSetUp(self):
5317        pass
5318
5319    def clientTearDown(self):
5320        self.cli.close()
5321        self.cli = None
5322        ThreadableTest.clientTearDown(self)
5323
5324    def testInsideTimeout(self):
5325        conn, addr = self.serv.accept()
5326        self.addCleanup(conn.close)
5327        time.sleep(3)
5328        conn.send(b"done!")
5329    testOutsideTimeout = testInsideTimeout
5330
5331    def _testInsideTimeout(self):
5332        self.cli = sock = socket.create_connection((HOST, self.port))
5333        data = sock.recv(5)
5334        self.assertEqual(data, b"done!")
5335
5336    def _testOutsideTimeout(self):
5337        self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
5338        self.assertRaises(TimeoutError, lambda: sock.recv(5))
5339
5340
5341class TCPTimeoutTest(SocketTCPTest):
5342
5343    def testTCPTimeout(self):
5344        def raise_timeout(*args, **kwargs):
5345            self.serv.settimeout(1.0)
5346            self.serv.accept()
5347        self.assertRaises(TimeoutError, raise_timeout,
5348                              "Error generating a timeout exception (TCP)")
5349
5350    def testTimeoutZero(self):
5351        ok = False
5352        try:
5353            self.serv.settimeout(0.0)
5354            foo = self.serv.accept()
5355        except TimeoutError:
5356            self.fail("caught timeout instead of error (TCP)")
5357        except OSError:
5358            ok = True
5359        except:
5360            self.fail("caught unexpected exception (TCP)")
5361        if not ok:
5362            self.fail("accept() returned success when we did not expect it")
5363
5364    @unittest.skipUnless(hasattr(signal, 'alarm'),
5365                         'test needs signal.alarm()')
5366    def testInterruptedTimeout(self):
5367        # XXX I don't know how to do this test on MSWindows or any other
5368        # platform that doesn't support signal.alarm() or os.kill(), though
5369        # the bug should have existed on all platforms.
5370        self.serv.settimeout(5.0)   # must be longer than alarm
5371        class Alarm(Exception):
5372            pass
5373        def alarm_handler(signal, frame):
5374            raise Alarm
5375        old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
5376        try:
5377            try:
5378                signal.alarm(2)    # POSIX allows alarm to be up to 1 second early
5379                foo = self.serv.accept()
5380            except TimeoutError:
5381                self.fail("caught timeout instead of Alarm")
5382            except Alarm:
5383                pass
5384            except:
5385                self.fail("caught other exception instead of Alarm:"
5386                          " %s(%s):\n%s" %
5387                          (sys.exc_info()[:2] + (traceback.format_exc(),)))
5388            else:
5389                self.fail("nothing caught")
5390            finally:
5391                signal.alarm(0)         # shut off alarm
5392        except Alarm:
5393            self.fail("got Alarm in wrong place")
5394        finally:
5395            # no alarm can be pending.  Safe to restore old handler.
5396            signal.signal(signal.SIGALRM, old_alarm)
5397
5398class UDPTimeoutTest(SocketUDPTest):
5399
5400    def testUDPTimeout(self):
5401        def raise_timeout(*args, **kwargs):
5402            self.serv.settimeout(1.0)
5403            self.serv.recv(1024)
5404        self.assertRaises(TimeoutError, raise_timeout,
5405                              "Error generating a timeout exception (UDP)")
5406
5407    def testTimeoutZero(self):
5408        ok = False
5409        try:
5410            self.serv.settimeout(0.0)
5411            foo = self.serv.recv(1024)
5412        except TimeoutError:
5413            self.fail("caught timeout instead of error (UDP)")
5414        except OSError:
5415            ok = True
5416        except:
5417            self.fail("caught unexpected exception (UDP)")
5418        if not ok:
5419            self.fail("recv() returned success when we did not expect it")
5420
5421@unittest.skipUnless(HAVE_SOCKET_UDPLITE,
5422          'UDPLITE sockets required for this test.')
5423class UDPLITETimeoutTest(SocketUDPLITETest):
5424
5425    def testUDPLITETimeout(self):
5426        def raise_timeout(*args, **kwargs):
5427            self.serv.settimeout(1.0)
5428            self.serv.recv(1024)
5429        self.assertRaises(TimeoutError, raise_timeout,
5430                              "Error generating a timeout exception (UDPLITE)")
5431
5432    def testTimeoutZero(self):
5433        ok = False
5434        try:
5435            self.serv.settimeout(0.0)
5436            foo = self.serv.recv(1024)
5437        except TimeoutError:
5438            self.fail("caught timeout instead of error (UDPLITE)")
5439        except OSError:
5440            ok = True
5441        except:
5442            self.fail("caught unexpected exception (UDPLITE)")
5443        if not ok:
5444            self.fail("recv() returned success when we did not expect it")
5445
5446class TestExceptions(unittest.TestCase):
5447
5448    def testExceptionTree(self):
5449        self.assertTrue(issubclass(OSError, Exception))
5450        self.assertTrue(issubclass(socket.herror, OSError))
5451        self.assertTrue(issubclass(socket.gaierror, OSError))
5452        self.assertTrue(issubclass(socket.timeout, OSError))
5453        self.assertIs(socket.error, OSError)
5454        self.assertIs(socket.timeout, TimeoutError)
5455
5456    def test_setblocking_invalidfd(self):
5457        # Regression test for issue #28471
5458
5459        sock0 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
5460        sock = socket.socket(
5461            socket.AF_INET, socket.SOCK_STREAM, 0, sock0.fileno())
5462        sock0.close()
5463        self.addCleanup(sock.detach)
5464
5465        with self.assertRaises(OSError):
5466            sock.setblocking(False)
5467
5468
5469@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test')
5470class TestLinuxAbstractNamespace(unittest.TestCase):
5471
5472    UNIX_PATH_MAX = 108
5473
5474    def testLinuxAbstractNamespace(self):
5475        address = b"\x00python-test-hello\x00\xff"
5476        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s1:
5477            s1.bind(address)
5478            s1.listen()
5479            with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s2:
5480                s2.connect(s1.getsockname())
5481                with s1.accept()[0] as s3:
5482                    self.assertEqual(s1.getsockname(), address)
5483                    self.assertEqual(s2.getpeername(), address)
5484
5485    def testMaxName(self):
5486        address = b"\x00" + b"h" * (self.UNIX_PATH_MAX - 1)
5487        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
5488            s.bind(address)
5489            self.assertEqual(s.getsockname(), address)
5490
5491    def testNameOverflow(self):
5492        address = "\x00" + "h" * self.UNIX_PATH_MAX
5493        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
5494            self.assertRaises(OSError, s.bind, address)
5495
5496    def testStrName(self):
5497        # Check that an abstract name can be passed as a string.
5498        s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
5499        try:
5500            s.bind("\x00python\x00test\x00")
5501            self.assertEqual(s.getsockname(), b"\x00python\x00test\x00")
5502        finally:
5503            s.close()
5504
5505    def testBytearrayName(self):
5506        # Check that an abstract name can be passed as a bytearray.
5507        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
5508            s.bind(bytearray(b"\x00python\x00test\x00"))
5509            self.assertEqual(s.getsockname(), b"\x00python\x00test\x00")
5510
5511    def testAutobind(self):
5512        # Check that binding to an empty string binds to an available address
5513        # in the abstract namespace as specified in unix(7) "Autobind feature".
5514        abstract_address = b"^\0[0-9a-f]{5}"
5515        with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s1:
5516            s1.bind("")
5517            self.assertRegex(s1.getsockname(), abstract_address)
5518            # Each socket is bound to a different abstract address.
5519            with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s2:
5520                s2.bind("")
5521                self.assertRegex(s2.getsockname(), abstract_address)
5522                self.assertNotEqual(s1.getsockname(), s2.getsockname())
5523
5524
5525@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'test needs socket.AF_UNIX')
5526class TestUnixDomain(unittest.TestCase):
5527
5528    def setUp(self):
5529        self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
5530
5531    def tearDown(self):
5532        self.sock.close()
5533
5534    def encoded(self, path):
5535        # Return the given path encoded in the file system encoding,
5536        # or skip the test if this is not possible.
5537        try:
5538            return os.fsencode(path)
5539        except UnicodeEncodeError:
5540            self.skipTest(
5541                "Pathname {0!a} cannot be represented in file "
5542                "system encoding {1!r}".format(
5543                    path, sys.getfilesystemencoding()))
5544
5545    def bind(self, sock, path):
5546        # Bind the socket
5547        try:
5548            socket_helper.bind_unix_socket(sock, path)
5549        except OSError as e:
5550            if str(e) == "AF_UNIX path too long":
5551                self.skipTest(
5552                    "Pathname {0!a} is too long to serve as an AF_UNIX path"
5553                    .format(path))
5554            else:
5555                raise
5556
5557    def testUnbound(self):
5558        # Issue #30205 (note getsockname() can return None on OS X)
5559        self.assertIn(self.sock.getsockname(), ('', None))
5560
5561    def testStrAddr(self):
5562        # Test binding to and retrieving a normal string pathname.
5563        path = os.path.abspath(os_helper.TESTFN)
5564        self.bind(self.sock, path)
5565        self.addCleanup(os_helper.unlink, path)
5566        self.assertEqual(self.sock.getsockname(), path)
5567
5568    def testBytesAddr(self):
5569        # Test binding to a bytes pathname.
5570        path = os.path.abspath(os_helper.TESTFN)
5571        self.bind(self.sock, self.encoded(path))
5572        self.addCleanup(os_helper.unlink, path)
5573        self.assertEqual(self.sock.getsockname(), path)
5574
5575    def testSurrogateescapeBind(self):
5576        # Test binding to a valid non-ASCII pathname, with the
5577        # non-ASCII bytes supplied using surrogateescape encoding.
5578        path = os.path.abspath(os_helper.TESTFN_UNICODE)
5579        b = self.encoded(path)
5580        self.bind(self.sock, b.decode("ascii", "surrogateescape"))
5581        self.addCleanup(os_helper.unlink, path)
5582        self.assertEqual(self.sock.getsockname(), path)
5583
5584    def testUnencodableAddr(self):
5585        # Test binding to a pathname that cannot be encoded in the
5586        # file system encoding.
5587        if os_helper.TESTFN_UNENCODABLE is None:
5588            self.skipTest("No unencodable filename available")
5589        path = os.path.abspath(os_helper.TESTFN_UNENCODABLE)
5590        self.bind(self.sock, path)
5591        self.addCleanup(os_helper.unlink, path)
5592        self.assertEqual(self.sock.getsockname(), path)
5593
5594    @unittest.skipIf(sys.platform == 'linux', 'Linux specific test')
5595    def testEmptyAddress(self):
5596        # Test that binding empty address fails.
5597        self.assertRaises(OSError, self.sock.bind, "")
5598
5599
5600class BufferIOTest(SocketConnectedTest):
5601    """
5602    Test the buffer versions of socket.recv() and socket.send().
5603    """
5604    def __init__(self, methodName='runTest'):
5605        SocketConnectedTest.__init__(self, methodName=methodName)
5606
5607    def testRecvIntoArray(self):
5608        buf = array.array("B", [0] * len(MSG))
5609        nbytes = self.cli_conn.recv_into(buf)
5610        self.assertEqual(nbytes, len(MSG))
5611        buf = buf.tobytes()
5612        msg = buf[:len(MSG)]
5613        self.assertEqual(msg, MSG)
5614
5615    def _testRecvIntoArray(self):
5616        buf = bytes(MSG)
5617        self.serv_conn.send(buf)
5618
5619    def testRecvIntoBytearray(self):
5620        buf = bytearray(1024)
5621        nbytes = self.cli_conn.recv_into(buf)
5622        self.assertEqual(nbytes, len(MSG))
5623        msg = buf[:len(MSG)]
5624        self.assertEqual(msg, MSG)
5625
5626    _testRecvIntoBytearray = _testRecvIntoArray
5627
5628    def testRecvIntoMemoryview(self):
5629        buf = bytearray(1024)
5630        nbytes = self.cli_conn.recv_into(memoryview(buf))
5631        self.assertEqual(nbytes, len(MSG))
5632        msg = buf[:len(MSG)]
5633        self.assertEqual(msg, MSG)
5634
5635    _testRecvIntoMemoryview = _testRecvIntoArray
5636
5637    def testRecvFromIntoArray(self):
5638        buf = array.array("B", [0] * len(MSG))
5639        nbytes, addr = self.cli_conn.recvfrom_into(buf)
5640        self.assertEqual(nbytes, len(MSG))
5641        buf = buf.tobytes()
5642        msg = buf[:len(MSG)]
5643        self.assertEqual(msg, MSG)
5644
5645    def _testRecvFromIntoArray(self):
5646        buf = bytes(MSG)
5647        self.serv_conn.send(buf)
5648
5649    def testRecvFromIntoBytearray(self):
5650        buf = bytearray(1024)
5651        nbytes, addr = self.cli_conn.recvfrom_into(buf)
5652        self.assertEqual(nbytes, len(MSG))
5653        msg = buf[:len(MSG)]
5654        self.assertEqual(msg, MSG)
5655
5656    _testRecvFromIntoBytearray = _testRecvFromIntoArray
5657
5658    def testRecvFromIntoMemoryview(self):
5659        buf = bytearray(1024)
5660        nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf))
5661        self.assertEqual(nbytes, len(MSG))
5662        msg = buf[:len(MSG)]
5663        self.assertEqual(msg, MSG)
5664
5665    _testRecvFromIntoMemoryview = _testRecvFromIntoArray
5666
5667    def testRecvFromIntoSmallBuffer(self):
5668        # See issue #20246.
5669        buf = bytearray(8)
5670        self.assertRaises(ValueError, self.cli_conn.recvfrom_into, buf, 1024)
5671
5672    def _testRecvFromIntoSmallBuffer(self):
5673        self.serv_conn.send(MSG)
5674
5675    def testRecvFromIntoEmptyBuffer(self):
5676        buf = bytearray()
5677        self.cli_conn.recvfrom_into(buf)
5678        self.cli_conn.recvfrom_into(buf, 0)
5679
5680    _testRecvFromIntoEmptyBuffer = _testRecvFromIntoArray
5681
5682
5683TIPC_STYPE = 2000
5684TIPC_LOWER = 200
5685TIPC_UPPER = 210
5686
5687def isTipcAvailable():
5688    """Check if the TIPC module is loaded
5689
5690    The TIPC module is not loaded automatically on Ubuntu and probably
5691    other Linux distros.
5692    """
5693    if not hasattr(socket, "AF_TIPC"):
5694        return False
5695    try:
5696        f = open("/proc/modules", encoding="utf-8")
5697    except (FileNotFoundError, IsADirectoryError, PermissionError):
5698        # It's ok if the file does not exist, is a directory or if we
5699        # have not the permission to read it.
5700        return False
5701    with f:
5702        for line in f:
5703            if line.startswith("tipc "):
5704                return True
5705    return False
5706
5707@unittest.skipUnless(isTipcAvailable(),
5708                     "TIPC module is not loaded, please 'sudo modprobe tipc'")
5709class TIPCTest(unittest.TestCase):
5710    def testRDM(self):
5711        srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
5712        cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
5713        self.addCleanup(srv.close)
5714        self.addCleanup(cli.close)
5715
5716        srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
5717        srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
5718                TIPC_LOWER, TIPC_UPPER)
5719        srv.bind(srvaddr)
5720
5721        sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
5722                TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0)
5723        cli.sendto(MSG, sendaddr)
5724
5725        msg, recvaddr = srv.recvfrom(1024)
5726
5727        self.assertEqual(cli.getsockname(), recvaddr)
5728        self.assertEqual(msg, MSG)
5729
5730
5731@unittest.skipUnless(isTipcAvailable(),
5732                     "TIPC module is not loaded, please 'sudo modprobe tipc'")
5733class TIPCThreadableTest(unittest.TestCase, ThreadableTest):
5734    def __init__(self, methodName = 'runTest'):
5735        unittest.TestCase.__init__(self, methodName = methodName)
5736        ThreadableTest.__init__(self)
5737
5738    def setUp(self):
5739        self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
5740        self.addCleanup(self.srv.close)
5741        self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
5742        srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
5743                TIPC_LOWER, TIPC_UPPER)
5744        self.srv.bind(srvaddr)
5745        self.srv.listen()
5746        self.serverExplicitReady()
5747        self.conn, self.connaddr = self.srv.accept()
5748        self.addCleanup(self.conn.close)
5749
5750    def clientSetUp(self):
5751        # There is a hittable race between serverExplicitReady() and the
5752        # accept() call; sleep a little while to avoid it, otherwise
5753        # we could get an exception
5754        time.sleep(0.1)
5755        self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
5756        self.addCleanup(self.cli.close)
5757        addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
5758                TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0)
5759        self.cli.connect(addr)
5760        self.cliaddr = self.cli.getsockname()
5761
5762    def testStream(self):
5763        msg = self.conn.recv(1024)
5764        self.assertEqual(msg, MSG)
5765        self.assertEqual(self.cliaddr, self.connaddr)
5766
5767    def _testStream(self):
5768        self.cli.send(MSG)
5769        self.cli.close()
5770
5771
5772class ContextManagersTest(ThreadedTCPSocketTest):
5773
5774    def _testSocketClass(self):
5775        # base test
5776        with socket.socket() as sock:
5777            self.assertFalse(sock._closed)
5778        self.assertTrue(sock._closed)
5779        # close inside with block
5780        with socket.socket() as sock:
5781            sock.close()
5782        self.assertTrue(sock._closed)
5783        # exception inside with block
5784        with socket.socket() as sock:
5785            self.assertRaises(OSError, sock.sendall, b'foo')
5786        self.assertTrue(sock._closed)
5787
5788    def testCreateConnectionBase(self):
5789        conn, addr = self.serv.accept()
5790        self.addCleanup(conn.close)
5791        data = conn.recv(1024)
5792        conn.sendall(data)
5793
5794    def _testCreateConnectionBase(self):
5795        address = self.serv.getsockname()
5796        with socket.create_connection(address) as sock:
5797            self.assertFalse(sock._closed)
5798            sock.sendall(b'foo')
5799            self.assertEqual(sock.recv(1024), b'foo')
5800        self.assertTrue(sock._closed)
5801
5802    def testCreateConnectionClose(self):
5803        conn, addr = self.serv.accept()
5804        self.addCleanup(conn.close)
5805        data = conn.recv(1024)
5806        conn.sendall(data)
5807
5808    def _testCreateConnectionClose(self):
5809        address = self.serv.getsockname()
5810        with socket.create_connection(address) as sock:
5811            sock.close()
5812        self.assertTrue(sock._closed)
5813        self.assertRaises(OSError, sock.sendall, b'foo')
5814
5815
5816class InheritanceTest(unittest.TestCase):
5817    @unittest.skipUnless(hasattr(socket, "SOCK_CLOEXEC"),
5818                         "SOCK_CLOEXEC not defined")
5819    @support.requires_linux_version(2, 6, 28)
5820    def test_SOCK_CLOEXEC(self):
5821        with socket.socket(socket.AF_INET,
5822                           socket.SOCK_STREAM | socket.SOCK_CLOEXEC) as s:
5823            self.assertEqual(s.type, socket.SOCK_STREAM)
5824            self.assertFalse(s.get_inheritable())
5825
5826    def test_default_inheritable(self):
5827        sock = socket.socket()
5828        with sock:
5829            self.assertEqual(sock.get_inheritable(), False)
5830
5831    def test_dup(self):
5832        sock = socket.socket()
5833        with sock:
5834            newsock = sock.dup()
5835            sock.close()
5836            with newsock:
5837                self.assertEqual(newsock.get_inheritable(), False)
5838
5839    def test_set_inheritable(self):
5840        sock = socket.socket()
5841        with sock:
5842            sock.set_inheritable(True)
5843            self.assertEqual(sock.get_inheritable(), True)
5844
5845            sock.set_inheritable(False)
5846            self.assertEqual(sock.get_inheritable(), False)
5847
5848    @unittest.skipIf(fcntl is None, "need fcntl")
5849    def test_get_inheritable_cloexec(self):
5850        sock = socket.socket()
5851        with sock:
5852            fd = sock.fileno()
5853            self.assertEqual(sock.get_inheritable(), False)
5854
5855            # clear FD_CLOEXEC flag
5856            flags = fcntl.fcntl(fd, fcntl.F_GETFD)
5857            flags &= ~fcntl.FD_CLOEXEC
5858            fcntl.fcntl(fd, fcntl.F_SETFD, flags)
5859
5860            self.assertEqual(sock.get_inheritable(), True)
5861
5862    @unittest.skipIf(fcntl is None, "need fcntl")
5863    def test_set_inheritable_cloexec(self):
5864        sock = socket.socket()
5865        with sock:
5866            fd = sock.fileno()
5867            self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
5868                             fcntl.FD_CLOEXEC)
5869
5870            sock.set_inheritable(True)
5871            self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC,
5872                             0)
5873
5874
5875    def test_socketpair(self):
5876        s1, s2 = socket.socketpair()
5877        self.addCleanup(s1.close)
5878        self.addCleanup(s2.close)
5879        self.assertEqual(s1.get_inheritable(), False)
5880        self.assertEqual(s2.get_inheritable(), False)
5881
5882
5883@unittest.skipUnless(hasattr(socket, "SOCK_NONBLOCK"),
5884                     "SOCK_NONBLOCK not defined")
5885class NonblockConstantTest(unittest.TestCase):
5886    def checkNonblock(self, s, nonblock=True, timeout=0.0):
5887        if nonblock:
5888            self.assertEqual(s.type, socket.SOCK_STREAM)
5889            self.assertEqual(s.gettimeout(), timeout)
5890            self.assertTrue(
5891                fcntl.fcntl(s, fcntl.F_GETFL, os.O_NONBLOCK) & os.O_NONBLOCK)
5892            if timeout == 0:
5893                # timeout == 0: means that getblocking() must be False.
5894                self.assertFalse(s.getblocking())
5895            else:
5896                # If timeout > 0, the socket will be in a "blocking" mode
5897                # from the standpoint of the Python API.  For Python socket
5898                # object, "blocking" means that operations like 'sock.recv()'
5899                # will block.  Internally, file descriptors for
5900                # "blocking" Python sockets *with timeouts* are in a
5901                # *non-blocking* mode, and 'sock.recv()' uses 'select()'
5902                # and handles EWOULDBLOCK/EAGAIN to enforce the timeout.
5903                self.assertTrue(s.getblocking())
5904        else:
5905            self.assertEqual(s.type, socket.SOCK_STREAM)
5906            self.assertEqual(s.gettimeout(), None)
5907            self.assertFalse(
5908                fcntl.fcntl(s, fcntl.F_GETFL, os.O_NONBLOCK) & os.O_NONBLOCK)
5909            self.assertTrue(s.getblocking())
5910
5911    @support.requires_linux_version(2, 6, 28)
5912    def test_SOCK_NONBLOCK(self):
5913        # a lot of it seems silly and redundant, but I wanted to test that
5914        # changing back and forth worked ok
5915        with socket.socket(socket.AF_INET,
5916                           socket.SOCK_STREAM | socket.SOCK_NONBLOCK) as s:
5917            self.checkNonblock(s)
5918            s.setblocking(True)
5919            self.checkNonblock(s, nonblock=False)
5920            s.setblocking(False)
5921            self.checkNonblock(s)
5922            s.settimeout(None)
5923            self.checkNonblock(s, nonblock=False)
5924            s.settimeout(2.0)
5925            self.checkNonblock(s, timeout=2.0)
5926            s.setblocking(True)
5927            self.checkNonblock(s, nonblock=False)
5928        # defaulttimeout
5929        t = socket.getdefaulttimeout()
5930        socket.setdefaulttimeout(0.0)
5931        with socket.socket() as s:
5932            self.checkNonblock(s)
5933        socket.setdefaulttimeout(None)
5934        with socket.socket() as s:
5935            self.checkNonblock(s, False)
5936        socket.setdefaulttimeout(2.0)
5937        with socket.socket() as s:
5938            self.checkNonblock(s, timeout=2.0)
5939        socket.setdefaulttimeout(None)
5940        with socket.socket() as s:
5941            self.checkNonblock(s, False)
5942        socket.setdefaulttimeout(t)
5943
5944
5945@unittest.skipUnless(os.name == "nt", "Windows specific")
5946@unittest.skipUnless(multiprocessing, "need multiprocessing")
5947class TestSocketSharing(SocketTCPTest):
5948    # This must be classmethod and not staticmethod or multiprocessing
5949    # won't be able to bootstrap it.
5950    @classmethod
5951    def remoteProcessServer(cls, q):
5952        # Recreate socket from shared data
5953        sdata = q.get()
5954        message = q.get()
5955
5956        s = socket.fromshare(sdata)
5957        s2, c = s.accept()
5958
5959        # Send the message
5960        s2.sendall(message)
5961        s2.close()
5962        s.close()
5963
5964    def testShare(self):
5965        # Transfer the listening server socket to another process
5966        # and service it from there.
5967
5968        # Create process:
5969        q = multiprocessing.Queue()
5970        p = multiprocessing.Process(target=self.remoteProcessServer, args=(q,))
5971        p.start()
5972
5973        # Get the shared socket data
5974        data = self.serv.share(p.pid)
5975
5976        # Pass the shared socket to the other process
5977        addr = self.serv.getsockname()
5978        self.serv.close()
5979        q.put(data)
5980
5981        # The data that the server will send us
5982        message = b"slapmahfro"
5983        q.put(message)
5984
5985        # Connect
5986        s = socket.create_connection(addr)
5987        #  listen for the data
5988        m = []
5989        while True:
5990            data = s.recv(100)
5991            if not data:
5992                break
5993            m.append(data)
5994        s.close()
5995        received = b"".join(m)
5996        self.assertEqual(received, message)
5997        p.join()
5998
5999    def testShareLength(self):
6000        data = self.serv.share(os.getpid())
6001        self.assertRaises(ValueError, socket.fromshare, data[:-1])
6002        self.assertRaises(ValueError, socket.fromshare, data+b"foo")
6003
6004    def compareSockets(self, org, other):
6005        # socket sharing is expected to work only for blocking socket
6006        # since the internal python timeout value isn't transferred.
6007        self.assertEqual(org.gettimeout(), None)
6008        self.assertEqual(org.gettimeout(), other.gettimeout())
6009
6010        self.assertEqual(org.family, other.family)
6011        self.assertEqual(org.type, other.type)
6012        # If the user specified "0" for proto, then
6013        # internally windows will have picked the correct value.
6014        # Python introspection on the socket however will still return
6015        # 0.  For the shared socket, the python value is recreated
6016        # from the actual value, so it may not compare correctly.
6017        if org.proto != 0:
6018            self.assertEqual(org.proto, other.proto)
6019
6020    def testShareLocal(self):
6021        data = self.serv.share(os.getpid())
6022        s = socket.fromshare(data)
6023        try:
6024            self.compareSockets(self.serv, s)
6025        finally:
6026            s.close()
6027
6028    def testTypes(self):
6029        families = [socket.AF_INET, socket.AF_INET6]
6030        types = [socket.SOCK_STREAM, socket.SOCK_DGRAM]
6031        for f in families:
6032            for t in types:
6033                try:
6034                    source = socket.socket(f, t)
6035                except OSError:
6036                    continue # This combination is not supported
6037                try:
6038                    data = source.share(os.getpid())
6039                    shared = socket.fromshare(data)
6040                    try:
6041                        self.compareSockets(source, shared)
6042                    finally:
6043                        shared.close()
6044                finally:
6045                    source.close()
6046
6047
6048class SendfileUsingSendTest(ThreadedTCPSocketTest):
6049    """
6050    Test the send() implementation of socket.sendfile().
6051    """
6052
6053    FILESIZE = (10 * 1024 * 1024)  # 10 MiB
6054    BUFSIZE = 8192
6055    FILEDATA = b""
6056    TIMEOUT = support.LOOPBACK_TIMEOUT
6057
6058    @classmethod
6059    def setUpClass(cls):
6060        def chunks(total, step):
6061            assert total >= step
6062            while total > step:
6063                yield step
6064                total -= step
6065            if total:
6066                yield total
6067
6068        chunk = b"".join([random.choice(string.ascii_letters).encode()
6069                          for i in range(cls.BUFSIZE)])
6070        with open(os_helper.TESTFN, 'wb') as f:
6071            for csize in chunks(cls.FILESIZE, cls.BUFSIZE):
6072                f.write(chunk)
6073        with open(os_helper.TESTFN, 'rb') as f:
6074            cls.FILEDATA = f.read()
6075            assert len(cls.FILEDATA) == cls.FILESIZE
6076
6077    @classmethod
6078    def tearDownClass(cls):
6079        os_helper.unlink(os_helper.TESTFN)
6080
6081    def accept_conn(self):
6082        self.serv.settimeout(support.LONG_TIMEOUT)
6083        conn, addr = self.serv.accept()
6084        conn.settimeout(self.TIMEOUT)
6085        self.addCleanup(conn.close)
6086        return conn
6087
6088    def recv_data(self, conn):
6089        received = []
6090        while True:
6091            chunk = conn.recv(self.BUFSIZE)
6092            if not chunk:
6093                break
6094            received.append(chunk)
6095        return b''.join(received)
6096
6097    def meth_from_sock(self, sock):
6098        # Depending on the mixin class being run return either send()
6099        # or sendfile() method implementation.
6100        return getattr(sock, "_sendfile_use_send")
6101
6102    # regular file
6103
6104    def _testRegularFile(self):
6105        address = self.serv.getsockname()
6106        file = open(os_helper.TESTFN, 'rb')
6107        with socket.create_connection(address) as sock, file as file:
6108            meth = self.meth_from_sock(sock)
6109            sent = meth(file)
6110            self.assertEqual(sent, self.FILESIZE)
6111            self.assertEqual(file.tell(), self.FILESIZE)
6112
6113    def testRegularFile(self):
6114        conn = self.accept_conn()
6115        data = self.recv_data(conn)
6116        self.assertEqual(len(data), self.FILESIZE)
6117        self.assertEqual(data, self.FILEDATA)
6118
6119    # non regular file
6120
6121    def _testNonRegularFile(self):
6122        address = self.serv.getsockname()
6123        file = io.BytesIO(self.FILEDATA)
6124        with socket.create_connection(address) as sock, file as file:
6125            sent = sock.sendfile(file)
6126            self.assertEqual(sent, self.FILESIZE)
6127            self.assertEqual(file.tell(), self.FILESIZE)
6128            self.assertRaises(socket._GiveupOnSendfile,
6129                              sock._sendfile_use_sendfile, file)
6130
6131    def testNonRegularFile(self):
6132        conn = self.accept_conn()
6133        data = self.recv_data(conn)
6134        self.assertEqual(len(data), self.FILESIZE)
6135        self.assertEqual(data, self.FILEDATA)
6136
6137    # empty file
6138
6139    def _testEmptyFileSend(self):
6140        address = self.serv.getsockname()
6141        filename = os_helper.TESTFN + "2"
6142        with open(filename, 'wb'):
6143            self.addCleanup(os_helper.unlink, filename)
6144        file = open(filename, 'rb')
6145        with socket.create_connection(address) as sock, file as file:
6146            meth = self.meth_from_sock(sock)
6147            sent = meth(file)
6148            self.assertEqual(sent, 0)
6149            self.assertEqual(file.tell(), 0)
6150
6151    def testEmptyFileSend(self):
6152        conn = self.accept_conn()
6153        data = self.recv_data(conn)
6154        self.assertEqual(data, b"")
6155
6156    # offset
6157
6158    def _testOffset(self):
6159        address = self.serv.getsockname()
6160        file = open(os_helper.TESTFN, 'rb')
6161        with socket.create_connection(address) as sock, file as file:
6162            meth = self.meth_from_sock(sock)
6163            sent = meth(file, offset=5000)
6164            self.assertEqual(sent, self.FILESIZE - 5000)
6165            self.assertEqual(file.tell(), self.FILESIZE)
6166
6167    def testOffset(self):
6168        conn = self.accept_conn()
6169        data = self.recv_data(conn)
6170        self.assertEqual(len(data), self.FILESIZE - 5000)
6171        self.assertEqual(data, self.FILEDATA[5000:])
6172
6173    # count
6174
6175    def _testCount(self):
6176        address = self.serv.getsockname()
6177        file = open(os_helper.TESTFN, 'rb')
6178        sock = socket.create_connection(address,
6179                                        timeout=support.LOOPBACK_TIMEOUT)
6180        with sock, file:
6181            count = 5000007
6182            meth = self.meth_from_sock(sock)
6183            sent = meth(file, count=count)
6184            self.assertEqual(sent, count)
6185            self.assertEqual(file.tell(), count)
6186
6187    def testCount(self):
6188        count = 5000007
6189        conn = self.accept_conn()
6190        data = self.recv_data(conn)
6191        self.assertEqual(len(data), count)
6192        self.assertEqual(data, self.FILEDATA[:count])
6193
6194    # count small
6195
6196    def _testCountSmall(self):
6197        address = self.serv.getsockname()
6198        file = open(os_helper.TESTFN, 'rb')
6199        sock = socket.create_connection(address,
6200                                        timeout=support.LOOPBACK_TIMEOUT)
6201        with sock, file:
6202            count = 1
6203            meth = self.meth_from_sock(sock)
6204            sent = meth(file, count=count)
6205            self.assertEqual(sent, count)
6206            self.assertEqual(file.tell(), count)
6207
6208    def testCountSmall(self):
6209        count = 1
6210        conn = self.accept_conn()
6211        data = self.recv_data(conn)
6212        self.assertEqual(len(data), count)
6213        self.assertEqual(data, self.FILEDATA[:count])
6214
6215    # count + offset
6216
6217    def _testCountWithOffset(self):
6218        address = self.serv.getsockname()
6219        file = open(os_helper.TESTFN, 'rb')
6220        with socket.create_connection(address, timeout=2) as sock, file as file:
6221            count = 100007
6222            meth = self.meth_from_sock(sock)
6223            sent = meth(file, offset=2007, count=count)
6224            self.assertEqual(sent, count)
6225            self.assertEqual(file.tell(), count + 2007)
6226
6227    def testCountWithOffset(self):
6228        count = 100007
6229        conn = self.accept_conn()
6230        data = self.recv_data(conn)
6231        self.assertEqual(len(data), count)
6232        self.assertEqual(data, self.FILEDATA[2007:count+2007])
6233
6234    # non blocking sockets are not supposed to work
6235
6236    def _testNonBlocking(self):
6237        address = self.serv.getsockname()
6238        file = open(os_helper.TESTFN, 'rb')
6239        with socket.create_connection(address) as sock, file as file:
6240            sock.setblocking(False)
6241            meth = self.meth_from_sock(sock)
6242            self.assertRaises(ValueError, meth, file)
6243            self.assertRaises(ValueError, sock.sendfile, file)
6244
6245    def testNonBlocking(self):
6246        conn = self.accept_conn()
6247        if conn.recv(8192):
6248            self.fail('was not supposed to receive any data')
6249
6250    # timeout (non-triggered)
6251
6252    def _testWithTimeout(self):
6253        address = self.serv.getsockname()
6254        file = open(os_helper.TESTFN, 'rb')
6255        sock = socket.create_connection(address,
6256                                        timeout=support.LOOPBACK_TIMEOUT)
6257        with sock, file:
6258            meth = self.meth_from_sock(sock)
6259            sent = meth(file)
6260            self.assertEqual(sent, self.FILESIZE)
6261
6262    def testWithTimeout(self):
6263        conn = self.accept_conn()
6264        data = self.recv_data(conn)
6265        self.assertEqual(len(data), self.FILESIZE)
6266        self.assertEqual(data, self.FILEDATA)
6267
6268    # timeout (triggered)
6269
6270    def _testWithTimeoutTriggeredSend(self):
6271        address = self.serv.getsockname()
6272        with open(os_helper.TESTFN, 'rb') as file:
6273            with socket.create_connection(address) as sock:
6274                sock.settimeout(0.01)
6275                meth = self.meth_from_sock(sock)
6276                self.assertRaises(TimeoutError, meth, file)
6277
6278    def testWithTimeoutTriggeredSend(self):
6279        conn = self.accept_conn()
6280        conn.recv(88192)
6281        # bpo-45212: the wait here needs to be longer than the client-side timeout (0.01s)
6282        time.sleep(1)
6283
6284    # errors
6285
6286    def _test_errors(self):
6287        pass
6288
6289    def test_errors(self):
6290        with open(os_helper.TESTFN, 'rb') as file:
6291            with socket.socket(type=socket.SOCK_DGRAM) as s:
6292                meth = self.meth_from_sock(s)
6293                self.assertRaisesRegex(
6294                    ValueError, "SOCK_STREAM", meth, file)
6295        with open(os_helper.TESTFN, encoding="utf-8") as file:
6296            with socket.socket() as s:
6297                meth = self.meth_from_sock(s)
6298                self.assertRaisesRegex(
6299                    ValueError, "binary mode", meth, file)
6300        with open(os_helper.TESTFN, 'rb') as file:
6301            with socket.socket() as s:
6302                meth = self.meth_from_sock(s)
6303                self.assertRaisesRegex(TypeError, "positive integer",
6304                                       meth, file, count='2')
6305                self.assertRaisesRegex(TypeError, "positive integer",
6306                                       meth, file, count=0.1)
6307                self.assertRaisesRegex(ValueError, "positive integer",
6308                                       meth, file, count=0)
6309                self.assertRaisesRegex(ValueError, "positive integer",
6310                                       meth, file, count=-1)
6311
6312
6313@unittest.skipUnless(hasattr(os, "sendfile"),
6314                     'os.sendfile() required for this test.')
6315class SendfileUsingSendfileTest(SendfileUsingSendTest):
6316    """
6317    Test the sendfile() implementation of socket.sendfile().
6318    """
6319    def meth_from_sock(self, sock):
6320        return getattr(sock, "_sendfile_use_sendfile")
6321
6322
6323@unittest.skipUnless(HAVE_SOCKET_ALG, 'AF_ALG required')
6324class LinuxKernelCryptoAPI(unittest.TestCase):
6325    # tests for AF_ALG
6326    def create_alg(self, typ, name):
6327        sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
6328        try:
6329            sock.bind((typ, name))
6330        except FileNotFoundError as e:
6331            # type / algorithm is not available
6332            sock.close()
6333            raise unittest.SkipTest(str(e), typ, name)
6334        else:
6335            return sock
6336
6337    # bpo-31705: On kernel older than 4.5, sendto() failed with ENOKEY,
6338    # at least on ppc64le architecture
6339    @support.requires_linux_version(4, 5)
6340    def test_sha256(self):
6341        expected = bytes.fromhex("ba7816bf8f01cfea414140de5dae2223b00361a396"
6342                                 "177a9cb410ff61f20015ad")
6343        with self.create_alg('hash', 'sha256') as algo:
6344            op, _ = algo.accept()
6345            with op:
6346                op.sendall(b"abc")
6347                self.assertEqual(op.recv(512), expected)
6348
6349            op, _ = algo.accept()
6350            with op:
6351                op.send(b'a', socket.MSG_MORE)
6352                op.send(b'b', socket.MSG_MORE)
6353                op.send(b'c', socket.MSG_MORE)
6354                op.send(b'')
6355                self.assertEqual(op.recv(512), expected)
6356
6357    def test_hmac_sha1(self):
6358        expected = bytes.fromhex("effcdf6ae5eb2fa2d27416d5f184df9c259a7c79")
6359        with self.create_alg('hash', 'hmac(sha1)') as algo:
6360            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, b"Jefe")
6361            op, _ = algo.accept()
6362            with op:
6363                op.sendall(b"what do ya want for nothing?")
6364                self.assertEqual(op.recv(512), expected)
6365
6366    # Although it should work with 3.19 and newer the test blocks on
6367    # Ubuntu 15.10 with Kernel 4.2.0-19.
6368    @support.requires_linux_version(4, 3)
6369    def test_aes_cbc(self):
6370        key = bytes.fromhex('06a9214036b8a15b512e03d534120006')
6371        iv = bytes.fromhex('3dafba429d9eb430b422da802c9fac41')
6372        msg = b"Single block msg"
6373        ciphertext = bytes.fromhex('e353779c1079aeb82708942dbe77181a')
6374        msglen = len(msg)
6375        with self.create_alg('skcipher', 'cbc(aes)') as algo:
6376            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key)
6377            op, _ = algo.accept()
6378            with op:
6379                op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
6380                                 flags=socket.MSG_MORE)
6381                op.sendall(msg)
6382                self.assertEqual(op.recv(msglen), ciphertext)
6383
6384            op, _ = algo.accept()
6385            with op:
6386                op.sendmsg_afalg([ciphertext],
6387                                 op=socket.ALG_OP_DECRYPT, iv=iv)
6388                self.assertEqual(op.recv(msglen), msg)
6389
6390            # long message
6391            multiplier = 1024
6392            longmsg = [msg] * multiplier
6393            op, _ = algo.accept()
6394            with op:
6395                op.sendmsg_afalg(longmsg,
6396                                 op=socket.ALG_OP_ENCRYPT, iv=iv)
6397                enc = op.recv(msglen * multiplier)
6398            self.assertEqual(len(enc), msglen * multiplier)
6399            self.assertEqual(enc[:msglen], ciphertext)
6400
6401            op, _ = algo.accept()
6402            with op:
6403                op.sendmsg_afalg([enc],
6404                                 op=socket.ALG_OP_DECRYPT, iv=iv)
6405                dec = op.recv(msglen * multiplier)
6406            self.assertEqual(len(dec), msglen * multiplier)
6407            self.assertEqual(dec, msg * multiplier)
6408
6409    @support.requires_linux_version(4, 9)  # see issue29324
6410    def test_aead_aes_gcm(self):
6411        key = bytes.fromhex('c939cc13397c1d37de6ae0e1cb7c423c')
6412        iv = bytes.fromhex('b3d8cc017cbb89b39e0f67e2')
6413        plain = bytes.fromhex('c3b3c41f113a31b73d9a5cd432103069')
6414        assoc = bytes.fromhex('24825602bd12a984e0092d3e448eda5f')
6415        expected_ct = bytes.fromhex('93fe7d9e9bfd10348a5606e5cafa7354')
6416        expected_tag = bytes.fromhex('0032a1dc85f1c9786925a2e71d8272dd')
6417
6418        taglen = len(expected_tag)
6419        assoclen = len(assoc)
6420
6421        with self.create_alg('aead', 'gcm(aes)') as algo:
6422            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key)
6423            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_AEAD_AUTHSIZE,
6424                            None, taglen)
6425
6426            # send assoc, plain and tag buffer in separate steps
6427            op, _ = algo.accept()
6428            with op:
6429                op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
6430                                 assoclen=assoclen, flags=socket.MSG_MORE)
6431                op.sendall(assoc, socket.MSG_MORE)
6432                op.sendall(plain)
6433                res = op.recv(assoclen + len(plain) + taglen)
6434                self.assertEqual(expected_ct, res[assoclen:-taglen])
6435                self.assertEqual(expected_tag, res[-taglen:])
6436
6437            # now with msg
6438            op, _ = algo.accept()
6439            with op:
6440                msg = assoc + plain
6441                op.sendmsg_afalg([msg], op=socket.ALG_OP_ENCRYPT, iv=iv,
6442                                 assoclen=assoclen)
6443                res = op.recv(assoclen + len(plain) + taglen)
6444                self.assertEqual(expected_ct, res[assoclen:-taglen])
6445                self.assertEqual(expected_tag, res[-taglen:])
6446
6447            # create anc data manually
6448            pack_uint32 = struct.Struct('I').pack
6449            op, _ = algo.accept()
6450            with op:
6451                msg = assoc + plain
6452                op.sendmsg(
6453                    [msg],
6454                    ([socket.SOL_ALG, socket.ALG_SET_OP, pack_uint32(socket.ALG_OP_ENCRYPT)],
6455                     [socket.SOL_ALG, socket.ALG_SET_IV, pack_uint32(len(iv)) + iv],
6456                     [socket.SOL_ALG, socket.ALG_SET_AEAD_ASSOCLEN, pack_uint32(assoclen)],
6457                    )
6458                )
6459                res = op.recv(len(msg) + taglen)
6460                self.assertEqual(expected_ct, res[assoclen:-taglen])
6461                self.assertEqual(expected_tag, res[-taglen:])
6462
6463            # decrypt and verify
6464            op, _ = algo.accept()
6465            with op:
6466                msg = assoc + expected_ct + expected_tag
6467                op.sendmsg_afalg([msg], op=socket.ALG_OP_DECRYPT, iv=iv,
6468                                 assoclen=assoclen)
6469                res = op.recv(len(msg) - taglen)
6470                self.assertEqual(plain, res[assoclen:])
6471
6472    @support.requires_linux_version(4, 3)  # see test_aes_cbc
6473    def test_drbg_pr_sha256(self):
6474        # deterministic random bit generator, prediction resistance, sha256
6475        with self.create_alg('rng', 'drbg_pr_sha256') as algo:
6476            extra_seed = os.urandom(32)
6477            algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, extra_seed)
6478            op, _ = algo.accept()
6479            with op:
6480                rn = op.recv(32)
6481                self.assertEqual(len(rn), 32)
6482
6483    def test_sendmsg_afalg_args(self):
6484        sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
6485        with sock:
6486            with self.assertRaises(TypeError):
6487                sock.sendmsg_afalg()
6488
6489            with self.assertRaises(TypeError):
6490                sock.sendmsg_afalg(op=None)
6491
6492            with self.assertRaises(TypeError):
6493                sock.sendmsg_afalg(1)
6494
6495            with self.assertRaises(TypeError):
6496                sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=None)
6497
6498            with self.assertRaises(TypeError):
6499                sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=-1)
6500
6501    def test_length_restriction(self):
6502        # bpo-35050, off-by-one error in length check
6503        sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
6504        self.addCleanup(sock.close)
6505
6506        # salg_type[14]
6507        with self.assertRaises(FileNotFoundError):
6508            sock.bind(("t" * 13, "name"))
6509        with self.assertRaisesRegex(ValueError, "type too long"):
6510            sock.bind(("t" * 14, "name"))
6511
6512        # salg_name[64]
6513        with self.assertRaises(FileNotFoundError):
6514            sock.bind(("type", "n" * 63))
6515        with self.assertRaisesRegex(ValueError, "name too long"):
6516            sock.bind(("type", "n" * 64))
6517
6518
6519@unittest.skipUnless(sys.platform == 'darwin', 'macOS specific test')
6520class TestMacOSTCPFlags(unittest.TestCase):
6521    def test_tcp_keepalive(self):
6522        self.assertTrue(socket.TCP_KEEPALIVE)
6523
6524
6525@unittest.skipUnless(sys.platform.startswith("win"), "requires Windows")
6526class TestMSWindowsTCPFlags(unittest.TestCase):
6527    knownTCPFlags = {
6528                       # available since long time ago
6529                       'TCP_MAXSEG',
6530                       'TCP_NODELAY',
6531                       # available starting with Windows 10 1607
6532                       'TCP_FASTOPEN',
6533                       # available starting with Windows 10 1703
6534                       'TCP_KEEPCNT',
6535                       # available starting with Windows 10 1709
6536                       'TCP_KEEPIDLE',
6537                       'TCP_KEEPINTVL'
6538                       }
6539
6540    def test_new_tcp_flags(self):
6541        provided = [s for s in dir(socket) if s.startswith('TCP')]
6542        unknown = [s for s in provided if s not in self.knownTCPFlags]
6543
6544        self.assertEqual([], unknown,
6545            "New TCP flags were discovered. See bpo-32394 for more information")
6546
6547
6548class CreateServerTest(unittest.TestCase):
6549
6550    def test_address(self):
6551        port = socket_helper.find_unused_port()
6552        with socket.create_server(("127.0.0.1", port)) as sock:
6553            self.assertEqual(sock.getsockname()[0], "127.0.0.1")
6554            self.assertEqual(sock.getsockname()[1], port)
6555        if socket_helper.IPV6_ENABLED:
6556            with socket.create_server(("::1", port),
6557                                      family=socket.AF_INET6) as sock:
6558                self.assertEqual(sock.getsockname()[0], "::1")
6559                self.assertEqual(sock.getsockname()[1], port)
6560
6561    def test_family_and_type(self):
6562        with socket.create_server(("127.0.0.1", 0)) as sock:
6563            self.assertEqual(sock.family, socket.AF_INET)
6564            self.assertEqual(sock.type, socket.SOCK_STREAM)
6565        if socket_helper.IPV6_ENABLED:
6566            with socket.create_server(("::1", 0), family=socket.AF_INET6) as s:
6567                self.assertEqual(s.family, socket.AF_INET6)
6568                self.assertEqual(sock.type, socket.SOCK_STREAM)
6569
6570    def test_reuse_port(self):
6571        if not hasattr(socket, "SO_REUSEPORT"):
6572            with self.assertRaises(ValueError):
6573                socket.create_server(("localhost", 0), reuse_port=True)
6574        else:
6575            with socket.create_server(("localhost", 0)) as sock:
6576                opt = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT)
6577                self.assertEqual(opt, 0)
6578            with socket.create_server(("localhost", 0), reuse_port=True) as sock:
6579                opt = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT)
6580                self.assertNotEqual(opt, 0)
6581
6582    @unittest.skipIf(not hasattr(_socket, 'IPPROTO_IPV6') or
6583                     not hasattr(_socket, 'IPV6_V6ONLY'),
6584                     "IPV6_V6ONLY option not supported")
6585    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
6586    def test_ipv6_only_default(self):
6587        with socket.create_server(("::1", 0), family=socket.AF_INET6) as sock:
6588            assert sock.getsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY)
6589
6590    @unittest.skipIf(not socket.has_dualstack_ipv6(),
6591                     "dualstack_ipv6 not supported")
6592    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
6593    def test_dualstack_ipv6_family(self):
6594        with socket.create_server(("::1", 0), family=socket.AF_INET6,
6595                                  dualstack_ipv6=True) as sock:
6596            self.assertEqual(sock.family, socket.AF_INET6)
6597
6598
6599class CreateServerFunctionalTest(unittest.TestCase):
6600    timeout = support.LOOPBACK_TIMEOUT
6601
6602    def echo_server(self, sock):
6603        def run(sock):
6604            with sock:
6605                conn, _ = sock.accept()
6606                with conn:
6607                    event.wait(self.timeout)
6608                    msg = conn.recv(1024)
6609                    if not msg:
6610                        return
6611                    conn.sendall(msg)
6612
6613        event = threading.Event()
6614        sock.settimeout(self.timeout)
6615        thread = threading.Thread(target=run, args=(sock, ))
6616        thread.start()
6617        self.addCleanup(thread.join, self.timeout)
6618        event.set()
6619
6620    def echo_client(self, addr, family):
6621        with socket.socket(family=family) as sock:
6622            sock.settimeout(self.timeout)
6623            sock.connect(addr)
6624            sock.sendall(b'foo')
6625            self.assertEqual(sock.recv(1024), b'foo')
6626
6627    def test_tcp4(self):
6628        port = socket_helper.find_unused_port()
6629        with socket.create_server(("", port)) as sock:
6630            self.echo_server(sock)
6631            self.echo_client(("127.0.0.1", port), socket.AF_INET)
6632
6633    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
6634    def test_tcp6(self):
6635        port = socket_helper.find_unused_port()
6636        with socket.create_server(("", port),
6637                                  family=socket.AF_INET6) as sock:
6638            self.echo_server(sock)
6639            self.echo_client(("::1", port), socket.AF_INET6)
6640
6641    # --- dual stack tests
6642
6643    @unittest.skipIf(not socket.has_dualstack_ipv6(),
6644                     "dualstack_ipv6 not supported")
6645    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
6646    def test_dual_stack_client_v4(self):
6647        port = socket_helper.find_unused_port()
6648        with socket.create_server(("", port), family=socket.AF_INET6,
6649                                  dualstack_ipv6=True) as sock:
6650            self.echo_server(sock)
6651            self.echo_client(("127.0.0.1", port), socket.AF_INET)
6652
6653    @unittest.skipIf(not socket.has_dualstack_ipv6(),
6654                     "dualstack_ipv6 not supported")
6655    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 required for this test')
6656    def test_dual_stack_client_v6(self):
6657        port = socket_helper.find_unused_port()
6658        with socket.create_server(("", port), family=socket.AF_INET6,
6659                                  dualstack_ipv6=True) as sock:
6660            self.echo_server(sock)
6661            self.echo_client(("::1", port), socket.AF_INET6)
6662
6663@requireAttrs(socket, "send_fds")
6664@requireAttrs(socket, "recv_fds")
6665@requireAttrs(socket, "AF_UNIX")
6666class SendRecvFdsTests(unittest.TestCase):
6667    def testSendAndRecvFds(self):
6668        def close_pipes(pipes):
6669            for fd1, fd2 in pipes:
6670                os.close(fd1)
6671                os.close(fd2)
6672
6673        def close_fds(fds):
6674            for fd in fds:
6675                os.close(fd)
6676
6677        # send 10 file descriptors
6678        pipes = [os.pipe() for _ in range(10)]
6679        self.addCleanup(close_pipes, pipes)
6680        fds = [rfd for rfd, wfd in pipes]
6681
6682        # use a UNIX socket pair to exchange file descriptors locally
6683        sock1, sock2 = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
6684        with sock1, sock2:
6685            socket.send_fds(sock1, [MSG], fds)
6686            # request more data and file descriptors than expected
6687            msg, fds2, flags, addr = socket.recv_fds(sock2, len(MSG) * 2, len(fds) * 2)
6688            self.addCleanup(close_fds, fds2)
6689
6690        self.assertEqual(msg, MSG)
6691        self.assertEqual(len(fds2), len(fds))
6692        self.assertEqual(flags, 0)
6693        # don't test addr
6694
6695        # test that file descriptors are connected
6696        for index, fds in enumerate(pipes):
6697            rfd, wfd = fds
6698            os.write(wfd, str(index).encode())
6699
6700        for index, rfd in enumerate(fds2):
6701            data = os.read(rfd, 100)
6702            self.assertEqual(data,  str(index).encode())
6703
6704
6705def setUpModule():
6706    thread_info = threading_helper.threading_setup()
6707    unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
6708
6709
6710if __name__ == "__main__":
6711    unittest.main()
6712