1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18
19from acts.controllers.monsoon_lib.sampling.engine.assembly_line import BufferList
20from acts.controllers.monsoon_lib.sampling.engine.assembly_line import BufferStream
21from acts.controllers.monsoon_lib.sampling.engine.assembly_line import DevNullBufferStream
22from acts.controllers.monsoon_lib.sampling.engine.assembly_line import IndexedBuffer
23
24
25class Transformer(object):
26    """An object that represents how to transform a given buffer into a result.
27
28    Attributes:
29        output_stream: The stream to output data to upon transformation.
30            Defaults to a DevNullBufferStream.
31    """
32
33    def __init__(self):
34        self.output_stream = DevNullBufferStream(None)
35
36    def set_output_stream(self, output_stream):
37        """Sets the Transformer's output stream to the given output stream."""
38        self.output_stream = output_stream
39
40    def transform(self, input_stream):
41        """Transforms input_stream data and passes it to self.output_stream.
42
43        Args:
44            input_stream: The BufferStream of input data this transformer should
45                transform. Note that the type of data stored within BufferStream
46                is not guaranteed to be in the format expected, much like STDIN
47                is not guaranteed to be the format a process expects. However,
48                for performance, users should expect the data to be properly
49                formatted anyway.
50        """
51        input_stream.initialize()
52        self.output_stream.initialize()
53        class_name = self.__class__.__qualname__
54        try:
55            logging.debug('%s transformer beginning.', class_name)
56            self.on_begin()
57            logging.debug('%s transformation started.', class_name)
58            self._transform(input_stream)
59        except Exception:
60            # TODO(markdr): Get multi-process error reporting to play nicer.
61            logging.exception('%s ran into an exception.', class_name)
62            raise
63        finally:
64            logging.debug('%s transformation ended.', class_name)
65            self.on_end()
66            logging.debug('%s finished.', class_name)
67
68    def _transform_buffer(self, buffer):
69        """Transforms a given buffer.
70
71        The implementation can either:
72
73        1) Return the transformed buffer. Can be either in-place or a new
74           buffer.
75
76        2) Return a BufferList: a list of transformed buffers. This is useful
77           for grouping data together for faster operations.
78
79        Args:
80            buffer: The buffer to transform
81
82        Returns:
83            either a buffer or a BufferList. See detailed documentation.
84        """
85        raise NotImplementedError()
86
87    def _on_end_of_stream(self, input_stream):
88        """To be called when the input stream has sent the end of stream signal.
89
90        This is particularly useful for flushing any stored memory into the
91        output stream.
92
93        Args:
94            input_stream: the stream that was closed.
95        """
96        # By default, this function closes the output stream.
97        self.output_stream.end_stream()
98
99    def _transform(self, input_stream):
100        """Should call _transform_buffer within this function."""
101        raise NotImplementedError()
102
103    def on_begin(self):
104        """A function called before the transform loop begins."""
105
106    def on_end(self):
107        """A function called after the transform loop has ended."""
108
109
110class SourceTransformer(Transformer):
111    """The base class for generating data in an AssemblyLine.
112
113    Note that any Transformer will be able to generate data, but this class is
114    a generic way to send data.
115
116    Attributes:
117        _buffer_size: The buffer size for each IndexedBuffer sent over the
118            output stream.
119    """
120
121    def __init__(self):
122        super().__init__()
123        # Defaulted to 64, which is small enough to be passed within the .6ms
124        # window, but large enough so that it does not spam the queue.
125        self._buffer_size = 64
126
127    def _transform(self, _):
128        """Generates data and sends it to the output stream."""
129        buffer_index = 0
130        while True:
131            indexed_buffer = IndexedBuffer(buffer_index, self._buffer_size)
132            buffer = self._transform_buffer(indexed_buffer.buffer)
133            if buffer is BufferStream.END:
134                break
135            indexed_buffer.buffer = buffer
136            self.output_stream.add_indexed_buffer(indexed_buffer)
137            buffer_index += 1
138
139        self.output_stream.end_stream()
140
141    def _transform_buffer(self, buffer):
142        """Fills the passed-in buffer with data."""
143        raise NotImplementedError()
144
145
146class SequentialTransformer(Transformer):
147    """A transformer that receives input in sequential order.
148
149    Attributes:
150        _next_index: The index of the next IndexedBuffer that should be read.
151    """
152
153    def __init__(self):
154        super().__init__()
155        self._next_index = 0
156
157    def _transform(self, input_stream):
158        while True:
159            indexed_buffer = input_stream.remove_indexed_buffer()
160            if indexed_buffer is BufferStream.END:
161                break
162            buffer_or_buffers = self._transform_buffer(indexed_buffer.buffer)
163            if buffer_or_buffers is not None:
164                self._send_buffers(buffer_or_buffers)
165
166        self._on_end_of_stream(input_stream)
167
168    def _send_buffers(self, buffer_or_buffer_list):
169        """Sends buffers over to the output_stream.
170
171        Args:
172            buffer_or_buffer_list: A BufferList or buffer object. Note that if
173                buffer is None, it is effectively an end-of-stream signal.
174        """
175        if not isinstance(buffer_or_buffer_list, BufferList):
176            # Assume a single buffer was returned
177            buffer_or_buffer_list = BufferList([buffer_or_buffer_list])
178
179        buffer_list = buffer_or_buffer_list
180        for buffer in buffer_list:
181            new_buffer = IndexedBuffer(self._next_index, buffer)
182            self.output_stream.add_indexed_buffer(new_buffer)
183            self._next_index += 1
184
185    def _transform_buffer(self, buffer):
186        raise NotImplementedError()
187
188
189class ParallelTransformer(Transformer):
190    """A Transformer that is capable of running in parallel.
191
192    Buffers received may be unordered. For ordered input, use
193    SequentialTransformer.
194    """
195
196    def _transform(self, input_stream):
197        while True:
198            indexed_buffer = input_stream.remove_indexed_buffer()
199            if indexed_buffer is None:
200                break
201            buffer = self._transform_buffer(indexed_buffer.buffer)
202            indexed_buffer.buffer = buffer
203            self.output_stream.add_indexed_buffer(indexed_buffer)
204
205        self._on_end_of_stream(input_stream)
206
207    def _transform_buffer(self, buffer):
208        """Transforms a given buffer.
209
210        Note that ParallelTransformers can NOT return a BufferList. This is a
211        limitation with the current indexing system. If the input buffer is
212        replaced with multiple buffers, later transformers will not know what
213        the proper order of buffers is.
214
215        Args:
216            buffer: The buffer to transform
217
218        Returns:
219            either None or a buffer. See detailed documentation.
220        """
221        raise NotImplementedError()
222