xref: /aosp_15_r20/external/mesa3d/src/gfxstream/aemu/ring_buffer.cpp (revision 6104692788411f58d303aa86923a9ff6ecaded22)
1 /*
2  * Copyright 2018 Google
3  * SPDX-License-Identifier: MIT
4  */
5 #include "ring_buffer.h"
6 
7 #include <errno.h>
8 #include <string.h>
9 #ifdef _MSC_VER
10 #include "aemu/base/msvc.h"
11 #else
12 #include <sys/time.h>
13 #endif
14 
15 #ifdef __x86_64__
16 #include <emmintrin.h>
17 #endif
18 
19 #ifdef _WIN32
20 #include <windows.h>
21 #else
22 #include <sched.h>
23 #include <unistd.h>
24 #endif
25 
26 #define RING_BUFFER_MASK (RING_BUFFER_SIZE - 1)
27 
28 #define RING_BUFFER_VERSION 1
29 
ring_buffer_init(struct ring_buffer * r)30 void ring_buffer_init(struct ring_buffer* r) {
31     r->host_version = 1;
32     r->write_pos = 0;
33     r->read_pos = 0;
34 
35     r->read_live_count = 0;
36     r->read_yield_count = 0;
37     r->read_sleep_us_count = 0;
38 
39     r->state = 0;
40 }
41 
get_ring_pos(uint32_t index)42 static uint32_t get_ring_pos(uint32_t index) { return index & RING_BUFFER_MASK; }
43 
ring_buffer_can_write(const struct ring_buffer * r,uint32_t bytes)44 bool ring_buffer_can_write(const struct ring_buffer* r, uint32_t bytes) {
45     uint32_t read_view;
46     __atomic_load(&r->read_pos, &read_view, __ATOMIC_SEQ_CST);
47     return get_ring_pos(read_view - r->write_pos - 1) >= bytes;
48 }
49 
ring_buffer_can_read(const struct ring_buffer * r,uint32_t bytes)50 bool ring_buffer_can_read(const struct ring_buffer* r, uint32_t bytes) {
51     uint32_t write_view;
52     __atomic_load(&r->write_pos, &write_view, __ATOMIC_SEQ_CST);
53     return get_ring_pos(write_view - r->read_pos) >= bytes;
54 }
55 
ring_buffer_write(struct ring_buffer * r,const void * data,uint32_t step_size,uint32_t steps)56 long ring_buffer_write(struct ring_buffer* r, const void* data, uint32_t step_size,
57                        uint32_t steps) {
58     const uint8_t* data_bytes = (const uint8_t*)data;
59     uint32_t i;
60 
61     for (i = 0; i < steps; ++i) {
62         if (!ring_buffer_can_write(r, step_size)) {
63             errno = -EAGAIN;
64             return (long)i;
65         }
66 
67         // Needs to be split up into 2 writes for the edge case.
68         uint32_t available_at_end = RING_BUFFER_SIZE - get_ring_pos(r->write_pos);
69 
70         if (step_size > available_at_end) {
71             uint32_t remaining = step_size - available_at_end;
72             memcpy(&r->buf[get_ring_pos(r->write_pos)], data_bytes + i * step_size,
73                    available_at_end);
74             memcpy(&r->buf[get_ring_pos(r->write_pos + available_at_end)],
75                    data_bytes + i * step_size + available_at_end, remaining);
76         } else {
77             memcpy(&r->buf[get_ring_pos(r->write_pos)], data_bytes + i * step_size, step_size);
78         }
79 
80         __atomic_add_fetch(&r->write_pos, step_size, __ATOMIC_SEQ_CST);
81     }
82 
83     errno = 0;
84     return (long)steps;
85 }
86 
ring_buffer_read(struct ring_buffer * r,void * data,uint32_t step_size,uint32_t steps)87 long ring_buffer_read(struct ring_buffer* r, void* data, uint32_t step_size, uint32_t steps) {
88     uint8_t* data_bytes = (uint8_t*)data;
89     uint32_t i;
90 
91     for (i = 0; i < steps; ++i) {
92         if (!ring_buffer_can_read(r, step_size)) {
93             errno = -EAGAIN;
94             return (long)i;
95         }
96 
97         // Needs to be split up into 2 reads for the edge case.
98         uint32_t available_at_end = RING_BUFFER_SIZE - get_ring_pos(r->read_pos);
99 
100         if (step_size > available_at_end) {
101             uint32_t remaining = step_size - available_at_end;
102             memcpy(data_bytes + i * step_size, &r->buf[get_ring_pos(r->read_pos)],
103                    available_at_end);
104             memcpy(data_bytes + i * step_size + available_at_end,
105                    &r->buf[get_ring_pos(r->read_pos + available_at_end)], remaining);
106         } else {
107             memcpy(data_bytes + i * step_size, &r->buf[get_ring_pos(r->read_pos)], step_size);
108         }
109 
110         __atomic_add_fetch(&r->read_pos, step_size, __ATOMIC_SEQ_CST);
111     }
112 
113     errno = 0;
114     return (long)steps;
115 }
116 
ring_buffer_advance_write(struct ring_buffer * r,uint32_t step_size,uint32_t steps)117 long ring_buffer_advance_write(struct ring_buffer* r, uint32_t step_size, uint32_t steps) {
118     uint32_t i;
119 
120     for (i = 0; i < steps; ++i) {
121         if (!ring_buffer_can_write(r, step_size)) {
122             errno = -EAGAIN;
123             return (long)i;
124         }
125 
126         __atomic_add_fetch(&r->write_pos, step_size, __ATOMIC_SEQ_CST);
127     }
128 
129     errno = 0;
130     return (long)steps;
131 }
132 
ring_buffer_advance_read(struct ring_buffer * r,uint32_t step_size,uint32_t steps)133 long ring_buffer_advance_read(struct ring_buffer* r, uint32_t step_size, uint32_t steps) {
134     uint32_t i;
135 
136     for (i = 0; i < steps; ++i) {
137         if (!ring_buffer_can_read(r, step_size)) {
138             errno = -EAGAIN;
139             return (long)i;
140         }
141 
142         __atomic_add_fetch(&r->read_pos, step_size, __ATOMIC_SEQ_CST);
143     }
144 
145     errno = 0;
146     return (long)steps;
147 }
148 
ring_buffer_calc_shift(uint32_t size)149 uint32_t ring_buffer_calc_shift(uint32_t size) {
150     uint32_t shift = 0;
151     while ((1 << shift) < size) {
152         ++shift;
153     }
154 
155     // if size is not a power of 2,
156     if ((1 << shift) > size) {
157         --shift;
158     }
159     return shift;
160 }
161 
ring_buffer_view_init(struct ring_buffer * r,struct ring_buffer_view * v,uint8_t * buf,uint32_t size)162 void ring_buffer_view_init(struct ring_buffer* r, struct ring_buffer_view* v, uint8_t* buf,
163                            uint32_t size) {
164     uint32_t shift = ring_buffer_calc_shift(size);
165 
166     ring_buffer_init(r);
167 
168     v->buf = buf;
169     v->size = (1 << shift);
170     v->mask = (1 << shift) - 1;
171 }
172 
ring_buffer_init_view_only(struct ring_buffer_view * v,uint8_t * buf,uint32_t size)173 void ring_buffer_init_view_only(struct ring_buffer_view* v, uint8_t* buf, uint32_t size) {
174     uint32_t shift = ring_buffer_calc_shift(size);
175 
176     v->buf = buf;
177     v->size = (1 << shift);
178     v->mask = (1 << shift) - 1;
179 }
180 
ring_buffer_view_get_ring_pos(const struct ring_buffer_view * v,uint32_t index)181 uint32_t ring_buffer_view_get_ring_pos(const struct ring_buffer_view* v, uint32_t index) {
182     return index & v->mask;
183 }
184 
ring_buffer_view_can_write(const struct ring_buffer * r,const struct ring_buffer_view * v,uint32_t bytes)185 bool ring_buffer_view_can_write(const struct ring_buffer* r, const struct ring_buffer_view* v,
186                                 uint32_t bytes) {
187     uint32_t read_view;
188     __atomic_load(&r->read_pos, &read_view, __ATOMIC_SEQ_CST);
189     return ring_buffer_view_get_ring_pos(v, read_view - r->write_pos - 1) >= bytes;
190 }
191 
ring_buffer_view_can_read(const struct ring_buffer * r,const struct ring_buffer_view * v,uint32_t bytes)192 bool ring_buffer_view_can_read(const struct ring_buffer* r, const struct ring_buffer_view* v,
193                                uint32_t bytes) {
194     uint32_t write_view;
195     __atomic_load(&r->write_pos, &write_view, __ATOMIC_SEQ_CST);
196     return ring_buffer_view_get_ring_pos(v, write_view - r->read_pos) >= bytes;
197 }
198 
ring_buffer_available_read(const struct ring_buffer * r,const struct ring_buffer_view * v)199 uint32_t ring_buffer_available_read(const struct ring_buffer* r, const struct ring_buffer_view* v) {
200     uint32_t write_view;
201     __atomic_load(&r->write_pos, &write_view, __ATOMIC_SEQ_CST);
202     if (v) {
203         return ring_buffer_view_get_ring_pos(v, write_view - r->read_pos);
204     } else {
205         return get_ring_pos(write_view - r->read_pos);
206     }
207 }
208 
ring_buffer_available_write(const struct ring_buffer * r,const struct ring_buffer_view * v)209 uint32_t ring_buffer_available_write(const struct ring_buffer* r,
210                                      const struct ring_buffer_view* v) {
211     uint32_t read_view;
212     __atomic_load(&r->read_pos, &read_view, __ATOMIC_SEQ_CST);
213     if (v) {
214         return ring_buffer_view_get_ring_pos(v, read_view - r->write_pos - 1);
215     } else {
216         return get_ring_pos(read_view - r->write_pos - 1);
217     }
218 }
219 
ring_buffer_copy_contents(const struct ring_buffer * r,const struct ring_buffer_view * v,uint32_t wanted_bytes,uint8_t * res)220 int ring_buffer_copy_contents(const struct ring_buffer* r, const struct ring_buffer_view* v,
221                               uint32_t wanted_bytes, uint8_t* res) {
222     uint32_t total_available = ring_buffer_available_read(r, v);
223     uint32_t available_at_end = 0;
224 
225     if (v) {
226         available_at_end = v->size - ring_buffer_view_get_ring_pos(v, r->read_pos);
227     } else {
228         available_at_end = RING_BUFFER_SIZE - get_ring_pos(r->write_pos);
229     }
230 
231     if (total_available < wanted_bytes) {
232         return -1;
233     }
234 
235     if (v) {
236         if (wanted_bytes > available_at_end) {
237             uint32_t remaining = wanted_bytes - available_at_end;
238             memcpy(res, &v->buf[ring_buffer_view_get_ring_pos(v, r->read_pos)], available_at_end);
239             memcpy(res + available_at_end,
240                    &v->buf[ring_buffer_view_get_ring_pos(v, r->read_pos + available_at_end)],
241                    remaining);
242         } else {
243             memcpy(res, &v->buf[ring_buffer_view_get_ring_pos(v, r->read_pos)], wanted_bytes);
244         }
245     } else {
246         if (wanted_bytes > available_at_end) {
247             uint32_t remaining = wanted_bytes - available_at_end;
248             memcpy(res, &r->buf[get_ring_pos(r->read_pos)], available_at_end);
249             memcpy(res + available_at_end, &r->buf[get_ring_pos(r->read_pos + available_at_end)],
250                    remaining);
251         } else {
252             memcpy(res, &r->buf[get_ring_pos(r->read_pos)], wanted_bytes);
253         }
254     }
255     return 0;
256 }
257 
ring_buffer_view_write(struct ring_buffer * r,struct ring_buffer_view * v,const void * data,uint32_t step_size,uint32_t steps)258 long ring_buffer_view_write(struct ring_buffer* r, struct ring_buffer_view* v, const void* data,
259                             uint32_t step_size, uint32_t steps) {
260     uint8_t* data_bytes = (uint8_t*)data;
261     uint32_t i;
262 
263     for (i = 0; i < steps; ++i) {
264         if (!ring_buffer_view_can_write(r, v, step_size)) {
265             errno = -EAGAIN;
266             return (long)i;
267         }
268 
269         // Needs to be split up into 2 writes for the edge case.
270         uint32_t available_at_end = v->size - ring_buffer_view_get_ring_pos(v, r->write_pos);
271 
272         if (step_size > available_at_end) {
273             uint32_t remaining = step_size - available_at_end;
274             memcpy(&v->buf[ring_buffer_view_get_ring_pos(v, r->write_pos)],
275                    data_bytes + i * step_size, available_at_end);
276             memcpy(&v->buf[ring_buffer_view_get_ring_pos(v, r->write_pos + available_at_end)],
277                    data_bytes + i * step_size + available_at_end, remaining);
278         } else {
279             memcpy(&v->buf[ring_buffer_view_get_ring_pos(v, r->write_pos)],
280                    data_bytes + i * step_size, step_size);
281         }
282 
283         __atomic_add_fetch(&r->write_pos, step_size, __ATOMIC_SEQ_CST);
284     }
285 
286     errno = 0;
287     return (long)steps;
288 }
289 
ring_buffer_view_read(struct ring_buffer * r,struct ring_buffer_view * v,void * data,uint32_t step_size,uint32_t steps)290 long ring_buffer_view_read(struct ring_buffer* r, struct ring_buffer_view* v, void* data,
291                            uint32_t step_size, uint32_t steps) {
292     uint8_t* data_bytes = (uint8_t*)data;
293     uint32_t i;
294 
295     for (i = 0; i < steps; ++i) {
296         if (!ring_buffer_view_can_read(r, v, step_size)) {
297             errno = -EAGAIN;
298             return (long)i;
299         }
300 
301         // Needs to be split up into 2 reads for the edge case.
302         uint32_t available_at_end = v->size - ring_buffer_view_get_ring_pos(v, r->read_pos);
303 
304         if (step_size > available_at_end) {
305             uint32_t remaining = step_size - available_at_end;
306             memcpy(data_bytes + i * step_size,
307                    &v->buf[ring_buffer_view_get_ring_pos(v, r->read_pos)], available_at_end);
308             memcpy(data_bytes + i * step_size + available_at_end,
309                    &v->buf[ring_buffer_view_get_ring_pos(v, r->read_pos + available_at_end)],
310                    remaining);
311         } else {
312             memcpy(data_bytes + i * step_size,
313                    &v->buf[ring_buffer_view_get_ring_pos(v, r->read_pos)], step_size);
314         }
315         __atomic_add_fetch(&r->read_pos, step_size, __ATOMIC_SEQ_CST);
316     }
317 
318     errno = 0;
319     return (long)steps;
320 }
321 
ring_buffer_yield()322 void ring_buffer_yield() {
323 #ifdef _WIN32
324     _mm_pause();
325 #else
326     sched_yield();
327 #endif
328 }
329 
ring_buffer_sleep()330 static void ring_buffer_sleep() {
331 #ifdef _WIN32
332     Sleep(2);
333 #else
334     usleep(2000);
335 #endif
336 }
337 
ring_buffer_curr_us()338 static uint64_t ring_buffer_curr_us() {
339     uint64_t res;
340     struct timeval tv;
341     gettimeofday(&tv, NULL);
342     res = tv.tv_sec * 1000000ULL + tv.tv_usec;
343     return res;
344 }
345 
346 static const uint32_t yield_backoff_us = 1000;
347 static const uint32_t sleep_backoff_us = 2000;
348 
ring_buffer_wait_write(const struct ring_buffer * r,const struct ring_buffer_view * v,uint32_t bytes,uint64_t timeout_us)349 bool ring_buffer_wait_write(const struct ring_buffer* r, const struct ring_buffer_view* v,
350                             uint32_t bytes, uint64_t timeout_us) {
351     uint64_t start_us = ring_buffer_curr_us();
352     uint64_t curr_wait_us;
353 
354     bool can_write = v ? ring_buffer_view_can_write(r, v, bytes) : ring_buffer_can_write(r, bytes);
355 
356     while (!can_write) {
357 #ifdef __x86_64
358         _mm_pause();
359 #endif
360         curr_wait_us = ring_buffer_curr_us() - start_us;
361 
362         if (curr_wait_us > yield_backoff_us) {
363             ring_buffer_yield();
364         }
365 
366         if (curr_wait_us > sleep_backoff_us) {
367             ring_buffer_sleep();
368         }
369 
370         if (curr_wait_us > timeout_us) {
371             return false;
372         }
373 
374         can_write = v ? ring_buffer_view_can_write(r, v, bytes) : ring_buffer_can_write(r, bytes);
375     }
376 
377     return true;
378 }
379 
ring_buffer_wait_read(const struct ring_buffer * r,const struct ring_buffer_view * v,uint32_t bytes,uint64_t timeout_us)380 bool ring_buffer_wait_read(const struct ring_buffer* r, const struct ring_buffer_view* v,
381                            uint32_t bytes, uint64_t timeout_us) {
382     uint64_t start_us = ring_buffer_curr_us();
383     uint64_t curr_wait_us;
384 
385     bool can_read = v ? ring_buffer_view_can_read(r, v, bytes) : ring_buffer_can_read(r, bytes);
386 
387     while (!can_read) {
388         // TODO(bohu): find aarch64 equivalent
389 #ifdef __x86_64
390         _mm_pause();
391 #endif
392         curr_wait_us = ring_buffer_curr_us() - start_us;
393 
394         if (curr_wait_us > yield_backoff_us) {
395             ring_buffer_yield();
396             ((struct ring_buffer*)r)->read_yield_count++;
397         }
398 
399         if (curr_wait_us > sleep_backoff_us) {
400             ring_buffer_sleep();
401             ((struct ring_buffer*)r)->read_sleep_us_count += 2000;
402         }
403 
404         if (curr_wait_us > timeout_us) {
405             return false;
406         }
407 
408         can_read = v ? ring_buffer_view_can_read(r, v, bytes) : ring_buffer_can_read(r, bytes);
409     }
410 
411     ((struct ring_buffer*)r)->read_live_count++;
412     return true;
413 }
414 
get_step_size(struct ring_buffer * r,struct ring_buffer_view * v,uint32_t bytes)415 static uint32_t get_step_size(struct ring_buffer* r, struct ring_buffer_view* v, uint32_t bytes) {
416     uint32_t available = v ? (v->size >> 1) : (RING_BUFFER_SIZE >> 1);
417     uint32_t res = available < bytes ? available : bytes;
418 
419     return res;
420 }
421 
ring_buffer_write_fully(struct ring_buffer * r,struct ring_buffer_view * v,const void * data,uint32_t bytes)422 void ring_buffer_write_fully(struct ring_buffer* r, struct ring_buffer_view* v, const void* data,
423                              uint32_t bytes) {
424     ring_buffer_write_fully_with_abort(r, v, data, bytes, 0, 0);
425 }
426 
ring_buffer_read_fully(struct ring_buffer * r,struct ring_buffer_view * v,void * data,uint32_t bytes)427 void ring_buffer_read_fully(struct ring_buffer* r, struct ring_buffer_view* v, void* data,
428                             uint32_t bytes) {
429     ring_buffer_read_fully_with_abort(r, v, data, bytes, 0, 0);
430 }
431 
ring_buffer_write_fully_with_abort(struct ring_buffer * r,struct ring_buffer_view * v,const void * data,uint32_t bytes,uint32_t abort_value,const volatile uint32_t * abort_ptr)432 uint32_t ring_buffer_write_fully_with_abort(struct ring_buffer* r, struct ring_buffer_view* v,
433                                             const void* data, uint32_t bytes, uint32_t abort_value,
434                                             const volatile uint32_t* abort_ptr) {
435     uint32_t candidate_step = get_step_size(r, v, bytes);
436     uint32_t processed = 0;
437 
438     uint8_t* dst = (uint8_t*)data;
439 
440     while (processed < bytes) {
441         if (bytes - processed < candidate_step) {
442             candidate_step = bytes - processed;
443         }
444 
445         long processed_here = 0;
446         ring_buffer_wait_write(r, v, candidate_step, (uint64_t)(-1));
447 
448         if (v) {
449             processed_here = ring_buffer_view_write(r, v, dst + processed, candidate_step, 1);
450         } else {
451             processed_here = ring_buffer_write(r, dst + processed, candidate_step, 1);
452         }
453 
454         processed += processed_here ? candidate_step : 0;
455 
456         if (abort_ptr && (abort_value == *abort_ptr)) {
457             return processed;
458         }
459     }
460 
461     return processed;
462 }
463 
ring_buffer_read_fully_with_abort(struct ring_buffer * r,struct ring_buffer_view * v,void * data,uint32_t bytes,uint32_t abort_value,const volatile uint32_t * abort_ptr)464 uint32_t ring_buffer_read_fully_with_abort(struct ring_buffer* r, struct ring_buffer_view* v,
465                                            void* data, uint32_t bytes, uint32_t abort_value,
466                                            const volatile uint32_t* abort_ptr) {
467     uint32_t candidate_step = get_step_size(r, v, bytes);
468     uint32_t processed = 0;
469 
470     uint8_t* dst = (uint8_t*)data;
471 
472     while (processed < bytes) {
473 #ifdef __x86_64
474         _mm_pause();
475 #endif
476         if (bytes - processed < candidate_step) {
477             candidate_step = bytes - processed;
478         }
479 
480         long processed_here = 0;
481         ring_buffer_wait_read(r, v, candidate_step, (uint64_t)(-1));
482 
483         if (v) {
484             processed_here = ring_buffer_view_read(r, v, dst + processed, candidate_step, 1);
485         } else {
486             processed_here = ring_buffer_read(r, dst + processed, candidate_step, 1);
487         }
488 
489         processed += processed_here ? candidate_step : 0;
490 
491         if (abort_ptr && (abort_value == *abort_ptr)) {
492             return processed;
493         }
494     }
495 
496     return processed;
497 }
498 
ring_buffer_sync_init(struct ring_buffer * r)499 void ring_buffer_sync_init(struct ring_buffer* r) {
500     __atomic_store_n(&r->state, RING_BUFFER_SYNC_PRODUCER_IDLE, __ATOMIC_SEQ_CST);
501 }
502 
ring_buffer_producer_acquire(struct ring_buffer * r)503 bool ring_buffer_producer_acquire(struct ring_buffer* r) {
504     uint32_t expected_idle = RING_BUFFER_SYNC_PRODUCER_IDLE;
505     bool success =
506         __atomic_compare_exchange_n(&r->state, &expected_idle, RING_BUFFER_SYNC_PRODUCER_ACTIVE,
507                                     false /* strong */, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
508     return success;
509 }
510 
ring_buffer_producer_acquire_from_hangup(struct ring_buffer * r)511 bool ring_buffer_producer_acquire_from_hangup(struct ring_buffer* r) {
512     uint32_t expected_hangup = RING_BUFFER_SYNC_CONSUMER_HUNG_UP;
513     bool success =
514         __atomic_compare_exchange_n(&r->state, &expected_hangup, RING_BUFFER_SYNC_PRODUCER_ACTIVE,
515                                     false /* strong */, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
516     return success;
517 }
518 
ring_buffer_producer_wait_hangup(struct ring_buffer * r)519 void ring_buffer_producer_wait_hangup(struct ring_buffer* r) {
520     while (__atomic_load_n(&r->state, __ATOMIC_SEQ_CST) != RING_BUFFER_SYNC_CONSUMER_HUNG_UP) {
521         ring_buffer_yield();
522     }
523 }
524 
ring_buffer_producer_idle(struct ring_buffer * r)525 void ring_buffer_producer_idle(struct ring_buffer* r) {
526     __atomic_store_n(&r->state, RING_BUFFER_SYNC_PRODUCER_IDLE, __ATOMIC_SEQ_CST);
527 }
528 
ring_buffer_consumer_hangup(struct ring_buffer * r)529 bool ring_buffer_consumer_hangup(struct ring_buffer* r) {
530     uint32_t expected_idle = RING_BUFFER_SYNC_PRODUCER_IDLE;
531     bool success =
532         __atomic_compare_exchange_n(&r->state, &expected_idle, RING_BUFFER_SYNC_CONSUMER_HANGING_UP,
533                                     false /* strong */, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
534     return success;
535 }
536 
ring_buffer_consumer_wait_producer_idle(struct ring_buffer * r)537 void ring_buffer_consumer_wait_producer_idle(struct ring_buffer* r) {
538     while (__atomic_load_n(&r->state, __ATOMIC_SEQ_CST) != RING_BUFFER_SYNC_PRODUCER_IDLE) {
539         ring_buffer_yield();
540     }
541 }
542 
ring_buffer_consumer_hung_up(struct ring_buffer * r)543 void ring_buffer_consumer_hung_up(struct ring_buffer* r) {
544     __atomic_store_n(&r->state, RING_BUFFER_SYNC_CONSUMER_HUNG_UP, __ATOMIC_SEQ_CST);
545 }
546