xref: /aosp_15_r20/external/icing/icing/result/result-state-manager.cc (revision 8b6cd535a057e39b3b86660c4aa06c99747c2136)
1 // Copyright (C) 2019 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "icing/result/result-state-manager.h"
16 
17 #include <memory>
18 #include <queue>
19 #include <utility>
20 
21 #include "icing/result/page-result.h"
22 #include "icing/result/result-adjustment-info.h"
23 #include "icing/result/result-retriever-v2.h"
24 #include "icing/result/result-state-v2.h"
25 #include "icing/scoring/scored-document-hits-ranker.h"
26 #include "icing/util/clock.h"
27 #include "icing/util/logging.h"
28 #include "icing/util/status-macros.h"
29 
30 namespace icing {
31 namespace lib {
32 
ResultStateManager(int max_total_hits,const DocumentStore & document_store)33 ResultStateManager::ResultStateManager(int max_total_hits,
34                                        const DocumentStore& document_store)
35     : document_store_(document_store),
36       max_total_hits_(max_total_hits),
37       num_total_hits_(0),
38       random_generator_(GetSteadyTimeNanoseconds()) {}
39 
40 libtextclassifier3::StatusOr<std::pair<uint64_t, PageResult>>
CacheAndRetrieveFirstPage(std::unique_ptr<ScoredDocumentHitsRanker> ranker,std::unique_ptr<ResultAdjustmentInfo> parent_adjustment_info,std::unique_ptr<ResultAdjustmentInfo> child_adjustment_info,const ResultSpecProto & result_spec,const DocumentStore & document_store,const ResultRetrieverV2 & result_retriever,int64_t current_time_ms)41 ResultStateManager::CacheAndRetrieveFirstPage(
42     std::unique_ptr<ScoredDocumentHitsRanker> ranker,
43     std::unique_ptr<ResultAdjustmentInfo> parent_adjustment_info,
44     std::unique_ptr<ResultAdjustmentInfo> child_adjustment_info,
45     const ResultSpecProto& result_spec, const DocumentStore& document_store,
46     const ResultRetrieverV2& result_retriever, int64_t current_time_ms) {
47   if (ranker == nullptr) {
48     return absl_ports::InvalidArgumentError("Should not provide null ranker");
49   }
50 
51   // Create shared pointer of ResultState.
52   // ResultState should be created by ResultStateManager only.
53   std::shared_ptr<ResultStateV2> result_state = std::make_shared<ResultStateV2>(
54       std::move(ranker), std::move(parent_adjustment_info),
55       std::move(child_adjustment_info), result_spec, document_store);
56 
57   // Retrieve docs outside of ResultStateManager critical section.
58   // Will enter ResultState critical section inside ResultRetriever.
59   auto [page_result, has_more_results] =
60       result_retriever.RetrieveNextPage(*result_state, current_time_ms);
61   if (!has_more_results) {
62     // No more pages, won't store ResultState, returns directly
63     return std::make_pair(kInvalidNextPageToken, std::move(page_result));
64   }
65 
66   // ResultState has multiple pages, storing it
67   int num_hits_to_add = 0;
68   {
69     // ResultState critical section
70     absl_ports::unique_lock l(&result_state->mutex);
71 
72     result_state->scored_document_hits_ranker->TruncateHitsTo(max_total_hits_);
73     result_state->RegisterNumTotalHits(&num_total_hits_);
74     num_hits_to_add = result_state->scored_document_hits_ranker->size();
75   }
76 
77   // It is fine to exit ResultState critical section, since it is just created
78   // above and only this thread (this call stack) has access to it. Thus, it
79   // won't be changed during the gap before we enter ResultStateManager critical
80   // section.
81   uint64_t next_page_token = kInvalidNextPageToken;
82   {
83     // ResultStateManager critical section
84     absl_ports::unique_lock l(&mutex_);
85 
86     // Remove expired result states first.
87     InternalInvalidateExpiredResultStates(kDefaultResultStateTtlInMs,
88                                           current_time_ms);
89     // Remove states to make room for this new state.
90     RemoveStatesIfNeeded(num_hits_to_add);
91     // Generate a new unique token and add it into result_state_map_.
92     next_page_token = Add(std::move(result_state), current_time_ms);
93   }
94 
95   return std::make_pair(next_page_token, std::move(page_result));
96 }
97 
Add(std::shared_ptr<ResultStateV2> result_state,int64_t current_time_ms)98 uint64_t ResultStateManager::Add(std::shared_ptr<ResultStateV2> result_state,
99                                  int64_t current_time_ms) {
100   uint64_t new_token = GetUniqueToken();
101 
102   result_state_map_.emplace(new_token, std::move(result_state));
103   // Tracks the insertion order
104   token_queue_.push(std::make_pair(new_token, current_time_ms));
105 
106   return new_token;
107 }
108 
109 libtextclassifier3::StatusOr<std::pair<uint64_t, PageResult>>
GetNextPage(uint64_t next_page_token,const ResultRetrieverV2 & result_retriever,int64_t current_time_ms)110 ResultStateManager::GetNextPage(uint64_t next_page_token,
111                                 const ResultRetrieverV2& result_retriever,
112                                 int64_t current_time_ms) {
113   std::shared_ptr<ResultStateV2> result_state = nullptr;
114   {
115     // ResultStateManager critical section
116     absl_ports::unique_lock l(&mutex_);
117 
118     // Remove expired result states before fetching
119     InternalInvalidateExpiredResultStates(kDefaultResultStateTtlInMs,
120                                           current_time_ms);
121 
122     const auto& state_iterator = result_state_map_.find(next_page_token);
123     if (state_iterator == result_state_map_.end()) {
124       return absl_ports::NotFoundError("next_page_token not found");
125     }
126     result_state = state_iterator->second;
127   }
128 
129   // Retrieve docs outside of ResultStateManager critical section.
130   // Will enter ResultState critical section inside ResultRetriever.
131   auto [page_result, has_more_results] =
132       result_retriever.RetrieveNextPage(*result_state, current_time_ms);
133 
134   if (!has_more_results) {
135     {
136       // ResultStateManager critical section
137       absl_ports::unique_lock l(&mutex_);
138 
139       InternalInvalidateResultState(next_page_token);
140     }
141 
142     next_page_token = kInvalidNextPageToken;
143   }
144   return std::make_pair(next_page_token, std::move(page_result));
145 }
146 
InvalidateResultState(uint64_t next_page_token)147 void ResultStateManager::InvalidateResultState(uint64_t next_page_token) {
148   if (next_page_token == kInvalidNextPageToken) {
149     return;
150   }
151 
152   absl_ports::unique_lock l(&mutex_);
153 
154   InternalInvalidateResultState(next_page_token);
155 }
156 
InvalidateAllResultStates()157 void ResultStateManager::InvalidateAllResultStates() {
158   absl_ports::unique_lock l(&mutex_);
159   InternalInvalidateAllResultStates();
160 }
161 
InternalInvalidateAllResultStates()162 void ResultStateManager::InternalInvalidateAllResultStates() {
163   // We don't have to reset num_total_hits_ (to 0) here, since clearing
164   // result_state_map_ will "eventually" invoke the destructor of ResultState
165   // (which decrements num_total_hits_) and num_total_hits_ will become 0.
166   result_state_map_.clear();
167   invalidated_token_set_.clear();
168   token_queue_ = std::queue<std::pair<uint64_t, int64_t>>();
169 }
170 
GetUniqueToken()171 uint64_t ResultStateManager::GetUniqueToken() {
172   uint64_t new_token = random_generator_();
173   // There's a small chance of collision between the random numbers, here we're
174   // trying to avoid any collisions by checking the keys.
175   while (result_state_map_.find(new_token) != result_state_map_.end() ||
176          invalidated_token_set_.find(new_token) !=
177              invalidated_token_set_.end() ||
178          new_token == kInvalidNextPageToken) {
179     new_token = random_generator_();
180   }
181   return new_token;
182 }
183 
RemoveStatesIfNeeded(int num_hits_to_add)184 void ResultStateManager::RemoveStatesIfNeeded(int num_hits_to_add) {
185   if (result_state_map_.empty() || token_queue_.empty()) {
186     return;
187   }
188 
189   // 1. Check if this new result_state would take up the entire result state
190   // manager budget.
191   if (num_hits_to_add > max_total_hits_) {
192     // This single result state will exceed our budget. Drop everything else to
193     // accomodate it.
194     InternalInvalidateAllResultStates();
195     return;
196   }
197 
198   // 2. Remove any tokens that were previously invalidated.
199   while (!token_queue_.empty() &&
200          invalidated_token_set_.find(token_queue_.front().first) !=
201              invalidated_token_set_.end()) {
202     invalidated_token_set_.erase(token_queue_.front().first);
203     token_queue_.pop();
204   }
205 
206   // 3. If we're over budget, remove states from oldest to newest until we fit
207   // into our budget.
208   // Note: num_total_hits_ may not be decremented immediately after invalidating
209   // a result state, since other threads may still hold the shared pointer.
210   // Thus, we have to check if token_queue_ is empty or not, since it is
211   // possible that num_total_hits_ is non-zero and still greater than
212   // max_total_hits_ when token_queue_ is empty. Still "eventually" it will be
213   // decremented after the last thread releases the shared pointer.
214   while (!token_queue_.empty() && num_total_hits_ > max_total_hits_) {
215     InternalInvalidateResultState(token_queue_.front().first);
216     token_queue_.pop();
217   }
218   invalidated_token_set_.clear();
219 }
220 
InternalInvalidateResultState(uint64_t token)221 void ResultStateManager::InternalInvalidateResultState(uint64_t token) {
222   // Removes the entry in result_state_map_ and insert the token into
223   // invalidated_token_set_. The entry in token_queue_ can't be easily removed
224   // right now (may need O(n) time), so we leave it there and later completely
225   // remove the token in RemoveStatesIfNeeded().
226   auto itr = result_state_map_.find(token);
227   if (itr != result_state_map_.end()) {
228     // We don't have to decrement num_total_hits_ here, since erasing the shared
229     // ptr instance will "eventually" invoke the destructor of ResultState and
230     // it will handle this.
231     result_state_map_.erase(itr);
232     invalidated_token_set_.insert(token);
233   }
234 }
235 
InternalInvalidateExpiredResultStates(int64_t result_state_ttl,int64_t current_time_ms)236 void ResultStateManager::InternalInvalidateExpiredResultStates(
237     int64_t result_state_ttl, int64_t current_time_ms) {
238   while (!token_queue_.empty() &&
239          current_time_ms - token_queue_.front().second >= result_state_ttl) {
240     auto itr = result_state_map_.find(token_queue_.front().first);
241     if (itr != result_state_map_.end()) {
242       // We don't have to decrement num_total_hits_ here, since erasing the
243       // shared ptr instance will "eventually" invoke the destructor of
244       // ResultState and it will handle this.
245       result_state_map_.erase(itr);
246     } else {
247       // Since result_state_map_ and invalidated_token_set_ are mutually
248       // exclusive, we remove the token from invalidated_token_set_ only if it
249       // isn't present in result_state_map_.
250       invalidated_token_set_.erase(token_queue_.front().first);
251     }
252     token_queue_.pop();
253   }
254 }
255 
256 }  // namespace lib
257 }  // namespace icing
258