1 /*
2 * Copyright 2018 Google
3 * SPDX-License-Identifier: MIT
4 */
5 #include "ring_buffer.h"
6
7 #include <errno.h>
8 #include <string.h>
9 #ifdef _MSC_VER
10 #include "aemu/base/msvc.h"
11 #else
12 #include <sys/time.h>
13 #endif
14
15 #ifdef __x86_64__
16 #include <emmintrin.h>
17 #endif
18
19 #ifdef _WIN32
20 #include <windows.h>
21 #else
22 #include <sched.h>
23 #include <unistd.h>
24 #endif
25
26 #define RING_BUFFER_MASK (RING_BUFFER_SIZE - 1)
27
28 #define RING_BUFFER_VERSION 1
29
ring_buffer_init(struct ring_buffer * r)30 void ring_buffer_init(struct ring_buffer* r) {
31 r->host_version = 1;
32 r->write_pos = 0;
33 r->read_pos = 0;
34
35 r->read_live_count = 0;
36 r->read_yield_count = 0;
37 r->read_sleep_us_count = 0;
38
39 r->state = 0;
40 }
41
get_ring_pos(uint32_t index)42 static uint32_t get_ring_pos(uint32_t index) { return index & RING_BUFFER_MASK; }
43
ring_buffer_can_write(const struct ring_buffer * r,uint32_t bytes)44 bool ring_buffer_can_write(const struct ring_buffer* r, uint32_t bytes) {
45 uint32_t read_view;
46 __atomic_load(&r->read_pos, &read_view, __ATOMIC_SEQ_CST);
47 return get_ring_pos(read_view - r->write_pos - 1) >= bytes;
48 }
49
ring_buffer_can_read(const struct ring_buffer * r,uint32_t bytes)50 bool ring_buffer_can_read(const struct ring_buffer* r, uint32_t bytes) {
51 uint32_t write_view;
52 __atomic_load(&r->write_pos, &write_view, __ATOMIC_SEQ_CST);
53 return get_ring_pos(write_view - r->read_pos) >= bytes;
54 }
55
ring_buffer_write(struct ring_buffer * r,const void * data,uint32_t step_size,uint32_t steps)56 long ring_buffer_write(struct ring_buffer* r, const void* data, uint32_t step_size,
57 uint32_t steps) {
58 const uint8_t* data_bytes = (const uint8_t*)data;
59 uint32_t i;
60
61 for (i = 0; i < steps; ++i) {
62 if (!ring_buffer_can_write(r, step_size)) {
63 errno = -EAGAIN;
64 return (long)i;
65 }
66
67 // Needs to be split up into 2 writes for the edge case.
68 uint32_t available_at_end = RING_BUFFER_SIZE - get_ring_pos(r->write_pos);
69
70 if (step_size > available_at_end) {
71 uint32_t remaining = step_size - available_at_end;
72 memcpy(&r->buf[get_ring_pos(r->write_pos)], data_bytes + i * step_size,
73 available_at_end);
74 memcpy(&r->buf[get_ring_pos(r->write_pos + available_at_end)],
75 data_bytes + i * step_size + available_at_end, remaining);
76 } else {
77 memcpy(&r->buf[get_ring_pos(r->write_pos)], data_bytes + i * step_size, step_size);
78 }
79
80 __atomic_add_fetch(&r->write_pos, step_size, __ATOMIC_SEQ_CST);
81 }
82
83 errno = 0;
84 return (long)steps;
85 }
86
ring_buffer_read(struct ring_buffer * r,void * data,uint32_t step_size,uint32_t steps)87 long ring_buffer_read(struct ring_buffer* r, void* data, uint32_t step_size, uint32_t steps) {
88 uint8_t* data_bytes = (uint8_t*)data;
89 uint32_t i;
90
91 for (i = 0; i < steps; ++i) {
92 if (!ring_buffer_can_read(r, step_size)) {
93 errno = -EAGAIN;
94 return (long)i;
95 }
96
97 // Needs to be split up into 2 reads for the edge case.
98 uint32_t available_at_end = RING_BUFFER_SIZE - get_ring_pos(r->read_pos);
99
100 if (step_size > available_at_end) {
101 uint32_t remaining = step_size - available_at_end;
102 memcpy(data_bytes + i * step_size, &r->buf[get_ring_pos(r->read_pos)],
103 available_at_end);
104 memcpy(data_bytes + i * step_size + available_at_end,
105 &r->buf[get_ring_pos(r->read_pos + available_at_end)], remaining);
106 } else {
107 memcpy(data_bytes + i * step_size, &r->buf[get_ring_pos(r->read_pos)], step_size);
108 }
109
110 __atomic_add_fetch(&r->read_pos, step_size, __ATOMIC_SEQ_CST);
111 }
112
113 errno = 0;
114 return (long)steps;
115 }
116
ring_buffer_advance_write(struct ring_buffer * r,uint32_t step_size,uint32_t steps)117 long ring_buffer_advance_write(struct ring_buffer* r, uint32_t step_size, uint32_t steps) {
118 uint32_t i;
119
120 for (i = 0; i < steps; ++i) {
121 if (!ring_buffer_can_write(r, step_size)) {
122 errno = -EAGAIN;
123 return (long)i;
124 }
125
126 __atomic_add_fetch(&r->write_pos, step_size, __ATOMIC_SEQ_CST);
127 }
128
129 errno = 0;
130 return (long)steps;
131 }
132
ring_buffer_advance_read(struct ring_buffer * r,uint32_t step_size,uint32_t steps)133 long ring_buffer_advance_read(struct ring_buffer* r, uint32_t step_size, uint32_t steps) {
134 uint32_t i;
135
136 for (i = 0; i < steps; ++i) {
137 if (!ring_buffer_can_read(r, step_size)) {
138 errno = -EAGAIN;
139 return (long)i;
140 }
141
142 __atomic_add_fetch(&r->read_pos, step_size, __ATOMIC_SEQ_CST);
143 }
144
145 errno = 0;
146 return (long)steps;
147 }
148
ring_buffer_calc_shift(uint32_t size)149 uint32_t ring_buffer_calc_shift(uint32_t size) {
150 uint32_t shift = 0;
151 while ((1 << shift) < size) {
152 ++shift;
153 }
154
155 // if size is not a power of 2,
156 if ((1 << shift) > size) {
157 --shift;
158 }
159 return shift;
160 }
161
ring_buffer_view_init(struct ring_buffer * r,struct ring_buffer_view * v,uint8_t * buf,uint32_t size)162 void ring_buffer_view_init(struct ring_buffer* r, struct ring_buffer_view* v, uint8_t* buf,
163 uint32_t size) {
164 uint32_t shift = ring_buffer_calc_shift(size);
165
166 ring_buffer_init(r);
167
168 v->buf = buf;
169 v->size = (1 << shift);
170 v->mask = (1 << shift) - 1;
171 }
172
ring_buffer_init_view_only(struct ring_buffer_view * v,uint8_t * buf,uint32_t size)173 void ring_buffer_init_view_only(struct ring_buffer_view* v, uint8_t* buf, uint32_t size) {
174 uint32_t shift = ring_buffer_calc_shift(size);
175
176 v->buf = buf;
177 v->size = (1 << shift);
178 v->mask = (1 << shift) - 1;
179 }
180
ring_buffer_view_get_ring_pos(const struct ring_buffer_view * v,uint32_t index)181 uint32_t ring_buffer_view_get_ring_pos(const struct ring_buffer_view* v, uint32_t index) {
182 return index & v->mask;
183 }
184
ring_buffer_view_can_write(const struct ring_buffer * r,const struct ring_buffer_view * v,uint32_t bytes)185 bool ring_buffer_view_can_write(const struct ring_buffer* r, const struct ring_buffer_view* v,
186 uint32_t bytes) {
187 uint32_t read_view;
188 __atomic_load(&r->read_pos, &read_view, __ATOMIC_SEQ_CST);
189 return ring_buffer_view_get_ring_pos(v, read_view - r->write_pos - 1) >= bytes;
190 }
191
ring_buffer_view_can_read(const struct ring_buffer * r,const struct ring_buffer_view * v,uint32_t bytes)192 bool ring_buffer_view_can_read(const struct ring_buffer* r, const struct ring_buffer_view* v,
193 uint32_t bytes) {
194 uint32_t write_view;
195 __atomic_load(&r->write_pos, &write_view, __ATOMIC_SEQ_CST);
196 return ring_buffer_view_get_ring_pos(v, write_view - r->read_pos) >= bytes;
197 }
198
ring_buffer_available_read(const struct ring_buffer * r,const struct ring_buffer_view * v)199 uint32_t ring_buffer_available_read(const struct ring_buffer* r, const struct ring_buffer_view* v) {
200 uint32_t write_view;
201 __atomic_load(&r->write_pos, &write_view, __ATOMIC_SEQ_CST);
202 if (v) {
203 return ring_buffer_view_get_ring_pos(v, write_view - r->read_pos);
204 } else {
205 return get_ring_pos(write_view - r->read_pos);
206 }
207 }
208
ring_buffer_available_write(const struct ring_buffer * r,const struct ring_buffer_view * v)209 uint32_t ring_buffer_available_write(const struct ring_buffer* r,
210 const struct ring_buffer_view* v) {
211 uint32_t read_view;
212 __atomic_load(&r->read_pos, &read_view, __ATOMIC_SEQ_CST);
213 if (v) {
214 return ring_buffer_view_get_ring_pos(v, read_view - r->write_pos - 1);
215 } else {
216 return get_ring_pos(read_view - r->write_pos - 1);
217 }
218 }
219
ring_buffer_copy_contents(const struct ring_buffer * r,const struct ring_buffer_view * v,uint32_t wanted_bytes,uint8_t * res)220 int ring_buffer_copy_contents(const struct ring_buffer* r, const struct ring_buffer_view* v,
221 uint32_t wanted_bytes, uint8_t* res) {
222 uint32_t total_available = ring_buffer_available_read(r, v);
223 uint32_t available_at_end = 0;
224
225 if (v) {
226 available_at_end = v->size - ring_buffer_view_get_ring_pos(v, r->read_pos);
227 } else {
228 available_at_end = RING_BUFFER_SIZE - get_ring_pos(r->write_pos);
229 }
230
231 if (total_available < wanted_bytes) {
232 return -1;
233 }
234
235 if (v) {
236 if (wanted_bytes > available_at_end) {
237 uint32_t remaining = wanted_bytes - available_at_end;
238 memcpy(res, &v->buf[ring_buffer_view_get_ring_pos(v, r->read_pos)], available_at_end);
239 memcpy(res + available_at_end,
240 &v->buf[ring_buffer_view_get_ring_pos(v, r->read_pos + available_at_end)],
241 remaining);
242 } else {
243 memcpy(res, &v->buf[ring_buffer_view_get_ring_pos(v, r->read_pos)], wanted_bytes);
244 }
245 } else {
246 if (wanted_bytes > available_at_end) {
247 uint32_t remaining = wanted_bytes - available_at_end;
248 memcpy(res, &r->buf[get_ring_pos(r->read_pos)], available_at_end);
249 memcpy(res + available_at_end, &r->buf[get_ring_pos(r->read_pos + available_at_end)],
250 remaining);
251 } else {
252 memcpy(res, &r->buf[get_ring_pos(r->read_pos)], wanted_bytes);
253 }
254 }
255 return 0;
256 }
257
ring_buffer_view_write(struct ring_buffer * r,struct ring_buffer_view * v,const void * data,uint32_t step_size,uint32_t steps)258 long ring_buffer_view_write(struct ring_buffer* r, struct ring_buffer_view* v, const void* data,
259 uint32_t step_size, uint32_t steps) {
260 uint8_t* data_bytes = (uint8_t*)data;
261 uint32_t i;
262
263 for (i = 0; i < steps; ++i) {
264 if (!ring_buffer_view_can_write(r, v, step_size)) {
265 errno = -EAGAIN;
266 return (long)i;
267 }
268
269 // Needs to be split up into 2 writes for the edge case.
270 uint32_t available_at_end = v->size - ring_buffer_view_get_ring_pos(v, r->write_pos);
271
272 if (step_size > available_at_end) {
273 uint32_t remaining = step_size - available_at_end;
274 memcpy(&v->buf[ring_buffer_view_get_ring_pos(v, r->write_pos)],
275 data_bytes + i * step_size, available_at_end);
276 memcpy(&v->buf[ring_buffer_view_get_ring_pos(v, r->write_pos + available_at_end)],
277 data_bytes + i * step_size + available_at_end, remaining);
278 } else {
279 memcpy(&v->buf[ring_buffer_view_get_ring_pos(v, r->write_pos)],
280 data_bytes + i * step_size, step_size);
281 }
282
283 __atomic_add_fetch(&r->write_pos, step_size, __ATOMIC_SEQ_CST);
284 }
285
286 errno = 0;
287 return (long)steps;
288 }
289
ring_buffer_view_read(struct ring_buffer * r,struct ring_buffer_view * v,void * data,uint32_t step_size,uint32_t steps)290 long ring_buffer_view_read(struct ring_buffer* r, struct ring_buffer_view* v, void* data,
291 uint32_t step_size, uint32_t steps) {
292 uint8_t* data_bytes = (uint8_t*)data;
293 uint32_t i;
294
295 for (i = 0; i < steps; ++i) {
296 if (!ring_buffer_view_can_read(r, v, step_size)) {
297 errno = -EAGAIN;
298 return (long)i;
299 }
300
301 // Needs to be split up into 2 reads for the edge case.
302 uint32_t available_at_end = v->size - ring_buffer_view_get_ring_pos(v, r->read_pos);
303
304 if (step_size > available_at_end) {
305 uint32_t remaining = step_size - available_at_end;
306 memcpy(data_bytes + i * step_size,
307 &v->buf[ring_buffer_view_get_ring_pos(v, r->read_pos)], available_at_end);
308 memcpy(data_bytes + i * step_size + available_at_end,
309 &v->buf[ring_buffer_view_get_ring_pos(v, r->read_pos + available_at_end)],
310 remaining);
311 } else {
312 memcpy(data_bytes + i * step_size,
313 &v->buf[ring_buffer_view_get_ring_pos(v, r->read_pos)], step_size);
314 }
315 __atomic_add_fetch(&r->read_pos, step_size, __ATOMIC_SEQ_CST);
316 }
317
318 errno = 0;
319 return (long)steps;
320 }
321
ring_buffer_yield()322 void ring_buffer_yield() {
323 #ifdef _WIN32
324 _mm_pause();
325 #else
326 sched_yield();
327 #endif
328 }
329
ring_buffer_sleep()330 static void ring_buffer_sleep() {
331 #ifdef _WIN32
332 Sleep(2);
333 #else
334 usleep(2000);
335 #endif
336 }
337
ring_buffer_curr_us()338 static uint64_t ring_buffer_curr_us() {
339 uint64_t res;
340 struct timeval tv;
341 gettimeofday(&tv, NULL);
342 res = tv.tv_sec * 1000000ULL + tv.tv_usec;
343 return res;
344 }
345
346 static const uint32_t yield_backoff_us = 1000;
347 static const uint32_t sleep_backoff_us = 2000;
348
ring_buffer_wait_write(const struct ring_buffer * r,const struct ring_buffer_view * v,uint32_t bytes,uint64_t timeout_us)349 bool ring_buffer_wait_write(const struct ring_buffer* r, const struct ring_buffer_view* v,
350 uint32_t bytes, uint64_t timeout_us) {
351 uint64_t start_us = ring_buffer_curr_us();
352 uint64_t curr_wait_us;
353
354 bool can_write = v ? ring_buffer_view_can_write(r, v, bytes) : ring_buffer_can_write(r, bytes);
355
356 while (!can_write) {
357 #ifdef __x86_64
358 _mm_pause();
359 #endif
360 curr_wait_us = ring_buffer_curr_us() - start_us;
361
362 if (curr_wait_us > yield_backoff_us) {
363 ring_buffer_yield();
364 }
365
366 if (curr_wait_us > sleep_backoff_us) {
367 ring_buffer_sleep();
368 }
369
370 if (curr_wait_us > timeout_us) {
371 return false;
372 }
373
374 can_write = v ? ring_buffer_view_can_write(r, v, bytes) : ring_buffer_can_write(r, bytes);
375 }
376
377 return true;
378 }
379
ring_buffer_wait_read(const struct ring_buffer * r,const struct ring_buffer_view * v,uint32_t bytes,uint64_t timeout_us)380 bool ring_buffer_wait_read(const struct ring_buffer* r, const struct ring_buffer_view* v,
381 uint32_t bytes, uint64_t timeout_us) {
382 uint64_t start_us = ring_buffer_curr_us();
383 uint64_t curr_wait_us;
384
385 bool can_read = v ? ring_buffer_view_can_read(r, v, bytes) : ring_buffer_can_read(r, bytes);
386
387 while (!can_read) {
388 // TODO(bohu): find aarch64 equivalent
389 #ifdef __x86_64
390 _mm_pause();
391 #endif
392 curr_wait_us = ring_buffer_curr_us() - start_us;
393
394 if (curr_wait_us > yield_backoff_us) {
395 ring_buffer_yield();
396 ((struct ring_buffer*)r)->read_yield_count++;
397 }
398
399 if (curr_wait_us > sleep_backoff_us) {
400 ring_buffer_sleep();
401 ((struct ring_buffer*)r)->read_sleep_us_count += 2000;
402 }
403
404 if (curr_wait_us > timeout_us) {
405 return false;
406 }
407
408 can_read = v ? ring_buffer_view_can_read(r, v, bytes) : ring_buffer_can_read(r, bytes);
409 }
410
411 ((struct ring_buffer*)r)->read_live_count++;
412 return true;
413 }
414
get_step_size(struct ring_buffer * r,struct ring_buffer_view * v,uint32_t bytes)415 static uint32_t get_step_size(struct ring_buffer* r, struct ring_buffer_view* v, uint32_t bytes) {
416 uint32_t available = v ? (v->size >> 1) : (RING_BUFFER_SIZE >> 1);
417 uint32_t res = available < bytes ? available : bytes;
418
419 return res;
420 }
421
ring_buffer_write_fully(struct ring_buffer * r,struct ring_buffer_view * v,const void * data,uint32_t bytes)422 void ring_buffer_write_fully(struct ring_buffer* r, struct ring_buffer_view* v, const void* data,
423 uint32_t bytes) {
424 ring_buffer_write_fully_with_abort(r, v, data, bytes, 0, 0);
425 }
426
ring_buffer_read_fully(struct ring_buffer * r,struct ring_buffer_view * v,void * data,uint32_t bytes)427 void ring_buffer_read_fully(struct ring_buffer* r, struct ring_buffer_view* v, void* data,
428 uint32_t bytes) {
429 ring_buffer_read_fully_with_abort(r, v, data, bytes, 0, 0);
430 }
431
ring_buffer_write_fully_with_abort(struct ring_buffer * r,struct ring_buffer_view * v,const void * data,uint32_t bytes,uint32_t abort_value,const volatile uint32_t * abort_ptr)432 uint32_t ring_buffer_write_fully_with_abort(struct ring_buffer* r, struct ring_buffer_view* v,
433 const void* data, uint32_t bytes, uint32_t abort_value,
434 const volatile uint32_t* abort_ptr) {
435 uint32_t candidate_step = get_step_size(r, v, bytes);
436 uint32_t processed = 0;
437
438 uint8_t* dst = (uint8_t*)data;
439
440 while (processed < bytes) {
441 if (bytes - processed < candidate_step) {
442 candidate_step = bytes - processed;
443 }
444
445 long processed_here = 0;
446 ring_buffer_wait_write(r, v, candidate_step, (uint64_t)(-1));
447
448 if (v) {
449 processed_here = ring_buffer_view_write(r, v, dst + processed, candidate_step, 1);
450 } else {
451 processed_here = ring_buffer_write(r, dst + processed, candidate_step, 1);
452 }
453
454 processed += processed_here ? candidate_step : 0;
455
456 if (abort_ptr && (abort_value == *abort_ptr)) {
457 return processed;
458 }
459 }
460
461 return processed;
462 }
463
ring_buffer_read_fully_with_abort(struct ring_buffer * r,struct ring_buffer_view * v,void * data,uint32_t bytes,uint32_t abort_value,const volatile uint32_t * abort_ptr)464 uint32_t ring_buffer_read_fully_with_abort(struct ring_buffer* r, struct ring_buffer_view* v,
465 void* data, uint32_t bytes, uint32_t abort_value,
466 const volatile uint32_t* abort_ptr) {
467 uint32_t candidate_step = get_step_size(r, v, bytes);
468 uint32_t processed = 0;
469
470 uint8_t* dst = (uint8_t*)data;
471
472 while (processed < bytes) {
473 #ifdef __x86_64
474 _mm_pause();
475 #endif
476 if (bytes - processed < candidate_step) {
477 candidate_step = bytes - processed;
478 }
479
480 long processed_here = 0;
481 ring_buffer_wait_read(r, v, candidate_step, (uint64_t)(-1));
482
483 if (v) {
484 processed_here = ring_buffer_view_read(r, v, dst + processed, candidate_step, 1);
485 } else {
486 processed_here = ring_buffer_read(r, dst + processed, candidate_step, 1);
487 }
488
489 processed += processed_here ? candidate_step : 0;
490
491 if (abort_ptr && (abort_value == *abort_ptr)) {
492 return processed;
493 }
494 }
495
496 return processed;
497 }
498
ring_buffer_sync_init(struct ring_buffer * r)499 void ring_buffer_sync_init(struct ring_buffer* r) {
500 __atomic_store_n(&r->state, RING_BUFFER_SYNC_PRODUCER_IDLE, __ATOMIC_SEQ_CST);
501 }
502
ring_buffer_producer_acquire(struct ring_buffer * r)503 bool ring_buffer_producer_acquire(struct ring_buffer* r) {
504 uint32_t expected_idle = RING_BUFFER_SYNC_PRODUCER_IDLE;
505 bool success =
506 __atomic_compare_exchange_n(&r->state, &expected_idle, RING_BUFFER_SYNC_PRODUCER_ACTIVE,
507 false /* strong */, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
508 return success;
509 }
510
ring_buffer_producer_acquire_from_hangup(struct ring_buffer * r)511 bool ring_buffer_producer_acquire_from_hangup(struct ring_buffer* r) {
512 uint32_t expected_hangup = RING_BUFFER_SYNC_CONSUMER_HUNG_UP;
513 bool success =
514 __atomic_compare_exchange_n(&r->state, &expected_hangup, RING_BUFFER_SYNC_PRODUCER_ACTIVE,
515 false /* strong */, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
516 return success;
517 }
518
ring_buffer_producer_wait_hangup(struct ring_buffer * r)519 void ring_buffer_producer_wait_hangup(struct ring_buffer* r) {
520 while (__atomic_load_n(&r->state, __ATOMIC_SEQ_CST) != RING_BUFFER_SYNC_CONSUMER_HUNG_UP) {
521 ring_buffer_yield();
522 }
523 }
524
ring_buffer_producer_idle(struct ring_buffer * r)525 void ring_buffer_producer_idle(struct ring_buffer* r) {
526 __atomic_store_n(&r->state, RING_BUFFER_SYNC_PRODUCER_IDLE, __ATOMIC_SEQ_CST);
527 }
528
ring_buffer_consumer_hangup(struct ring_buffer * r)529 bool ring_buffer_consumer_hangup(struct ring_buffer* r) {
530 uint32_t expected_idle = RING_BUFFER_SYNC_PRODUCER_IDLE;
531 bool success =
532 __atomic_compare_exchange_n(&r->state, &expected_idle, RING_BUFFER_SYNC_CONSUMER_HANGING_UP,
533 false /* strong */, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
534 return success;
535 }
536
ring_buffer_consumer_wait_producer_idle(struct ring_buffer * r)537 void ring_buffer_consumer_wait_producer_idle(struct ring_buffer* r) {
538 while (__atomic_load_n(&r->state, __ATOMIC_SEQ_CST) != RING_BUFFER_SYNC_PRODUCER_IDLE) {
539 ring_buffer_yield();
540 }
541 }
542
ring_buffer_consumer_hung_up(struct ring_buffer * r)543 void ring_buffer_consumer_hung_up(struct ring_buffer* r) {
544 __atomic_store_n(&r->state, RING_BUFFER_SYNC_CONSUMER_HUNG_UP, __ATOMIC_SEQ_CST);
545 }
546