1 /*
2 * Copyright (C) 2015 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <future>
18 #include "fec_private.h"
19
20 struct process_info {
21 int id;
22 fec_handle* f;
23 uint8_t* buf;
24 size_t count;
25 uint64_t offset;
26 read_func func;
27 ssize_t rc;
28 size_t errors;
29 };
30
31 /* thread function */
__process(process_info * p)32 static process_info* __process(process_info* p) {
33 debug("thread %d: [%" PRIu64 ", %" PRIu64 ")", p->id, p->offset, p->offset + p->count);
34
35 p->rc = p->func(p->f, p->buf, p->count, p->offset, &p->errors);
36 return p;
37 }
38
39 /* launches a maximum number of threads to process a read */
process(fec_handle * f,uint8_t * buf,size_t count,uint64_t offset,read_func func)40 ssize_t process(fec_handle* f, uint8_t* buf, size_t count, uint64_t offset, read_func func) {
41 check(f);
42 check(buf);
43 check(func);
44
45 if (count == 0) {
46 return 0;
47 }
48
49 int threads = sysconf(_SC_NPROCESSORS_ONLN);
50
51 if (threads < WORK_MIN_THREADS) {
52 threads = WORK_MIN_THREADS;
53 } else if (threads > WORK_MAX_THREADS) {
54 threads = WORK_MAX_THREADS;
55 }
56
57 uint64_t start = (offset / FEC_BLOCKSIZE) * FEC_BLOCKSIZE;
58 size_t blocks = fec_div_round_up(offset + count - start, FEC_BLOCKSIZE);
59
60 /* start at most one thread per block we're accessing */
61 if ((size_t)threads > blocks) {
62 threads = (int)blocks;
63 }
64
65 size_t count_per_thread = fec_div_round_up(blocks, threads) * FEC_BLOCKSIZE;
66 size_t left = count;
67 uint64_t pos = offset;
68 uint64_t end = start + count_per_thread;
69
70 debug("max %d threads, %zu bytes per thread (total %zu spanning %zu blocks)", threads,
71 count_per_thread, count, blocks);
72
73 std::vector<std::future<process_info*>> handles;
74 process_info info[threads];
75 ssize_t rc = 0;
76
77 /* start threads to process queue */
78 for (int i = 0; i < threads && left > 0; ++i) {
79 info[i].id = i;
80 info[i].f = f;
81 info[i].buf = &buf[pos - offset];
82 info[i].count = (size_t)(end - pos);
83 info[i].offset = pos;
84 info[i].func = func;
85 info[i].rc = -1;
86 info[i].errors = 0;
87
88 if (info[i].count > left) {
89 info[i].count = left;
90 }
91
92 handles.push_back(std::async(std::launch::async, __process, &info[i]));
93
94 pos = end;
95 end += count_per_thread;
96 left -= info[i].count;
97 }
98
99 ssize_t nread = 0;
100
101 /* wait for all threads to complete */
102 for (auto&& future : handles) {
103 process_info* p = future.get();
104 if (!p || p->rc == -1) {
105 rc = -1;
106 } else {
107 nread += p->rc;
108 f->errors += p->errors;
109 }
110 }
111
112 if (left > 0 || rc == -1) {
113 errno = EIO;
114 return -1;
115 }
116
117 return nread;
118 }
119