1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "snapuserd_readahead.h"
18
19 #include <pthread.h>
20
21 #include "android-base/properties.h"
22 #include "snapuserd_core.h"
23 #include "utility.h"
24
25 namespace android {
26 namespace snapshot {
27
28 using namespace android;
29 using namespace android::dm;
30 using android::base::unique_fd;
31
ReadAhead(const std::string & cow_device,const std::string & backing_device,const std::string & misc_name,std::shared_ptr<SnapshotHandler> snapuserd,uint32_t cow_op_merge_size)32 ReadAhead::ReadAhead(const std::string& cow_device, const std::string& backing_device,
33 const std::string& misc_name, std::shared_ptr<SnapshotHandler> snapuserd,
34 uint32_t cow_op_merge_size) {
35 cow_device_ = cow_device;
36 backing_store_device_ = backing_device;
37 misc_name_ = misc_name;
38 snapuserd_ = snapuserd;
39 cow_op_merge_size_ = cow_op_merge_size;
40 }
41
CheckOverlap(const CowOperation * cow_op)42 void ReadAhead::CheckOverlap(const CowOperation* cow_op) {
43 uint64_t source_offset;
44 if (!reader_->GetSourceOffset(cow_op, &source_offset)) {
45 SNAP_LOG(ERROR) << "ReadAhead operation has no source offset: " << *cow_op;
46 return;
47 }
48
49 uint64_t source_block = GetBlockFromOffset(header_, source_offset);
50 bool misaligned = (GetBlockRelativeOffset(header_, source_offset) != 0);
51
52 if (dest_blocks_.count(cow_op->new_block) || source_blocks_.count(source_block) ||
53 (misaligned && source_blocks_.count(source_block + 1))) {
54 overlap_ = true;
55 }
56
57 dest_blocks_.insert(source_block);
58 if (source_offset > 0) {
59 dest_blocks_.insert(source_block + 1);
60 }
61 source_blocks_.insert(cow_op->new_block);
62 }
63
PrepareNextReadAhead(uint64_t * source_offset,int * pending_ops,std::vector<uint64_t> & blocks,std::vector<const CowOperation * > & xor_op_vec)64 int ReadAhead::PrepareNextReadAhead(uint64_t* source_offset, int* pending_ops,
65 std::vector<uint64_t>& blocks,
66 std::vector<const CowOperation*>& xor_op_vec) {
67 int num_ops = *pending_ops;
68 if (cow_op_merge_size_ != 0) {
69 num_ops = std::min(static_cast<int>(cow_op_merge_size_), *pending_ops);
70 }
71
72 int nr_consecutive = 0;
73 bool is_ops_present = (!RAIterDone() && num_ops);
74
75 if (!is_ops_present) {
76 return nr_consecutive;
77 }
78
79 // Get the first block with offset
80 const CowOperation* cow_op = GetRAOpIter();
81
82 if (!reader_->GetSourceOffset(cow_op, source_offset)) {
83 SNAP_LOG(ERROR) << "PrepareNextReadAhead operation has no source offset: " << *cow_op;
84 return nr_consecutive;
85 }
86 if (cow_op->type() == kCowXorOp) {
87 xor_op_vec.push_back(cow_op);
88 }
89
90 RAIterNext();
91 num_ops -= 1;
92 nr_consecutive = 1;
93 blocks.push_back(cow_op->new_block);
94
95 if (!overlap_) {
96 CheckOverlap(cow_op);
97 }
98
99 /*
100 * Find number of consecutive blocks
101 */
102 while (!RAIterDone() && num_ops) {
103 const CowOperation* op = GetRAOpIter();
104 uint64_t next_offset;
105 if (!reader_->GetSourceOffset(op, &next_offset)) {
106 SNAP_LOG(ERROR) << "PrepareNextReadAhead operation has no source offset: " << *cow_op;
107 break;
108 }
109
110 // Check for consecutive blocks
111 if (next_offset != (*source_offset + nr_consecutive * BLOCK_SZ)) {
112 break;
113 }
114
115 if (op->type() == kCowXorOp) {
116 xor_op_vec.push_back(op);
117 }
118
119 nr_consecutive += 1;
120 num_ops -= 1;
121 blocks.push_back(op->new_block);
122 RAIterNext();
123
124 if (!overlap_) {
125 CheckOverlap(op);
126 }
127 }
128
129 return nr_consecutive;
130 }
131
132 class [[nodiscard]] AutoNotifyReadAheadFailed {
133 public:
AutoNotifyReadAheadFailed(std::shared_ptr<SnapshotHandler> snapuserd)134 AutoNotifyReadAheadFailed(std::shared_ptr<SnapshotHandler> snapuserd) : snapuserd_(snapuserd) {}
135
~AutoNotifyReadAheadFailed()136 ~AutoNotifyReadAheadFailed() {
137 if (cancelled_) {
138 return;
139 }
140 snapuserd_->ReadAheadIOFailed();
141 }
142
Cancel()143 void Cancel() { cancelled_ = true; }
144
145 private:
146 std::shared_ptr<SnapshotHandler> snapuserd_;
147 bool cancelled_ = false;
148 };
149
ReconstructDataFromCow()150 bool ReadAhead::ReconstructDataFromCow() {
151 std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
152 loff_t metadata_offset = 0;
153 loff_t start_data_offset = snapuserd_->GetBufferDataOffset();
154 int num_ops = 0;
155 int total_blocks_merged = 0;
156
157 // This memcpy is important as metadata_buffer_ will be an unaligned address and will fault
158 // on 32-bit systems
159 std::unique_ptr<uint8_t[]> metadata_buffer =
160 std::make_unique<uint8_t[]>(snapuserd_->GetBufferMetadataSize());
161 memcpy(metadata_buffer.get(), metadata_buffer_, snapuserd_->GetBufferMetadataSize());
162
163 while (true) {
164 struct ScratchMetadata* bm = reinterpret_cast<struct ScratchMetadata*>(
165 (char*)metadata_buffer.get() + metadata_offset);
166
167 // Done reading metadata
168 if (bm->new_block == 0 && bm->file_offset == 0) {
169 break;
170 }
171
172 loff_t buffer_offset = bm->file_offset - start_data_offset;
173 void* bufptr = static_cast<void*>((char*)read_ahead_buffer_ + buffer_offset);
174 read_ahead_buffer_map[bm->new_block] = bufptr;
175 num_ops += 1;
176 total_blocks_merged += 1;
177
178 metadata_offset += sizeof(struct ScratchMetadata);
179 }
180
181 AutoNotifyReadAheadFailed notify_read_ahead_failed(snapuserd_);
182
183 // We are done re-constructing the mapping; however, we need to make sure
184 // all the COW operations to-be merged are present in the re-constructed
185 // mapping.
186 while (!RAIterDone()) {
187 const CowOperation* op = GetRAOpIter();
188 if (read_ahead_buffer_map.find(op->new_block) != read_ahead_buffer_map.end()) {
189 num_ops -= 1;
190 RAIterNext();
191 continue;
192 }
193
194 // Verify that we have covered all the ops which were re-constructed
195 // from COW device - These are the ops which are being
196 // re-constructed after crash.
197 if (!(num_ops == 0)) {
198 SNAP_LOG(ERROR) << "ReconstructDataFromCow failed. Not all ops recoverd "
199 << " Pending ops: " << num_ops;
200 return false;
201 }
202
203 break;
204 }
205
206 snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged);
207
208 snapuserd_->FinishReconstructDataFromCow();
209
210 if (!snapuserd_->ReadAheadIOCompleted(true)) {
211 SNAP_LOG(ERROR) << "ReadAheadIOCompleted failed...";
212 return false;
213 }
214
215 snapuserd_->RaThreadStarted();
216 SNAP_LOG(INFO) << "ReconstructDataFromCow success";
217 notify_read_ahead_failed.Cancel();
218 return true;
219 }
220
221 /*
222 * With io_uring, the data flow is slightly different.
223 *
224 * The data flow is as follows:
225 *
226 * 1: Queue the I/O requests to be read from backing source device.
227 * This is done by retrieving the SQE entry from ring and populating
228 * the SQE entry. Note that the I/O is not submitted yet.
229 *
230 * 2: Once the ring is full (aka queue_depth), we will submit all
231 * the queued I/O request with a single system call. This essentially
232 * cuts down "queue_depth" number of system calls to a single system call.
233 *
234 * 3: Once the I/O is submitted, user-space thread will now work
235 * on processing the XOR Operations. This happens in parallel when
236 * I/O requests are submitted to the kernel. This is ok because, for XOR
237 * operations, we first need to retrieve the compressed data form COW block
238 * device. Thus, we have offloaded the backing source I/O to the kernel
239 * and user-space is parallely working on fetching the data for XOR operations.
240 *
241 * 4: After the XOR operations are read from COW device, poll the completion
242 * queue for all the I/O submitted. If the I/O's were already completed,
243 * then user-space thread will just read the CQE requests from the ring
244 * without doing any system call. If none of the I/O were completed yet,
245 * user-space thread will do a system call and wait for I/O completions.
246 *
247 * Flow diagram:
248 * SQ-RING
249 * SQE1 <----------- Fetch SQE1 Entry ---------- |SQE1||SQE2|SQE3|
250 *
251 * SQE1 ------------ Populate SQE1 Entry ------> |SQE1-X||SQE2|SQE3|
252 *
253 * SQE2 <----------- Fetch SQE2 Entry ---------- |SQE1-X||SQE2|SQE3|
254 *
255 * SQE2 ------------ Populate SQE2 Entry ------> |SQE1-X||SQE2-X|SQE3|
256 *
257 * SQE3 <----------- Fetch SQE3 Entry ---------- |SQE1-X||SQE2-X|SQE3|
258 *
259 * SQE3 ------------ Populate SQE3 Entry ------> |SQE1-X||SQE2-X|SQE3-X|
260 *
261 * Submit-IO ---------------------------------> |SQE1-X||SQE2-X|SQE3-X|
262 * | |
263 * | Process I/O entries in kernel
264 * | |
265 * Retrieve XOR |
266 * data from COW |
267 * | |
268 * | |
269 * Fetch CQ completions
270 * | CQ-RING
271 * |CQE1-X||CQE2-X|CQE3-X|
272 * |
273 * CQE1 <------------Fetch CQE1 Entry |CQE1||CQE2-X|CQE3-X|
274 * CQE2 <------------Fetch CQE2 Entry |CQE1||CQE2-|CQE3-X|
275 * CQE3 <------------Fetch CQE3 Entry |CQE1||CQE2-|CQE3-|
276 * |
277 * |
278 * Continue Next set of operations in the RING
279 */
280
ReadAheadAsyncIO()281 bool ReadAhead::ReadAheadAsyncIO() {
282 int num_ops = (snapuserd_->GetBufferDataSize()) / BLOCK_SZ;
283 loff_t buffer_offset = 0;
284 total_blocks_merged_ = 0;
285 overlap_ = false;
286 dest_blocks_.clear();
287 source_blocks_.clear();
288 blocks_.clear();
289 std::vector<const CowOperation*> xor_op_vec;
290
291 int pending_sqe = queue_depth_;
292 int pending_ios_to_submit = 0;
293
294 size_t xor_op_index = 0;
295 size_t block_index = 0;
296
297 loff_t offset = 0;
298
299 bufsink_.ResetBufferOffset();
300
301 // Number of ops to be merged in this window. This is a fixed size
302 // except for the last window wherein the number of ops can be less
303 // than the size of the RA window.
304 while (num_ops) {
305 uint64_t source_offset;
306 struct io_uring_sqe* sqe;
307
308 int linear_blocks = PrepareNextReadAhead(&source_offset, &num_ops, blocks_, xor_op_vec);
309
310 if (linear_blocks != 0) {
311 size_t io_size = (linear_blocks * BLOCK_SZ);
312
313 // Get an SQE entry from the ring and populate the I/O variables
314 sqe = io_uring_get_sqe(ring_.get());
315 if (!sqe) {
316 SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during read-ahead";
317 return false;
318 }
319
320 io_uring_prep_read(sqe, backing_store_fd_.get(),
321 (char*)ra_temp_buffer_.get() + buffer_offset, io_size,
322 source_offset);
323
324 buffer_offset += io_size;
325 num_ops -= linear_blocks;
326 total_blocks_merged_ += linear_blocks;
327
328 pending_sqe -= 1;
329 pending_ios_to_submit += 1;
330 sqe->flags |= IOSQE_ASYNC;
331 }
332
333 // pending_sqe == 0 : Ring is full
334 //
335 // num_ops == 0 : All the COW ops in this batch are processed - Submit
336 // pending I/O requests in the ring
337 //
338 // linear_blocks == 0 : All the COW ops processing is done. Submit
339 // pending I/O requests in the ring
340 if (pending_sqe == 0 || num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
341 // Submit the IO for all the COW ops in a single syscall
342 int ret = io_uring_submit(ring_.get());
343 if (ret != pending_ios_to_submit) {
344 SNAP_PLOG(ERROR) << "io_uring_submit failed for read-ahead: "
345 << " io submit: " << ret << " expected: " << pending_ios_to_submit;
346 return false;
347 }
348
349 int pending_ios_to_complete = pending_ios_to_submit;
350 pending_ios_to_submit = 0;
351
352 bool xor_processing_required = (xor_op_vec.size() > 0);
353
354 // Read XOR data from COW file in parallel when I/O's are in-flight
355 if (xor_processing_required && !ReadXorData(block_index, xor_op_index, xor_op_vec)) {
356 SNAP_LOG(ERROR) << "ReadXorData failed";
357 return false;
358 }
359
360 // Fetch I/O completions
361 if (!ReapIoCompletions(pending_ios_to_complete)) {
362 SNAP_LOG(ERROR) << "ReapIoCompletions failed";
363 return false;
364 }
365
366 // Retrieve XOR'ed data
367 if (xor_processing_required) {
368 ProcessXorData(block_index, xor_op_index, xor_op_vec, ra_temp_buffer_.get(),
369 offset);
370 }
371
372 // All the I/O in the ring is processed.
373 pending_sqe = queue_depth_;
374 }
375
376 if (linear_blocks == 0) {
377 break;
378 }
379 }
380
381 // Done with merging ordered ops
382 if (RAIterDone() && total_blocks_merged_ == 0) {
383 return true;
384 }
385
386 CHECK(blocks_.size() == total_blocks_merged_);
387
388 UpdateScratchMetadata();
389
390 return true;
391 }
392
UpdateScratchMetadata()393 void ReadAhead::UpdateScratchMetadata() {
394 loff_t metadata_offset = 0;
395
396 struct ScratchMetadata* bm = reinterpret_cast<struct ScratchMetadata*>(
397 (char*)ra_temp_meta_buffer_.get() + metadata_offset);
398
399 bm->new_block = 0;
400 bm->file_offset = 0;
401
402 loff_t file_offset = snapuserd_->GetBufferDataOffset();
403
404 for (size_t block_index = 0; block_index < blocks_.size(); block_index++) {
405 uint64_t new_block = blocks_[block_index];
406 // Track the metadata blocks which are stored in scratch space
407 bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
408 metadata_offset);
409
410 bm->new_block = new_block;
411 bm->file_offset = file_offset;
412
413 metadata_offset += sizeof(struct ScratchMetadata);
414 file_offset += BLOCK_SZ;
415 }
416
417 // This is important - explicitly set the contents to zero. This is used
418 // when re-constructing the data after crash. This indicates end of
419 // reading metadata contents when re-constructing the data
420 bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
421 metadata_offset);
422 bm->new_block = 0;
423 bm->file_offset = 0;
424 }
425
ReapIoCompletions(int pending_ios_to_complete)426 bool ReadAhead::ReapIoCompletions(int pending_ios_to_complete) {
427 bool status = true;
428
429 // Reap I/O completions
430 while (pending_ios_to_complete) {
431 struct io_uring_cqe* cqe;
432
433 // io_uring_wait_cqe can potentially return -EAGAIN or -EINTR;
434 // these error codes are not truly I/O errors; we can retry them
435 // by re-populating the SQE entries and submitting the I/O
436 // request back. However, we don't do that now; instead we
437 // will fallback to synchronous I/O.
438 int ret = io_uring_wait_cqe(ring_.get(), &cqe);
439 if (ret) {
440 SNAP_LOG(ERROR) << "Read-ahead - io_uring_wait_cqe failed: " << strerror(-ret);
441 status = false;
442 break;
443 }
444
445 if (cqe->res < 0) {
446 SNAP_LOG(ERROR) << "Read-ahead - io_uring_Wait_cqe failed with res: " << cqe->res;
447 status = false;
448 break;
449 }
450
451 io_uring_cqe_seen(ring_.get(), cqe);
452 pending_ios_to_complete -= 1;
453 }
454
455 return status;
456 }
457
ProcessXorData(size_t & block_xor_index,size_t & xor_index,std::vector<const CowOperation * > & xor_op_vec,void * buffer,loff_t & buffer_offset)458 void ReadAhead::ProcessXorData(size_t& block_xor_index, size_t& xor_index,
459 std::vector<const CowOperation*>& xor_op_vec, void* buffer,
460 loff_t& buffer_offset) {
461 using WordType = std::conditional_t<sizeof(void*) == sizeof(uint64_t), uint64_t, uint32_t>;
462 loff_t xor_buf_offset = 0;
463
464 while (block_xor_index < blocks_.size()) {
465 void* bufptr = static_cast<void*>((char*)buffer + buffer_offset);
466 uint64_t new_block = blocks_[block_xor_index];
467
468 if (xor_index < xor_op_vec.size()) {
469 const CowOperation* xor_op = xor_op_vec[xor_index];
470
471 // Check if this block is an XOR op
472 if (xor_op->new_block == new_block) {
473 // Pointer to the data read from base device
474 auto buffer_words = reinterpret_cast<WordType*>(bufptr);
475 // Get the xor'ed data read from COW device
476 auto xor_data_words = reinterpret_cast<WordType*>(
477 (char*)bufsink_.GetPayloadBufPtr() + xor_buf_offset);
478 auto num_words = BLOCK_SZ / sizeof(WordType);
479
480 for (auto i = 0; i < num_words; i++) {
481 buffer_words[i] ^= xor_data_words[i];
482 }
483
484 // Move to next XOR op
485 xor_index += 1;
486 xor_buf_offset += BLOCK_SZ;
487 }
488 }
489
490 buffer_offset += BLOCK_SZ;
491 block_xor_index += 1;
492 }
493
494 bufsink_.ResetBufferOffset();
495 }
496
ReadXorData(size_t block_index,size_t xor_op_index,std::vector<const CowOperation * > & xor_op_vec)497 bool ReadAhead::ReadXorData(size_t block_index, size_t xor_op_index,
498 std::vector<const CowOperation*>& xor_op_vec) {
499 // Process the XOR ops in parallel - We will be reading data
500 // from COW file for XOR ops processing.
501 while (block_index < blocks_.size()) {
502 uint64_t new_block = blocks_[block_index];
503
504 if (xor_op_index < xor_op_vec.size()) {
505 const CowOperation* xor_op = xor_op_vec[xor_op_index];
506 if (xor_op->new_block == new_block) {
507 void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ);
508 if (!buffer) {
509 SNAP_LOG(ERROR) << "ReadAhead - failed to allocate buffer for block: "
510 << xor_op->new_block;
511 return false;
512 }
513 if (ssize_t rv = reader_->ReadData(xor_op, buffer, BLOCK_SZ); rv != BLOCK_SZ) {
514 SNAP_LOG(ERROR)
515 << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block
516 << ", return value: " << rv;
517 return false;
518 }
519
520 xor_op_index += 1;
521 }
522 }
523 block_index += 1;
524 }
525 return true;
526 }
527
ReadAheadSyncIO()528 bool ReadAhead::ReadAheadSyncIO() {
529 int num_ops = (snapuserd_->GetBufferDataSize()) / BLOCK_SZ;
530 loff_t buffer_offset = 0;
531 total_blocks_merged_ = 0;
532 overlap_ = false;
533 dest_blocks_.clear();
534 source_blocks_.clear();
535 blocks_.clear();
536 std::vector<const CowOperation*> xor_op_vec;
537
538 AutoNotifyReadAheadFailed notify_read_ahead_failed(snapuserd_);
539
540 bufsink_.ResetBufferOffset();
541
542 // Number of ops to be merged in this window. This is a fixed size
543 // except for the last window wherein the number of ops can be less
544 // than the size of the RA window.
545 while (num_ops) {
546 uint64_t source_offset;
547
548 int linear_blocks = PrepareNextReadAhead(&source_offset, &num_ops, blocks_, xor_op_vec);
549 if (linear_blocks == 0) {
550 // No more blocks to read
551 SNAP_LOG(DEBUG) << " Read-ahead completed....";
552 break;
553 }
554
555 size_t io_size = (linear_blocks * BLOCK_SZ);
556
557 // Read from the base device consecutive set of blocks in one shot
558 if (!android::base::ReadFullyAtOffset(backing_store_fd_,
559 (char*)ra_temp_buffer_.get() + buffer_offset, io_size,
560 source_offset)) {
561 SNAP_PLOG(ERROR) << "Ordered-op failed. Read from backing store: "
562 << backing_store_device_ << "at block :" << source_offset / BLOCK_SZ
563 << " offset :" << source_offset % BLOCK_SZ
564 << " buffer_offset : " << buffer_offset << " io_size : " << io_size
565 << " buf-addr : " << read_ahead_buffer_;
566 return false;
567 }
568
569 buffer_offset += io_size;
570 total_blocks_merged_ += linear_blocks;
571 num_ops -= linear_blocks;
572 }
573
574 // Done with merging ordered ops
575 if (RAIterDone() && total_blocks_merged_ == 0) {
576 notify_read_ahead_failed.Cancel();
577 return true;
578 }
579
580 loff_t metadata_offset = 0;
581
582 struct ScratchMetadata* bm = reinterpret_cast<struct ScratchMetadata*>(
583 (char*)ra_temp_meta_buffer_.get() + metadata_offset);
584
585 bm->new_block = 0;
586 bm->file_offset = 0;
587
588 loff_t file_offset = snapuserd_->GetBufferDataOffset();
589
590 loff_t offset = 0;
591 CHECK(blocks_.size() == total_blocks_merged_);
592
593 size_t xor_index = 0;
594 BufferSink bufsink;
595 bufsink.Initialize(BLOCK_SZ * 2);
596
597 for (size_t block_index = 0; block_index < blocks_.size(); block_index++) {
598 void* bufptr = static_cast<void*>((char*)ra_temp_buffer_.get() + offset);
599 uint64_t new_block = blocks_[block_index];
600
601 if (xor_index < xor_op_vec.size()) {
602 const CowOperation* xor_op = xor_op_vec[xor_index];
603
604 // Check if this block is an XOR op
605 if (xor_op->new_block == new_block) {
606 // Read the xor'ed data from COW
607 void* buffer = bufsink.GetPayloadBuffer(BLOCK_SZ);
608 if (!buffer) {
609 SNAP_LOG(ERROR) << "ReadAhead - failed to allocate buffer";
610 return false;
611 }
612 if (ssize_t rv = reader_->ReadData(xor_op, buffer, BLOCK_SZ); rv != BLOCK_SZ) {
613 SNAP_LOG(ERROR)
614 << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block
615 << ", return value: " << rv;
616 return false;
617 }
618 // Pointer to the data read from base device
619 uint8_t* read_buffer = reinterpret_cast<uint8_t*>(bufptr);
620 // Get the xor'ed data read from COW device
621 uint8_t* xor_data = reinterpret_cast<uint8_t*>(bufsink.GetPayloadBufPtr());
622
623 // Retrieve the original data
624 for (size_t byte_offset = 0; byte_offset < BLOCK_SZ; byte_offset++) {
625 read_buffer[byte_offset] ^= xor_data[byte_offset];
626 }
627
628 // Move to next XOR op
629 xor_index += 1;
630 }
631 }
632
633 offset += BLOCK_SZ;
634 // Track the metadata blocks which are stored in scratch space
635 bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
636 metadata_offset);
637
638 bm->new_block = new_block;
639 bm->file_offset = file_offset;
640
641 metadata_offset += sizeof(struct ScratchMetadata);
642 file_offset += BLOCK_SZ;
643 }
644
645 // Verify if all the xor blocks were scanned to retrieve the original data
646 CHECK(xor_index == xor_op_vec.size());
647
648 // This is important - explicitly set the contents to zero. This is used
649 // when re-constructing the data after crash. This indicates end of
650 // reading metadata contents when re-constructing the data
651 bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
652 metadata_offset);
653 bm->new_block = 0;
654 bm->file_offset = 0;
655
656 notify_read_ahead_failed.Cancel();
657 return true;
658 }
659
ReadAheadIOStart()660 bool ReadAhead::ReadAheadIOStart() {
661 // Check if the data has to be constructed from the COW file.
662 // This will be true only once during boot up after a crash
663 // during merge.
664 if (snapuserd_->ShouldReconstructDataFromCow()) {
665 return ReconstructDataFromCow();
666 }
667
668 bool retry = false;
669 bool ra_status;
670
671 // Start Async read-ahead
672 if (read_ahead_async_) {
673 ra_status = ReadAheadAsyncIO();
674 if (!ra_status) {
675 SNAP_LOG(ERROR) << "ReadAheadAsyncIO failed - Falling back synchronous I/O";
676 FinalizeIouring();
677 RAResetIter(total_blocks_merged_);
678 retry = true;
679 read_ahead_async_ = false;
680 }
681 }
682
683 // Check if we need to fallback and retry the merge
684 //
685 // If the device doesn't support async operations, we
686 // will directly enter here (aka devices with 4.x kernels)
687
688 const bool ra_sync_required = (retry || !read_ahead_async_);
689
690 if (ra_sync_required) {
691 ra_status = ReadAheadSyncIO();
692 if (!ra_status) {
693 SNAP_LOG(ERROR) << "ReadAheadSyncIO failed";
694 return false;
695 }
696 }
697
698 SNAP_LOG(DEBUG) << "Read-ahead: total_ra_blocks_merged: " << total_ra_blocks_completed_;
699
700 // Wait for the merge to finish for the previous RA window. We shouldn't
701 // be touching the scratch space until merge is complete of previous RA
702 // window. If there is a crash during this time frame, merge should resume
703 // based on the contents of the scratch space.
704 if (!snapuserd_->WaitForMergeReady()) {
705 SNAP_LOG(VERBOSE) << "ReadAhead failed to wait for merge ready";
706 return false;
707 }
708
709 // Acquire buffer lock before doing memcpy to the scratch buffer. Although,
710 // by now snapshot-merge thread shouldn't be working on this scratch space
711 // but we take additional measure to ensure that the buffer is not being
712 // used by the merge thread at this point. see b/377819507
713 {
714 std::lock_guard<std::mutex> buffer_lock(snapuserd_->GetBufferLock());
715 // Copy the data to scratch space
716 memcpy(metadata_buffer_, ra_temp_meta_buffer_.get(), snapuserd_->GetBufferMetadataSize());
717 memcpy(read_ahead_buffer_, ra_temp_buffer_.get(), total_blocks_merged_ * BLOCK_SZ);
718
719 loff_t offset = 0;
720 std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
721 read_ahead_buffer_map.clear();
722
723 for (size_t block_index = 0; block_index < blocks_.size(); block_index++) {
724 void* bufptr = static_cast<void*>((char*)read_ahead_buffer_ + offset);
725 uint64_t new_block = blocks_[block_index];
726
727 read_ahead_buffer_map[new_block] = bufptr;
728 offset += BLOCK_SZ;
729 }
730
731 total_ra_blocks_completed_ += total_blocks_merged_;
732 snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged_);
733 }
734
735 // Flush the scratch data - Technically, we should flush only for overlapping
736 // blocks; However, since this region is mmap'ed, the dirty pages can still
737 // get flushed to disk at any random point in time. Instead, make sure
738 // the data in scratch is in the correct state before merge thread resumes.
739 //
740 // Notify the Merge thread to resume merging this window
741 if (!snapuserd_->ReadAheadIOCompleted(true)) {
742 SNAP_LOG(ERROR) << "ReadAheadIOCompleted failed...";
743 snapuserd_->ReadAheadIOFailed();
744 return false;
745 }
746
747 return true;
748 }
749
InitializeIouring()750 bool ReadAhead::InitializeIouring() {
751 if (!snapuserd_->IsIouringSupported()) {
752 return false;
753 }
754
755 ring_ = std::make_unique<struct io_uring>();
756
757 int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0);
758 if (ret) {
759 SNAP_LOG(ERROR) << "io_uring_queue_init failed with ret: " << ret;
760 return false;
761 }
762
763 // For xor ops processing
764 bufsink_.Initialize(PAYLOAD_BUFFER_SZ * 2);
765 read_ahead_async_ = true;
766
767 SNAP_LOG(INFO) << "Read-ahead: io_uring initialized with queue depth: " << queue_depth_;
768 return true;
769 }
770
FinalizeIouring()771 void ReadAhead::FinalizeIouring() {
772 if (read_ahead_async_) {
773 io_uring_queue_exit(ring_.get());
774 }
775 }
776
RunThread()777 bool ReadAhead::RunThread() {
778 SNAP_LOG(INFO) << "ReadAhead thread started.";
779
780 pthread_setname_np(pthread_self(), "ReadAhead");
781
782 if (!InitializeFds()) {
783 return false;
784 }
785
786 InitializeBuffer();
787
788 if (!InitReader()) {
789 return false;
790 }
791
792 InitializeRAIter();
793
794 InitializeIouring();
795
796 if (!SetThreadPriority(ANDROID_PRIORITY_BACKGROUND)) {
797 SNAP_PLOG(ERROR) << "Failed to set thread priority";
798 }
799
800 if (!SetProfiles({"CPUSET_SP_BACKGROUND"})) {
801 SNAP_PLOG(ERROR) << "Failed to assign task profile to readahead thread";
802 }
803
804 SNAP_LOG(INFO) << "ReadAhead processing.";
805 while (!RAIterDone()) {
806 if (!ReadAheadIOStart()) {
807 break;
808 }
809 }
810
811 FinalizeIouring();
812 CloseFds();
813 reader_->CloseCowFd();
814
815 SNAP_LOG(INFO) << " ReadAhead thread terminating.";
816 return true;
817 }
818
819 // Initialization
InitializeFds()820 bool ReadAhead::InitializeFds() {
821 backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
822 if (backing_store_fd_ < 0) {
823 SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
824 return false;
825 }
826
827 cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
828 if (cow_fd_ < 0) {
829 SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
830 return false;
831 }
832
833 return true;
834 }
835
InitReader()836 bool ReadAhead::InitReader() {
837 reader_ = snapuserd_->CloneReaderForWorker();
838
839 if (!reader_->InitForMerge(std::move(cow_fd_))) {
840 return false;
841 }
842 header_ = reader_->GetHeader();
843 return true;
844 }
845
InitializeRAIter()846 void ReadAhead::InitializeRAIter() {
847 cowop_iter_ = reader_->GetOpIter(true);
848 }
849
RAIterDone()850 bool ReadAhead::RAIterDone() {
851 if (cowop_iter_->AtEnd()) {
852 return true;
853 }
854
855 const CowOperation* cow_op = GetRAOpIter();
856
857 if (!IsOrderedOp(*cow_op)) {
858 return true;
859 }
860
861 return false;
862 }
863
RAIterNext()864 void ReadAhead::RAIterNext() {
865 cowop_iter_->Next();
866 }
867
RAResetIter(uint64_t num_blocks)868 void ReadAhead::RAResetIter(uint64_t num_blocks) {
869 while (num_blocks && !cowop_iter_->AtBegin()) {
870 cowop_iter_->Prev();
871 num_blocks -= 1;
872 }
873 }
874
GetRAOpIter()875 const CowOperation* ReadAhead::GetRAOpIter() {
876 return cowop_iter_->Get();
877 }
878
InitializeBuffer()879 void ReadAhead::InitializeBuffer() {
880 void* mapped_addr = snapuserd_->GetMappedAddr();
881 // Map the scratch space region into memory
882 metadata_buffer_ =
883 static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferMetadataOffset());
884 read_ahead_buffer_ = static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
885
886 ra_temp_buffer_ = std::make_unique<uint8_t[]>(snapuserd_->GetBufferDataSize());
887 ra_temp_meta_buffer_ = std::make_unique<uint8_t[]>(snapuserd_->GetBufferMetadataSize());
888 }
889
890 } // namespace snapshot
891 } // namespace android
892