xref: /aosp_15_r20/external/angle/third_party/logdog/logdog/stream.py (revision 8975f5c5ed3d1c378011245431ada316dfb6f244)
1# Copyright 2016 The LUCI Authors. All rights reserved.
2# Use of this source code is governed under the Apache License, Version 2.0
3# that can be found in the LICENSE file.
4
5import collections
6import contextlib
7import json
8import os
9import posixpath
10import socket
11import sys
12import threading
13import time
14
15from . import streamname, varint
16
17
18if sys.platform == "win32":
19    from ctypes import GetLastError
20
21_PY2 = sys.version_info[0] == 2
22_MAPPING = collections.Mapping if _PY2 else collections.abc.Mapping
23
24_StreamParamsBase = collections.namedtuple('_StreamParamsBase',
25                                           ('name', 'type', 'content_type', 'tags'))
26
27
28# Magic number at the beginning of a Butler stream
29#
30# See "ProtocolFrameHeaderMagic" in:
31# <luci-go>/logdog/client/butlerlib/streamproto
32BUTLER_MAGIC = b'BTLR1\x1e'
33
34
35class StreamParams(_StreamParamsBase):
36    """Defines the set of parameters to apply to a new stream."""
37
38    # A text content stream.
39    TEXT = 'text'
40    # A binary content stream.
41    BINARY = 'binary'
42    # A datagram content stream.
43    DATAGRAM = 'datagram'
44
45    @classmethod
46    def make(cls, **kwargs):
47        """Returns (StreamParams): A new StreamParams instance with supplied values.
48
49    Any parameter that isn't supplied will be set to None.
50
51    Args:
52      kwargs (dict): Named parameters to apply.
53    """
54        return cls(**{f: kwargs.get(f) for f in cls._fields})
55
56    def validate(self):
57        """Raises (ValueError): if the parameters are not valid."""
58        streamname.validate_stream_name(self.name)
59
60        if self.type not in (self.TEXT, self.BINARY, self.DATAGRAM):
61            raise ValueError('Invalid type (%s)' % (self.type,))
62
63        if self.tags is not None:
64            if not isinstance(self.tags, _MAPPING):
65                raise ValueError('Invalid tags type (%s)' % (self.tags,))
66            for k, v in self.tags.items():
67                streamname.validate_tag(k, v)
68
69    def to_json(self):
70        """Returns (str): The JSON representation of the StreamParams.
71
72    Converts stream parameters to JSON for Butler consumption.
73
74    Raises:
75      ValueError: if these parameters are not valid.
76    """
77        self.validate()
78
79        obj = {
80            'name': self.name,
81            'type': self.type,
82        }
83
84        def _maybe_add(key, value):
85            if value is not None:
86                obj[key] = value
87
88        _maybe_add('contentType', self.content_type)
89        _maybe_add('tags', self.tags)
90
91        # Note that "dumps' will dump UTF-8 by default, which is what Butler wants.
92        return json.dumps(obj, sort_keys=True, ensure_ascii=True, indent=None)
93
94
95class StreamProtocolRegistry(object):
96    """Registry of streamserver URI protocols and their client classes.
97  """
98
99    def __init__(self):
100        self._registry = {}
101
102    def register_protocol(self, protocol, client_cls):
103        assert issubclass(client_cls, StreamClient)
104        if self._registry.get(protocol) is not None:
105            raise KeyError('Duplicate protocol registered.')
106        self._registry[protocol] = client_cls
107
108    def create(self, uri, **kwargs):
109        """Returns (StreamClient): A stream client for the specified URI.
110
111    This uses the default StreamProtocolRegistry to instantiate a StreamClient
112    for the specified URI.
113
114    Args:
115      uri (str): The streamserver URI.
116      kwargs: keyword arguments to forward to the stream. See
117          StreamClient.__init__.
118
119    Raises:
120      ValueError: if the supplied URI references an invalid or improperly
121          configured streamserver.
122    """
123        uri = uri.split(':', 1)
124        if len(uri) != 2:
125            raise ValueError('Invalid stream server URI [%s]' % (uri,))
126        protocol, value = uri
127
128        client_cls = self._registry.get(protocol)
129        if not client_cls:
130            raise ValueError('Unknown stream client protocol (%s)' % (protocol,))
131        return client_cls._create(value, **kwargs)
132
133
134# Default (global) registry.
135_default_registry = StreamProtocolRegistry()
136create = _default_registry.create
137
138
139class StreamClient(object):
140    """Abstract base class for a streamserver client.
141  """
142
143    class _StreamBase(object):
144        """ABC for StreamClient streams."""
145
146        def __init__(self, stream_client, params):
147            self._stream_client = stream_client
148            self._params = params
149
150        @property
151        def params(self):
152            """Returns (StreamParams): The stream parameters."""
153            return self._params
154
155        @property
156        def path(self):
157            """Returns (streamname.StreamPath): The stream path.
158
159      Raises:
160        ValueError: if the stream path is invalid, or if the stream prefix is
161            not defined in the client.
162      """
163            return self._stream_client.get_stream_path(self._params.name)
164
165        def get_viewer_url(self):
166            """Returns (str): The viewer URL for this stream.
167
168      Raises:
169        KeyError: if information needed to construct the URL is missing.
170        ValueError: if the stream prefix or name do not form a valid stream
171            path.
172      """
173            return self._stream_client.get_viewer_url(self._params.name)
174
175    class _BasicStream(_StreamBase):
176        """Wraps a basic file descriptor, offering "write" and "close"."""
177
178        def __init__(self, stream_client, params, fd):
179            super(StreamClient._BasicStream, self).__init__(stream_client, params)
180            self._fd = fd
181
182        @property
183        def fd(self):
184            return self._fd
185
186        def fileno(self):
187            return self._fd.fileno()
188
189        def write(self, data):
190            return self._fd.write(data)
191
192        def close(self):
193            return self._fd.close()
194
195    class _TextStream(_BasicStream):
196        """Extends _BasicStream, ensuring data written is UTF-8 text."""
197
198        def __init__(self, stream_client, params, fd):
199            super(StreamClient._TextStream, self).__init__(stream_client, params, fd)
200            self._fd = fd
201
202        def write(self, data):
203            if _PY2 and isinstance(data, str):
204                # byte string is unfortunately accepted in py2 because of
205                # undifferentiated usage of `str` and `unicode` but it should be
206                # discontinued in py3. User should switch to binary stream instead
207                # if there's a need to write bytes.
208                return self._fd.write(data)
209            elif _PY2 and isinstance(data, unicode):
210                return self._fd.write(data.encode('utf-8'))
211            elif not _PY2 and isinstance(data, str):
212                return self._fd.write(data.encode('utf-8'))
213            else:
214                raise ValueError('expect str, got %r that is type %s' % (
215                    data,
216                    type(data),
217                ))
218
219    class _DatagramStream(_StreamBase):
220        """Wraps a stream object to write length-prefixed datagrams."""
221
222        def __init__(self, stream_client, params, fd):
223            super(StreamClient._DatagramStream, self).__init__(stream_client, params)
224            self._fd = fd
225
226        def send(self, data):
227            varint.write_uvarint(self._fd, len(data))
228            self._fd.write(data)
229
230        def close(self):
231            return self._fd.close()
232
233    def __init__(self, project=None, prefix=None, coordinator_host=None, namespace=''):
234        """Constructs a new base StreamClient instance.
235
236    Args:
237      project (str or None): If not None, the name of the log stream project.
238      prefix (str or None): If not None, the log stream session prefix.
239      coordinator_host (str or None): If not None, the name of the Coordinator
240          host that this stream client is bound to. This will be used to
241          construct viewer URLs for generated streams.
242      namespace (str): The prefix to apply to all streams opened by this client.
243    """
244        self._project = project
245        self._prefix = prefix
246        self._coordinator_host = coordinator_host
247        self._namespace = namespace
248
249        self._name_lock = threading.Lock()
250        self._names = set()
251
252    @property
253    def project(self):
254        """Returns (str or None): The stream project, or None if not configured."""
255        return self._project
256
257    @property
258    def prefix(self):
259        """Returns (str or None): The stream prefix, or None if not configured."""
260        return self._prefix
261
262    @property
263    def coordinator_host(self):
264        """Returns (str or None): The coordinator host, or None if not configured.
265    """
266        return self._coordinator_host
267
268    @property
269    def namespace(self):
270        """Returns (str): The namespace for all streams opened by this client.
271    Empty if not configured.
272    """
273        return self._namespace
274
275    def get_stream_path(self, name):
276        """Returns (streamname.StreamPath): The stream path.
277
278    Args:
279      name (str): The name of the stream.
280
281    Raises:
282      KeyError: if information needed to construct the path is missing.
283      ValueError: if the stream path is invalid, or if the stream prefix is
284          not defined in the client.
285    """
286        if not self._prefix:
287            raise KeyError('Stream prefix is not configured')
288        return streamname.StreamPath.make(self._prefix, name)
289
290    def get_viewer_url(self, name):
291        """Returns (str): The LogDog viewer URL for the named stream.
292
293    Args:
294      name (str): The name of the stream. This can also be a query glob.
295
296    Raises:
297      KeyError: if information needed to construct the URL is missing.
298      ValueError: if the stream prefix or name do not form a valid stream
299          path.
300    """
301        if not self._coordinator_host:
302            raise KeyError('Coordinator host is not configured')
303        if not self._project:
304            raise KeyError('Stream project is not configured')
305
306        return streamname.get_logdog_viewer_url(self._coordinator_host, self._project,
307                                                self.get_stream_path(name))
308
309    def _register_new_stream(self, name):
310        """Registers a new stream name.
311
312    The Butler will internally reject any duplicate stream names. However, there
313    isn't really feedback when this happens except a closed stream client. This
314    is a client-side check to provide a more user-friendly experience in the
315    event that a user attempts to register a duplicate stream name.
316
317    Note that this is imperfect, as something else could register stream names
318    with the same Butler instance and this library has no means of tracking.
319    This is a best-effort experience, not a reliable check.
320
321    Args:
322      name (str): The name of the stream.
323
324    Raises:
325      ValueError if the stream name has already been registered.
326    """
327        with self._name_lock:
328            if name in self._names:
329                raise ValueError("Duplicate stream name [%s]" % (name,))
330            self._names.add(name)
331
332    @classmethod
333    def _create(cls, value, **kwargs):
334        """Returns (StreamClient): A new stream client instance.
335
336    Validates the streamserver parameters and creates a new StreamClient
337    instance that connects to them.
338
339    Implementing classes must override this.
340    """
341        raise NotImplementedError()
342
343    def _connect_raw(self):
344        """Returns (file): A new file-like stream.
345
346    Creates a new raw connection to the streamserver. This connection MUST not
347    have any data written to it past initialization (if needed) when it has been
348    returned.
349
350    The file-like object must implement `write`, `fileno`, `flush`, and `close`.
351
352    Implementing classes must override this.
353    """
354        raise NotImplementedError()
355
356    def new_connection(self, params):
357        """Returns (file): A new configured stream.
358
359    The returned object implements (minimally) `write` and `close`.
360
361    Creates a new LogDog stream with the specified parameters.
362
363    Args:
364      params (StreamParams): The parameters to use with the new connection.
365
366    Raises:
367      ValueError if the stream name has already been used, or if the parameters
368      are not valid.
369    """
370        self._register_new_stream(params.name)
371        params_bytes = params.to_json().encode('utf-8')
372
373        fobj = self._connect_raw()
374        fobj.write(BUTLER_MAGIC)
375        varint.write_uvarint(fobj, len(params_bytes))
376        fobj.write(params_bytes)
377        return fobj
378
379    @contextlib.contextmanager
380    def text(self, name, **kwargs):
381        """Context manager to create, use, and teardown a TEXT stream.
382
383    This context manager creates a new butler TEXT stream with the specified
384    parameters, yields it, and closes it on teardown.
385
386    Args:
387      name (str): the LogDog name of the stream.
388      kwargs (dict): Log stream parameters. These may be any keyword arguments
389          accepted by `open_text`.
390
391    Returns (file): A file-like object to a Butler UTF-8 text stream supporting
392        `write`.
393    """
394        fobj = None
395        try:
396            fobj = self.open_text(name, **kwargs)
397            yield fobj
398        finally:
399            if fobj is not None:
400                fobj.close()
401
402    def open_text(self, name, content_type=None, tags=None):
403        """Returns (file): A file-like object for a single text stream.
404
405    This creates a new butler TEXT stream with the specified parameters.
406
407    Args:
408      name (str): the LogDog name of the stream.
409      content_type (str): The optional content type of the stream. If None, a
410          default content type will be chosen by the Butler.
411      tags (dict): An optional key/value dictionary pair of LogDog stream tags.
412
413    Returns (file): A file-like object to a Butler text stream. This object can
414        have UTF-8 text content written to it with its `write` method, and must
415        be closed when finished using its `close` method.
416    """
417        params = StreamParams.make(
418            name=posixpath.join(self._namespace, name),
419            type=StreamParams.TEXT,
420            content_type=content_type,
421            tags=tags)
422        return self._TextStream(self, params, self.new_connection(params))
423
424    @contextlib.contextmanager
425    def binary(self, name, **kwargs):
426        """Context manager to create, use, and teardown a BINARY stream.
427
428    This context manager creates a new butler BINARY stream with the specified
429    parameters, yields it, and closes it on teardown.
430
431    Args:
432      name (str): the LogDog name of the stream.
433      kwargs (dict): Log stream parameters. These may be any keyword arguments
434          accepted by `open_binary`.
435
436    Returns (file): A file-like object to a Butler binary stream supporting
437        `write`.
438    """
439        fobj = None
440        try:
441            fobj = self.open_binary(name, **kwargs)
442            yield fobj
443        finally:
444            if fobj is not None:
445                fobj.close()
446
447    def open_binary(self, name, content_type=None, tags=None):
448        """Returns (file): A file-like object for a single binary stream.
449
450    This creates a new butler BINARY stream with the specified parameters.
451
452    Args:
453      name (str): the LogDog name of the stream.
454      content_type (str): The optional content type of the stream. If None, a
455          default content type will be chosen by the Butler.
456      tags (dict): An optional key/value dictionary pair of LogDog stream tags.
457
458    Returns (file): A file-like object to a Butler binary stream. This object
459        can have UTF-8 content written to it with its `write` method, and must
460        be closed when finished using its `close` method.
461    """
462        params = StreamParams.make(
463            name=posixpath.join(self._namespace, name),
464            type=StreamParams.BINARY,
465            content_type=content_type,
466            tags=tags)
467        return self._BasicStream(self, params, self.new_connection(params))
468
469    @contextlib.contextmanager
470    def datagram(self, name, **kwargs):
471        """Context manager to create, use, and teardown a DATAGRAM stream.
472
473    This context manager creates a new butler DATAAGRAM stream with the
474    specified parameters, yields it, and closes it on teardown.
475
476    Args:
477      name (str): the LogDog name of the stream.
478      kwargs (dict): Log stream parameters. These may be any keyword arguments
479          accepted by `open_datagram`.
480
481    Returns (_DatagramStream): A datagram stream object. Datagrams can be
482        written to it using its `send` method.
483    """
484        fobj = None
485        try:
486            fobj = self.open_datagram(name, **kwargs)
487            yield fobj
488        finally:
489            if fobj is not None:
490                fobj.close()
491
492    def open_datagram(self, name, content_type=None, tags=None):
493        """Creates a new butler DATAGRAM stream with the specified parameters.
494
495    Args:
496      name (str): the LogDog name of the stream.
497      content_type (str): The optional content type of the stream. If None, a
498          default content type will be chosen by the Butler.
499      tags (dict): An optional key/value dictionary pair of LogDog stream tags.
500
501    Returns (_DatagramStream): A datagram stream object. Datagrams can be
502        written to it using its `send` method. This object must be closed when
503        finished by using its `close` method.
504    """
505        params = StreamParams.make(
506            name=posixpath.join(self._namespace, name),
507            type=StreamParams.DATAGRAM,
508            content_type=content_type,
509            tags=tags)
510        return self._DatagramStream(self, params, self.new_connection(params))
511
512
513class _NamedPipeStreamClient(StreamClient):
514    """A StreamClient implementation that connects to a Windows named pipe.
515  """
516
517    def __init__(self, name, **kwargs):
518        r"""Initializes a new Windows named pipe stream client.
519
520    Args:
521      name (str): The name of the Windows named pipe to use (e.g., "\\.\name")
522    """
523        super(_NamedPipeStreamClient, self).__init__(**kwargs)
524        self._name = '\\\\.\\pipe\\' + name
525
526    @classmethod
527    def _create(cls, value, **kwargs):
528        return cls(value, **kwargs)
529
530    ERROR_PIPE_BUSY = 231
531
532    def _connect_raw(self):
533        # This is a similar procedure to the one in
534        #   https://github.com/microsoft/go-winio/blob/master/pipe.go (tryDialPipe)
535        while True:
536            try:
537                return open(self._name, 'wb+', buffering=0)
538            except (OSError, IOError):
539                if GetLastError() != self.ERROR_PIPE_BUSY:
540                    raise
541            time.sleep(0.001)  # 1ms
542
543
544_default_registry.register_protocol('net.pipe', _NamedPipeStreamClient)
545
546
547class _UnixDomainSocketStreamClient(StreamClient):
548    """A StreamClient implementation that uses a UNIX domain socket.
549  """
550
551    class SocketFile(object):
552        """A write-only file-like object that writes to a UNIX socket."""
553
554        def __init__(self, sock):
555            self._sock = sock
556
557        def fileno(self):
558            return self._sock
559
560        def write(self, data):
561            self._sock.sendall(data)
562
563        def flush(self):
564            pass
565
566        def close(self):
567            self._sock.close()
568
569    def __init__(self, path, **kwargs):
570        """Initializes a new UNIX domain socket stream client.
571
572    Args:
573      path (str): The path to the named UNIX domain socket.
574    """
575        super(_UnixDomainSocketStreamClient, self).__init__(**kwargs)
576        self._path = path
577
578    @classmethod
579    def _create(cls, value, **kwargs):
580        if not os.path.exists(value):
581            raise ValueError('UNIX domain socket [%s] does not exist.' % (value,))
582        return cls(value, **kwargs)
583
584    def _connect_raw(self):
585        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
586        sock.connect(self._path)
587        return self.SocketFile(sock)
588
589
590_default_registry.register_protocol('unix', _UnixDomainSocketStreamClient)
591