1# Copyright 2016 The LUCI Authors. All rights reserved. 2# Use of this source code is governed under the Apache License, Version 2.0 3# that can be found in the LICENSE file. 4 5import collections 6import contextlib 7import json 8import os 9import posixpath 10import socket 11import sys 12import threading 13import time 14 15from . import streamname, varint 16 17 18if sys.platform == "win32": 19 from ctypes import GetLastError 20 21_PY2 = sys.version_info[0] == 2 22_MAPPING = collections.Mapping if _PY2 else collections.abc.Mapping 23 24_StreamParamsBase = collections.namedtuple('_StreamParamsBase', 25 ('name', 'type', 'content_type', 'tags')) 26 27 28# Magic number at the beginning of a Butler stream 29# 30# See "ProtocolFrameHeaderMagic" in: 31# <luci-go>/logdog/client/butlerlib/streamproto 32BUTLER_MAGIC = b'BTLR1\x1e' 33 34 35class StreamParams(_StreamParamsBase): 36 """Defines the set of parameters to apply to a new stream.""" 37 38 # A text content stream. 39 TEXT = 'text' 40 # A binary content stream. 41 BINARY = 'binary' 42 # A datagram content stream. 43 DATAGRAM = 'datagram' 44 45 @classmethod 46 def make(cls, **kwargs): 47 """Returns (StreamParams): A new StreamParams instance with supplied values. 48 49 Any parameter that isn't supplied will be set to None. 50 51 Args: 52 kwargs (dict): Named parameters to apply. 53 """ 54 return cls(**{f: kwargs.get(f) for f in cls._fields}) 55 56 def validate(self): 57 """Raises (ValueError): if the parameters are not valid.""" 58 streamname.validate_stream_name(self.name) 59 60 if self.type not in (self.TEXT, self.BINARY, self.DATAGRAM): 61 raise ValueError('Invalid type (%s)' % (self.type,)) 62 63 if self.tags is not None: 64 if not isinstance(self.tags, _MAPPING): 65 raise ValueError('Invalid tags type (%s)' % (self.tags,)) 66 for k, v in self.tags.items(): 67 streamname.validate_tag(k, v) 68 69 def to_json(self): 70 """Returns (str): The JSON representation of the StreamParams. 71 72 Converts stream parameters to JSON for Butler consumption. 73 74 Raises: 75 ValueError: if these parameters are not valid. 76 """ 77 self.validate() 78 79 obj = { 80 'name': self.name, 81 'type': self.type, 82 } 83 84 def _maybe_add(key, value): 85 if value is not None: 86 obj[key] = value 87 88 _maybe_add('contentType', self.content_type) 89 _maybe_add('tags', self.tags) 90 91 # Note that "dumps' will dump UTF-8 by default, which is what Butler wants. 92 return json.dumps(obj, sort_keys=True, ensure_ascii=True, indent=None) 93 94 95class StreamProtocolRegistry(object): 96 """Registry of streamserver URI protocols and their client classes. 97 """ 98 99 def __init__(self): 100 self._registry = {} 101 102 def register_protocol(self, protocol, client_cls): 103 assert issubclass(client_cls, StreamClient) 104 if self._registry.get(protocol) is not None: 105 raise KeyError('Duplicate protocol registered.') 106 self._registry[protocol] = client_cls 107 108 def create(self, uri, **kwargs): 109 """Returns (StreamClient): A stream client for the specified URI. 110 111 This uses the default StreamProtocolRegistry to instantiate a StreamClient 112 for the specified URI. 113 114 Args: 115 uri (str): The streamserver URI. 116 kwargs: keyword arguments to forward to the stream. See 117 StreamClient.__init__. 118 119 Raises: 120 ValueError: if the supplied URI references an invalid or improperly 121 configured streamserver. 122 """ 123 uri = uri.split(':', 1) 124 if len(uri) != 2: 125 raise ValueError('Invalid stream server URI [%s]' % (uri,)) 126 protocol, value = uri 127 128 client_cls = self._registry.get(protocol) 129 if not client_cls: 130 raise ValueError('Unknown stream client protocol (%s)' % (protocol,)) 131 return client_cls._create(value, **kwargs) 132 133 134# Default (global) registry. 135_default_registry = StreamProtocolRegistry() 136create = _default_registry.create 137 138 139class StreamClient(object): 140 """Abstract base class for a streamserver client. 141 """ 142 143 class _StreamBase(object): 144 """ABC for StreamClient streams.""" 145 146 def __init__(self, stream_client, params): 147 self._stream_client = stream_client 148 self._params = params 149 150 @property 151 def params(self): 152 """Returns (StreamParams): The stream parameters.""" 153 return self._params 154 155 @property 156 def path(self): 157 """Returns (streamname.StreamPath): The stream path. 158 159 Raises: 160 ValueError: if the stream path is invalid, or if the stream prefix is 161 not defined in the client. 162 """ 163 return self._stream_client.get_stream_path(self._params.name) 164 165 def get_viewer_url(self): 166 """Returns (str): The viewer URL for this stream. 167 168 Raises: 169 KeyError: if information needed to construct the URL is missing. 170 ValueError: if the stream prefix or name do not form a valid stream 171 path. 172 """ 173 return self._stream_client.get_viewer_url(self._params.name) 174 175 class _BasicStream(_StreamBase): 176 """Wraps a basic file descriptor, offering "write" and "close".""" 177 178 def __init__(self, stream_client, params, fd): 179 super(StreamClient._BasicStream, self).__init__(stream_client, params) 180 self._fd = fd 181 182 @property 183 def fd(self): 184 return self._fd 185 186 def fileno(self): 187 return self._fd.fileno() 188 189 def write(self, data): 190 return self._fd.write(data) 191 192 def close(self): 193 return self._fd.close() 194 195 class _TextStream(_BasicStream): 196 """Extends _BasicStream, ensuring data written is UTF-8 text.""" 197 198 def __init__(self, stream_client, params, fd): 199 super(StreamClient._TextStream, self).__init__(stream_client, params, fd) 200 self._fd = fd 201 202 def write(self, data): 203 if _PY2 and isinstance(data, str): 204 # byte string is unfortunately accepted in py2 because of 205 # undifferentiated usage of `str` and `unicode` but it should be 206 # discontinued in py3. User should switch to binary stream instead 207 # if there's a need to write bytes. 208 return self._fd.write(data) 209 elif _PY2 and isinstance(data, unicode): 210 return self._fd.write(data.encode('utf-8')) 211 elif not _PY2 and isinstance(data, str): 212 return self._fd.write(data.encode('utf-8')) 213 else: 214 raise ValueError('expect str, got %r that is type %s' % ( 215 data, 216 type(data), 217 )) 218 219 class _DatagramStream(_StreamBase): 220 """Wraps a stream object to write length-prefixed datagrams.""" 221 222 def __init__(self, stream_client, params, fd): 223 super(StreamClient._DatagramStream, self).__init__(stream_client, params) 224 self._fd = fd 225 226 def send(self, data): 227 varint.write_uvarint(self._fd, len(data)) 228 self._fd.write(data) 229 230 def close(self): 231 return self._fd.close() 232 233 def __init__(self, project=None, prefix=None, coordinator_host=None, namespace=''): 234 """Constructs a new base StreamClient instance. 235 236 Args: 237 project (str or None): If not None, the name of the log stream project. 238 prefix (str or None): If not None, the log stream session prefix. 239 coordinator_host (str or None): If not None, the name of the Coordinator 240 host that this stream client is bound to. This will be used to 241 construct viewer URLs for generated streams. 242 namespace (str): The prefix to apply to all streams opened by this client. 243 """ 244 self._project = project 245 self._prefix = prefix 246 self._coordinator_host = coordinator_host 247 self._namespace = namespace 248 249 self._name_lock = threading.Lock() 250 self._names = set() 251 252 @property 253 def project(self): 254 """Returns (str or None): The stream project, or None if not configured.""" 255 return self._project 256 257 @property 258 def prefix(self): 259 """Returns (str or None): The stream prefix, or None if not configured.""" 260 return self._prefix 261 262 @property 263 def coordinator_host(self): 264 """Returns (str or None): The coordinator host, or None if not configured. 265 """ 266 return self._coordinator_host 267 268 @property 269 def namespace(self): 270 """Returns (str): The namespace for all streams opened by this client. 271 Empty if not configured. 272 """ 273 return self._namespace 274 275 def get_stream_path(self, name): 276 """Returns (streamname.StreamPath): The stream path. 277 278 Args: 279 name (str): The name of the stream. 280 281 Raises: 282 KeyError: if information needed to construct the path is missing. 283 ValueError: if the stream path is invalid, or if the stream prefix is 284 not defined in the client. 285 """ 286 if not self._prefix: 287 raise KeyError('Stream prefix is not configured') 288 return streamname.StreamPath.make(self._prefix, name) 289 290 def get_viewer_url(self, name): 291 """Returns (str): The LogDog viewer URL for the named stream. 292 293 Args: 294 name (str): The name of the stream. This can also be a query glob. 295 296 Raises: 297 KeyError: if information needed to construct the URL is missing. 298 ValueError: if the stream prefix or name do not form a valid stream 299 path. 300 """ 301 if not self._coordinator_host: 302 raise KeyError('Coordinator host is not configured') 303 if not self._project: 304 raise KeyError('Stream project is not configured') 305 306 return streamname.get_logdog_viewer_url(self._coordinator_host, self._project, 307 self.get_stream_path(name)) 308 309 def _register_new_stream(self, name): 310 """Registers a new stream name. 311 312 The Butler will internally reject any duplicate stream names. However, there 313 isn't really feedback when this happens except a closed stream client. This 314 is a client-side check to provide a more user-friendly experience in the 315 event that a user attempts to register a duplicate stream name. 316 317 Note that this is imperfect, as something else could register stream names 318 with the same Butler instance and this library has no means of tracking. 319 This is a best-effort experience, not a reliable check. 320 321 Args: 322 name (str): The name of the stream. 323 324 Raises: 325 ValueError if the stream name has already been registered. 326 """ 327 with self._name_lock: 328 if name in self._names: 329 raise ValueError("Duplicate stream name [%s]" % (name,)) 330 self._names.add(name) 331 332 @classmethod 333 def _create(cls, value, **kwargs): 334 """Returns (StreamClient): A new stream client instance. 335 336 Validates the streamserver parameters and creates a new StreamClient 337 instance that connects to them. 338 339 Implementing classes must override this. 340 """ 341 raise NotImplementedError() 342 343 def _connect_raw(self): 344 """Returns (file): A new file-like stream. 345 346 Creates a new raw connection to the streamserver. This connection MUST not 347 have any data written to it past initialization (if needed) when it has been 348 returned. 349 350 The file-like object must implement `write`, `fileno`, `flush`, and `close`. 351 352 Implementing classes must override this. 353 """ 354 raise NotImplementedError() 355 356 def new_connection(self, params): 357 """Returns (file): A new configured stream. 358 359 The returned object implements (minimally) `write` and `close`. 360 361 Creates a new LogDog stream with the specified parameters. 362 363 Args: 364 params (StreamParams): The parameters to use with the new connection. 365 366 Raises: 367 ValueError if the stream name has already been used, or if the parameters 368 are not valid. 369 """ 370 self._register_new_stream(params.name) 371 params_bytes = params.to_json().encode('utf-8') 372 373 fobj = self._connect_raw() 374 fobj.write(BUTLER_MAGIC) 375 varint.write_uvarint(fobj, len(params_bytes)) 376 fobj.write(params_bytes) 377 return fobj 378 379 @contextlib.contextmanager 380 def text(self, name, **kwargs): 381 """Context manager to create, use, and teardown a TEXT stream. 382 383 This context manager creates a new butler TEXT stream with the specified 384 parameters, yields it, and closes it on teardown. 385 386 Args: 387 name (str): the LogDog name of the stream. 388 kwargs (dict): Log stream parameters. These may be any keyword arguments 389 accepted by `open_text`. 390 391 Returns (file): A file-like object to a Butler UTF-8 text stream supporting 392 `write`. 393 """ 394 fobj = None 395 try: 396 fobj = self.open_text(name, **kwargs) 397 yield fobj 398 finally: 399 if fobj is not None: 400 fobj.close() 401 402 def open_text(self, name, content_type=None, tags=None): 403 """Returns (file): A file-like object for a single text stream. 404 405 This creates a new butler TEXT stream with the specified parameters. 406 407 Args: 408 name (str): the LogDog name of the stream. 409 content_type (str): The optional content type of the stream. If None, a 410 default content type will be chosen by the Butler. 411 tags (dict): An optional key/value dictionary pair of LogDog stream tags. 412 413 Returns (file): A file-like object to a Butler text stream. This object can 414 have UTF-8 text content written to it with its `write` method, and must 415 be closed when finished using its `close` method. 416 """ 417 params = StreamParams.make( 418 name=posixpath.join(self._namespace, name), 419 type=StreamParams.TEXT, 420 content_type=content_type, 421 tags=tags) 422 return self._TextStream(self, params, self.new_connection(params)) 423 424 @contextlib.contextmanager 425 def binary(self, name, **kwargs): 426 """Context manager to create, use, and teardown a BINARY stream. 427 428 This context manager creates a new butler BINARY stream with the specified 429 parameters, yields it, and closes it on teardown. 430 431 Args: 432 name (str): the LogDog name of the stream. 433 kwargs (dict): Log stream parameters. These may be any keyword arguments 434 accepted by `open_binary`. 435 436 Returns (file): A file-like object to a Butler binary stream supporting 437 `write`. 438 """ 439 fobj = None 440 try: 441 fobj = self.open_binary(name, **kwargs) 442 yield fobj 443 finally: 444 if fobj is not None: 445 fobj.close() 446 447 def open_binary(self, name, content_type=None, tags=None): 448 """Returns (file): A file-like object for a single binary stream. 449 450 This creates a new butler BINARY stream with the specified parameters. 451 452 Args: 453 name (str): the LogDog name of the stream. 454 content_type (str): The optional content type of the stream. If None, a 455 default content type will be chosen by the Butler. 456 tags (dict): An optional key/value dictionary pair of LogDog stream tags. 457 458 Returns (file): A file-like object to a Butler binary stream. This object 459 can have UTF-8 content written to it with its `write` method, and must 460 be closed when finished using its `close` method. 461 """ 462 params = StreamParams.make( 463 name=posixpath.join(self._namespace, name), 464 type=StreamParams.BINARY, 465 content_type=content_type, 466 tags=tags) 467 return self._BasicStream(self, params, self.new_connection(params)) 468 469 @contextlib.contextmanager 470 def datagram(self, name, **kwargs): 471 """Context manager to create, use, and teardown a DATAGRAM stream. 472 473 This context manager creates a new butler DATAAGRAM stream with the 474 specified parameters, yields it, and closes it on teardown. 475 476 Args: 477 name (str): the LogDog name of the stream. 478 kwargs (dict): Log stream parameters. These may be any keyword arguments 479 accepted by `open_datagram`. 480 481 Returns (_DatagramStream): A datagram stream object. Datagrams can be 482 written to it using its `send` method. 483 """ 484 fobj = None 485 try: 486 fobj = self.open_datagram(name, **kwargs) 487 yield fobj 488 finally: 489 if fobj is not None: 490 fobj.close() 491 492 def open_datagram(self, name, content_type=None, tags=None): 493 """Creates a new butler DATAGRAM stream with the specified parameters. 494 495 Args: 496 name (str): the LogDog name of the stream. 497 content_type (str): The optional content type of the stream. If None, a 498 default content type will be chosen by the Butler. 499 tags (dict): An optional key/value dictionary pair of LogDog stream tags. 500 501 Returns (_DatagramStream): A datagram stream object. Datagrams can be 502 written to it using its `send` method. This object must be closed when 503 finished by using its `close` method. 504 """ 505 params = StreamParams.make( 506 name=posixpath.join(self._namespace, name), 507 type=StreamParams.DATAGRAM, 508 content_type=content_type, 509 tags=tags) 510 return self._DatagramStream(self, params, self.new_connection(params)) 511 512 513class _NamedPipeStreamClient(StreamClient): 514 """A StreamClient implementation that connects to a Windows named pipe. 515 """ 516 517 def __init__(self, name, **kwargs): 518 r"""Initializes a new Windows named pipe stream client. 519 520 Args: 521 name (str): The name of the Windows named pipe to use (e.g., "\\.\name") 522 """ 523 super(_NamedPipeStreamClient, self).__init__(**kwargs) 524 self._name = '\\\\.\\pipe\\' + name 525 526 @classmethod 527 def _create(cls, value, **kwargs): 528 return cls(value, **kwargs) 529 530 ERROR_PIPE_BUSY = 231 531 532 def _connect_raw(self): 533 # This is a similar procedure to the one in 534 # https://github.com/microsoft/go-winio/blob/master/pipe.go (tryDialPipe) 535 while True: 536 try: 537 return open(self._name, 'wb+', buffering=0) 538 except (OSError, IOError): 539 if GetLastError() != self.ERROR_PIPE_BUSY: 540 raise 541 time.sleep(0.001) # 1ms 542 543 544_default_registry.register_protocol('net.pipe', _NamedPipeStreamClient) 545 546 547class _UnixDomainSocketStreamClient(StreamClient): 548 """A StreamClient implementation that uses a UNIX domain socket. 549 """ 550 551 class SocketFile(object): 552 """A write-only file-like object that writes to a UNIX socket.""" 553 554 def __init__(self, sock): 555 self._sock = sock 556 557 def fileno(self): 558 return self._sock 559 560 def write(self, data): 561 self._sock.sendall(data) 562 563 def flush(self): 564 pass 565 566 def close(self): 567 self._sock.close() 568 569 def __init__(self, path, **kwargs): 570 """Initializes a new UNIX domain socket stream client. 571 572 Args: 573 path (str): The path to the named UNIX domain socket. 574 """ 575 super(_UnixDomainSocketStreamClient, self).__init__(**kwargs) 576 self._path = path 577 578 @classmethod 579 def _create(cls, value, **kwargs): 580 if not os.path.exists(value): 581 raise ValueError('UNIX domain socket [%s] does not exist.' % (value,)) 582 return cls(value, **kwargs) 583 584 def _connect_raw(self): 585 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 586 sock.connect(self._path) 587 return self.SocketFile(sock) 588 589 590_default_registry.register_protocol('unix', _UnixDomainSocketStreamClient) 591