1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define TRACE_TAG INCREMENTAL
18 
19 #include "incremental_server.h"
20 
21 #include <android-base/endian.h>
22 #include <android-base/strings.h>
23 #include <inttypes.h>
24 #include <lz4.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29 
30 #include <array>
31 #include <deque>
32 #include <fstream>
33 #include <thread>
34 #include <type_traits>
35 #include <unordered_set>
36 
37 #include "adb.h"
38 #include "adb_client.h"
39 #include "adb_io.h"
40 #include "adb_trace.h"
41 #include "adb_unique_fd.h"
42 #include "adb_utils.h"
43 #include "incremental_utils.h"
44 #include "sysdeps.h"
45 
46 namespace incremental {
47 
48 static constexpr int kHashesPerBlock = kBlockSize / kDigestSize;
49 static constexpr int kCompressedSizeMax = kBlockSize * 0.95;
50 static constexpr int8_t kTypeData = 0;
51 static constexpr int8_t kTypeHash = 1;
52 static constexpr int8_t kCompressionNone = 0;
53 static constexpr int8_t kCompressionLZ4 = 1;
54 static constexpr int kCompressBound = std::max(kBlockSize, LZ4_COMPRESSBOUND(kBlockSize));
55 static constexpr auto kReadBufferSize = 128 * 1024;
56 static constexpr int kPollTimeoutMillis = 300000;  // 5 minutes
57 
58 using BlockSize = int16_t;
59 using FileId = int16_t;
60 using BlockIdx = int32_t;
61 using NumBlocks = int32_t;
62 using BlockType = int8_t;
63 using CompressionType = int8_t;
64 using RequestType = int16_t;
65 using ChunkHeader = int32_t;
66 using MagicType = uint32_t;
67 
68 static constexpr MagicType INCR = 0x494e4352;  // LE INCR
69 
70 static constexpr RequestType SERVING_COMPLETE = 0;
71 static constexpr RequestType BLOCK_MISSING = 1;
72 static constexpr RequestType PREFETCH = 2;
73 static constexpr RequestType DESTROY = 3;
74 
roundDownToBlockOffset(int64_t val)75 static constexpr inline int64_t roundDownToBlockOffset(int64_t val) {
76     return val & ~(kBlockSize - 1);
77 }
78 
roundUpToBlockOffset(int64_t val)79 static constexpr inline int64_t roundUpToBlockOffset(int64_t val) {
80     return roundDownToBlockOffset(val + kBlockSize - 1);
81 }
82 
numBytesToNumBlocks(int64_t bytes)83 static constexpr inline NumBlocks numBytesToNumBlocks(int64_t bytes) {
84     return roundUpToBlockOffset(bytes) / kBlockSize;
85 }
86 
blockIndexToOffset(BlockIdx blockIdx)87 static constexpr inline off64_t blockIndexToOffset(BlockIdx blockIdx) {
88     return static_cast<off64_t>(blockIdx) * kBlockSize;
89 }
90 
91 template <typename T>
toBigEndian(T t)92 static inline constexpr T toBigEndian(T t) {
93     using unsigned_type = std::make_unsigned_t<T>;
94     if constexpr (std::is_same_v<T, int16_t>) {
95         return htobe16(static_cast<unsigned_type>(t));
96     } else if constexpr (std::is_same_v<T, int32_t>) {
97         return htobe32(static_cast<unsigned_type>(t));
98     } else if constexpr (std::is_same_v<T, int64_t>) {
99         return htobe64(static_cast<unsigned_type>(t));
100     } else {
101         return t;
102     }
103 }
104 
105 template <typename T>
readBigEndian(void * data)106 static inline constexpr T readBigEndian(void* data) {
107     using unsigned_type = std::make_unsigned_t<T>;
108     if constexpr (std::is_same_v<T, int16_t>) {
109         return static_cast<T>(be16toh(*reinterpret_cast<unsigned_type*>(data)));
110     } else if constexpr (std::is_same_v<T, int32_t>) {
111         return static_cast<T>(be32toh(*reinterpret_cast<unsigned_type*>(data)));
112     } else if constexpr (std::is_same_v<T, int64_t>) {
113         return static_cast<T>(be64toh(*reinterpret_cast<unsigned_type*>(data)));
114     } else {
115         return T();
116     }
117 }
118 
119 // Received from device
120 // !Does not include magic!
121 struct RequestCommand {
122     RequestType request_type;  // 2 bytes
123     FileId file_id;            // 2 bytes
124     union {
125         BlockIdx block_idx;
126         NumBlocks num_blocks;
127     };  // 4 bytes
128 } __attribute__((packed));
129 
130 // Placed before actual data bytes of each block
131 struct ResponseHeader {
132     FileId file_id;                    // 2 bytes
133     BlockType block_type;              // 1 byte
134     CompressionType compression_type;  // 1 byte
135     BlockIdx block_idx;                // 4 bytes
136     BlockSize block_size;              // 2 bytes
137 
responseSizeForincremental::ResponseHeader138     static constexpr size_t responseSizeFor(size_t dataSize) {
139         return dataSize + sizeof(ResponseHeader);
140     }
141 } __attribute__((packed));
142 
143 template <size_t Size = kBlockSize>
144 struct BlockBuffer {
145     ResponseHeader header;
146     char data[Size];
147 } __attribute__((packed));
148 
149 // Holds streaming state for a file
150 class File {
151   public:
152     // Plain file
File(const char * filepath,FileId id,int64_t size,unique_fd fd,int64_t tree_offset,unique_fd tree_fd)153     File(const char* filepath, FileId id, int64_t size, unique_fd fd, int64_t tree_offset,
154          unique_fd tree_fd)
155         : File(filepath, id, size, tree_offset) {
156         this->fd_ = std::move(fd);
157         this->tree_fd_ = std::move(tree_fd);
158         priority_blocks_ = PriorityBlocksForFile(filepath, fd_.get(), size);
159     }
ReadDataBlock(BlockIdx block_idx,void * buf,bool * is_zip_compressed) const160     int64_t ReadDataBlock(BlockIdx block_idx, void* buf, bool* is_zip_compressed) const {
161         int64_t bytes_read = -1;
162         const off64_t offsetStart = blockIndexToOffset(block_idx);
163         bytes_read = adb_pread(fd_, buf, kBlockSize, offsetStart);
164         return bytes_read;
165     }
ReadTreeBlock(BlockIdx block_idx,void * buf) const166     int64_t ReadTreeBlock(BlockIdx block_idx, void* buf) const {
167         int64_t bytes_read = -1;
168         const off64_t offsetStart = tree_offset_ + blockIndexToOffset(block_idx);
169         bytes_read = adb_pread(tree_fd_, buf, kBlockSize, offsetStart);
170         return bytes_read;
171     }
172 
PriorityBlocks() const173     const std::vector<BlockIdx>& PriorityBlocks() const { return priority_blocks_; }
174 
hasTree() const175     bool hasTree() const { return tree_fd_.ok(); }
176 
177     std::vector<bool> sentBlocks;
178     NumBlocks sentBlocksCount = 0;
179 
180     std::vector<bool> sentTreeBlocks;
181 
182     const char* const filepath;
183     const FileId id;
184     const int64_t size;
185 
186   private:
File(const char * filepath,FileId id,int64_t size,int64_t tree_offset)187     File(const char* filepath, FileId id, int64_t size, int64_t tree_offset)
188         : filepath(filepath), id(id), size(size), tree_offset_(tree_offset) {
189         sentBlocks.resize(numBytesToNumBlocks(size));
190         sentTreeBlocks.resize(verity_tree_blocks_for_file(size));
191     }
192     unique_fd fd_;
193     std::vector<BlockIdx> priority_blocks_;
194 
195     unique_fd tree_fd_;
196     const int64_t tree_offset_;
197 };
198 
199 class IncrementalServer {
200   public:
IncrementalServer(unique_fd adb_fd,unique_fd output_fd,std::vector<File> files)201     IncrementalServer(unique_fd adb_fd, unique_fd output_fd, std::vector<File> files)
202         : adb_fd_(std::move(adb_fd)), output_fd_(std::move(output_fd)), files_(std::move(files)) {
203         buffer_.reserve(kReadBufferSize);
204         pendingBlocksBuffer_.resize(kChunkFlushSize + 2 * kBlockSize);
205         pendingBlocks_ = pendingBlocksBuffer_.data() + sizeof(ChunkHeader);
206     }
207 
208     bool Serve();
209 
210   private:
211     struct PrefetchState {
212         const File* file;
213         BlockIdx overallIndex = 0;
214         BlockIdx overallEnd = 0;
215         BlockIdx priorityIndex = 0;
216 
PrefetchStateincremental::IncrementalServer::PrefetchState217         explicit PrefetchState(const File& f, BlockIdx start, int count)
218             : file(&f),
219               overallIndex(start),
220               overallEnd(std::min<BlockIdx>(start + count, f.sentBlocks.size())) {}
221 
PrefetchStateincremental::IncrementalServer::PrefetchState222         explicit PrefetchState(const File& f)
223             : PrefetchState(f, 0, (BlockIdx)f.sentBlocks.size()) {}
224 
doneincremental::IncrementalServer::PrefetchState225         bool done() const {
226             const bool overallSent = (overallIndex >= overallEnd);
227             if (file->PriorityBlocks().empty()) {
228                 return overallSent;
229             }
230             return overallSent && (priorityIndex >= (BlockIdx)file->PriorityBlocks().size());
231         }
232     };
233 
234     bool SkipToRequest(void* buffer, size_t* size, bool blocking);
235     std::optional<RequestCommand> ReadRequest(bool blocking);
236 
erase_buffer_head(int count)237     void erase_buffer_head(int count) { buffer_.erase(buffer_.begin(), buffer_.begin() + count); }
238 
239     enum class SendResult { Sent, Skipped, Error };
240     SendResult SendDataBlock(FileId fileId, BlockIdx blockIdx, bool flush = false);
241 
242     bool SendTreeBlock(FileId fileId, int32_t fileBlockIdx, BlockIdx blockIdx);
243     bool SendTreeBlocksForDataBlock(FileId fileId, BlockIdx blockIdx);
244 
245     bool SendDone();
246     void RunPrefetching();
247 
248     void Send(const void* data, size_t size, bool flush);
249     void Flush();
250     using TimePoint = decltype(std::chrono::high_resolution_clock::now());
251     bool ServingComplete(std::optional<TimePoint> startTime, int missesCount, int missesSent);
252 
253     unique_fd const adb_fd_;
254     unique_fd const output_fd_;
255     std::vector<File> files_;
256 
257     // Incoming data buffer.
258     std::vector<char> buffer_;
259 
260     std::deque<PrefetchState> prefetches_;
261     int compressed_ = 0, uncompressed_ = 0;
262     long long sentSize_ = 0;
263 
264     static constexpr auto kChunkFlushSize = 31 * kBlockSize;
265 
266     std::vector<char> pendingBlocksBuffer_;
267     char* pendingBlocks_ = nullptr;
268 
269     // True when client notifies that all the data has been received
270     bool servingComplete_ = false;
271 };
272 
SkipToRequest(void * buffer,size_t * size,bool blocking)273 bool IncrementalServer::SkipToRequest(void* buffer, size_t* size, bool blocking) {
274     while (true) {
275         // Looking for INCR magic.
276         bool magic_found = false;
277         int bcur = 0;
278         int bsize = buffer_.size();
279         for (bcur = 0; bcur + 4 < bsize; ++bcur) {
280             uint32_t magic = be32toh(*(uint32_t*)(buffer_.data() + bcur));
281             if (magic == INCR) {
282                 magic_found = true;
283                 break;
284             }
285         }
286 
287         if (bcur > 0) {
288             // output the rest.
289             (void)WriteFdExactly(output_fd_, buffer_.data(), bcur);
290             erase_buffer_head(bcur);
291         }
292 
293         if (magic_found && buffer_.size() >= *size + sizeof(INCR)) {
294             // fine, return
295             memcpy(buffer, buffer_.data() + sizeof(INCR), *size);
296             erase_buffer_head(*size + sizeof(INCR));
297             return true;
298         }
299 
300         adb_pollfd pfd = {adb_fd_.get(), POLLIN, 0};
301         auto res = adb_poll(&pfd, 1, blocking ? kPollTimeoutMillis : 0);
302 
303         if (res != 1) {
304             auto err = errno;
305             (void)WriteFdExactly(output_fd_, buffer_.data(), buffer_.size());
306             if (res < 0) {
307                 D("Failed to poll: %s", strerror(err));
308                 return false;
309             }
310             if (blocking) {
311                 fprintf(stderr, "Timed out waiting for data from device.\n");
312             }
313             if (blocking && servingComplete_) {
314                 // timeout waiting from client. Serving is complete, so quit.
315                 return false;
316             }
317             *size = 0;
318             return true;
319         }
320 
321         bsize = buffer_.size();
322         buffer_.resize(kReadBufferSize);
323         int r = adb_read(adb_fd_, buffer_.data() + bsize, kReadBufferSize - bsize);
324         if (r > 0) {
325             buffer_.resize(bsize + r);
326             continue;
327         }
328 
329         D("Failed to read from fd %d: %d. Exit", adb_fd_.get(), errno);
330         break;
331     }
332     // socket is closed. print remaining messages
333     WriteFdExactly(output_fd_, buffer_.data(), buffer_.size());
334     return false;
335 }
336 
ReadRequest(bool blocking)337 std::optional<RequestCommand> IncrementalServer::ReadRequest(bool blocking) {
338     uint8_t commandBuf[sizeof(RequestCommand)];
339     auto size = sizeof(commandBuf);
340     if (!SkipToRequest(&commandBuf, &size, blocking)) {
341         return {{DESTROY}};
342     }
343     if (size < sizeof(RequestCommand)) {
344         return {};
345     }
346     RequestCommand request;
347     request.request_type = readBigEndian<RequestType>(&commandBuf[0]);
348     request.file_id = readBigEndian<FileId>(&commandBuf[2]);
349     request.block_idx = readBigEndian<BlockIdx>(&commandBuf[4]);
350     return request;
351 }
352 
SendTreeBlocksForDataBlock(const FileId fileId,const BlockIdx blockIdx)353 bool IncrementalServer::SendTreeBlocksForDataBlock(const FileId fileId, const BlockIdx blockIdx) {
354     auto& file = files_[fileId];
355     if (!file.hasTree()) {
356         return true;
357     }
358     const int32_t data_block_count = numBytesToNumBlocks(file.size);
359 
360     const int32_t total_nodes_count(file.sentTreeBlocks.size());
361     const int32_t leaf_nodes_count = (data_block_count + kHashesPerBlock - 1) / kHashesPerBlock;
362 
363     const int32_t leaf_nodes_offset = total_nodes_count - leaf_nodes_count;
364 
365     // Leaf level, sending only 1 block.
366     const int32_t leaf_idx = leaf_nodes_offset + blockIdx / kHashesPerBlock;
367     if (file.sentTreeBlocks[leaf_idx]) {
368         return true;
369     }
370     if (!SendTreeBlock(fileId, blockIdx, leaf_idx)) {
371         return false;
372     }
373     file.sentTreeBlocks[leaf_idx] = true;
374 
375     // Non-leaf, sending EVERYTHING. This should be done only once.
376     if (leaf_nodes_offset == 0 || file.sentTreeBlocks[0]) {
377         return true;
378     }
379 
380     for (int32_t i = 0; i < leaf_nodes_offset; ++i) {
381         if (!SendTreeBlock(fileId, blockIdx, i)) {
382             return false;
383         }
384         file.sentTreeBlocks[i] = true;
385     }
386     return true;
387 }
388 
SendTreeBlock(FileId fileId,int32_t fileBlockIdx,BlockIdx blockIdx)389 bool IncrementalServer::SendTreeBlock(FileId fileId, int32_t fileBlockIdx, BlockIdx blockIdx) {
390     const auto& file = files_[fileId];
391 
392     BlockBuffer buffer;
393     const int64_t bytesRead = file.ReadTreeBlock(blockIdx, buffer.data);
394     if (bytesRead <= 0) {
395         fprintf(stderr, "Failed to get data for %s.idsig at blockIdx=%d.\n", file.filepath,
396                 blockIdx);
397         return false;
398     }
399 
400     buffer.header.compression_type = kCompressionNone;
401     buffer.header.block_type = kTypeHash;
402     buffer.header.file_id = toBigEndian(fileId);
403     buffer.header.block_size = toBigEndian(int16_t(bytesRead));
404     buffer.header.block_idx = toBigEndian(blockIdx);
405 
406     Send(&buffer, ResponseHeader::responseSizeFor(bytesRead), /*flush=*/false);
407 
408     return true;
409 }
410 
SendDataBlock(FileId fileId,BlockIdx blockIdx,bool flush)411 auto IncrementalServer::SendDataBlock(FileId fileId, BlockIdx blockIdx, bool flush) -> SendResult {
412     auto& file = files_[fileId];
413     if (blockIdx >= static_cast<long>(file.sentBlocks.size())) {
414         // may happen as we schedule some extra blocks for reported page misses
415         D("Skipped reading file %s at block %" PRId32 " (past end).", file.filepath, blockIdx);
416         return SendResult::Skipped;
417     }
418     if (file.sentBlocks[blockIdx]) {
419         return SendResult::Skipped;
420     }
421 
422     if (!SendTreeBlocksForDataBlock(fileId, blockIdx)) {
423         return SendResult::Error;
424     }
425 
426     BlockBuffer raw;
427     bool isZipCompressed = false;
428     const int64_t bytesRead = file.ReadDataBlock(blockIdx, raw.data, &isZipCompressed);
429     if (bytesRead < 0) {
430         fprintf(stderr, "Failed to get data for %s at blockIdx=%d (%d).\n", file.filepath, blockIdx,
431                 errno);
432         return SendResult::Error;
433     }
434 
435     BlockBuffer<kCompressBound> compressed;
436     int16_t compressedSize = 0;
437     if (!isZipCompressed) {
438         compressedSize = LZ4_compress_default(raw.data, compressed.data, bytesRead, kCompressBound);
439     }
440     int16_t blockSize;
441     ResponseHeader* header;
442     if (compressedSize > 0 && compressedSize < kCompressedSizeMax) {
443         ++compressed_;
444         blockSize = compressedSize;
445         header = &compressed.header;
446         header->compression_type = kCompressionLZ4;
447     } else {
448         ++uncompressed_;
449         blockSize = bytesRead;
450         header = &raw.header;
451         header->compression_type = kCompressionNone;
452     }
453 
454     header->block_type = kTypeData;
455     header->file_id = toBigEndian(fileId);
456     header->block_size = toBigEndian(blockSize);
457     header->block_idx = toBigEndian(blockIdx);
458 
459     file.sentBlocks[blockIdx] = true;
460     file.sentBlocksCount += 1;
461     Send(header, ResponseHeader::responseSizeFor(blockSize), flush);
462 
463     return SendResult::Sent;
464 }
465 
SendDone()466 bool IncrementalServer::SendDone() {
467     ResponseHeader header;
468     header.file_id = -1;
469     header.block_type = 0;
470     header.compression_type = 0;
471     header.block_idx = 0;
472     header.block_size = 0;
473     Send(&header, sizeof(header), true);
474     return true;
475 }
476 
RunPrefetching()477 void IncrementalServer::RunPrefetching() {
478     constexpr auto kPrefetchBlocksPerIteration = 128;
479 
480     int blocksToSend = kPrefetchBlocksPerIteration;
481     while (!prefetches_.empty() && blocksToSend > 0) {
482         auto& prefetch = prefetches_.front();
483         const auto& file = *prefetch.file;
484         const auto& priority_blocks = file.PriorityBlocks();
485         if (!priority_blocks.empty()) {
486             for (auto& i = prefetch.priorityIndex;
487                  blocksToSend > 0 && i < (BlockIdx)priority_blocks.size(); ++i) {
488                 if (auto res = SendDataBlock(file.id, priority_blocks[i]);
489                     res == SendResult::Sent) {
490                     --blocksToSend;
491                 } else if (res == SendResult::Error) {
492                     fprintf(stderr, "Failed to send priority block %" PRId32 "\n", i);
493                 }
494             }
495         }
496         for (auto& i = prefetch.overallIndex; blocksToSend > 0 && i < prefetch.overallEnd; ++i) {
497             if (auto res = SendDataBlock(file.id, i); res == SendResult::Sent) {
498                 --blocksToSend;
499             } else if (res == SendResult::Error) {
500                 fprintf(stderr, "Failed to send block %" PRId32 "\n", i);
501             }
502         }
503         if (prefetch.done()) {
504             prefetches_.pop_front();
505         }
506     }
507 }
508 
Send(const void * data,size_t size,bool flush)509 void IncrementalServer::Send(const void* data, size_t size, bool flush) {
510     pendingBlocks_ = std::copy_n(static_cast<const char*>(data), size, pendingBlocks_);
511     if (flush || pendingBlocks_ - pendingBlocksBuffer_.data() > kChunkFlushSize) {
512         Flush();
513     }
514 }
515 
Flush()516 void IncrementalServer::Flush() {
517     auto dataBytes = pendingBlocks_ - (pendingBlocksBuffer_.data() + sizeof(ChunkHeader));
518     if (dataBytes == 0) {
519         return;
520     }
521 
522     *(ChunkHeader*)pendingBlocksBuffer_.data() = toBigEndian<int32_t>(dataBytes);
523     auto totalBytes = sizeof(ChunkHeader) + dataBytes;
524     if (!WriteFdExactly(adb_fd_, pendingBlocksBuffer_.data(), totalBytes)) {
525         fprintf(stderr, "Failed to write %d bytes\n", int(totalBytes));
526     }
527     sentSize_ += totalBytes;
528     pendingBlocks_ = pendingBlocksBuffer_.data() + sizeof(ChunkHeader);
529 }
530 
ServingComplete(std::optional<TimePoint> startTime,int missesCount,int missesSent)531 bool IncrementalServer::ServingComplete(std::optional<TimePoint> startTime, int missesCount,
532                                         int missesSent) {
533     servingComplete_ = true;
534     using namespace std::chrono;
535     auto endTime = high_resolution_clock::now();
536     D("Streaming completed.\n"
537       "Misses: %d, of those unique: %d; sent compressed: %d, uncompressed: "
538       "%d, mb: %.3f\n"
539       "Total time taken: %.3fms",
540       missesCount, missesSent, compressed_, uncompressed_, sentSize_ / 1024.0 / 1024.0,
541       duration_cast<microseconds>(endTime - (startTime ? *startTime : endTime)).count() / 1000.0);
542     return true;
543 }
544 
Serve()545 bool IncrementalServer::Serve() {
546     // Initial handshake to verify connection is still alive
547     if (!SendOkay(adb_fd_)) {
548         fprintf(stderr, "Connection is dead. Abort.\n");
549         return false;
550     }
551 
552     std::unordered_set<FileId> prefetchedFiles;
553     bool doneSent = false;
554     int missesCount = 0;
555     int missesSent = 0;
556 
557     using namespace std::chrono;
558     std::optional<TimePoint> startTime;
559 
560     while (true) {
561         if (!doneSent && prefetches_.empty() &&
562             std::all_of(files_.begin(), files_.end(), [](const File& f) {
563                 return f.sentBlocksCount == NumBlocks(f.sentBlocks.size());
564             })) {
565             fprintf(stderr, "All files should be loaded. Notifying the device.\n");
566             SendDone();
567             doneSent = true;
568         }
569 
570         const bool blocking = prefetches_.empty();
571         if (blocking) {
572             // We've no idea how long the blocking call is, so let's flush whatever is still unsent.
573             Flush();
574         }
575         auto request = ReadRequest(blocking);
576 
577         if (!startTime) {
578             startTime = high_resolution_clock::now();
579         }
580 
581         if (request) {
582             FileId fileId = request->file_id;
583             BlockIdx blockIdx = request->block_idx;
584 
585             switch (request->request_type) {
586                 case DESTROY: {
587                     // Stop everything.
588                     return true;
589                 }
590                 case SERVING_COMPLETE: {
591                     // Not stopping the server here.
592                     ServingComplete(startTime, missesCount, missesSent);
593                     break;
594                 }
595                 case BLOCK_MISSING: {
596                     ++missesCount;
597                     // Sends one single block ASAP.
598                     if (fileId < 0 || fileId >= (FileId)files_.size() || blockIdx < 0 ||
599                         blockIdx >= (BlockIdx)files_[fileId].sentBlocks.size()) {
600                         fprintf(stderr,
601                                 "Received invalid data request for file_id %" PRId16
602                                 " block_idx %" PRId32 ".\n",
603                                 fileId, blockIdx);
604                         break;
605                     }
606 
607                     if (VLOG_IS_ON(INCREMENTAL)) {
608                         auto& file = files_[fileId];
609                         auto posP = std::find(file.PriorityBlocks().begin(),
610                                               file.PriorityBlocks().end(), blockIdx);
611                         D("\tMISSING BLOCK: reading file %d block %04d (in priority: %d of %d)",
612                           (int)fileId, (int)blockIdx,
613                           posP == file.PriorityBlocks().end()
614                                   ? -1
615                                   : int(posP - file.PriorityBlocks().begin()),
616                           int(file.PriorityBlocks().size()));
617                     }
618 
619                     if (auto res = SendDataBlock(fileId, blockIdx, true);
620                         res == SendResult::Error) {
621                         fprintf(stderr, "Failed to send block %" PRId32 ".\n", blockIdx);
622                     } else if (res == SendResult::Sent) {
623                         ++missesSent;
624                         // Make sure we send more pages from this place onward, in case if the OS is
625                         // reading a bigger block.
626                         prefetches_.emplace_front(files_[fileId], blockIdx + 1, 7);
627                     }
628                     break;
629                 }
630                 case PREFETCH: {
631                     // Start prefetching for a file
632                     if (fileId < 0) {
633                         fprintf(stderr,
634                                 "Received invalid prefetch request for file_id %" PRId16 "\n",
635                                 fileId);
636                         break;
637                     }
638                     if (!prefetchedFiles.insert(fileId).second) {
639                         fprintf(stderr,
640                                 "Received duplicate prefetch request for file_id %" PRId16 "\n",
641                                 fileId);
642                         break;
643                     }
644                     D("Received prefetch request for file_id %" PRId16 ".", fileId);
645                     prefetches_.emplace_back(files_[fileId]);
646                     break;
647                 }
648                 default:
649                     fprintf(stderr, "Invalid request %" PRId16 ",%" PRId16 ",%" PRId32 ".\n",
650                             request->request_type, fileId, blockIdx);
651                     break;
652             }
653         }
654 
655         RunPrefetching();
656     }
657 }
658 
open_fd(const char * filepath)659 static std::pair<unique_fd, int64_t> open_fd(const char* filepath) {
660     struct stat st;
661     if (stat(filepath, &st)) {
662         error_exit("inc-server: failed to stat input file '%s'.", filepath);
663     }
664 
665     unique_fd fd(adb_open(filepath, O_RDONLY));
666     if (fd < 0) {
667         error_exit("inc-server: failed to open file '%s'.", filepath);
668     }
669 
670     return {std::move(fd), st.st_size};
671 }
672 
open_signature(int64_t file_size,const char * filepath)673 static std::pair<unique_fd, int64_t> open_signature(int64_t file_size, const char* filepath) {
674     std::string signature_file(filepath);
675     signature_file += IDSIG;
676 
677     unique_fd fd(adb_open(signature_file.c_str(), O_RDONLY));
678     if (fd < 0) {
679         D("No signature file found for '%s'('%s')", filepath, signature_file.c_str());
680         return {};
681     }
682 
683     auto [tree_offset, tree_size] = skip_id_sig_headers(fd);
684     if (auto expected = verity_tree_size_for_file(file_size); tree_size != expected) {
685         error_exit("Verity tree size mismatch in signature file: %s [was %lld, expected %lld].\n",
686                    signature_file.c_str(), (long long)tree_size, (long long)expected);
687     }
688 
689     int32_t data_block_count = numBytesToNumBlocks(file_size);
690     int32_t leaf_nodes_count = (data_block_count + kHashesPerBlock - 1) / kHashesPerBlock;
691     D("Verity tree loaded: %s, tree size: %d (%d blocks, %d leafs)", signature_file.c_str(),
692       int(tree_size), int(numBytesToNumBlocks(tree_size)), int(leaf_nodes_count));
693 
694     return {std::move(fd), tree_offset};
695 }
696 
serve(int connection_fd,int output_fd,int argc,const char ** argv)697 bool serve(int connection_fd, int output_fd, int argc, const char** argv) {
698     auto connection_ufd = unique_fd(connection_fd);
699     auto output_ufd = unique_fd(output_fd);
700     if (argc <= 0) {
701         error_exit("inc-server: must specify at least one file.");
702     }
703 
704     std::vector<File> files;
705     files.reserve(argc);
706     for (int i = 0; i < argc; ++i) {
707         auto filepath = argv[i];
708 
709         auto [file_fd, file_size] = open_fd(filepath);
710         auto [sign_fd, sign_offset] = open_signature(file_size, filepath);
711 
712         files.emplace_back(filepath, i, file_size, std::move(file_fd), sign_offset,
713                            std::move(sign_fd));
714     }
715 
716     IncrementalServer server(std::move(connection_ufd), std::move(output_ufd), std::move(files));
717     printf("Serving...\n");
718     fclose(stdin);
719     fclose(stdout);
720     return server.Serve();
721 }
722 
723 }  // namespace incremental
724