1 // Copyright (C) 2022 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "icing/index/numeric/integer-index-storage.h"
16
17 #include <algorithm>
18 #include <cstdint>
19 #include <functional>
20 #include <iterator>
21 #include <limits>
22 #include <memory>
23 #include <queue>
24 #include <string>
25 #include <string_view>
26 #include <utility>
27 #include <vector>
28
29 #include "icing/text_classifier/lib3/utils/base/status.h"
30 #include "icing/text_classifier/lib3/utils/base/statusor.h"
31 #include "icing/absl_ports/canonical_errors.h"
32 #include "icing/absl_ports/str_cat.h"
33 #include "icing/file/file-backed-vector.h"
34 #include "icing/file/filesystem.h"
35 #include "icing/file/memory-mapped-file.h"
36 #include "icing/file/posting_list/flash-index-storage.h"
37 #include "icing/file/posting_list/posting-list-identifier.h"
38 #include "icing/index/hit/doc-hit-info.h"
39 #include "icing/index/iterator/doc-hit-info-iterator.h"
40 #include "icing/index/numeric/doc-hit-info-iterator-numeric.h"
41 #include "icing/index/numeric/integer-index-bucket-util.h"
42 #include "icing/index/numeric/integer-index-data.h"
43 #include "icing/index/numeric/numeric-index.h"
44 #include "icing/index/numeric/posting-list-integer-index-accessor.h"
45 #include "icing/index/numeric/posting-list-integer-index-serializer.h"
46 #include "icing/schema/section.h"
47 #include "icing/store/document-id.h"
48 #include "icing/util/crc32.h"
49 #include "icing/util/status-macros.h"
50
51 namespace icing {
52 namespace lib {
53
54 namespace {
55
56 // Helper function to flush data between [it_start, it_end) into posting list(s)
57 // and return posting list id.
58 // Note: it will sort data between [it_start, it_end) by basic hit value, so the
59 // caller should be aware that the data order will be changed after calling this
60 // function.
FlushDataIntoPostingLists(FlashIndexStorage * flash_index_storage,PostingListIntegerIndexSerializer * posting_list_serializer,const std::vector<IntegerIndexData>::iterator & it_start,const std::vector<IntegerIndexData>::iterator & it_end)61 libtextclassifier3::StatusOr<PostingListIdentifier> FlushDataIntoPostingLists(
62 FlashIndexStorage* flash_index_storage,
63 PostingListIntegerIndexSerializer* posting_list_serializer,
64 const std::vector<IntegerIndexData>::iterator& it_start,
65 const std::vector<IntegerIndexData>::iterator& it_end) {
66 if (it_start == it_end) {
67 return PostingListIdentifier::kInvalid;
68 }
69
70 ICING_ASSIGN_OR_RETURN(
71 std::unique_ptr<PostingListIntegerIndexAccessor> new_pl_accessor,
72 PostingListIntegerIndexAccessor::Create(flash_index_storage,
73 posting_list_serializer));
74
75 std::sort(it_start, it_end);
76 for (auto it = it_end - 1; it >= it_start; --it) {
77 ICING_RETURN_IF_ERROR(new_pl_accessor->PrependData(*it));
78 }
79
80 PostingListAccessor::FinalizeResult result =
81 std::move(*new_pl_accessor).Finalize();
82 if (!result.status.ok()) {
83 return result.status;
84 }
85 if (!result.id.is_valid()) {
86 return absl_ports::InternalError("Fail to flush data into posting list(s)");
87 }
88 return result.id;
89 }
90
91 // The following 4 methods are helper functions to get the correct file path of
92 // metadata/sorted_buckets/unsorted_buckets/flash_index_storage, according to
93 // the given working directory.
GetMetadataFilePath(std::string_view working_path)94 std::string GetMetadataFilePath(std::string_view working_path) {
95 return absl_ports::StrCat(working_path, "/", IntegerIndexStorage::kFilePrefix,
96 ".m");
97 }
98
GetSortedBucketsFilePath(std::string_view working_path)99 std::string GetSortedBucketsFilePath(std::string_view working_path) {
100 return absl_ports::StrCat(working_path, "/", IntegerIndexStorage::kFilePrefix,
101 ".s");
102 }
103
GetUnsortedBucketsFilePath(std::string_view working_path)104 std::string GetUnsortedBucketsFilePath(std::string_view working_path) {
105 return absl_ports::StrCat(working_path, "/", IntegerIndexStorage::kFilePrefix,
106 ".u");
107 }
108
GetFlashIndexStorageFilePath(std::string_view working_path)109 std::string GetFlashIndexStorageFilePath(std::string_view working_path) {
110 return absl_ports::StrCat(working_path, "/", IntegerIndexStorage::kFilePrefix,
111 ".f");
112 }
113
114 } // namespace
115
116 // We add (BasicHits, key) into a bucket in DocumentId descending and SectionId
117 // ascending order. When doing range query, we may access buckets and want to
118 // return BasicHits to callers sorted by DocumentId. Therefore, this problem is
119 // actually "merge K sorted lists".
120 // To implement this algorithm via priority_queue, we create this wrapper class
121 // to store PostingListIntegerIndexAccessor for iterating through the posting
122 // list chain.
123 // - Non-relevant (i.e. not in range [key_lower, key_upper]) will be skipped.
124 // - Relevant BasicHits will be returned.
125 class BucketPostingListIterator {
126 public:
127 class Comparator {
128 public:
129 // REQUIRES: 2 BucketPostingListIterator* instances (lhs, rhs) should be
130 // valid, i.e. the preceding AdvanceAndFilter() succeeded.
operator ()(const BucketPostingListIterator * lhs,const BucketPostingListIterator * rhs) const131 bool operator()(const BucketPostingListIterator* lhs,
132 const BucketPostingListIterator* rhs) const {
133 // std::priority_queue is a max heap and we should return BasicHits in
134 // DocumentId descending order.
135 // - BucketPostingListIterator::operator< should have the same order as
136 // DocumentId.
137 // - BasicHit encodes inverted document id and BasicHit::operator<
138 // compares the encoded raw value directly.
139 // - Therefore, BucketPostingListIterator::operator< should compare
140 // BasicHit reversely.
141 // - This will make priority_queue return buckets in DocumentId
142 // descending and SectionId ascending order.
143 // - Whatever direction we sort SectionId by (or pop by priority_queue)
144 // doesn't matter because all hits for the same DocumentId will be
145 // merged into a single DocHitInfo.
146 return rhs->GetCurrentBasicHit() < lhs->GetCurrentBasicHit();
147 }
148 };
149
BucketPostingListIterator(std::unique_ptr<PostingListIntegerIndexAccessor> pl_accessor)150 explicit BucketPostingListIterator(
151 std::unique_ptr<PostingListIntegerIndexAccessor> pl_accessor)
152 : pl_accessor_(std::move(pl_accessor)),
153 should_retrieve_next_batch_(true) {}
154
155 struct AdvanceAndFilterResult {
156 libtextclassifier3::Status status = libtextclassifier3::Status::OK;
157 int32_t num_advance_calls = 0;
158 int32_t num_blocks_inspected = 0;
159 };
160 // Advances to the next relevant data. The posting list of a bucket contains
161 // keys within range [bucket.key_lower, bucket.key_upper], but some of them
162 // may be out of [query_key_lower, query_key_upper], so when advancing we have
163 // to filter out those non-relevant keys.
164 //
165 // Returns:
166 // AdvanceAndFilterResult. status will be:
167 // - OK on success
168 // - RESOURCE_EXHAUSTED_ERROR if reaching the end (i.e. no more relevant
169 // data)
170 // - Any other PostingListIntegerIndexAccessor errors
AdvanceAndFilter(int64_t query_key_lower,int64_t query_key_upper)171 AdvanceAndFilterResult AdvanceAndFilter(int64_t query_key_lower,
172 int64_t query_key_upper) {
173 AdvanceAndFilterResult result;
174 // Move curr_ until reaching a relevant data (i.e. key in range
175 // [query_key_lower, query_key_upper])
176 do {
177 if (!should_retrieve_next_batch_) {
178 ++curr_;
179 should_retrieve_next_batch_ =
180 curr_ >= cached_batch_integer_index_data_.cend();
181 }
182 if (should_retrieve_next_batch_) {
183 auto status = GetNextDataBatch();
184 if (!status.ok()) {
185 result.status = std::move(status);
186 return result;
187 }
188 ++result.num_blocks_inspected;
189 should_retrieve_next_batch_ = false;
190 }
191 ++result.num_advance_calls;
192 } while (curr_->key() < query_key_lower || curr_->key() > query_key_upper);
193
194 return result;
195 }
196
GetCurrentBasicHit() const197 const BasicHit& GetCurrentBasicHit() const { return curr_->basic_hit(); }
198
199 private:
200 // Gets next batch of data from the posting list chain, caches in
201 // cached_batch_integer_index_data_, and sets curr_ to the begin of the cache.
GetNextDataBatch()202 libtextclassifier3::Status GetNextDataBatch() {
203 auto cached_batch_integer_index_data_or = pl_accessor_->GetNextDataBatch();
204 if (!cached_batch_integer_index_data_or.ok()) {
205 ICING_LOG(WARNING)
206 << "Fail to get next batch data from posting list due to: "
207 << cached_batch_integer_index_data_or.status().error_message();
208 return std::move(cached_batch_integer_index_data_or).status();
209 }
210
211 cached_batch_integer_index_data_ =
212 std::move(cached_batch_integer_index_data_or).ValueOrDie();
213 curr_ = cached_batch_integer_index_data_.cbegin();
214
215 if (cached_batch_integer_index_data_.empty()) {
216 return absl_ports::ResourceExhaustedError("End of iterator");
217 }
218
219 return libtextclassifier3::Status::OK;
220 }
221
222 std::unique_ptr<PostingListIntegerIndexAccessor> pl_accessor_;
223 std::vector<IntegerIndexData> cached_batch_integer_index_data_;
224 std::vector<IntegerIndexData>::const_iterator curr_;
225 bool should_retrieve_next_batch_;
226 };
227
228 // Wrapper class to iterate through IntegerIndexStorage to get relevant data.
229 // It uses multiple BucketPostingListIterator instances from different candidate
230 // buckets and merges all relevant BasicHits from these buckets by
231 // std::priority_queue in DocumentId descending order. Also different SectionIds
232 // of the same DocumentId will be merged into SectionIdMask and returned as a
233 // single DocHitInfo.
234 class IntegerIndexStorageIterator : public NumericIndex<int64_t>::Iterator {
235 public:
IntegerIndexStorageIterator(int64_t query_key_lower,int64_t query_key_upper,std::vector<std::unique_ptr<BucketPostingListIterator>> && bucket_pl_iters)236 explicit IntegerIndexStorageIterator(
237 int64_t query_key_lower, int64_t query_key_upper,
238 std::vector<std::unique_ptr<BucketPostingListIterator>>&& bucket_pl_iters)
239 : NumericIndex<int64_t>::Iterator(query_key_lower, query_key_upper),
240 num_advance_calls_(0),
241 num_blocks_inspected_(0) {
242 std::vector<BucketPostingListIterator*> bucket_pl_iters_raw_ptrs;
243 for (std::unique_ptr<BucketPostingListIterator>& bucket_pl_itr :
244 bucket_pl_iters) {
245 // Before adding BucketPostingListIterator* into the priority queue, we
246 // have to advance the bucket iterator to the first valid data since the
247 // priority queue needs valid data to compare the order.
248 // Note: it is possible that the bucket iterator fails to advance for the
249 // first round, because data could be filtered out by [query_key_lower,
250 // query_key_upper]. In this case, just discard the iterator.
251 BucketPostingListIterator::AdvanceAndFilterResult
252 advance_and_filter_result =
253 bucket_pl_itr->AdvanceAndFilter(query_key_lower, query_key_upper);
254 if (advance_and_filter_result.status.ok()) {
255 bucket_pl_iters_raw_ptrs.push_back(bucket_pl_itr.get());
256 bucket_pl_iters_.push_back(std::move(bucket_pl_itr));
257 }
258 num_advance_calls_ += advance_and_filter_result.num_advance_calls;
259 num_blocks_inspected_ += advance_and_filter_result.num_blocks_inspected;
260 }
261
262 pq_ = std::priority_queue<BucketPostingListIterator*,
263 std::vector<BucketPostingListIterator*>,
264 BucketPostingListIterator::Comparator>(
265 comparator_, std::move(bucket_pl_iters_raw_ptrs));
266 }
267
268 ~IntegerIndexStorageIterator() override = default;
269
270 // Advances to the next DocHitInfo. Note: several BucketPostingListIterator
271 // instances may be advanced if they point to data with the same DocumentId.
272 //
273 // Returns:
274 // - OK on success
275 // - RESOURCE_EXHAUSTED_ERROR if reaching the end (i.e. no more relevant
276 // data)
277 // - Any BucketPostingListIterator errors
278 libtextclassifier3::Status Advance() override;
279
GetDocHitInfo() const280 DocHitInfo GetDocHitInfo() const override { return doc_hit_info_; }
281
GetNumAdvanceCalls() const282 int32_t GetNumAdvanceCalls() const override { return num_advance_calls_; }
283
GetNumBlocksInspected() const284 int32_t GetNumBlocksInspected() const override {
285 return num_blocks_inspected_;
286 }
287
288 private:
289 BucketPostingListIterator::Comparator comparator_;
290
291 // We have to fetch and pop the top BucketPostingListIterator from
292 // std::priority_queue to perform "merge K sorted lists algorithm".
293 // - Since std::priority_queue::pop() doesn't return the top element, we have
294 // to call top() and pop() together.
295 // - std::move the top() element by const_cast is not an appropriate way
296 // because it introduces transient unstable state for std::priority_queue.
297 // - We don't want to copy BucketPostingListIterator, either.
298 // - Therefore, add bucket_pl_iters_ for the ownership of all
299 // BucketPostingListIterator instances and std::priority_queue uses the raw
300 // pointer. So when calling top(), we can simply copy the raw pointer via
301 // top() and avoid transient unstable state.
302 std::vector<std::unique_ptr<BucketPostingListIterator>> bucket_pl_iters_;
303 std::priority_queue<BucketPostingListIterator*,
304 std::vector<BucketPostingListIterator*>,
305 BucketPostingListIterator::Comparator>
306 pq_;
307
308 DocHitInfo doc_hit_info_;
309
310 int32_t num_advance_calls_;
311 int32_t num_blocks_inspected_;
312 };
313
Advance()314 libtextclassifier3::Status IntegerIndexStorageIterator::Advance() {
315 if (pq_.empty()) {
316 return absl_ports::ResourceExhaustedError("End of iterator");
317 }
318
319 DocumentId document_id = pq_.top()->GetCurrentBasicHit().document_id();
320 doc_hit_info_ = DocHitInfo(document_id);
321 // Merge sections with same document_id into a single DocHitInfo
322 while (!pq_.empty() &&
323 pq_.top()->GetCurrentBasicHit().document_id() == document_id) {
324 BucketPostingListIterator* bucket_itr = pq_.top();
325 pq_.pop();
326
327 libtextclassifier3::Status advance_status;
328 do {
329 doc_hit_info_.UpdateSection(
330 bucket_itr->GetCurrentBasicHit().section_id());
331 BucketPostingListIterator::AdvanceAndFilterResult
332 advance_and_filter_result =
333 bucket_itr->AdvanceAndFilter(key_lower_, key_upper_);
334 advance_status = std::move(advance_and_filter_result.status);
335 num_advance_calls_ += advance_and_filter_result.num_advance_calls;
336 num_blocks_inspected_ += advance_and_filter_result.num_blocks_inspected;
337 } while (advance_status.ok() &&
338 bucket_itr->GetCurrentBasicHit().document_id() == document_id);
339 if (advance_status.ok()) {
340 pq_.push(bucket_itr);
341 }
342 }
343
344 return libtextclassifier3::Status::OK;
345 }
346
IsValid() const347 bool IntegerIndexStorage::Options::IsValid() const {
348 if (num_data_threshold_for_bucket_split <=
349 kMinNumDataThresholdForBucketSplit) {
350 return false;
351 }
352
353 if (!HasCustomInitBuckets()) {
354 return true;
355 }
356
357 // Verify if the range of buckets are disjoint and the range union is
358 // [INT64_MIN, INT64_MAX].
359 std::vector<Bucket> buckets;
360 buckets.reserve(custom_init_sorted_buckets.size() +
361 custom_init_unsorted_buckets.size());
362 buckets.insert(buckets.end(), custom_init_sorted_buckets.begin(),
363 custom_init_sorted_buckets.end());
364 buckets.insert(buckets.end(), custom_init_unsorted_buckets.begin(),
365 custom_init_unsorted_buckets.end());
366 if (buckets.empty()) {
367 return false;
368 }
369 std::sort(buckets.begin(), buckets.end());
370 int64_t prev_upper = std::numeric_limits<int64_t>::min();
371 for (int i = 0; i < buckets.size(); ++i) {
372 // key_lower should not be greater than key_upper and init bucket should
373 // have invalid posting list identifier.
374 if (buckets[i].key_lower() > buckets[i].key_upper() ||
375 buckets[i].posting_list_identifier().is_valid()) {
376 return false;
377 }
378
379 // Previous upper bound should not be INT64_MAX since it is not the last
380 // bucket.
381 if (prev_upper == std::numeric_limits<int64_t>::max()) {
382 return false;
383 }
384
385 int64_t expected_lower =
386 (i == 0 ? std::numeric_limits<int64_t>::min() : prev_upper + 1);
387 if (buckets[i].key_lower() != expected_lower) {
388 return false;
389 }
390
391 prev_upper = buckets[i].key_upper();
392 }
393
394 return prev_upper == std::numeric_limits<int64_t>::max();
395 }
396
397 /* static */ libtextclassifier3::StatusOr<std::unique_ptr<IntegerIndexStorage>>
Create(const Filesystem & filesystem,std::string working_path,Options options,PostingListIntegerIndexSerializer * posting_list_serializer)398 IntegerIndexStorage::Create(
399 const Filesystem& filesystem, std::string working_path, Options options,
400 PostingListIntegerIndexSerializer* posting_list_serializer) {
401 if (!options.IsValid()) {
402 return absl_ports::InvalidArgumentError(
403 "Invalid IntegerIndexStorage options");
404 }
405
406 if (!filesystem.FileExists(GetMetadataFilePath(working_path).c_str()) ||
407 !filesystem.FileExists(GetSortedBucketsFilePath(working_path).c_str()) ||
408 !filesystem.FileExists(
409 GetUnsortedBucketsFilePath(working_path).c_str()) ||
410 !filesystem.FileExists(
411 GetFlashIndexStorageFilePath(working_path).c_str())) {
412 // Discard working_path if any of them is missing, and reinitialize.
413 if (filesystem.DirectoryExists(working_path.c_str())) {
414 ICING_RETURN_IF_ERROR(Discard(filesystem, working_path));
415 }
416 return InitializeNewFiles(filesystem, std::move(working_path),
417 std::move(options), posting_list_serializer);
418 }
419 return InitializeExistingFiles(filesystem, std::move(working_path),
420 std::move(options), posting_list_serializer);
421 }
422
~IntegerIndexStorage()423 IntegerIndexStorage::~IntegerIndexStorage() {
424 if (!PersistToDisk().ok()) {
425 ICING_LOG(WARNING)
426 << "Failed to persist hash map to disk while destructing "
427 << working_path_;
428 }
429 }
430
431 class IntegerIndexStorageComparator {
432 public:
operator ()(const IntegerIndexStorage::Bucket & lhs,int64_t rhs) const433 bool operator()(const IntegerIndexStorage::Bucket& lhs, int64_t rhs) const {
434 return lhs.key_upper() < rhs;
435 }
436 } kComparator;
437
AddKeys(DocumentId document_id,SectionId section_id,std::vector<int64_t> && new_keys)438 libtextclassifier3::Status IntegerIndexStorage::AddKeys(
439 DocumentId document_id, SectionId section_id,
440 std::vector<int64_t>&& new_keys) {
441 if (new_keys.empty()) {
442 return libtextclassifier3::Status::OK;
443 }
444
445 SetDirty();
446
447 std::sort(new_keys.begin(), new_keys.end());
448
449 // Dedupe
450 auto last = std::unique(new_keys.begin(), new_keys.end());
451 new_keys.erase(last, new_keys.end());
452
453 if (static_cast<int32_t>(new_keys.size()) >
454 std::numeric_limits<int32_t>::max() - info().num_data) {
455 return absl_ports::ResourceExhaustedError(
456 "# of keys in this integer index storage exceed the limit");
457 }
458
459 // When adding keys into a bucket, we potentially split it into 2 new buckets
460 // and one of them will be added into the unsorted bucket array.
461 // When handling keys belonging to buckets in the unsorted bucket array, we
462 // don't have to (and must not) handle these newly split buckets. Therefore,
463 // collect all newly split buckets in another vector and append them into the
464 // unsorted bucket array after adding all keys.
465 std::vector<Bucket> new_buckets;
466
467 // Binary search range of the sorted bucket array.
468 const Bucket* sorted_bucket_arr_begin = sorted_buckets_->array();
469 const Bucket* sorted_bucket_arr_end =
470 sorted_buckets_->array() + sorted_buckets_->num_elements();
471
472 // Step 1: handle keys belonging to buckets in the sorted bucket array. Skip
473 // keys belonging to the unsorted bucket array and deal with them in
474 // the next step.
475 // - Iterate through new_keys by it_start.
476 // - Binary search (std::lower_bound comparing key with bucket.key_upper()) to
477 // find the first bucket in the sorted bucket array with key_upper is not
478 // smaller than (>=) the key.
479 // - Skip (and advance it_start) all keys smaller than the target bucket's
480 // key_lower. It means these keys belong to buckets in the unsorted bucket
481 // array and we will deal with them later.
482 // - Find it_end such that all keys within range [it_start, it_end) belong to
483 // the target bucket.
484 // - Batch add keys within range [it_start, it_end) into the target bucket.
485 auto it_start = new_keys.cbegin();
486 while (it_start != new_keys.cend() &&
487 sorted_bucket_arr_begin < sorted_bucket_arr_end) {
488 // Use std::lower_bound to find the first bucket in the sorted bucket array
489 // with key_upper >= *it_start.
490 const Bucket* target_bucket = std::lower_bound(
491 sorted_bucket_arr_begin, sorted_bucket_arr_end, *it_start, kComparator);
492 if (target_bucket >= sorted_bucket_arr_end) {
493 // Keys in range [it_start, new_keys.cend()) are greater than all sorted
494 // buckets' key_upper, so we can end step 1. In fact, they belong to
495 // buckets in the unsorted bucket array and we will deal with them in
496 // step 2.
497 break;
498 }
499
500 // Sequential instead of binary search to advance it_start and it_end for
501 // several reasons:
502 // - Eventually we have to iterate through all keys within range [it_start,
503 // it_end) and add them into the posting list, so binary search doesn't
504 // improve the overall time complexity.
505 // - Binary search may jump to far-away indices, which potentially
506 // downgrades the cache performance.
507
508 // After binary search, we've ensured *it_start <=
509 // target_bucket->key_upper(), but it is still possible that *it_start (and
510 // the next several keys) is still smaller than target_bucket->key_lower(),
511 // so we have to skip them. In fact, they belong to buckets in the unsorted
512 // bucket array.
513 //
514 // For example:
515 // - sorted bucket array: [(INT_MIN, 0), (1, 5), (100, 300), (301, 550)]
516 // - unsorted bucket array: [(550, INT_MAX), (6, 99)]
517 // - new_keys: [10, 20, 40, 102, 150, 200, 500, 600]
518 // std::lower_bound (target = 10) will get target_bucket = (100, 300), but
519 // we have to skip 10, 20, 40 because they are smaller than 100 (the
520 // bucket's key_lower). We should move it_start pointing to key 102.
521 while (it_start != new_keys.cend() &&
522 *it_start < target_bucket->key_lower()) {
523 ++it_start;
524 }
525
526 // Locate it_end such that all keys within range [it_start, it_end) belong
527 // to target_bucket and all keys outside this range don't belong to
528 // target_bucket.
529 //
530 // For example (continue above), we should locate it_end to point to key
531 // 500.
532 auto it_end = it_start;
533 while (it_end != new_keys.cend() && *it_end <= target_bucket->key_upper()) {
534 ++it_end;
535 }
536
537 // Now, keys within range [it_start, it_end) belong to target_bucket, so
538 // construct IntegerIndexData and add them into the bucket's posting list.
539 if (it_start != it_end) {
540 ICING_ASSIGN_OR_RETURN(
541 FileBackedVector<Bucket>::MutableView mutable_bucket,
542 sorted_buckets_->GetMutable(target_bucket -
543 sorted_buckets_->array()));
544 ICING_ASSIGN_OR_RETURN(
545 std::vector<Bucket> round_new_buckets,
546 AddKeysIntoBucketAndSplitIfNecessary(
547 document_id, section_id, it_start, it_end, mutable_bucket));
548 new_buckets.insert(new_buckets.end(), round_new_buckets.begin(),
549 round_new_buckets.end());
550 }
551
552 it_start = it_end;
553 sorted_bucket_arr_begin = target_bucket + 1;
554 }
555
556 // Step 2: handle keys belonging to buckets in the unsorted bucket array. They
557 // were skipped in step 1.
558 // For each bucket in the unsorted bucket array, find [it_start, it_end) such
559 // that all keys within this range belong to the bucket and add them.
560 // - Binary search (std::lower_bound comparing bucket.key_lower() with key) to
561 // find it_start.
562 // - Sequential advance (start from it_start) to find it_end. Same reason as
563 // above for choosing sequential advance instead of binary search.
564 // - Add keys within range [it_start, it_end) into the bucket.
565 for (int32_t i = 0; i < unsorted_buckets_->num_elements(); ++i) {
566 ICING_ASSIGN_OR_RETURN(FileBackedVector<Bucket>::MutableView mutable_bucket,
567 unsorted_buckets_->GetMutable(i));
568 auto it_start = std::lower_bound(new_keys.cbegin(), new_keys.cend(),
569 mutable_bucket.Get().key_lower());
570 if (it_start == new_keys.cend()) {
571 continue;
572 }
573
574 // Sequential advance instead of binary search to find the correct position
575 // of it_end for the same reasons mentioned above in step 1.
576 auto it_end = it_start;
577 while (it_end != new_keys.cend() &&
578 *it_end <= mutable_bucket.Get().key_upper()) {
579 ++it_end;
580 }
581
582 // Now, key within range [it_start, it_end) belong to the bucket, so
583 // construct IntegerIndexData and add them into the bucket's posting list.
584 if (it_start != it_end) {
585 ICING_ASSIGN_OR_RETURN(
586 std::vector<Bucket> round_new_buckets,
587 AddKeysIntoBucketAndSplitIfNecessary(
588 document_id, section_id, it_start, it_end, mutable_bucket));
589 new_buckets.insert(new_buckets.end(), round_new_buckets.begin(),
590 round_new_buckets.end());
591 }
592 }
593
594 // Step 3: append new buckets into the unsorted bucket array.
595 if (!new_buckets.empty()) {
596 ICING_ASSIGN_OR_RETURN(
597 typename FileBackedVector<Bucket>::MutableArrayView mutable_new_arr,
598 unsorted_buckets_->Allocate(new_buckets.size()));
599 mutable_new_arr.SetArray(/*idx=*/0, new_buckets.data(), new_buckets.size());
600 }
601
602 // Step 4: sort and merge the unsorted bucket array into the sorted bucket
603 // array if the length of the unsorted bucket array exceeds the
604 // threshold.
605 if (unsorted_buckets_->num_elements() > kUnsortedBucketsLengthThreshold) {
606 ICING_RETURN_IF_ERROR(SortBuckets());
607 }
608
609 info().num_data += new_keys.size();
610
611 return libtextclassifier3::Status::OK;
612 }
613
614 libtextclassifier3::StatusOr<std::unique_ptr<DocHitInfoIterator>>
GetIterator(int64_t query_key_lower,int64_t query_key_upper) const615 IntegerIndexStorage::GetIterator(int64_t query_key_lower,
616 int64_t query_key_upper) const {
617 if (query_key_lower > query_key_upper) {
618 return absl_ports::InvalidArgumentError(
619 "key_lower should not be greater than key_upper");
620 }
621
622 std::vector<std::unique_ptr<BucketPostingListIterator>> bucket_pl_iters;
623
624 // Sorted bucket array
625 const Bucket* sorted_bucket_arr_begin = sorted_buckets_->array();
626 const Bucket* sorted_bucket_arr_end =
627 sorted_buckets_->array() + sorted_buckets_->num_elements();
628 for (const Bucket* bucket =
629 std::lower_bound(sorted_bucket_arr_begin, sorted_bucket_arr_end,
630 query_key_lower, kComparator);
631 bucket < sorted_bucket_arr_end && bucket->key_lower() <= query_key_upper;
632 ++bucket) {
633 if (!bucket->posting_list_identifier().is_valid()) {
634 continue;
635 }
636
637 ICING_ASSIGN_OR_RETURN(
638 std::unique_ptr<PostingListIntegerIndexAccessor> pl_accessor,
639 PostingListIntegerIndexAccessor::CreateFromExisting(
640 flash_index_storage_.get(), posting_list_serializer_,
641 bucket->posting_list_identifier()));
642 bucket_pl_iters.push_back(
643 std::make_unique<BucketPostingListIterator>(std::move(pl_accessor)));
644 }
645
646 // Unsorted bucket array
647 for (int32_t i = 0; i < unsorted_buckets_->num_elements(); ++i) {
648 ICING_ASSIGN_OR_RETURN(const Bucket* bucket, unsorted_buckets_->Get(i));
649 if (query_key_upper < bucket->key_lower() ||
650 query_key_lower > bucket->key_upper() ||
651 !bucket->posting_list_identifier().is_valid()) {
652 // Skip bucket whose range doesn't overlap with [query_key_lower,
653 // query_key_upper] or posting_list_identifier is invalid.
654 continue;
655 }
656
657 ICING_ASSIGN_OR_RETURN(
658 std::unique_ptr<PostingListIntegerIndexAccessor> pl_accessor,
659 PostingListIntegerIndexAccessor::CreateFromExisting(
660 flash_index_storage_.get(), posting_list_serializer_,
661 bucket->posting_list_identifier()));
662 bucket_pl_iters.push_back(
663 std::make_unique<BucketPostingListIterator>(std::move(pl_accessor)));
664 }
665
666 return std::make_unique<DocHitInfoIteratorNumeric<int64_t>>(
667 std::make_unique<IntegerIndexStorageIterator>(
668 query_key_lower, query_key_upper, std::move(bucket_pl_iters)));
669 }
670
TransferIndex(const std::vector<DocumentId> & document_id_old_to_new,IntegerIndexStorage * new_storage) const671 libtextclassifier3::Status IntegerIndexStorage::TransferIndex(
672 const std::vector<DocumentId>& document_id_old_to_new,
673 IntegerIndexStorage* new_storage) const {
674 // Discard all pre-existing buckets in new_storage since we will append newly
675 // merged buckets gradually into new_storage.
676 if (new_storage->sorted_buckets_->num_elements() > 0) {
677 ICING_RETURN_IF_ERROR(new_storage->sorted_buckets_->TruncateTo(0));
678 }
679 if (new_storage->unsorted_buckets_->num_elements() > 0) {
680 ICING_RETURN_IF_ERROR(new_storage->unsorted_buckets_->TruncateTo(0));
681 }
682
683 // "Reference sort" the original storage buckets.
684 std::vector<std::reference_wrapper<const Bucket>> temp_buckets;
685 temp_buckets.reserve(sorted_buckets_->num_elements() +
686 unsorted_buckets_->num_elements());
687 temp_buckets.insert(
688 temp_buckets.end(), sorted_buckets_->array(),
689 sorted_buckets_->array() + sorted_buckets_->num_elements());
690 temp_buckets.insert(
691 temp_buckets.end(), unsorted_buckets_->array(),
692 unsorted_buckets_->array() + unsorted_buckets_->num_elements());
693 std::sort(temp_buckets.begin(), temp_buckets.end(),
694 [](const std::reference_wrapper<const Bucket>& lhs,
695 const std::reference_wrapper<const Bucket>& rhs) -> bool {
696 return lhs.get() < rhs.get();
697 });
698
699 const int32_t num_data_threshold_for_bucket_merge =
700 kNumDataThresholdRatioForBucketMerge *
701 new_storage->options_.num_data_threshold_for_bucket_split;
702 int64_t curr_key_lower = std::numeric_limits<int64_t>::min();
703 int64_t curr_key_upper = std::numeric_limits<int64_t>::min();
704 std::vector<IntegerIndexData> accumulated_data;
705 for (const std::reference_wrapper<const Bucket>& bucket_ref : temp_buckets) {
706 // Read all data from the bucket.
707 std::vector<IntegerIndexData> new_data;
708 if (bucket_ref.get().posting_list_identifier().is_valid()) {
709 ICING_ASSIGN_OR_RETURN(
710 std::unique_ptr<PostingListIntegerIndexAccessor> old_pl_accessor,
711 PostingListIntegerIndexAccessor::CreateFromExisting(
712 flash_index_storage_.get(), posting_list_serializer_,
713 bucket_ref.get().posting_list_identifier()));
714
715 ICING_ASSIGN_OR_RETURN(std::vector<IntegerIndexData> batch_old_data,
716 old_pl_accessor->GetNextDataBatch());
717 while (!batch_old_data.empty()) {
718 for (const IntegerIndexData& old_data : batch_old_data) {
719 DocumentId old_document_id = old_data.basic_hit().document_id();
720 DocumentId new_document_id =
721 old_document_id >= 0 &&
722 old_document_id < document_id_old_to_new.size()
723 ? document_id_old_to_new[old_document_id]
724 : kInvalidDocumentId;
725 // Transfer the document id of the hit if the document is not deleted
726 // or outdated.
727 if (new_document_id != kInvalidDocumentId) {
728 new_data.push_back(
729 IntegerIndexData(old_data.basic_hit().section_id(),
730 new_document_id, old_data.key()));
731 }
732 }
733 ICING_ASSIGN_OR_RETURN(batch_old_data,
734 old_pl_accessor->GetNextDataBatch());
735 }
736 }
737
738 // Decide whether:
739 // - Flush accumulated_data and create a new bucket for them.
740 // - OR merge new_data into accumulated_data and go to the next round.
741 if (!accumulated_data.empty() && accumulated_data.size() + new_data.size() >
742 num_data_threshold_for_bucket_merge) {
743 // TODO(b/259743562): [Optimization 3] adjust upper bound to fit more data
744 // from new_data to accumulated_data.
745 ICING_RETURN_IF_ERROR(FlushDataIntoNewSortedBucket(
746 curr_key_lower, curr_key_upper, std::move(accumulated_data),
747 new_storage));
748
749 curr_key_lower = bucket_ref.get().key_lower();
750 accumulated_data = std::move(new_data);
751 } else {
752 // We can just append to accumulated data because
753 // FlushDataIntoNewSortedBucket will take care of sorting the contents.
754 std::move(new_data.begin(), new_data.end(),
755 std::back_inserter(accumulated_data));
756 }
757 curr_key_upper = bucket_ref.get().key_upper();
758 }
759
760 // Add the last round of bucket.
761 ICING_RETURN_IF_ERROR(
762 FlushDataIntoNewSortedBucket(curr_key_lower, curr_key_upper,
763 std::move(accumulated_data), new_storage));
764
765 return libtextclassifier3::Status::OK;
766 }
767
768 /* static */ libtextclassifier3::StatusOr<std::unique_ptr<IntegerIndexStorage>>
InitializeNewFiles(const Filesystem & filesystem,std::string && working_path,Options && options,PostingListIntegerIndexSerializer * posting_list_serializer)769 IntegerIndexStorage::InitializeNewFiles(
770 const Filesystem& filesystem, std::string&& working_path, Options&& options,
771 PostingListIntegerIndexSerializer* posting_list_serializer) {
772 // IntegerIndexStorage uses working_path as working directory path.
773 // Create working directory.
774 if (!filesystem.CreateDirectory(working_path.c_str())) {
775 return absl_ports::InternalError(
776 absl_ports::StrCat("Failed to create directory: ", working_path));
777 }
778
779 // Initialize sorted_buckets
780 int32_t pre_mapping_mmap_size = sizeof(Bucket) * (1 << 10);
781 ICING_ASSIGN_OR_RETURN(
782 std::unique_ptr<FileBackedVector<Bucket>> sorted_buckets,
783 FileBackedVector<Bucket>::Create(
784 filesystem, GetSortedBucketsFilePath(working_path),
785 MemoryMappedFile::Strategy::READ_WRITE_AUTO_SYNC,
786 FileBackedVector<Bucket>::kMaxFileSize,
787 options.pre_mapping_fbv ? pre_mapping_mmap_size : 0));
788
789 // Initialize unsorted_buckets
790 pre_mapping_mmap_size = sizeof(Bucket) * kUnsortedBucketsLengthThreshold;
791 ICING_ASSIGN_OR_RETURN(
792 std::unique_ptr<FileBackedVector<Bucket>> unsorted_buckets,
793 FileBackedVector<Bucket>::Create(
794 filesystem, GetUnsortedBucketsFilePath(working_path),
795 MemoryMappedFile::Strategy::READ_WRITE_AUTO_SYNC,
796 FileBackedVector<Bucket>::kMaxFileSize,
797 options.pre_mapping_fbv ? pre_mapping_mmap_size : 0));
798
799 // Initialize flash_index_storage
800 ICING_ASSIGN_OR_RETURN(
801 FlashIndexStorage flash_index_storage,
802 FlashIndexStorage::Create(GetFlashIndexStorageFilePath(working_path),
803 &filesystem, posting_list_serializer));
804
805 if (options.HasCustomInitBuckets()) {
806 // Insert custom init buckets.
807 std::sort(options.custom_init_sorted_buckets.begin(),
808 options.custom_init_sorted_buckets.end());
809 ICING_ASSIGN_OR_RETURN(
810 typename FileBackedVector<Bucket>::MutableArrayView
811 mutable_new_sorted_bucket_arr,
812 sorted_buckets->Allocate(options.custom_init_sorted_buckets.size()));
813 mutable_new_sorted_bucket_arr.SetArray(
814 /*idx=*/0, options.custom_init_sorted_buckets.data(),
815 options.custom_init_sorted_buckets.size());
816
817 ICING_ASSIGN_OR_RETURN(typename FileBackedVector<Bucket>::MutableArrayView
818 mutable_new_unsorted_bucket_arr,
819 unsorted_buckets->Allocate(
820 options.custom_init_unsorted_buckets.size()));
821 mutable_new_unsorted_bucket_arr.SetArray(
822 /*idx=*/0, options.custom_init_unsorted_buckets.data(),
823 options.custom_init_unsorted_buckets.size());
824
825 // After inserting buckets, we can clear vectors since there is no need to
826 // cache them.
827 options.custom_init_sorted_buckets.clear();
828 options.custom_init_unsorted_buckets.clear();
829 } else {
830 // Insert one bucket with range [INT64_MIN, INT64_MAX].
831 ICING_RETURN_IF_ERROR(sorted_buckets->Append(Bucket(
832 /*key_lower=*/std::numeric_limits<int64_t>::min(),
833 /*key_upper=*/std::numeric_limits<int64_t>::max())));
834 }
835 ICING_RETURN_IF_ERROR(sorted_buckets->PersistToDisk());
836
837 // Initialize metadata file. Create MemoryMappedFile with pre-mapping, and
838 // call GrowAndRemapIfNecessary to grow the underlying file.
839 ICING_ASSIGN_OR_RETURN(
840 MemoryMappedFile metadata_mmapped_file,
841 MemoryMappedFile::Create(filesystem, GetMetadataFilePath(working_path),
842 MemoryMappedFile::Strategy::READ_WRITE_AUTO_SYNC,
843 /*max_file_size=*/kMetadataFileSize,
844 /*pre_mapping_file_offset=*/0,
845 /*pre_mapping_mmap_size=*/kMetadataFileSize));
846 ICING_RETURN_IF_ERROR(metadata_mmapped_file.GrowAndRemapIfNecessary(
847 /*file_offset=*/0, /*mmap_size=*/kMetadataFileSize));
848
849 // Create instance.
850 auto new_integer_index_storage =
851 std::unique_ptr<IntegerIndexStorage>(new IntegerIndexStorage(
852 filesystem, std::move(working_path), std::move(options),
853 posting_list_serializer,
854 std::make_unique<MemoryMappedFile>(std::move(metadata_mmapped_file)),
855 std::move(sorted_buckets), std::move(unsorted_buckets),
856 std::make_unique<FlashIndexStorage>(std::move(flash_index_storage))));
857 // Initialize info content by writing mapped memory directly.
858 Info& info_ref = new_integer_index_storage->info();
859 info_ref.magic = Info::kMagic;
860 info_ref.num_data = 0;
861
862 // Initialize new PersistentStorage. The initial checksums will be computed
863 // and set via InitializeNewStorage.
864 ICING_RETURN_IF_ERROR(new_integer_index_storage->InitializeNewStorage());
865
866 return new_integer_index_storage;
867 }
868
869 /* static */ libtextclassifier3::StatusOr<std::unique_ptr<IntegerIndexStorage>>
InitializeExistingFiles(const Filesystem & filesystem,std::string && working_path,Options && options,PostingListIntegerIndexSerializer * posting_list_serializer)870 IntegerIndexStorage::InitializeExistingFiles(
871 const Filesystem& filesystem, std::string&& working_path, Options&& options,
872 PostingListIntegerIndexSerializer* posting_list_serializer) {
873 // Mmap the content of the crcs and info.
874 ICING_ASSIGN_OR_RETURN(
875 MemoryMappedFile metadata_mmapped_file,
876 MemoryMappedFile::Create(filesystem, GetMetadataFilePath(working_path),
877 MemoryMappedFile::Strategy::READ_WRITE_AUTO_SYNC,
878 /*max_file_size=*/kMetadataFileSize,
879 /*pre_mapping_file_offset=*/0,
880 /*pre_mapping_mmap_size=*/kMetadataFileSize));
881 if (metadata_mmapped_file.available_size() != kMetadataFileSize) {
882 return absl_ports::FailedPreconditionError("Incorrect metadata file size");
883 }
884
885 // Initialize sorted_buckets
886 int32_t pre_mapping_mmap_size = sizeof(Bucket) * (1 << 10);
887 ICING_ASSIGN_OR_RETURN(
888 std::unique_ptr<FileBackedVector<Bucket>> sorted_buckets,
889 FileBackedVector<Bucket>::Create(
890 filesystem, GetSortedBucketsFilePath(working_path),
891 MemoryMappedFile::Strategy::READ_WRITE_AUTO_SYNC,
892 FileBackedVector<Bucket>::kMaxFileSize,
893 options.pre_mapping_fbv ? pre_mapping_mmap_size : 0));
894
895 // Initialize unsorted_buckets
896 pre_mapping_mmap_size = sizeof(Bucket) * kUnsortedBucketsLengthThreshold;
897 ICING_ASSIGN_OR_RETURN(
898 std::unique_ptr<FileBackedVector<Bucket>> unsorted_buckets,
899 FileBackedVector<Bucket>::Create(
900 filesystem, GetUnsortedBucketsFilePath(working_path),
901 MemoryMappedFile::Strategy::READ_WRITE_AUTO_SYNC,
902 FileBackedVector<Bucket>::kMaxFileSize,
903 options.pre_mapping_fbv ? pre_mapping_mmap_size : 0));
904
905 // Initialize flash_index_storage
906 ICING_ASSIGN_OR_RETURN(
907 FlashIndexStorage flash_index_storage,
908 FlashIndexStorage::Create(GetFlashIndexStorageFilePath(working_path),
909 &filesystem, posting_list_serializer));
910
911 // Create instance.
912 auto integer_index_storage =
913 std::unique_ptr<IntegerIndexStorage>(new IntegerIndexStorage(
914 filesystem, std::move(working_path), std::move(options),
915 posting_list_serializer,
916 std::make_unique<MemoryMappedFile>(std::move(metadata_mmapped_file)),
917 std::move(sorted_buckets), std::move(unsorted_buckets),
918 std::make_unique<FlashIndexStorage>(std::move(flash_index_storage))));
919
920 // Initialize existing PersistentStorage. Checksums will be validated.
921 ICING_RETURN_IF_ERROR(integer_index_storage->InitializeExistingStorage());
922
923 // Validate other values of info and options.
924 // Magic should be consistent with the codebase.
925 if (integer_index_storage->info().magic != Info::kMagic) {
926 return absl_ports::FailedPreconditionError("Incorrect magic value");
927 }
928
929 return integer_index_storage;
930 }
931
932 /* static */ libtextclassifier3::Status
FlushDataIntoNewSortedBucket(int64_t key_lower,int64_t key_upper,std::vector<IntegerIndexData> && data,IntegerIndexStorage * storage)933 IntegerIndexStorage::FlushDataIntoNewSortedBucket(
934 int64_t key_lower, int64_t key_upper, std::vector<IntegerIndexData>&& data,
935 IntegerIndexStorage* storage) {
936 storage->SetDirty();
937
938 if (data.empty()) {
939 return storage->sorted_buckets_->Append(Bucket(
940 key_lower, key_upper, PostingListIdentifier::kInvalid, /*num_data=*/0));
941 }
942
943 ICING_ASSIGN_OR_RETURN(
944 PostingListIdentifier pl_id,
945 FlushDataIntoPostingLists(storage->flash_index_storage_.get(),
946 storage->posting_list_serializer_, data.begin(),
947 data.end()));
948
949 storage->info().num_data += data.size();
950 return storage->sorted_buckets_->Append(
951 Bucket(key_lower, key_upper, pl_id, data.size()));
952 }
953
PersistStoragesToDisk()954 libtextclassifier3::Status IntegerIndexStorage::PersistStoragesToDisk() {
955 if (is_initialized_ && !is_storage_dirty()) {
956 return libtextclassifier3::Status::OK;
957 }
958
959 ICING_RETURN_IF_ERROR(sorted_buckets_->PersistToDisk());
960 ICING_RETURN_IF_ERROR(unsorted_buckets_->PersistToDisk());
961 if (!flash_index_storage_->PersistToDisk()) {
962 return absl_ports::InternalError(
963 "Fail to persist FlashIndexStorage to disk");
964 }
965 is_storage_dirty_ = false;
966 return libtextclassifier3::Status::OK;
967 }
968
PersistMetadataToDisk()969 libtextclassifier3::Status IntegerIndexStorage::PersistMetadataToDisk() {
970 // We can skip persisting metadata to disk only if both info and storage are
971 // clean.
972 if (is_initialized_ && !is_info_dirty() && !is_storage_dirty()) {
973 return libtextclassifier3::Status::OK;
974 }
975
976 // Changes should have been applied to the underlying file when using
977 // MemoryMappedFile::Strategy::READ_WRITE_AUTO_SYNC, but call msync() as an
978 // extra safety step to ensure they are written out.
979 ICING_RETURN_IF_ERROR(metadata_mmapped_file_->PersistToDisk());
980 is_info_dirty_ = false;
981 return libtextclassifier3::Status::OK;
982 }
983
984 libtextclassifier3::StatusOr<Crc32>
UpdateStoragesChecksum()985 IntegerIndexStorage::UpdateStoragesChecksum() {
986 if (is_initialized_ && !is_storage_dirty()) {
987 return Crc32(crcs().component_crcs.storages_crc);
988 }
989
990 // Compute crcs
991 ICING_ASSIGN_OR_RETURN(Crc32 sorted_buckets_crc,
992 sorted_buckets_->UpdateChecksum());
993 ICING_ASSIGN_OR_RETURN(Crc32 unsorted_buckets_crc,
994 unsorted_buckets_->UpdateChecksum());
995
996 // TODO(b/259744228): implement and include flash_index_storage checksum
997 return Crc32(sorted_buckets_crc.Get() ^ unsorted_buckets_crc.Get());
998 }
999
GetInfoChecksum() const1000 libtextclassifier3::StatusOr<Crc32> IntegerIndexStorage::GetInfoChecksum()
1001 const {
1002 if (is_initialized_ && !is_info_dirty()) {
1003 return Crc32(crcs().component_crcs.info_crc);
1004 }
1005
1006 return info().GetChecksum();
1007 }
1008
GetStoragesChecksum() const1009 libtextclassifier3::StatusOr<Crc32> IntegerIndexStorage::GetStoragesChecksum()
1010 const {
1011 if (is_initialized_ && !is_storage_dirty()) {
1012 return Crc32(crcs().component_crcs.storages_crc);
1013 }
1014
1015 // Compute crcs
1016 Crc32 sorted_buckets_crc = sorted_buckets_->GetChecksum();
1017 Crc32 unsorted_buckets_crc = unsorted_buckets_->GetChecksum();
1018 // TODO(b/259744228): implement and include flash_index_storage checksum
1019 return Crc32(sorted_buckets_crc.Get() ^ unsorted_buckets_crc.Get());
1020 }
1021
1022 libtextclassifier3::StatusOr<std::vector<IntegerIndexStorage::Bucket>>
AddKeysIntoBucketAndSplitIfNecessary(DocumentId document_id,SectionId section_id,const std::vector<int64_t>::const_iterator & it_start,const std::vector<int64_t>::const_iterator & it_end,FileBackedVector<Bucket>::MutableView & mutable_bucket)1023 IntegerIndexStorage::AddKeysIntoBucketAndSplitIfNecessary(
1024 DocumentId document_id, SectionId section_id,
1025 const std::vector<int64_t>::const_iterator& it_start,
1026 const std::vector<int64_t>::const_iterator& it_end,
1027 FileBackedVector<Bucket>::MutableView& mutable_bucket) {
1028 int32_t num_data_in_bucket = mutable_bucket.Get().num_data();
1029 int32_t num_new_data = std::distance(it_start, it_end);
1030 if (mutable_bucket.Get().key_lower() < mutable_bucket.Get().key_upper() &&
1031 num_new_data + num_data_in_bucket >
1032 options_.num_data_threshold_for_bucket_split) {
1033 // Split bucket.
1034
1035 // 1. Read all data and free all posting lists.
1036 std::vector<IntegerIndexData> all_data;
1037 if (mutable_bucket.Get().posting_list_identifier().is_valid()) {
1038 ICING_ASSIGN_OR_RETURN(
1039 std::unique_ptr<PostingListIntegerIndexAccessor> pl_accessor,
1040 PostingListIntegerIndexAccessor::CreateFromExisting(
1041 flash_index_storage_.get(), posting_list_serializer_,
1042 mutable_bucket.Get().posting_list_identifier()));
1043 ICING_ASSIGN_OR_RETURN(all_data, pl_accessor->GetAllDataAndFree());
1044 }
1045
1046 // 2. Append all new data.
1047 all_data.reserve(all_data.size() + num_new_data);
1048 for (auto it = it_start; it != it_end; ++it) {
1049 all_data.push_back(IntegerIndexData(section_id, document_id, *it));
1050 }
1051
1052 // 3. Run bucket splitting algorithm to decide new buckets and dispatch
1053 // data.
1054 // - # of data in a full bucket =
1055 // options_.num_data_threshold_for_bucket_split.
1056 // - Bucket splitting logic will be invoked if adding new data
1057 // (num_new_data >= 1) into a full bucket.
1058 // - In order to achieve good (amortized) time complexity, we want # of
1059 // data in new buckets to be around half_of_threshold (i.e.
1060 // options_.num_data_threshold_for_bucket_split / 2).
1061 // - Using half_of_threshold as the cutoff threshold will cause splitting
1062 // buckets with [half_of_threshold, half_of_threshold, num_new_data]
1063 // data, which is not ideal because num_new_data is usually small.
1064 // - Thus, we pick (half_of_threshold + kNumDataAfterSplitAdjustment) as
1065 // the cutoff threshold to avoid over-splitting. It can tolerate
1066 // num_new_data up to (2 * kNumDataAfterSplitAdjustment) and
1067 // split only 2 buckets (instead of 3) with
1068 // [half_of_threshold + kNumDataAfterSplitAdjustment,
1069 // half_of_threshold + (kNumDataAfterSplitAdjustment - num_new_data)].
1070 int32_t cutoff_threshold =
1071 options_.num_data_threshold_for_bucket_split / 2 +
1072 kNumDataAfterSplitAdjustment;
1073 std::vector<integer_index_bucket_util::DataRangeAndBucketInfo>
1074 new_bucket_infos = integer_index_bucket_util::Split(
1075 all_data, mutable_bucket.Get().key_lower(),
1076 mutable_bucket.Get().key_upper(), cutoff_threshold);
1077 if (new_bucket_infos.empty()) {
1078 ICING_LOG(WARNING)
1079 << "No buckets after splitting. This should not happen.";
1080 return absl_ports::InternalError("Split error");
1081 }
1082
1083 // 4. Flush data and create new buckets.
1084 std::vector<Bucket> new_buckets;
1085 for (int i = 0; i < new_bucket_infos.size(); ++i) {
1086 int32_t num_data_in_new_bucket =
1087 std::distance(new_bucket_infos[i].start, new_bucket_infos[i].end);
1088 ICING_ASSIGN_OR_RETURN(
1089 PostingListIdentifier pl_id,
1090 FlushDataIntoPostingLists(
1091 flash_index_storage_.get(), posting_list_serializer_,
1092 new_bucket_infos[i].start, new_bucket_infos[i].end));
1093 if (i == 0) {
1094 // Reuse mutable_bucket
1095 mutable_bucket.Get().set_key_lower(new_bucket_infos[i].key_lower);
1096 mutable_bucket.Get().set_key_upper(new_bucket_infos[i].key_upper);
1097 mutable_bucket.Get().set_posting_list_identifier(pl_id);
1098 mutable_bucket.Get().set_num_data(num_data_in_new_bucket);
1099 } else {
1100 new_buckets.push_back(Bucket(new_bucket_infos[i].key_lower,
1101 new_bucket_infos[i].key_upper, pl_id,
1102 num_data_in_new_bucket));
1103 }
1104 }
1105
1106 return new_buckets;
1107 }
1108
1109 // Otherwise, we don't need to split bucket. Just simply add all new data into
1110 // the bucket.
1111 std::unique_ptr<PostingListIntegerIndexAccessor> pl_accessor;
1112 if (mutable_bucket.Get().posting_list_identifier().is_valid()) {
1113 ICING_ASSIGN_OR_RETURN(
1114 pl_accessor, PostingListIntegerIndexAccessor::CreateFromExisting(
1115 flash_index_storage_.get(), posting_list_serializer_,
1116 mutable_bucket.Get().posting_list_identifier()));
1117 } else {
1118 ICING_ASSIGN_OR_RETURN(
1119 pl_accessor, PostingListIntegerIndexAccessor::Create(
1120 flash_index_storage_.get(), posting_list_serializer_));
1121 }
1122
1123 for (auto it = it_start; it != it_end; ++it) {
1124 ICING_RETURN_IF_ERROR(pl_accessor->PrependData(
1125 IntegerIndexData(section_id, document_id, *it)));
1126 }
1127
1128 PostingListAccessor::FinalizeResult result =
1129 std::move(*pl_accessor).Finalize();
1130 if (!result.status.ok()) {
1131 return result.status;
1132 }
1133 if (!result.id.is_valid()) {
1134 return absl_ports::InternalError("Fail to flush data into posting list(s)");
1135 }
1136
1137 mutable_bucket.Get().set_posting_list_identifier(result.id);
1138 // We've already verified num_new_data won't exceed the limit of the entire
1139 // storage, so it is safe to add to the counter of the bucket.
1140 mutable_bucket.Get().set_num_data(num_data_in_bucket + num_new_data);
1141
1142 return std::vector<Bucket>();
1143 }
1144
SortBuckets()1145 libtextclassifier3::Status IntegerIndexStorage::SortBuckets() {
1146 if (unsorted_buckets_->num_elements() == 0) {
1147 return libtextclassifier3::Status::OK;
1148 }
1149
1150 int32_t sorted_len = sorted_buckets_->num_elements();
1151 int32_t unsorted_len = unsorted_buckets_->num_elements();
1152 if (sorted_len > FileBackedVector<Bucket>::kMaxNumElements - unsorted_len) {
1153 return absl_ports::OutOfRangeError(
1154 "Sorted buckets length exceeds the limit after merging");
1155 }
1156
1157 ICING_RETURN_IF_ERROR(sorted_buckets_->Allocate(unsorted_len));
1158
1159 // Sort unsorted_buckets_.
1160 ICING_RETURN_IF_ERROR(
1161 unsorted_buckets_->Sort(/*begin_idx=*/0, /*end_idx=*/unsorted_len));
1162
1163 // Merge unsorted_buckets_ into sorted_buckets_ and clear unsorted_buckets_.
1164 // Note that we could have used std::sort + std::inplace_merge, but it is more
1165 // complicated to deal with FileBackedVector SetDirty logic, so implement our
1166 // own merging with FileBackedVector methods.
1167 //
1168 // Merge buckets from back. This could save some iterations and avoid setting
1169 // dirty for unchanged elements of the original sorted segments.
1170 // For example, we can avoid setting dirty for elements [1, 2, 3, 5] for the
1171 // following sorted/unsorted data:
1172 // - sorted: [1, 2, 3, 5, 8, 13, _, _, _, _)]
1173 // - unsorted: [6, 10, 14, 15]
1174 int32_t sorted_write_idx = sorted_len + unsorted_len - 1;
1175 int32_t sorted_curr_idx = sorted_len - 1;
1176 int32_t unsorted_curr_idx = unsorted_len - 1;
1177 while (unsorted_curr_idx >= 0) {
1178 if (sorted_curr_idx >= 0 && unsorted_buckets_->array()[unsorted_curr_idx] <
1179 sorted_buckets_->array()[sorted_curr_idx]) {
1180 ICING_RETURN_IF_ERROR(sorted_buckets_->Set(
1181 sorted_write_idx, sorted_buckets_->array()[sorted_curr_idx]));
1182 --sorted_curr_idx;
1183
1184 } else {
1185 ICING_RETURN_IF_ERROR(sorted_buckets_->Set(
1186 sorted_write_idx, unsorted_buckets_->array()[unsorted_curr_idx]));
1187 --unsorted_curr_idx;
1188 }
1189 --sorted_write_idx;
1190 }
1191
1192 ICING_RETURN_IF_ERROR(unsorted_buckets_->TruncateTo(0));
1193
1194 return libtextclassifier3::Status::OK;
1195 }
1196
1197 } // namespace lib
1198 } // namespace icing
1199