1 /*
2 * Copyright (C) 2020 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define TRACE_TAG INCREMENTAL
18
19 #include "incremental_server.h"
20
21 #include <android-base/endian.h>
22 #include <android-base/strings.h>
23 #include <inttypes.h>
24 #include <lz4.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29
30 #include <array>
31 #include <deque>
32 #include <fstream>
33 #include <thread>
34 #include <type_traits>
35 #include <unordered_set>
36
37 #include "adb.h"
38 #include "adb_client.h"
39 #include "adb_io.h"
40 #include "adb_trace.h"
41 #include "adb_unique_fd.h"
42 #include "adb_utils.h"
43 #include "incremental_utils.h"
44 #include "sysdeps.h"
45
46 namespace incremental {
47
48 static constexpr int kHashesPerBlock = kBlockSize / kDigestSize;
49 static constexpr int kCompressedSizeMax = kBlockSize * 0.95;
50 static constexpr int8_t kTypeData = 0;
51 static constexpr int8_t kTypeHash = 1;
52 static constexpr int8_t kCompressionNone = 0;
53 static constexpr int8_t kCompressionLZ4 = 1;
54 static constexpr int kCompressBound = std::max(kBlockSize, LZ4_COMPRESSBOUND(kBlockSize));
55 static constexpr auto kReadBufferSize = 128 * 1024;
56 static constexpr int kPollTimeoutMillis = 300000; // 5 minutes
57
58 using BlockSize = int16_t;
59 using FileId = int16_t;
60 using BlockIdx = int32_t;
61 using NumBlocks = int32_t;
62 using BlockType = int8_t;
63 using CompressionType = int8_t;
64 using RequestType = int16_t;
65 using ChunkHeader = int32_t;
66 using MagicType = uint32_t;
67
68 static constexpr MagicType INCR = 0x494e4352; // LE INCR
69
70 static constexpr RequestType SERVING_COMPLETE = 0;
71 static constexpr RequestType BLOCK_MISSING = 1;
72 static constexpr RequestType PREFETCH = 2;
73 static constexpr RequestType DESTROY = 3;
74
roundDownToBlockOffset(int64_t val)75 static constexpr inline int64_t roundDownToBlockOffset(int64_t val) {
76 return val & ~(kBlockSize - 1);
77 }
78
roundUpToBlockOffset(int64_t val)79 static constexpr inline int64_t roundUpToBlockOffset(int64_t val) {
80 return roundDownToBlockOffset(val + kBlockSize - 1);
81 }
82
numBytesToNumBlocks(int64_t bytes)83 static constexpr inline NumBlocks numBytesToNumBlocks(int64_t bytes) {
84 return roundUpToBlockOffset(bytes) / kBlockSize;
85 }
86
blockIndexToOffset(BlockIdx blockIdx)87 static constexpr inline off64_t blockIndexToOffset(BlockIdx blockIdx) {
88 return static_cast<off64_t>(blockIdx) * kBlockSize;
89 }
90
91 template <typename T>
toBigEndian(T t)92 static inline constexpr T toBigEndian(T t) {
93 using unsigned_type = std::make_unsigned_t<T>;
94 if constexpr (std::is_same_v<T, int16_t>) {
95 return htobe16(static_cast<unsigned_type>(t));
96 } else if constexpr (std::is_same_v<T, int32_t>) {
97 return htobe32(static_cast<unsigned_type>(t));
98 } else if constexpr (std::is_same_v<T, int64_t>) {
99 return htobe64(static_cast<unsigned_type>(t));
100 } else {
101 return t;
102 }
103 }
104
105 template <typename T>
readBigEndian(void * data)106 static inline constexpr T readBigEndian(void* data) {
107 using unsigned_type = std::make_unsigned_t<T>;
108 if constexpr (std::is_same_v<T, int16_t>) {
109 return static_cast<T>(be16toh(*reinterpret_cast<unsigned_type*>(data)));
110 } else if constexpr (std::is_same_v<T, int32_t>) {
111 return static_cast<T>(be32toh(*reinterpret_cast<unsigned_type*>(data)));
112 } else if constexpr (std::is_same_v<T, int64_t>) {
113 return static_cast<T>(be64toh(*reinterpret_cast<unsigned_type*>(data)));
114 } else {
115 return T();
116 }
117 }
118
119 // Received from device
120 // !Does not include magic!
121 struct RequestCommand {
122 RequestType request_type; // 2 bytes
123 FileId file_id; // 2 bytes
124 union {
125 BlockIdx block_idx;
126 NumBlocks num_blocks;
127 }; // 4 bytes
128 } __attribute__((packed));
129
130 // Placed before actual data bytes of each block
131 struct ResponseHeader {
132 FileId file_id; // 2 bytes
133 BlockType block_type; // 1 byte
134 CompressionType compression_type; // 1 byte
135 BlockIdx block_idx; // 4 bytes
136 BlockSize block_size; // 2 bytes
137
responseSizeForincremental::ResponseHeader138 static constexpr size_t responseSizeFor(size_t dataSize) {
139 return dataSize + sizeof(ResponseHeader);
140 }
141 } __attribute__((packed));
142
143 template <size_t Size = kBlockSize>
144 struct BlockBuffer {
145 ResponseHeader header;
146 char data[Size];
147 } __attribute__((packed));
148
149 // Holds streaming state for a file
150 class File {
151 public:
152 // Plain file
File(const char * filepath,FileId id,int64_t size,unique_fd fd,int64_t tree_offset,unique_fd tree_fd)153 File(const char* filepath, FileId id, int64_t size, unique_fd fd, int64_t tree_offset,
154 unique_fd tree_fd)
155 : File(filepath, id, size, tree_offset) {
156 this->fd_ = std::move(fd);
157 this->tree_fd_ = std::move(tree_fd);
158 priority_blocks_ = PriorityBlocksForFile(filepath, fd_.get(), size);
159 }
ReadDataBlock(BlockIdx block_idx,void * buf,bool * is_zip_compressed) const160 int64_t ReadDataBlock(BlockIdx block_idx, void* buf, bool* is_zip_compressed) const {
161 int64_t bytes_read = -1;
162 const off64_t offsetStart = blockIndexToOffset(block_idx);
163 bytes_read = adb_pread(fd_, buf, kBlockSize, offsetStart);
164 return bytes_read;
165 }
ReadTreeBlock(BlockIdx block_idx,void * buf) const166 int64_t ReadTreeBlock(BlockIdx block_idx, void* buf) const {
167 int64_t bytes_read = -1;
168 const off64_t offsetStart = tree_offset_ + blockIndexToOffset(block_idx);
169 bytes_read = adb_pread(tree_fd_, buf, kBlockSize, offsetStart);
170 return bytes_read;
171 }
172
PriorityBlocks() const173 const std::vector<BlockIdx>& PriorityBlocks() const { return priority_blocks_; }
174
hasTree() const175 bool hasTree() const { return tree_fd_.ok(); }
176
177 std::vector<bool> sentBlocks;
178 NumBlocks sentBlocksCount = 0;
179
180 std::vector<bool> sentTreeBlocks;
181
182 const char* const filepath;
183 const FileId id;
184 const int64_t size;
185
186 private:
File(const char * filepath,FileId id,int64_t size,int64_t tree_offset)187 File(const char* filepath, FileId id, int64_t size, int64_t tree_offset)
188 : filepath(filepath), id(id), size(size), tree_offset_(tree_offset) {
189 sentBlocks.resize(numBytesToNumBlocks(size));
190 sentTreeBlocks.resize(verity_tree_blocks_for_file(size));
191 }
192 unique_fd fd_;
193 std::vector<BlockIdx> priority_blocks_;
194
195 unique_fd tree_fd_;
196 const int64_t tree_offset_;
197 };
198
199 class IncrementalServer {
200 public:
IncrementalServer(unique_fd adb_fd,unique_fd output_fd,std::vector<File> files)201 IncrementalServer(unique_fd adb_fd, unique_fd output_fd, std::vector<File> files)
202 : adb_fd_(std::move(adb_fd)), output_fd_(std::move(output_fd)), files_(std::move(files)) {
203 buffer_.reserve(kReadBufferSize);
204 pendingBlocksBuffer_.resize(kChunkFlushSize + 2 * kBlockSize);
205 pendingBlocks_ = pendingBlocksBuffer_.data() + sizeof(ChunkHeader);
206 }
207
208 bool Serve();
209
210 private:
211 struct PrefetchState {
212 const File* file;
213 BlockIdx overallIndex = 0;
214 BlockIdx overallEnd = 0;
215 BlockIdx priorityIndex = 0;
216
PrefetchStateincremental::IncrementalServer::PrefetchState217 explicit PrefetchState(const File& f, BlockIdx start, int count)
218 : file(&f),
219 overallIndex(start),
220 overallEnd(std::min<BlockIdx>(start + count, f.sentBlocks.size())) {}
221
PrefetchStateincremental::IncrementalServer::PrefetchState222 explicit PrefetchState(const File& f)
223 : PrefetchState(f, 0, (BlockIdx)f.sentBlocks.size()) {}
224
doneincremental::IncrementalServer::PrefetchState225 bool done() const {
226 const bool overallSent = (overallIndex >= overallEnd);
227 if (file->PriorityBlocks().empty()) {
228 return overallSent;
229 }
230 return overallSent && (priorityIndex >= (BlockIdx)file->PriorityBlocks().size());
231 }
232 };
233
234 bool SkipToRequest(void* buffer, size_t* size, bool blocking);
235 std::optional<RequestCommand> ReadRequest(bool blocking);
236
erase_buffer_head(int count)237 void erase_buffer_head(int count) { buffer_.erase(buffer_.begin(), buffer_.begin() + count); }
238
239 enum class SendResult { Sent, Skipped, Error };
240 SendResult SendDataBlock(FileId fileId, BlockIdx blockIdx, bool flush = false);
241
242 bool SendTreeBlock(FileId fileId, int32_t fileBlockIdx, BlockIdx blockIdx);
243 bool SendTreeBlocksForDataBlock(FileId fileId, BlockIdx blockIdx);
244
245 bool SendDone();
246 void RunPrefetching();
247
248 void Send(const void* data, size_t size, bool flush);
249 void Flush();
250 using TimePoint = decltype(std::chrono::high_resolution_clock::now());
251 bool ServingComplete(std::optional<TimePoint> startTime, int missesCount, int missesSent);
252
253 unique_fd const adb_fd_;
254 unique_fd const output_fd_;
255 std::vector<File> files_;
256
257 // Incoming data buffer.
258 std::vector<char> buffer_;
259
260 std::deque<PrefetchState> prefetches_;
261 int compressed_ = 0, uncompressed_ = 0;
262 long long sentSize_ = 0;
263
264 static constexpr auto kChunkFlushSize = 31 * kBlockSize;
265
266 std::vector<char> pendingBlocksBuffer_;
267 char* pendingBlocks_ = nullptr;
268
269 // True when client notifies that all the data has been received
270 bool servingComplete_ = false;
271 };
272
SkipToRequest(void * buffer,size_t * size,bool blocking)273 bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking) {
274 while (true) {
275 // Looking for INCR magic.
276 bool magic_found = false;
277 int bcur = 0;
278 int bsize = buffer_.size();
279 for (bcur = 0; bcur + 4 < bsize; ++bcur) {
280 uint32_t magic = be32toh(*(uint32_t*)(buffer_.data() + bcur));
281 if (magic == INCR) {
282 magic_found = true;
283 break;
284 }
285 }
286
287 if (bcur > 0) {
288 // output the rest.
289 (void)WriteFdExactly(output_fd_, buffer_.data(), bcur);
290 erase_buffer_head(bcur);
291 }
292
293 if (magic_found && buffer_.size() >= *size + sizeof(INCR)) {
294 // fine, return
295 memcpy(buffer, buffer_.data() + sizeof(INCR), *size);
296 erase_buffer_head(*size + sizeof(INCR));
297 return true;
298 }
299
300 adb_pollfd pfd = {adb_fd_.get(), POLLIN, 0};
301 auto res = adb_poll(&pfd, 1, blocking ? kPollTimeoutMillis : 0);
302
303 if (res != 1) {
304 auto err = errno;
305 (void)WriteFdExactly(output_fd_, buffer_.data(), buffer_.size());
306 if (res < 0) {
307 D("Failed to poll: %s", strerror(err));
308 return false;
309 }
310 if (blocking) {
311 fprintf(stderr, "Timed out waiting for data from device.\n");
312 }
313 if (blocking && servingComplete_) {
314 // timeout waiting from client. Serving is complete, so quit.
315 return false;
316 }
317 *size = 0;
318 return true;
319 }
320
321 bsize = buffer_.size();
322 buffer_.resize(kReadBufferSize);
323 int r = adb_read(adb_fd_, buffer_.data() + bsize, kReadBufferSize - bsize);
324 if (r > 0) {
325 buffer_.resize(bsize + r);
326 continue;
327 }
328
329 D("Failed to read from fd %d: %d. Exit", adb_fd_.get(), errno);
330 break;
331 }
332 // socket is closed. print remaining messages
333 WriteFdExactly(output_fd_, buffer_.data(), buffer_.size());
334 return false;
335 }
336
ReadRequest(bool blocking)337 std::optional<RequestCommand> IncrementalServer::ReadRequest(bool blocking) {
338 uint8_t commandBuf[sizeof(RequestCommand)];
339 auto size = sizeof(commandBuf);
340 if (!SkipToRequest(&commandBuf, &size, blocking)) {
341 return {{DESTROY}};
342 }
343 if (size < sizeof(RequestCommand)) {
344 return {};
345 }
346 RequestCommand request;
347 request.request_type = readBigEndian<RequestType>(&commandBuf[0]);
348 request.file_id = readBigEndian<FileId>(&commandBuf[2]);
349 request.block_idx = readBigEndian<BlockIdx>(&commandBuf[4]);
350 return request;
351 }
352
SendTreeBlocksForDataBlock(const FileId fileId,const BlockIdx blockIdx)353 bool IncrementalServer::SendTreeBlocksForDataBlock(const FileId fileId, const BlockIdx blockIdx) {
354 auto& file = files_[fileId];
355 if (!file.hasTree()) {
356 return true;
357 }
358 const int32_t data_block_count = numBytesToNumBlocks(file.size);
359
360 const int32_t total_nodes_count(file.sentTreeBlocks.size());
361 const int32_t leaf_nodes_count = (data_block_count + kHashesPerBlock - 1) / kHashesPerBlock;
362
363 const int32_t leaf_nodes_offset = total_nodes_count - leaf_nodes_count;
364
365 // Leaf level, sending only 1 block.
366 const int32_t leaf_idx = leaf_nodes_offset + blockIdx / kHashesPerBlock;
367 if (file.sentTreeBlocks[leaf_idx]) {
368 return true;
369 }
370 if (!SendTreeBlock(fileId, blockIdx, leaf_idx)) {
371 return false;
372 }
373 file.sentTreeBlocks[leaf_idx] = true;
374
375 // Non-leaf, sending EVERYTHING. This should be done only once.
376 if (leaf_nodes_offset == 0 || file.sentTreeBlocks[0]) {
377 return true;
378 }
379
380 for (int32_t i = 0; i < leaf_nodes_offset; ++i) {
381 if (!SendTreeBlock(fileId, blockIdx, i)) {
382 return false;
383 }
384 file.sentTreeBlocks[i] = true;
385 }
386 return true;
387 }
388
SendTreeBlock(FileId fileId,int32_t fileBlockIdx,BlockIdx blockIdx)389 bool IncrementalServer::SendTreeBlock(FileId fileId, int32_t fileBlockIdx, BlockIdx blockIdx) {
390 const auto& file = files_[fileId];
391
392 BlockBuffer buffer;
393 const int64_t bytesRead = file.ReadTreeBlock(blockIdx, buffer.data);
394 if (bytesRead <= 0) {
395 fprintf(stderr, "Failed to get data for %s.idsig at blockIdx=%d.\n", file.filepath,
396 blockIdx);
397 return false;
398 }
399
400 buffer.header.compression_type = kCompressionNone;
401 buffer.header.block_type = kTypeHash;
402 buffer.header.file_id = toBigEndian(fileId);
403 buffer.header.block_size = toBigEndian(int16_t(bytesRead));
404 buffer.header.block_idx = toBigEndian(blockIdx);
405
406 Send(&buffer, ResponseHeader::responseSizeFor(bytesRead), /*flush=*/false);
407
408 return true;
409 }
410
SendDataBlock(FileId fileId,BlockIdx blockIdx,bool flush)411 auto IncrementalServer::SendDataBlock(FileId fileId, BlockIdx blockIdx, bool flush) -> SendResult {
412 auto& file = files_[fileId];
413 if (blockIdx >= static_cast<long>(file.sentBlocks.size())) {
414 // may happen as we schedule some extra blocks for reported page misses
415 D("Skipped reading file %s at block %" PRId32 " (past end).", file.filepath, blockIdx);
416 return SendResult::Skipped;
417 }
418 if (file.sentBlocks[blockIdx]) {
419 return SendResult::Skipped;
420 }
421
422 if (!SendTreeBlocksForDataBlock(fileId, blockIdx)) {
423 return SendResult::Error;
424 }
425
426 BlockBuffer raw;
427 bool isZipCompressed = false;
428 const int64_t bytesRead = file.ReadDataBlock(blockIdx, raw.data, &isZipCompressed);
429 if (bytesRead < 0) {
430 fprintf(stderr, "Failed to get data for %s at blockIdx=%d (%d).\n", file.filepath, blockIdx,
431 errno);
432 return SendResult::Error;
433 }
434
435 BlockBuffer<kCompressBound> compressed;
436 int16_t compressedSize = 0;
437 if (!isZipCompressed) {
438 compressedSize = LZ4_compress_default(raw.data, compressed.data, bytesRead, kCompressBound);
439 }
440 int16_t blockSize;
441 ResponseHeader* header;
442 if (compressedSize > 0 && compressedSize < kCompressedSizeMax) {
443 ++compressed_;
444 blockSize = compressedSize;
445 header = &compressed.header;
446 header->compression_type = kCompressionLZ4;
447 } else {
448 ++uncompressed_;
449 blockSize = bytesRead;
450 header = &raw.header;
451 header->compression_type = kCompressionNone;
452 }
453
454 header->block_type = kTypeData;
455 header->file_id = toBigEndian(fileId);
456 header->block_size = toBigEndian(blockSize);
457 header->block_idx = toBigEndian(blockIdx);
458
459 file.sentBlocks[blockIdx] = true;
460 file.sentBlocksCount += 1;
461 Send(header, ResponseHeader::responseSizeFor(blockSize), flush);
462
463 return SendResult::Sent;
464 }
465
SendDone()466 bool IncrementalServer::SendDone() {
467 ResponseHeader header;
468 header.file_id = -1;
469 header.block_type = 0;
470 header.compression_type = 0;
471 header.block_idx = 0;
472 header.block_size = 0;
473 Send(&header, sizeof(header), true);
474 return true;
475 }
476
RunPrefetching()477 void IncrementalServer::RunPrefetching() {
478 constexpr auto kPrefetchBlocksPerIteration = 128;
479
480 int blocksToSend = kPrefetchBlocksPerIteration;
481 while (!prefetches_.empty() && blocksToSend > 0) {
482 auto& prefetch = prefetches_.front();
483 const auto& file = *prefetch.file;
484 const auto& priority_blocks = file.PriorityBlocks();
485 if (!priority_blocks.empty()) {
486 for (auto& i = prefetch.priorityIndex;
487 blocksToSend > 0 && i < (BlockIdx)priority_blocks.size(); ++i) {
488 if (auto res = SendDataBlock(file.id, priority_blocks[i]);
489 res == SendResult::Sent) {
490 --blocksToSend;
491 } else if (res == SendResult::Error) {
492 fprintf(stderr, "Failed to send priority block %" PRId32 "\n", i);
493 }
494 }
495 }
496 for (auto& i = prefetch.overallIndex; blocksToSend > 0 && i < prefetch.overallEnd; ++i) {
497 if (auto res = SendDataBlock(file.id, i); res == SendResult::Sent) {
498 --blocksToSend;
499 } else if (res == SendResult::Error) {
500 fprintf(stderr, "Failed to send block %" PRId32 "\n", i);
501 }
502 }
503 if (prefetch.done()) {
504 prefetches_.pop_front();
505 }
506 }
507 }
508
Send(const void * data,size_t size,bool flush)509 void IncrementalServer::Send(const void* data, size_t size, bool flush) {
510 pendingBlocks_ = std::copy_n(static_cast<const char*>(data), size, pendingBlocks_);
511 if (flush || pendingBlocks_ - pendingBlocksBuffer_.data() > kChunkFlushSize) {
512 Flush();
513 }
514 }
515
Flush()516 void IncrementalServer::Flush() {
517 auto dataBytes = pendingBlocks_ - (pendingBlocksBuffer_.data() + sizeof(ChunkHeader));
518 if (dataBytes == 0) {
519 return;
520 }
521
522 *(ChunkHeader*)pendingBlocksBuffer_.data() = toBigEndian<int32_t>(dataBytes);
523 auto totalBytes = sizeof(ChunkHeader) + dataBytes;
524 if (!WriteFdExactly(adb_fd_, pendingBlocksBuffer_.data(), totalBytes)) {
525 fprintf(stderr, "Failed to write %d bytes\n", int(totalBytes));
526 }
527 sentSize_ += totalBytes;
528 pendingBlocks_ = pendingBlocksBuffer_.data() + sizeof(ChunkHeader);
529 }
530
ServingComplete(std::optional<TimePoint> startTime,int missesCount,int missesSent)531 bool IncrementalServer::ServingComplete(std::optional<TimePoint> startTime, int missesCount,
532 int missesSent) {
533 servingComplete_ = true;
534 using namespace std::chrono;
535 auto endTime = high_resolution_clock::now();
536 D("Streaming completed.\n"
537 "Misses: %d, of those unique: %d; sent compressed: %d, uncompressed: "
538 "%d, mb: %.3f\n"
539 "Total time taken: %.3fms",
540 missesCount, missesSent, compressed_, uncompressed_, sentSize_ / 1024.0 / 1024.0,
541 duration_cast<microseconds>(endTime - (startTime ? *startTime : endTime)).count() / 1000.0);
542 return true;
543 }
544
Serve()545 bool IncrementalServer::Serve() {
546 // Initial handshake to verify connection is still alive
547 if (!SendOkay(adb_fd_)) {
548 fprintf(stderr, "Connection is dead. Abort.\n");
549 return false;
550 }
551
552 std::unordered_set<FileId> prefetchedFiles;
553 bool doneSent = false;
554 int missesCount = 0;
555 int missesSent = 0;
556
557 using namespace std::chrono;
558 std::optional<TimePoint> startTime;
559
560 while (true) {
561 if (!doneSent && prefetches_.empty() &&
562 std::all_of(files_.begin(), files_.end(), [](const File& f) {
563 return f.sentBlocksCount == NumBlocks(f.sentBlocks.size());
564 })) {
565 fprintf(stderr, "All files should be loaded. Notifying the device.\n");
566 SendDone();
567 doneSent = true;
568 }
569
570 const bool blocking = prefetches_.empty();
571 if (blocking) {
572 // We've no idea how long the blocking call is, so let's flush whatever is still unsent.
573 Flush();
574 }
575 auto request = ReadRequest(blocking);
576
577 if (!startTime) {
578 startTime = high_resolution_clock::now();
579 }
580
581 if (request) {
582 FileId fileId = request->file_id;
583 BlockIdx blockIdx = request->block_idx;
584
585 switch (request->request_type) {
586 case DESTROY: {
587 // Stop everything.
588 return true;
589 }
590 case SERVING_COMPLETE: {
591 // Not stopping the server here.
592 ServingComplete(startTime, missesCount, missesSent);
593 break;
594 }
595 case BLOCK_MISSING: {
596 ++missesCount;
597 // Sends one single block ASAP.
598 if (fileId < 0 || fileId >= (FileId)files_.size() || blockIdx < 0 ||
599 blockIdx >= (BlockIdx)files_[fileId].sentBlocks.size()) {
600 fprintf(stderr,
601 "Received invalid data request for file_id %" PRId16
602 " block_idx %" PRId32 ".\n",
603 fileId, blockIdx);
604 break;
605 }
606
607 if (VLOG_IS_ON(INCREMENTAL)) {
608 auto& file = files_[fileId];
609 auto posP = std::find(file.PriorityBlocks().begin(),
610 file.PriorityBlocks().end(), blockIdx);
611 D("\tMISSING BLOCK: reading file %d block %04d (in priority: %d of %d)",
612 (int)fileId, (int)blockIdx,
613 posP == file.PriorityBlocks().end()
614 ? -1
615 : int(posP - file.PriorityBlocks().begin()),
616 int(file.PriorityBlocks().size()));
617 }
618
619 if (auto res = SendDataBlock(fileId, blockIdx, true);
620 res == SendResult::Error) {
621 fprintf(stderr, "Failed to send block %" PRId32 ".\n", blockIdx);
622 } else if (res == SendResult::Sent) {
623 ++missesSent;
624 // Make sure we send more pages from this place onward, in case if the OS is
625 // reading a bigger block.
626 prefetches_.emplace_front(files_[fileId], blockIdx + 1, 7);
627 }
628 break;
629 }
630 case PREFETCH: {
631 // Start prefetching for a file
632 if (fileId < 0) {
633 fprintf(stderr,
634 "Received invalid prefetch request for file_id %" PRId16 "\n",
635 fileId);
636 break;
637 }
638 if (!prefetchedFiles.insert(fileId).second) {
639 fprintf(stderr,
640 "Received duplicate prefetch request for file_id %" PRId16 "\n",
641 fileId);
642 break;
643 }
644 D("Received prefetch request for file_id %" PRId16 ".", fileId);
645 prefetches_.emplace_back(files_[fileId]);
646 break;
647 }
648 default:
649 fprintf(stderr, "Invalid request %" PRId16 ",%" PRId16 ",%" PRId32 ".\n",
650 request->request_type, fileId, blockIdx);
651 break;
652 }
653 }
654
655 RunPrefetching();
656 }
657 }
658
open_fd(const char * filepath)659 static std::pair<unique_fd, int64_t> open_fd(const char* filepath) {
660 struct stat st;
661 if (stat(filepath, &st)) {
662 error_exit("inc-server: failed to stat input file '%s'.", filepath);
663 }
664
665 unique_fd fd(adb_open(filepath, O_RDONLY));
666 if (fd < 0) {
667 error_exit("inc-server: failed to open file '%s'.", filepath);
668 }
669
670 return {std::move(fd), st.st_size};
671 }
672
open_signature(int64_t file_size,const char * filepath)673 static std::pair<unique_fd, int64_t> open_signature(int64_t file_size, const char* filepath) {
674 std::string signature_file(filepath);
675 signature_file += IDSIG;
676
677 unique_fd fd(adb_open(signature_file.c_str(), O_RDONLY));
678 if (fd < 0) {
679 D("No signature file found for '%s'('%s')", filepath, signature_file.c_str());
680 return {};
681 }
682
683 auto [tree_offset, tree_size] = skip_id_sig_headers(fd);
684 if (auto expected = verity_tree_size_for_file(file_size); tree_size != expected) {
685 error_exit("Verity tree size mismatch in signature file: %s [was %lld, expected %lld].\n",
686 signature_file.c_str(), (long long)tree_size, (long long)expected);
687 }
688
689 int32_t data_block_count = numBytesToNumBlocks(file_size);
690 int32_t leaf_nodes_count = (data_block_count + kHashesPerBlock - 1) / kHashesPerBlock;
691 D("Verity tree loaded: %s, tree size: %d (%d blocks, %d leafs)", signature_file.c_str(),
692 int(tree_size), int(numBytesToNumBlocks(tree_size)), int(leaf_nodes_count));
693
694 return {std::move(fd), tree_offset};
695 }
696
serve(int connection_fd,int output_fd,int argc,const char ** argv)697 bool serve(int connection_fd, int output_fd, int argc, const char** argv) {
698 auto connection_ufd = unique_fd(connection_fd);
699 auto output_ufd = unique_fd(output_fd);
700 if (argc <= 0) {
701 error_exit("inc-server: must specify at least one file.");
702 }
703
704 std::vector<File> files;
705 files.reserve(argc);
706 for (int i = 0; i < argc; ++i) {
707 auto filepath = argv[i];
708
709 auto [file_fd, file_size] = open_fd(filepath);
710 auto [sign_fd, sign_offset] = open_signature(file_size, filepath);
711
712 files.emplace_back(filepath, i, file_size, std::move(file_fd), sign_offset,
713 std::move(sign_fd));
714 }
715
716 IncrementalServer server(std::move(connection_ufd), std::move(output_ufd), std::move(files));
717 printf("Serving...\n");
718 fclose(stdin);
719 fclose(stdout);
720 return server.Serve();
721 }
722
723 } // namespace incremental
724