1import socket
2
3
4class TransportSocket:
5
6    """A socket-like wrapper for exposing real transport sockets.
7
8    These objects can be safely returned by APIs like
9    `transport.get_extra_info('socket')`.  All potentially disruptive
10    operations (like "socket.close()") are banned.
11    """
12
13    __slots__ = ('_sock',)
14
15    def __init__(self, sock: socket.socket):
16        self._sock = sock
17
18    @property
19    def family(self):
20        return self._sock.family
21
22    @property
23    def type(self):
24        return self._sock.type
25
26    @property
27    def proto(self):
28        return self._sock.proto
29
30    def __repr__(self):
31        s = (
32            f"<asyncio.TransportSocket fd={self.fileno()}, "
33            f"family={self.family!s}, type={self.type!s}, "
34            f"proto={self.proto}"
35        )
36
37        if self.fileno() != -1:
38            try:
39                laddr = self.getsockname()
40                if laddr:
41                    s = f"{s}, laddr={laddr}"
42            except socket.error:
43                pass
44            try:
45                raddr = self.getpeername()
46                if raddr:
47                    s = f"{s}, raddr={raddr}"
48            except socket.error:
49                pass
50
51        return f"{s}>"
52
53    def __getstate__(self):
54        raise TypeError("Cannot serialize asyncio.TransportSocket object")
55
56    def fileno(self):
57        return self._sock.fileno()
58
59    def dup(self):
60        return self._sock.dup()
61
62    def get_inheritable(self):
63        return self._sock.get_inheritable()
64
65    def shutdown(self, how):
66        # asyncio doesn't currently provide a high-level transport API
67        # to shutdown the connection.
68        self._sock.shutdown(how)
69
70    def getsockopt(self, *args, **kwargs):
71        return self._sock.getsockopt(*args, **kwargs)
72
73    def setsockopt(self, *args, **kwargs):
74        self._sock.setsockopt(*args, **kwargs)
75
76    def getpeername(self):
77        return self._sock.getpeername()
78
79    def getsockname(self):
80        return self._sock.getsockname()
81
82    def getsockbyname(self):
83        return self._sock.getsockbyname()
84
85    def settimeout(self, value):
86        if value == 0:
87            return
88        raise ValueError(
89            'settimeout(): only 0 timeout is allowed on transport sockets')
90
91    def gettimeout(self):
92        return 0
93
94    def setblocking(self, flag):
95        if not flag:
96            return
97        raise ValueError(
98            'setblocking(): transport sockets cannot be blocking')
99