xref: /aosp_15_r20/system/core/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp (revision 00c7fec1bb09f3284aad6a6f96d2f63dfc3650ad)
1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "snapuserd_readahead.h"
18 
19 #include <pthread.h>
20 
21 #include "android-base/properties.h"
22 #include "snapuserd_core.h"
23 #include "utility.h"
24 
25 namespace android {
26 namespace snapshot {
27 
28 using namespace android;
29 using namespace android::dm;
30 using android::base::unique_fd;
31 
ReadAhead(const std::string & cow_device,const std::string & backing_device,const std::string & misc_name,std::shared_ptr<SnapshotHandler> snapuserd,uint32_t cow_op_merge_size)32 ReadAhead::ReadAhead(const std::string& cow_device, const std::string& backing_device,
33                      const std::string& misc_name, std::shared_ptr<SnapshotHandler> snapuserd,
34                      uint32_t cow_op_merge_size) {
35     cow_device_ = cow_device;
36     backing_store_device_ = backing_device;
37     misc_name_ = misc_name;
38     snapuserd_ = snapuserd;
39     cow_op_merge_size_ = cow_op_merge_size;
40 }
41 
CheckOverlap(const CowOperation * cow_op)42 void ReadAhead::CheckOverlap(const CowOperation* cow_op) {
43     uint64_t source_offset;
44     if (!reader_->GetSourceOffset(cow_op, &source_offset)) {
45         SNAP_LOG(ERROR) << "ReadAhead operation has no source offset: " << *cow_op;
46         return;
47     }
48 
49     uint64_t source_block = GetBlockFromOffset(header_, source_offset);
50     bool misaligned = (GetBlockRelativeOffset(header_, source_offset) != 0);
51 
52     if (dest_blocks_.count(cow_op->new_block) || source_blocks_.count(source_block) ||
53         (misaligned && source_blocks_.count(source_block + 1))) {
54         overlap_ = true;
55     }
56 
57     dest_blocks_.insert(source_block);
58     if (source_offset > 0) {
59         dest_blocks_.insert(source_block + 1);
60     }
61     source_blocks_.insert(cow_op->new_block);
62 }
63 
PrepareNextReadAhead(uint64_t * source_offset,int * pending_ops,std::vector<uint64_t> & blocks,std::vector<const CowOperation * > & xor_op_vec)64 int ReadAhead::PrepareNextReadAhead(uint64_t* source_offset, int* pending_ops,
65                                     std::vector<uint64_t>& blocks,
66                                     std::vector<const CowOperation*>& xor_op_vec) {
67     int num_ops = *pending_ops;
68     if (cow_op_merge_size_ != 0) {
69         num_ops = std::min(static_cast<int>(cow_op_merge_size_), *pending_ops);
70     }
71 
72     int nr_consecutive = 0;
73     bool is_ops_present = (!RAIterDone() && num_ops);
74 
75     if (!is_ops_present) {
76         return nr_consecutive;
77     }
78 
79     // Get the first block with offset
80     const CowOperation* cow_op = GetRAOpIter();
81 
82     if (!reader_->GetSourceOffset(cow_op, source_offset)) {
83         SNAP_LOG(ERROR) << "PrepareNextReadAhead operation has no source offset: " << *cow_op;
84         return nr_consecutive;
85     }
86     if (cow_op->type() == kCowXorOp) {
87         xor_op_vec.push_back(cow_op);
88     }
89 
90     RAIterNext();
91     num_ops -= 1;
92     nr_consecutive = 1;
93     blocks.push_back(cow_op->new_block);
94 
95     if (!overlap_) {
96         CheckOverlap(cow_op);
97     }
98 
99     /*
100      * Find number of consecutive blocks
101      */
102     while (!RAIterDone() && num_ops) {
103         const CowOperation* op = GetRAOpIter();
104         uint64_t next_offset;
105         if (!reader_->GetSourceOffset(op, &next_offset)) {
106             SNAP_LOG(ERROR) << "PrepareNextReadAhead operation has no source offset: " << *cow_op;
107             break;
108         }
109 
110         // Check for consecutive blocks
111         if (next_offset != (*source_offset + nr_consecutive * BLOCK_SZ)) {
112             break;
113         }
114 
115         if (op->type() == kCowXorOp) {
116             xor_op_vec.push_back(op);
117         }
118 
119         nr_consecutive += 1;
120         num_ops -= 1;
121         blocks.push_back(op->new_block);
122         RAIterNext();
123 
124         if (!overlap_) {
125             CheckOverlap(op);
126         }
127     }
128 
129     return nr_consecutive;
130 }
131 
132 class [[nodiscard]] AutoNotifyReadAheadFailed {
133   public:
AutoNotifyReadAheadFailed(std::shared_ptr<SnapshotHandler> snapuserd)134     AutoNotifyReadAheadFailed(std::shared_ptr<SnapshotHandler> snapuserd) : snapuserd_(snapuserd) {}
135 
~AutoNotifyReadAheadFailed()136     ~AutoNotifyReadAheadFailed() {
137         if (cancelled_) {
138             return;
139         }
140         snapuserd_->ReadAheadIOFailed();
141     }
142 
Cancel()143     void Cancel() { cancelled_ = true; }
144 
145   private:
146     std::shared_ptr<SnapshotHandler> snapuserd_;
147     bool cancelled_ = false;
148 };
149 
ReconstructDataFromCow()150 bool ReadAhead::ReconstructDataFromCow() {
151     std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
152     loff_t metadata_offset = 0;
153     loff_t start_data_offset = snapuserd_->GetBufferDataOffset();
154     int num_ops = 0;
155     int total_blocks_merged = 0;
156 
157     // This memcpy is important as metadata_buffer_ will be an unaligned address and will fault
158     // on 32-bit systems
159     std::unique_ptr<uint8_t[]> metadata_buffer =
160             std::make_unique<uint8_t[]>(snapuserd_->GetBufferMetadataSize());
161     memcpy(metadata_buffer.get(), metadata_buffer_, snapuserd_->GetBufferMetadataSize());
162 
163     while (true) {
164         struct ScratchMetadata* bm = reinterpret_cast<struct ScratchMetadata*>(
165                 (char*)metadata_buffer.get() + metadata_offset);
166 
167         // Done reading metadata
168         if (bm->new_block == 0 && bm->file_offset == 0) {
169             break;
170         }
171 
172         loff_t buffer_offset = bm->file_offset - start_data_offset;
173         void* bufptr = static_cast<void*>((char*)read_ahead_buffer_ + buffer_offset);
174         read_ahead_buffer_map[bm->new_block] = bufptr;
175         num_ops += 1;
176         total_blocks_merged += 1;
177 
178         metadata_offset += sizeof(struct ScratchMetadata);
179     }
180 
181     AutoNotifyReadAheadFailed notify_read_ahead_failed(snapuserd_);
182 
183     // We are done re-constructing the mapping; however, we need to make sure
184     // all the COW operations to-be merged are present in the re-constructed
185     // mapping.
186     while (!RAIterDone()) {
187         const CowOperation* op = GetRAOpIter();
188         if (read_ahead_buffer_map.find(op->new_block) != read_ahead_buffer_map.end()) {
189             num_ops -= 1;
190             RAIterNext();
191             continue;
192         }
193 
194         // Verify that we have covered all the ops which were re-constructed
195         // from COW device - These are the ops which are being
196         // re-constructed after crash.
197         if (!(num_ops == 0)) {
198             SNAP_LOG(ERROR) << "ReconstructDataFromCow failed. Not all ops recoverd "
199                             << " Pending ops: " << num_ops;
200             return false;
201         }
202 
203         break;
204     }
205 
206     snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged);
207 
208     snapuserd_->FinishReconstructDataFromCow();
209 
210     if (!snapuserd_->ReadAheadIOCompleted(true)) {
211         SNAP_LOG(ERROR) << "ReadAheadIOCompleted failed...";
212         return false;
213     }
214 
215     snapuserd_->RaThreadStarted();
216     SNAP_LOG(INFO) << "ReconstructDataFromCow success";
217     notify_read_ahead_failed.Cancel();
218     return true;
219 }
220 
221 /*
222  * With io_uring, the data flow is slightly different.
223  *
224  * The data flow is as follows:
225  *
226  * 1: Queue the I/O requests to be read from backing source device.
227  * This is done by retrieving the SQE entry from ring and populating
228  * the SQE entry. Note that the I/O is not submitted yet.
229  *
230  * 2: Once the ring is full (aka queue_depth), we will submit all
231  * the queued I/O request with a single system call. This essentially
232  * cuts down "queue_depth" number of system calls to a single system call.
233  *
234  * 3: Once the I/O is submitted, user-space thread will now work
235  * on processing the XOR Operations. This happens in parallel when
236  * I/O requests are submitted to the kernel. This is ok because, for XOR
237  * operations, we first need to retrieve the compressed data form COW block
238  * device. Thus, we have offloaded the backing source I/O to the kernel
239  * and user-space is parallely working on fetching the data for XOR operations.
240  *
241  * 4: After the XOR operations are read from COW device, poll the completion
242  * queue for all the I/O submitted. If the I/O's were already completed,
243  * then user-space thread will just read the CQE requests from the ring
244  * without doing any system call. If none of the I/O were completed yet,
245  * user-space thread will do a system call and wait for I/O completions.
246  *
247  * Flow diagram:
248  *                                                    SQ-RING
249  *  SQE1 <----------- Fetch SQE1 Entry ---------- |SQE1||SQE2|SQE3|
250  *
251  *  SQE1  ------------ Populate SQE1 Entry ------> |SQE1-X||SQE2|SQE3|
252  *
253  *  SQE2 <----------- Fetch SQE2 Entry ---------- |SQE1-X||SQE2|SQE3|
254  *
255  *  SQE2  ------------ Populate SQE2 Entry ------> |SQE1-X||SQE2-X|SQE3|
256  *
257  *  SQE3 <----------- Fetch SQE3 Entry ---------- |SQE1-X||SQE2-X|SQE3|
258  *
259  *  SQE3  ------------ Populate SQE3 Entry ------> |SQE1-X||SQE2-X|SQE3-X|
260  *
261  *  Submit-IO ---------------------------------> |SQE1-X||SQE2-X|SQE3-X|
262  *     |                                                  |
263  *     |                                        Process I/O entries in kernel
264  *     |                                                  |
265  *  Retrieve XOR                                          |
266  *  data from COW                                         |
267  *     |                                                  |
268  *     |                                                  |
269  *  Fetch CQ completions
270  *     |                                              CQ-RING
271  *                                               |CQE1-X||CQE2-X|CQE3-X|
272  *                                                        |
273  *   CQE1 <------------Fetch CQE1 Entry          |CQE1||CQE2-X|CQE3-X|
274  *   CQE2 <------------Fetch CQE2 Entry          |CQE1||CQE2-|CQE3-X|
275  *   CQE3 <------------Fetch CQE3 Entry          |CQE1||CQE2-|CQE3-|
276  *    |
277  *    |
278  *  Continue Next set of operations in the RING
279  */
280 
ReadAheadAsyncIO()281 bool ReadAhead::ReadAheadAsyncIO() {
282     int num_ops = (snapuserd_->GetBufferDataSize()) / BLOCK_SZ;
283     loff_t buffer_offset = 0;
284     total_blocks_merged_ = 0;
285     overlap_ = false;
286     dest_blocks_.clear();
287     source_blocks_.clear();
288     blocks_.clear();
289     std::vector<const CowOperation*> xor_op_vec;
290 
291     int pending_sqe = queue_depth_;
292     int pending_ios_to_submit = 0;
293 
294     size_t xor_op_index = 0;
295     size_t block_index = 0;
296 
297     loff_t offset = 0;
298 
299     bufsink_.ResetBufferOffset();
300 
301     // Number of ops to be merged in this window. This is a fixed size
302     // except for the last window wherein the number of ops can be less
303     // than the size of the RA window.
304     while (num_ops) {
305         uint64_t source_offset;
306         struct io_uring_sqe* sqe;
307 
308         int linear_blocks = PrepareNextReadAhead(&source_offset, &num_ops, blocks_, xor_op_vec);
309 
310         if (linear_blocks != 0) {
311             size_t io_size = (linear_blocks * BLOCK_SZ);
312 
313             // Get an SQE entry from the ring and populate the I/O variables
314             sqe = io_uring_get_sqe(ring_.get());
315             if (!sqe) {
316                 SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during read-ahead";
317                 return false;
318             }
319 
320             io_uring_prep_read(sqe, backing_store_fd_.get(),
321                                (char*)ra_temp_buffer_.get() + buffer_offset, io_size,
322                                source_offset);
323 
324             buffer_offset += io_size;
325             num_ops -= linear_blocks;
326             total_blocks_merged_ += linear_blocks;
327 
328             pending_sqe -= 1;
329             pending_ios_to_submit += 1;
330             sqe->flags |= IOSQE_ASYNC;
331         }
332 
333         // pending_sqe == 0 : Ring is full
334         //
335         // num_ops == 0 : All the COW ops in this batch are processed - Submit
336         // pending I/O requests in the ring
337         //
338         // linear_blocks == 0 : All the COW ops processing is done. Submit
339         // pending I/O requests in the ring
340         if (pending_sqe == 0 || num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
341             // Submit the IO for all the COW ops in a single syscall
342             int ret = io_uring_submit(ring_.get());
343             if (ret != pending_ios_to_submit) {
344                 SNAP_PLOG(ERROR) << "io_uring_submit failed for read-ahead: "
345                                  << " io submit: " << ret << " expected: " << pending_ios_to_submit;
346                 return false;
347             }
348 
349             int pending_ios_to_complete = pending_ios_to_submit;
350             pending_ios_to_submit = 0;
351 
352             bool xor_processing_required = (xor_op_vec.size() > 0);
353 
354             // Read XOR data from COW file in parallel when I/O's are in-flight
355             if (xor_processing_required && !ReadXorData(block_index, xor_op_index, xor_op_vec)) {
356                 SNAP_LOG(ERROR) << "ReadXorData failed";
357                 return false;
358             }
359 
360             // Fetch I/O completions
361             if (!ReapIoCompletions(pending_ios_to_complete)) {
362                 SNAP_LOG(ERROR) << "ReapIoCompletions failed";
363                 return false;
364             }
365 
366             // Retrieve XOR'ed data
367             if (xor_processing_required) {
368                 ProcessXorData(block_index, xor_op_index, xor_op_vec, ra_temp_buffer_.get(),
369                                offset);
370             }
371 
372             // All the I/O in the ring is processed.
373             pending_sqe = queue_depth_;
374         }
375 
376         if (linear_blocks == 0) {
377             break;
378         }
379     }
380 
381     // Done with merging ordered ops
382     if (RAIterDone() && total_blocks_merged_ == 0) {
383         return true;
384     }
385 
386     CHECK(blocks_.size() == total_blocks_merged_);
387 
388     UpdateScratchMetadata();
389 
390     return true;
391 }
392 
UpdateScratchMetadata()393 void ReadAhead::UpdateScratchMetadata() {
394     loff_t metadata_offset = 0;
395 
396     struct ScratchMetadata* bm = reinterpret_cast<struct ScratchMetadata*>(
397             (char*)ra_temp_meta_buffer_.get() + metadata_offset);
398 
399     bm->new_block = 0;
400     bm->file_offset = 0;
401 
402     loff_t file_offset = snapuserd_->GetBufferDataOffset();
403 
404     for (size_t block_index = 0; block_index < blocks_.size(); block_index++) {
405         uint64_t new_block = blocks_[block_index];
406         // Track the metadata blocks which are stored in scratch space
407         bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
408                                                        metadata_offset);
409 
410         bm->new_block = new_block;
411         bm->file_offset = file_offset;
412 
413         metadata_offset += sizeof(struct ScratchMetadata);
414         file_offset += BLOCK_SZ;
415     }
416 
417     // This is important - explicitly set the contents to zero. This is used
418     // when re-constructing the data after crash. This indicates end of
419     // reading metadata contents when re-constructing the data
420     bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
421                                                    metadata_offset);
422     bm->new_block = 0;
423     bm->file_offset = 0;
424 }
425 
ReapIoCompletions(int pending_ios_to_complete)426 bool ReadAhead::ReapIoCompletions(int pending_ios_to_complete) {
427     bool status = true;
428 
429     // Reap I/O completions
430     while (pending_ios_to_complete) {
431         struct io_uring_cqe* cqe;
432 
433         // io_uring_wait_cqe can potentially return -EAGAIN or -EINTR;
434         // these error codes are not truly I/O errors; we can retry them
435         // by re-populating the SQE entries and submitting the I/O
436         // request back. However, we don't do that now; instead we
437         // will fallback to synchronous I/O.
438         int ret = io_uring_wait_cqe(ring_.get(), &cqe);
439         if (ret) {
440             SNAP_LOG(ERROR) << "Read-ahead - io_uring_wait_cqe failed: " << strerror(-ret);
441             status = false;
442             break;
443         }
444 
445         if (cqe->res < 0) {
446             SNAP_LOG(ERROR) << "Read-ahead - io_uring_Wait_cqe failed with res: " << cqe->res;
447             status = false;
448             break;
449         }
450 
451         io_uring_cqe_seen(ring_.get(), cqe);
452         pending_ios_to_complete -= 1;
453     }
454 
455     return status;
456 }
457 
ProcessXorData(size_t & block_xor_index,size_t & xor_index,std::vector<const CowOperation * > & xor_op_vec,void * buffer,loff_t & buffer_offset)458 void ReadAhead::ProcessXorData(size_t& block_xor_index, size_t& xor_index,
459                                std::vector<const CowOperation*>& xor_op_vec, void* buffer,
460                                loff_t& buffer_offset) {
461     using WordType = std::conditional_t<sizeof(void*) == sizeof(uint64_t), uint64_t, uint32_t>;
462     loff_t xor_buf_offset = 0;
463 
464     while (block_xor_index < blocks_.size()) {
465         void* bufptr = static_cast<void*>((char*)buffer + buffer_offset);
466         uint64_t new_block = blocks_[block_xor_index];
467 
468         if (xor_index < xor_op_vec.size()) {
469             const CowOperation* xor_op = xor_op_vec[xor_index];
470 
471             // Check if this block is an XOR op
472             if (xor_op->new_block == new_block) {
473                 // Pointer to the data read from base device
474                 auto buffer_words = reinterpret_cast<WordType*>(bufptr);
475                 // Get the xor'ed data read from COW device
476                 auto xor_data_words = reinterpret_cast<WordType*>(
477                         (char*)bufsink_.GetPayloadBufPtr() + xor_buf_offset);
478                 auto num_words = BLOCK_SZ / sizeof(WordType);
479 
480                 for (auto i = 0; i < num_words; i++) {
481                     buffer_words[i] ^= xor_data_words[i];
482                 }
483 
484                 // Move to next XOR op
485                 xor_index += 1;
486                 xor_buf_offset += BLOCK_SZ;
487             }
488         }
489 
490         buffer_offset += BLOCK_SZ;
491         block_xor_index += 1;
492     }
493 
494     bufsink_.ResetBufferOffset();
495 }
496 
ReadXorData(size_t block_index,size_t xor_op_index,std::vector<const CowOperation * > & xor_op_vec)497 bool ReadAhead::ReadXorData(size_t block_index, size_t xor_op_index,
498                             std::vector<const CowOperation*>& xor_op_vec) {
499     // Process the XOR ops in parallel - We will be reading data
500     // from COW file for XOR ops processing.
501     while (block_index < blocks_.size()) {
502         uint64_t new_block = blocks_[block_index];
503 
504         if (xor_op_index < xor_op_vec.size()) {
505             const CowOperation* xor_op = xor_op_vec[xor_op_index];
506             if (xor_op->new_block == new_block) {
507                 void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ);
508                 if (!buffer) {
509                     SNAP_LOG(ERROR) << "ReadAhead - failed to allocate buffer for block: "
510                                     << xor_op->new_block;
511                     return false;
512                 }
513                 if (ssize_t rv = reader_->ReadData(xor_op, buffer, BLOCK_SZ); rv != BLOCK_SZ) {
514                     SNAP_LOG(ERROR)
515                             << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block
516                             << ", return value: " << rv;
517                     return false;
518                 }
519 
520                 xor_op_index += 1;
521             }
522         }
523         block_index += 1;
524     }
525     return true;
526 }
527 
ReadAheadSyncIO()528 bool ReadAhead::ReadAheadSyncIO() {
529     int num_ops = (snapuserd_->GetBufferDataSize()) / BLOCK_SZ;
530     loff_t buffer_offset = 0;
531     total_blocks_merged_ = 0;
532     overlap_ = false;
533     dest_blocks_.clear();
534     source_blocks_.clear();
535     blocks_.clear();
536     std::vector<const CowOperation*> xor_op_vec;
537 
538     AutoNotifyReadAheadFailed notify_read_ahead_failed(snapuserd_);
539 
540     bufsink_.ResetBufferOffset();
541 
542     // Number of ops to be merged in this window. This is a fixed size
543     // except for the last window wherein the number of ops can be less
544     // than the size of the RA window.
545     while (num_ops) {
546         uint64_t source_offset;
547 
548         int linear_blocks = PrepareNextReadAhead(&source_offset, &num_ops, blocks_, xor_op_vec);
549         if (linear_blocks == 0) {
550             // No more blocks to read
551             SNAP_LOG(DEBUG) << " Read-ahead completed....";
552             break;
553         }
554 
555         size_t io_size = (linear_blocks * BLOCK_SZ);
556 
557         // Read from the base device consecutive set of blocks in one shot
558         if (!android::base::ReadFullyAtOffset(backing_store_fd_,
559                                               (char*)ra_temp_buffer_.get() + buffer_offset, io_size,
560                                               source_offset)) {
561             SNAP_PLOG(ERROR) << "Ordered-op failed. Read from backing store: "
562                              << backing_store_device_ << "at block :" << source_offset / BLOCK_SZ
563                              << " offset :" << source_offset % BLOCK_SZ
564                              << " buffer_offset : " << buffer_offset << " io_size : " << io_size
565                              << " buf-addr : " << read_ahead_buffer_;
566             return false;
567         }
568 
569         buffer_offset += io_size;
570         total_blocks_merged_ += linear_blocks;
571         num_ops -= linear_blocks;
572     }
573 
574     // Done with merging ordered ops
575     if (RAIterDone() && total_blocks_merged_ == 0) {
576         notify_read_ahead_failed.Cancel();
577         return true;
578     }
579 
580     loff_t metadata_offset = 0;
581 
582     struct ScratchMetadata* bm = reinterpret_cast<struct ScratchMetadata*>(
583             (char*)ra_temp_meta_buffer_.get() + metadata_offset);
584 
585     bm->new_block = 0;
586     bm->file_offset = 0;
587 
588     loff_t file_offset = snapuserd_->GetBufferDataOffset();
589 
590     loff_t offset = 0;
591     CHECK(blocks_.size() == total_blocks_merged_);
592 
593     size_t xor_index = 0;
594     BufferSink bufsink;
595     bufsink.Initialize(BLOCK_SZ * 2);
596 
597     for (size_t block_index = 0; block_index < blocks_.size(); block_index++) {
598         void* bufptr = static_cast<void*>((char*)ra_temp_buffer_.get() + offset);
599         uint64_t new_block = blocks_[block_index];
600 
601         if (xor_index < xor_op_vec.size()) {
602             const CowOperation* xor_op = xor_op_vec[xor_index];
603 
604             // Check if this block is an XOR op
605             if (xor_op->new_block == new_block) {
606                 // Read the xor'ed data from COW
607                 void* buffer = bufsink.GetPayloadBuffer(BLOCK_SZ);
608                 if (!buffer) {
609                     SNAP_LOG(ERROR) << "ReadAhead - failed to allocate buffer";
610                     return false;
611                 }
612                 if (ssize_t rv = reader_->ReadData(xor_op, buffer, BLOCK_SZ); rv != BLOCK_SZ) {
613                     SNAP_LOG(ERROR)
614                             << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block
615                             << ", return value: " << rv;
616                     return false;
617                 }
618                 // Pointer to the data read from base device
619                 uint8_t* read_buffer = reinterpret_cast<uint8_t*>(bufptr);
620                 // Get the xor'ed data read from COW device
621                 uint8_t* xor_data = reinterpret_cast<uint8_t*>(bufsink.GetPayloadBufPtr());
622 
623                 // Retrieve the original data
624                 for (size_t byte_offset = 0; byte_offset < BLOCK_SZ; byte_offset++) {
625                     read_buffer[byte_offset] ^= xor_data[byte_offset];
626                 }
627 
628                 // Move to next XOR op
629                 xor_index += 1;
630             }
631         }
632 
633         offset += BLOCK_SZ;
634         // Track the metadata blocks which are stored in scratch space
635         bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
636                                                        metadata_offset);
637 
638         bm->new_block = new_block;
639         bm->file_offset = file_offset;
640 
641         metadata_offset += sizeof(struct ScratchMetadata);
642         file_offset += BLOCK_SZ;
643     }
644 
645     // Verify if all the xor blocks were scanned to retrieve the original data
646     CHECK(xor_index == xor_op_vec.size());
647 
648     // This is important - explicitly set the contents to zero. This is used
649     // when re-constructing the data after crash. This indicates end of
650     // reading metadata contents when re-constructing the data
651     bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
652                                                    metadata_offset);
653     bm->new_block = 0;
654     bm->file_offset = 0;
655 
656     notify_read_ahead_failed.Cancel();
657     return true;
658 }
659 
ReadAheadIOStart()660 bool ReadAhead::ReadAheadIOStart() {
661     // Check if the data has to be constructed from the COW file.
662     // This will be true only once during boot up after a crash
663     // during merge.
664     if (snapuserd_->ShouldReconstructDataFromCow()) {
665         return ReconstructDataFromCow();
666     }
667 
668     bool retry = false;
669     bool ra_status;
670 
671     // Start Async read-ahead
672     if (read_ahead_async_) {
673         ra_status = ReadAheadAsyncIO();
674         if (!ra_status) {
675             SNAP_LOG(ERROR) << "ReadAheadAsyncIO failed - Falling back synchronous I/O";
676             FinalizeIouring();
677             RAResetIter(total_blocks_merged_);
678             retry = true;
679             read_ahead_async_ = false;
680         }
681     }
682 
683     // Check if we need to fallback and retry the merge
684     //
685     // If the device doesn't support async operations, we
686     // will directly enter here (aka devices with 4.x kernels)
687 
688     const bool ra_sync_required = (retry || !read_ahead_async_);
689 
690     if (ra_sync_required) {
691         ra_status = ReadAheadSyncIO();
692         if (!ra_status) {
693             SNAP_LOG(ERROR) << "ReadAheadSyncIO failed";
694             return false;
695         }
696     }
697 
698     SNAP_LOG(DEBUG) << "Read-ahead: total_ra_blocks_merged: " << total_ra_blocks_completed_;
699 
700     // Wait for the merge to finish for the previous RA window. We shouldn't
701     // be touching the scratch space until merge is complete of previous RA
702     // window. If there is a crash during this time frame, merge should resume
703     // based on the contents of the scratch space.
704     if (!snapuserd_->WaitForMergeReady()) {
705         SNAP_LOG(VERBOSE) << "ReadAhead failed to wait for merge ready";
706         return false;
707     }
708 
709     // Acquire buffer lock before doing memcpy to the scratch buffer. Although,
710     // by now snapshot-merge thread shouldn't be working on this scratch space
711     // but we take additional measure to ensure that the buffer is not being
712     // used by the merge thread at this point. see b/377819507
713     {
714         std::lock_guard<std::mutex> buffer_lock(snapuserd_->GetBufferLock());
715         // Copy the data to scratch space
716         memcpy(metadata_buffer_, ra_temp_meta_buffer_.get(), snapuserd_->GetBufferMetadataSize());
717         memcpy(read_ahead_buffer_, ra_temp_buffer_.get(), total_blocks_merged_ * BLOCK_SZ);
718 
719         loff_t offset = 0;
720         std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
721         read_ahead_buffer_map.clear();
722 
723         for (size_t block_index = 0; block_index < blocks_.size(); block_index++) {
724             void* bufptr = static_cast<void*>((char*)read_ahead_buffer_ + offset);
725             uint64_t new_block = blocks_[block_index];
726 
727             read_ahead_buffer_map[new_block] = bufptr;
728             offset += BLOCK_SZ;
729         }
730 
731         total_ra_blocks_completed_ += total_blocks_merged_;
732         snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged_);
733     }
734 
735     // Flush the scratch data - Technically, we should flush only for overlapping
736     // blocks; However, since this region is mmap'ed, the dirty pages can still
737     // get flushed to disk at any random point in time. Instead, make sure
738     // the data in scratch is in the correct state before merge thread resumes.
739     //
740     // Notify the Merge thread to resume merging this window
741     if (!snapuserd_->ReadAheadIOCompleted(true)) {
742         SNAP_LOG(ERROR) << "ReadAheadIOCompleted failed...";
743         snapuserd_->ReadAheadIOFailed();
744         return false;
745     }
746 
747     return true;
748 }
749 
InitializeIouring()750 bool ReadAhead::InitializeIouring() {
751     if (!snapuserd_->IsIouringSupported()) {
752         return false;
753     }
754 
755     ring_ = std::make_unique<struct io_uring>();
756 
757     int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0);
758     if (ret) {
759         SNAP_LOG(ERROR) << "io_uring_queue_init failed with ret: " << ret;
760         return false;
761     }
762 
763     // For xor ops processing
764     bufsink_.Initialize(PAYLOAD_BUFFER_SZ * 2);
765     read_ahead_async_ = true;
766 
767     SNAP_LOG(INFO) << "Read-ahead: io_uring initialized with queue depth: " << queue_depth_;
768     return true;
769 }
770 
FinalizeIouring()771 void ReadAhead::FinalizeIouring() {
772     if (read_ahead_async_) {
773         io_uring_queue_exit(ring_.get());
774     }
775 }
776 
RunThread()777 bool ReadAhead::RunThread() {
778     SNAP_LOG(INFO) << "ReadAhead thread started.";
779 
780     pthread_setname_np(pthread_self(), "ReadAhead");
781 
782     if (!InitializeFds()) {
783         return false;
784     }
785 
786     InitializeBuffer();
787 
788     if (!InitReader()) {
789         return false;
790     }
791 
792     InitializeRAIter();
793 
794     InitializeIouring();
795 
796     if (!SetThreadPriority(ANDROID_PRIORITY_BACKGROUND)) {
797         SNAP_PLOG(ERROR) << "Failed to set thread priority";
798     }
799 
800     if (!SetProfiles({"CPUSET_SP_BACKGROUND"})) {
801         SNAP_PLOG(ERROR) << "Failed to assign task profile to readahead thread";
802     }
803 
804     SNAP_LOG(INFO) << "ReadAhead processing.";
805     while (!RAIterDone()) {
806         if (!ReadAheadIOStart()) {
807             break;
808         }
809     }
810 
811     FinalizeIouring();
812     CloseFds();
813     reader_->CloseCowFd();
814 
815     SNAP_LOG(INFO) << " ReadAhead thread terminating.";
816     return true;
817 }
818 
819 // Initialization
InitializeFds()820 bool ReadAhead::InitializeFds() {
821     backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
822     if (backing_store_fd_ < 0) {
823         SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
824         return false;
825     }
826 
827     cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
828     if (cow_fd_ < 0) {
829         SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
830         return false;
831     }
832 
833     return true;
834 }
835 
InitReader()836 bool ReadAhead::InitReader() {
837     reader_ = snapuserd_->CloneReaderForWorker();
838 
839     if (!reader_->InitForMerge(std::move(cow_fd_))) {
840         return false;
841     }
842     header_ = reader_->GetHeader();
843     return true;
844 }
845 
InitializeRAIter()846 void ReadAhead::InitializeRAIter() {
847     cowop_iter_ = reader_->GetOpIter(true);
848 }
849 
RAIterDone()850 bool ReadAhead::RAIterDone() {
851     if (cowop_iter_->AtEnd()) {
852         return true;
853     }
854 
855     const CowOperation* cow_op = GetRAOpIter();
856 
857     if (!IsOrderedOp(*cow_op)) {
858         return true;
859     }
860 
861     return false;
862 }
863 
RAIterNext()864 void ReadAhead::RAIterNext() {
865     cowop_iter_->Next();
866 }
867 
RAResetIter(uint64_t num_blocks)868 void ReadAhead::RAResetIter(uint64_t num_blocks) {
869     while (num_blocks && !cowop_iter_->AtBegin()) {
870         cowop_iter_->Prev();
871         num_blocks -= 1;
872     }
873 }
874 
GetRAOpIter()875 const CowOperation* ReadAhead::GetRAOpIter() {
876     return cowop_iter_->Get();
877 }
878 
InitializeBuffer()879 void ReadAhead::InitializeBuffer() {
880     void* mapped_addr = snapuserd_->GetMappedAddr();
881     // Map the scratch space region into memory
882     metadata_buffer_ =
883             static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferMetadataOffset());
884     read_ahead_buffer_ = static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
885 
886     ra_temp_buffer_ = std::make_unique<uint8_t[]>(snapuserd_->GetBufferDataSize());
887     ra_temp_meta_buffer_ = std::make_unique<uint8_t[]>(snapuserd_->GetBufferMetadataSize());
888 }
889 
890 }  // namespace snapshot
891 }  // namespace android
892