1#!/usr/bin/env python3 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import logging 18 19from acts.controllers.monsoon_lib.sampling.engine.assembly_line import BufferList 20from acts.controllers.monsoon_lib.sampling.engine.assembly_line import BufferStream 21from acts.controllers.monsoon_lib.sampling.engine.assembly_line import DevNullBufferStream 22from acts.controllers.monsoon_lib.sampling.engine.assembly_line import IndexedBuffer 23 24 25class Transformer(object): 26 """An object that represents how to transform a given buffer into a result. 27 28 Attributes: 29 output_stream: The stream to output data to upon transformation. 30 Defaults to a DevNullBufferStream. 31 """ 32 33 def __init__(self): 34 self.output_stream = DevNullBufferStream(None) 35 36 def set_output_stream(self, output_stream): 37 """Sets the Transformer's output stream to the given output stream.""" 38 self.output_stream = output_stream 39 40 def transform(self, input_stream): 41 """Transforms input_stream data and passes it to self.output_stream. 42 43 Args: 44 input_stream: The BufferStream of input data this transformer should 45 transform. Note that the type of data stored within BufferStream 46 is not guaranteed to be in the format expected, much like STDIN 47 is not guaranteed to be the format a process expects. However, 48 for performance, users should expect the data to be properly 49 formatted anyway. 50 """ 51 input_stream.initialize() 52 self.output_stream.initialize() 53 class_name = self.__class__.__qualname__ 54 try: 55 logging.debug('%s transformer beginning.', class_name) 56 self.on_begin() 57 logging.debug('%s transformation started.', class_name) 58 self._transform(input_stream) 59 except Exception: 60 # TODO(markdr): Get multi-process error reporting to play nicer. 61 logging.exception('%s ran into an exception.', class_name) 62 raise 63 finally: 64 logging.debug('%s transformation ended.', class_name) 65 self.on_end() 66 logging.debug('%s finished.', class_name) 67 68 def _transform_buffer(self, buffer): 69 """Transforms a given buffer. 70 71 The implementation can either: 72 73 1) Return the transformed buffer. Can be either in-place or a new 74 buffer. 75 76 2) Return a BufferList: a list of transformed buffers. This is useful 77 for grouping data together for faster operations. 78 79 Args: 80 buffer: The buffer to transform 81 82 Returns: 83 either a buffer or a BufferList. See detailed documentation. 84 """ 85 raise NotImplementedError() 86 87 def _on_end_of_stream(self, input_stream): 88 """To be called when the input stream has sent the end of stream signal. 89 90 This is particularly useful for flushing any stored memory into the 91 output stream. 92 93 Args: 94 input_stream: the stream that was closed. 95 """ 96 # By default, this function closes the output stream. 97 self.output_stream.end_stream() 98 99 def _transform(self, input_stream): 100 """Should call _transform_buffer within this function.""" 101 raise NotImplementedError() 102 103 def on_begin(self): 104 """A function called before the transform loop begins.""" 105 106 def on_end(self): 107 """A function called after the transform loop has ended.""" 108 109 110class SourceTransformer(Transformer): 111 """The base class for generating data in an AssemblyLine. 112 113 Note that any Transformer will be able to generate data, but this class is 114 a generic way to send data. 115 116 Attributes: 117 _buffer_size: The buffer size for each IndexedBuffer sent over the 118 output stream. 119 """ 120 121 def __init__(self): 122 super().__init__() 123 # Defaulted to 64, which is small enough to be passed within the .6ms 124 # window, but large enough so that it does not spam the queue. 125 self._buffer_size = 64 126 127 def _transform(self, _): 128 """Generates data and sends it to the output stream.""" 129 buffer_index = 0 130 while True: 131 indexed_buffer = IndexedBuffer(buffer_index, self._buffer_size) 132 buffer = self._transform_buffer(indexed_buffer.buffer) 133 if buffer is BufferStream.END: 134 break 135 indexed_buffer.buffer = buffer 136 self.output_stream.add_indexed_buffer(indexed_buffer) 137 buffer_index += 1 138 139 self.output_stream.end_stream() 140 141 def _transform_buffer(self, buffer): 142 """Fills the passed-in buffer with data.""" 143 raise NotImplementedError() 144 145 146class SequentialTransformer(Transformer): 147 """A transformer that receives input in sequential order. 148 149 Attributes: 150 _next_index: The index of the next IndexedBuffer that should be read. 151 """ 152 153 def __init__(self): 154 super().__init__() 155 self._next_index = 0 156 157 def _transform(self, input_stream): 158 while True: 159 indexed_buffer = input_stream.remove_indexed_buffer() 160 if indexed_buffer is BufferStream.END: 161 break 162 buffer_or_buffers = self._transform_buffer(indexed_buffer.buffer) 163 if buffer_or_buffers is not None: 164 self._send_buffers(buffer_or_buffers) 165 166 self._on_end_of_stream(input_stream) 167 168 def _send_buffers(self, buffer_or_buffer_list): 169 """Sends buffers over to the output_stream. 170 171 Args: 172 buffer_or_buffer_list: A BufferList or buffer object. Note that if 173 buffer is None, it is effectively an end-of-stream signal. 174 """ 175 if not isinstance(buffer_or_buffer_list, BufferList): 176 # Assume a single buffer was returned 177 buffer_or_buffer_list = BufferList([buffer_or_buffer_list]) 178 179 buffer_list = buffer_or_buffer_list 180 for buffer in buffer_list: 181 new_buffer = IndexedBuffer(self._next_index, buffer) 182 self.output_stream.add_indexed_buffer(new_buffer) 183 self._next_index += 1 184 185 def _transform_buffer(self, buffer): 186 raise NotImplementedError() 187 188 189class ParallelTransformer(Transformer): 190 """A Transformer that is capable of running in parallel. 191 192 Buffers received may be unordered. For ordered input, use 193 SequentialTransformer. 194 """ 195 196 def _transform(self, input_stream): 197 while True: 198 indexed_buffer = input_stream.remove_indexed_buffer() 199 if indexed_buffer is None: 200 break 201 buffer = self._transform_buffer(indexed_buffer.buffer) 202 indexed_buffer.buffer = buffer 203 self.output_stream.add_indexed_buffer(indexed_buffer) 204 205 self._on_end_of_stream(input_stream) 206 207 def _transform_buffer(self, buffer): 208 """Transforms a given buffer. 209 210 Note that ParallelTransformers can NOT return a BufferList. This is a 211 limitation with the current indexing system. If the input buffer is 212 replaced with multiple buffers, later transformers will not know what 213 the proper order of buffers is. 214 215 Args: 216 buffer: The buffer to transform 217 218 Returns: 219 either None or a buffer. See detailed documentation. 220 """ 221 raise NotImplementedError() 222