1 /*
2 * Copyright 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "Codec2-InputBufferManager-Aidl"
19 #include <android-base/logging.h>
20
21 #include <codec2/aidl/InputBufferManager.h>
22
23 #include <aidl/android/hardware/media/c2/IComponentListener.h>
24 #include <android-base/logging.h>
25
26 #include <C2Buffer.h>
27 #include <C2Work.h>
28
29 #include <chrono>
30
31 namespace aidl {
32 namespace android {
33 namespace hardware {
34 namespace media {
35 namespace c2 {
36 namespace utils {
37
38 using namespace ::android;
39
registerFrameData(const std::shared_ptr<IComponentListener> & listener,const C2FrameData & input)40 void InputBufferManager::registerFrameData(
41 const std::shared_ptr<IComponentListener>& listener,
42 const C2FrameData& input) {
43 getInstance()._registerFrameData(listener, input);
44 }
45
unregisterFrameData(const std::weak_ptr<IComponentListener> & listener,const C2FrameData & input)46 void InputBufferManager::unregisterFrameData(
47 const std::weak_ptr<IComponentListener>& listener,
48 const C2FrameData& input) {
49 getInstance()._unregisterFrameData(listener, input);
50 }
51
unregisterFrameData(const std::weak_ptr<IComponentListener> & listener)52 void InputBufferManager::unregisterFrameData(
53 const std::weak_ptr<IComponentListener>& listener) {
54 getInstance()._unregisterFrameData(listener);
55 }
56
setNotificationInterval(nsecs_t notificationIntervalNs)57 void InputBufferManager::setNotificationInterval(
58 nsecs_t notificationIntervalNs) {
59 getInstance()._setNotificationInterval(notificationIntervalNs);
60 }
61
_registerFrameData(const std::shared_ptr<IComponentListener> & listener,const C2FrameData & input)62 void InputBufferManager::_registerFrameData(
63 const std::shared_ptr<IComponentListener>& listener,
64 const C2FrameData& input) {
65 uint64_t frameIndex = input.ordinal.frameIndex.peeku();
66 LOG(VERBOSE) << "InputBufferManager::_registerFrameData -- called with "
67 << "listener @ 0x" << std::hex << listener.get()
68 << ", frameIndex = " << std::dec << frameIndex
69 << ".";
70 std::lock_guard<std::mutex> lock(mMutex);
71
72 std::set<TrackedBuffer*> &bufferIds =
73 mTrackedBuffersMap[listener][frameIndex];
74
75 for (size_t i = 0; i < input.buffers.size(); ++i) {
76 if (!input.buffers[i]) {
77 LOG(VERBOSE) << "InputBufferManager::_registerFrameData -- "
78 << "Input buffer at index " << i << " is null.";
79 continue;
80 }
81 TrackedBuffer *bufferId =
82 new TrackedBuffer(listener, frameIndex, i, input.buffers[i]);
83 mTrackedBufferCache.emplace(bufferId);
84 bufferIds.emplace(bufferId);
85
86 c2_status_t status = input.buffers[i]->registerOnDestroyNotify(
87 onBufferDestroyed,
88 reinterpret_cast<void*>(bufferId));
89 if (status != C2_OK) {
90 LOG(DEBUG) << "InputBufferManager::_registerFrameData -- "
91 << "registerOnDestroyNotify() failed "
92 << "(listener @ 0x" << std::hex << listener.get()
93 << ", frameIndex = " << std::dec << frameIndex
94 << ", bufferIndex = " << i
95 << ") => status = " << status
96 << ".";
97 }
98 }
99
100 mDeathNotifications.emplace(
101 listener,
102 DeathNotifications(
103 mNotificationIntervalNs.load(std::memory_order_relaxed)));
104 }
105
106 // Remove a pair (listener, frameIndex) from mTrackedBuffersMap and
107 // mDeathNotifications. This implies all bufferIndices are removed.
108 //
109 // This is called from onWorkDone() and flush().
_unregisterFrameData(const std::weak_ptr<IComponentListener> & listener,const C2FrameData & input)110 void InputBufferManager::_unregisterFrameData(
111 const std::weak_ptr<IComponentListener>& listener,
112 const C2FrameData& input) {
113 uint64_t frameIndex = input.ordinal.frameIndex.peeku();
114 LOG(VERBOSE) << "InputBufferManager::_unregisterFrameData -- called with "
115 << "frameIndex = " << frameIndex << ".";
116 std::lock_guard<std::mutex> lock(mMutex);
117
118 auto findListener = mTrackedBuffersMap.find(listener);
119 if (findListener != mTrackedBuffersMap.end()) {
120 std::map<uint64_t, std::set<TrackedBuffer*>> &frameIndex2BufferIds
121 = findListener->second;
122 auto findFrameIndex = frameIndex2BufferIds.find(frameIndex);
123 if (findFrameIndex != frameIndex2BufferIds.end()) {
124 std::set<TrackedBuffer*> &bufferIds = findFrameIndex->second;
125 for (TrackedBuffer* bufferId : bufferIds) {
126 std::shared_ptr<C2Buffer> buffer = bufferId->buffer.lock();
127 if (buffer) {
128 c2_status_t status = buffer->unregisterOnDestroyNotify(
129 onBufferDestroyed,
130 reinterpret_cast<void*>(bufferId));
131 if (status != C2_OK) {
132 LOG(DEBUG) << "InputBufferManager::_unregisterFrameData "
133 << "-- unregisterOnDestroyNotify() failed "
134 << "(frameIndex = " << bufferId->frameIndex
135 << ", bufferIndex = " << bufferId->bufferIndex
136 << ") => status = " << status
137 << ".";
138 }
139 }
140 mTrackedBufferCache.erase(bufferId);
141 delete bufferId;
142 }
143
144 frameIndex2BufferIds.erase(findFrameIndex);
145 if (frameIndex2BufferIds.empty()) {
146 mTrackedBuffersMap.erase(findListener);
147 }
148 }
149 }
150
151 auto findListenerD = mDeathNotifications.find(listener);
152 if (findListenerD != mDeathNotifications.end()) {
153 DeathNotifications &deathNotifications = findListenerD->second;
154 auto findFrameIndex = deathNotifications.indices.find(frameIndex);
155 if (findFrameIndex != deathNotifications.indices.end()) {
156 std::vector<size_t> &bufferIndices = findFrameIndex->second;
157 deathNotifications.count -= bufferIndices.size();
158 deathNotifications.indices.erase(findFrameIndex);
159 }
160 }
161 }
162
163 // Remove listener from mTrackedBuffersMap and mDeathNotifications. This implies
164 // all frameIndices and bufferIndices are removed.
165 //
166 // This is called when the component cleans up all input buffers, i.e., when
167 // reset(), release(), stop() or ~Component() is called.
_unregisterFrameData(const std::weak_ptr<IComponentListener> & listener)168 void InputBufferManager::_unregisterFrameData(
169 const std::weak_ptr<IComponentListener>& listener) {
170 LOG(VERBOSE) << "InputBufferManager::_unregisterFrameData.";
171 std::lock_guard<std::mutex> lock(mMutex);
172
173 auto findListener = mTrackedBuffersMap.find(listener);
174 if (findListener != mTrackedBuffersMap.end()) {
175 std::map<uint64_t, std::set<TrackedBuffer*>> &frameIndex2BufferIds =
176 findListener->second;
177 for (auto findFrameIndex = frameIndex2BufferIds.begin();
178 findFrameIndex != frameIndex2BufferIds.end();
179 ++findFrameIndex) {
180 std::set<TrackedBuffer*> &bufferIds = findFrameIndex->second;
181 for (TrackedBuffer* bufferId : bufferIds) {
182 std::shared_ptr<C2Buffer> buffer = bufferId->buffer.lock();
183 if (buffer) {
184 c2_status_t status = buffer->unregisterOnDestroyNotify(
185 onBufferDestroyed,
186 reinterpret_cast<void*>(bufferId));
187 if (status != C2_OK) {
188 LOG(DEBUG) << "InputBufferManager::_unregisterFrameData "
189 << "-- unregisterOnDestroyNotify() failed "
190 << "(frameIndex = " << bufferId->frameIndex
191 << ", bufferIndex = " << bufferId->bufferIndex
192 << ") => status = " << status
193 << ".";
194 }
195 mTrackedBufferCache.erase(bufferId);
196 delete bufferId;
197 }
198 }
199 }
200 mTrackedBuffersMap.erase(findListener);
201 }
202
203 mDeathNotifications.erase(listener);
204 }
205
206 // Set mNotificationIntervalNs.
_setNotificationInterval(nsecs_t notificationIntervalNs)207 void InputBufferManager::_setNotificationInterval(
208 nsecs_t notificationIntervalNs) {
209 mNotificationIntervalNs.store(
210 notificationIntervalNs,
211 std::memory_order_relaxed);
212 }
213
214 // Move a buffer from mTrackedBuffersMap to mDeathNotifications.
215 // This is called when a registered C2Buffer object is destroyed.
onBufferDestroyed(const C2Buffer * buf,void * arg)216 void InputBufferManager::onBufferDestroyed(const C2Buffer* buf, void* arg) {
217 getInstance()._onBufferDestroyed(buf, arg);
218 }
219
_onBufferDestroyed(const C2Buffer * buf,void * arg)220 void InputBufferManager::_onBufferDestroyed(const C2Buffer* buf, void* arg) {
221 if (!buf || !arg) {
222 LOG(WARNING) << "InputBufferManager::_onBufferDestroyed -- called with "
223 << "null argument (s): "
224 << "buf @ 0x" << std::hex << buf
225 << ", arg @ 0x" << std::hex << arg
226 << std::dec << ".";
227 return;
228 }
229
230 std::lock_guard<std::mutex> lock(mMutex);
231 TrackedBuffer *bufferId = reinterpret_cast<TrackedBuffer*>(arg);
232
233 if (mTrackedBufferCache.find(bufferId) == mTrackedBufferCache.end()) {
234 LOG(VERBOSE) << "InputBufferManager::_onBufferDestroyed -- called with "
235 << "unregistered buffer: "
236 << "buf @ 0x" << std::hex << buf
237 << ", arg @ 0x" << std::hex << arg
238 << std::dec << ".";
239 return;
240 }
241
242 LOG(VERBOSE) << "InputBufferManager::_onBufferDestroyed -- called with "
243 << "buf @ 0x" << std::hex << buf
244 << ", arg @ 0x" << std::hex << arg
245 << std::dec << " -- "
246 << ", frameIndex = " << bufferId->frameIndex
247 << ", bufferIndex = " << bufferId->bufferIndex
248 << ".";
249 auto findListener = mTrackedBuffersMap.find(bufferId->listener);
250 if (findListener == mTrackedBuffersMap.end()) {
251 LOG(VERBOSE) << "InputBufferManager::_onBufferDestroyed -- "
252 << "received invalid listener: "
253 << " (frameIndex = " << bufferId->frameIndex
254 << ", bufferIndex = " << bufferId->bufferIndex
255 << ").";
256 return;
257 }
258
259 std::map<uint64_t, std::set<TrackedBuffer*>> &frameIndex2BufferIds
260 = findListener->second;
261 auto findFrameIndex = frameIndex2BufferIds.find(bufferId->frameIndex);
262 if (findFrameIndex == frameIndex2BufferIds.end()) {
263 LOG(DEBUG) << "InputBufferManager::_onBufferDestroyed -- "
264 << "received invalid frame index: "
265 << "frameIndex = " << bufferId->frameIndex
266 << ", bufferIndex = " << bufferId->bufferIndex
267 << ").";
268 return;
269 }
270
271 std::set<TrackedBuffer*> &bufferIds = findFrameIndex->second;
272 auto findBufferId = bufferIds.find(bufferId);
273 if (findBufferId == bufferIds.end()) {
274 LOG(DEBUG) << "InputBufferManager::_onBufferDestroyed -- "
275 << "received invalid buffer index: "
276 << "bufferIndex = " << bufferId->bufferIndex
277 << " (frameIndex = " << bufferId->frameIndex
278 << ").";
279 return;
280 }
281
282 bufferIds.erase(findBufferId);
283 if (bufferIds.empty()) {
284 frameIndex2BufferIds.erase(findFrameIndex);
285 if (frameIndex2BufferIds.empty()) {
286 mTrackedBuffersMap.erase(findListener);
287 }
288 }
289
290 DeathNotifications &deathNotifications = mDeathNotifications[bufferId->listener];
291 deathNotifications.indices[bufferId->frameIndex].emplace_back(bufferId->bufferIndex);
292 ++deathNotifications.count;
293 mOnBufferDestroyed.notify_one();
294
295 mTrackedBufferCache.erase(bufferId);
296 delete bufferId;
297 }
298
299 // Notify the clients about buffer destructions.
300 // Return false if all destructions have been notified.
301 // Return true and set timeToRetry to the time point to wait for before
302 // retrying if some destructions have not been notified.
processNotifications(nsecs_t * timeToRetryNs)303 bool InputBufferManager::processNotifications(nsecs_t* timeToRetryNs) {
304
305 struct Notification {
306 std::shared_ptr<IComponentListener> listener;
307 std::vector<IComponentListener::InputBuffer> inputBuffers;
308 Notification(const std::shared_ptr<IComponentListener>& l, size_t s)
309 : listener(l), inputBuffers(s) {}
310 };
311 std::list<Notification> notifications;
312 nsecs_t notificationIntervalNs =
313 mNotificationIntervalNs.load(std::memory_order_relaxed);
314
315 bool retry = false;
316 {
317 std::lock_guard<std::mutex> lock(mMutex);
318 *timeToRetryNs = notificationIntervalNs;
319 nsecs_t timeNowNs = systemTime();
320 for (auto it = mDeathNotifications.begin();
321 it != mDeathNotifications.end(); ) {
322 std::shared_ptr<IComponentListener> listener = it->first.lock();
323 if (!listener) {
324 ++it;
325 continue;
326 }
327 DeathNotifications &deathNotifications = it->second;
328
329 nsecs_t timeSinceLastNotifiedNs =
330 timeNowNs - deathNotifications.lastSentNs;
331 // If not enough time has passed since the last callback, leave the
332 // notifications for this listener untouched for now and retry
333 // later.
334 if (timeSinceLastNotifiedNs < notificationIntervalNs) {
335 retry = true;
336 *timeToRetryNs = std::min(*timeToRetryNs,
337 notificationIntervalNs - timeSinceLastNotifiedNs);
338 LOG(VERBOSE) << "InputBufferManager::processNotifications -- "
339 << "Notifications for listener @ "
340 << std::hex << listener.get()
341 << " will be postponed.";
342 ++it;
343 continue;
344 }
345
346 // If enough time has passed since the last notification to this
347 // listener but there are currently no pending notifications, the
348 // listener can be removed from mDeathNotifications---there is no
349 // need to keep track of the last notification time anymore.
350 if (deathNotifications.count == 0) {
351 it = mDeathNotifications.erase(it);
352 continue;
353 }
354
355 // Create the argument for the callback.
356 notifications.emplace_back(listener, deathNotifications.count);
357 std::vector<IComponentListener::InputBuffer> &inputBuffers =
358 notifications.back().inputBuffers;
359 size_t i = 0;
360 for (std::pair<const uint64_t, std::vector<size_t>>& p :
361 deathNotifications.indices) {
362 uint64_t frameIndex = p.first;
363 const std::vector<size_t> &bufferIndices = p.second;
364 for (const size_t& bufferIndex : bufferIndices) {
365 IComponentListener::InputBuffer &inputBuffer
366 = inputBuffers[i++];
367 inputBuffer.arrayIndex = bufferIndex;
368 inputBuffer.frameIndex = frameIndex;
369 }
370 }
371
372 // Clear deathNotifications for this listener and set retry to true
373 // so processNotifications will be called again. This will
374 // guarantee that a listener with no pending notifications will
375 // eventually be removed from mDeathNotifications after
376 // mNotificationIntervalNs nanoseconds has passed.
377 retry = true;
378 deathNotifications.indices.clear();
379 deathNotifications.count = 0;
380 deathNotifications.lastSentNs = timeNowNs;
381 ++it;
382 }
383 }
384
385 // Call onInputBuffersReleased() outside the lock to avoid deadlock.
386 for (const Notification& notification : notifications) {
387 if (!notification.listener->onInputBuffersReleased(
388 notification.inputBuffers).isOk()) {
389 // This may trigger if the client has died.
390 LOG(DEBUG) << "InputBufferManager::processNotifications -- "
391 << "failed to send death notifications to "
392 << "listener @ 0x" << std::hex
393 << notification.listener.get()
394 << std::dec << ".";
395 } else {
396 #if LOG_NDEBUG == 0
397 std::stringstream inputBufferLog;
398 for (const IComponentListener::InputBuffer& inputBuffer :
399 notification.inputBuffers) {
400 inputBufferLog << " (" << inputBuffer.frameIndex
401 << ", " << inputBuffer.arrayIndex
402 << ")";
403 }
404 LOG(VERBOSE) << "InputBufferManager::processNotifications -- "
405 << "death notifications sent to "
406 << "listener @ 0x" << std::hex
407 << notification.listener.get()
408 << std::dec
409 << " with these (frameIndex, bufferIndex) pairs:"
410 << inputBufferLog.str();
411 #endif
412 }
413 }
414 #if LOG_NDEBUG == 0
415 if (retry) {
416 LOG(VERBOSE) << "InputBufferManager::processNotifications -- "
417 << "will retry again in " << *timeToRetryNs << "ns.";
418 } else {
419 LOG(VERBOSE) << "InputBufferManager::processNotifications -- "
420 << "no pending death notifications.";
421 }
422 #endif
423 return retry;
424 }
425
main()426 void InputBufferManager::main() {
427 LOG(VERBOSE) << "InputBufferManager main -- started.";
428 nsecs_t timeToRetryNs;
429 while (true) {
430 std::unique_lock<std::mutex> lock(mMutex);
431 while (mDeathNotifications.empty()) {
432 mOnBufferDestroyed.wait(lock);
433 }
434 lock.unlock();
435 while (processNotifications(&timeToRetryNs)) {
436 std::this_thread::sleep_for(
437 std::chrono::nanoseconds(timeToRetryNs));
438 }
439 }
440 }
441
InputBufferManager()442 InputBufferManager::InputBufferManager()
443 : mMainThread{&InputBufferManager::main, this} {
444 }
445
getInstance()446 InputBufferManager& InputBufferManager::getInstance() {
447 static InputBufferManager instance{};
448 return instance;
449 }
450
451 } // namespace utils
452 } // namespace c2
453 } // namespace media
454 } // namespace hardware
455 } // namespace android
456 } // namespace aidl
457
458