xref: /aosp_15_r20/system/extras/libfec/fec_process.cpp (revision 288bf5226967eb3dac5cce6c939ccc2a7f2b4fe5)
1 /*
2  * Copyright (C) 2015 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <future>
18 #include "fec_private.h"
19 
20 struct process_info {
21     int id;
22     fec_handle* f;
23     uint8_t* buf;
24     size_t count;
25     uint64_t offset;
26     read_func func;
27     ssize_t rc;
28     size_t errors;
29 };
30 
31 /* thread function  */
__process(process_info * p)32 static process_info* __process(process_info* p) {
33     debug("thread %d: [%" PRIu64 ", %" PRIu64 ")", p->id, p->offset, p->offset + p->count);
34 
35     p->rc = p->func(p->f, p->buf, p->count, p->offset, &p->errors);
36     return p;
37 }
38 
39 /* launches a maximum number of threads to process a read */
process(fec_handle * f,uint8_t * buf,size_t count,uint64_t offset,read_func func)40 ssize_t process(fec_handle* f, uint8_t* buf, size_t count, uint64_t offset, read_func func) {
41     check(f);
42     check(buf);
43     check(func);
44 
45     if (count == 0) {
46         return 0;
47     }
48 
49     int threads = sysconf(_SC_NPROCESSORS_ONLN);
50 
51     if (threads < WORK_MIN_THREADS) {
52         threads = WORK_MIN_THREADS;
53     } else if (threads > WORK_MAX_THREADS) {
54         threads = WORK_MAX_THREADS;
55     }
56 
57     uint64_t start = (offset / FEC_BLOCKSIZE) * FEC_BLOCKSIZE;
58     size_t blocks = fec_div_round_up(offset + count - start, FEC_BLOCKSIZE);
59 
60     /* start at most one thread per block we're accessing */
61     if ((size_t)threads > blocks) {
62         threads = (int)blocks;
63     }
64 
65     size_t count_per_thread = fec_div_round_up(blocks, threads) * FEC_BLOCKSIZE;
66     size_t left = count;
67     uint64_t pos = offset;
68     uint64_t end = start + count_per_thread;
69 
70     debug("max %d threads, %zu bytes per thread (total %zu spanning %zu blocks)", threads,
71           count_per_thread, count, blocks);
72 
73     std::vector<std::future<process_info*>> handles;
74     process_info info[threads];
75     ssize_t rc = 0;
76 
77     /* start threads to process queue */
78     for (int i = 0; i < threads && left > 0; ++i) {
79         info[i].id = i;
80         info[i].f = f;
81         info[i].buf = &buf[pos - offset];
82         info[i].count = (size_t)(end - pos);
83         info[i].offset = pos;
84         info[i].func = func;
85         info[i].rc = -1;
86         info[i].errors = 0;
87 
88         if (info[i].count > left) {
89             info[i].count = left;
90         }
91 
92         handles.push_back(std::async(std::launch::async, __process, &info[i]));
93 
94         pos = end;
95         end += count_per_thread;
96         left -= info[i].count;
97     }
98 
99     ssize_t nread = 0;
100 
101     /* wait for all threads to complete */
102     for (auto&& future : handles) {
103         process_info* p = future.get();
104         if (!p || p->rc == -1) {
105             rc = -1;
106         } else {
107             nread += p->rc;
108             f->errors += p->errors;
109         }
110     }
111 
112     if (left > 0 || rc == -1) {
113         errno = EIO;
114         return -1;
115     }
116 
117     return nread;
118 }
119