1 // Copyright (C) 2019 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "icing/result/result-state-manager.h"
16
17 #include <memory>
18 #include <queue>
19 #include <utility>
20
21 #include "icing/result/page-result.h"
22 #include "icing/result/result-adjustment-info.h"
23 #include "icing/result/result-retriever-v2.h"
24 #include "icing/result/result-state-v2.h"
25 #include "icing/scoring/scored-document-hits-ranker.h"
26 #include "icing/util/clock.h"
27 #include "icing/util/logging.h"
28 #include "icing/util/status-macros.h"
29
30 namespace icing {
31 namespace lib {
32
ResultStateManager(int max_total_hits,const DocumentStore & document_store)33 ResultStateManager::ResultStateManager(int max_total_hits,
34 const DocumentStore& document_store)
35 : document_store_(document_store),
36 max_total_hits_(max_total_hits),
37 num_total_hits_(0),
38 random_generator_(GetSteadyTimeNanoseconds()) {}
39
40 libtextclassifier3::StatusOr<std::pair<uint64_t, PageResult>>
CacheAndRetrieveFirstPage(std::unique_ptr<ScoredDocumentHitsRanker> ranker,std::unique_ptr<ResultAdjustmentInfo> parent_adjustment_info,std::unique_ptr<ResultAdjustmentInfo> child_adjustment_info,const ResultSpecProto & result_spec,const DocumentStore & document_store,const ResultRetrieverV2 & result_retriever,int64_t current_time_ms)41 ResultStateManager::CacheAndRetrieveFirstPage(
42 std::unique_ptr<ScoredDocumentHitsRanker> ranker,
43 std::unique_ptr<ResultAdjustmentInfo> parent_adjustment_info,
44 std::unique_ptr<ResultAdjustmentInfo> child_adjustment_info,
45 const ResultSpecProto& result_spec, const DocumentStore& document_store,
46 const ResultRetrieverV2& result_retriever, int64_t current_time_ms) {
47 if (ranker == nullptr) {
48 return absl_ports::InvalidArgumentError("Should not provide null ranker");
49 }
50
51 // Create shared pointer of ResultState.
52 // ResultState should be created by ResultStateManager only.
53 std::shared_ptr<ResultStateV2> result_state = std::make_shared<ResultStateV2>(
54 std::move(ranker), std::move(parent_adjustment_info),
55 std::move(child_adjustment_info), result_spec, document_store);
56
57 // Retrieve docs outside of ResultStateManager critical section.
58 // Will enter ResultState critical section inside ResultRetriever.
59 auto [page_result, has_more_results] =
60 result_retriever.RetrieveNextPage(*result_state, current_time_ms);
61 if (!has_more_results) {
62 // No more pages, won't store ResultState, returns directly
63 return std::make_pair(kInvalidNextPageToken, std::move(page_result));
64 }
65
66 // ResultState has multiple pages, storing it
67 int num_hits_to_add = 0;
68 {
69 // ResultState critical section
70 absl_ports::unique_lock l(&result_state->mutex);
71
72 result_state->scored_document_hits_ranker->TruncateHitsTo(max_total_hits_);
73 result_state->RegisterNumTotalHits(&num_total_hits_);
74 num_hits_to_add = result_state->scored_document_hits_ranker->size();
75 }
76
77 // It is fine to exit ResultState critical section, since it is just created
78 // above and only this thread (this call stack) has access to it. Thus, it
79 // won't be changed during the gap before we enter ResultStateManager critical
80 // section.
81 uint64_t next_page_token = kInvalidNextPageToken;
82 {
83 // ResultStateManager critical section
84 absl_ports::unique_lock l(&mutex_);
85
86 // Remove expired result states first.
87 InternalInvalidateExpiredResultStates(kDefaultResultStateTtlInMs,
88 current_time_ms);
89 // Remove states to make room for this new state.
90 RemoveStatesIfNeeded(num_hits_to_add);
91 // Generate a new unique token and add it into result_state_map_.
92 next_page_token = Add(std::move(result_state), current_time_ms);
93 }
94
95 return std::make_pair(next_page_token, std::move(page_result));
96 }
97
Add(std::shared_ptr<ResultStateV2> result_state,int64_t current_time_ms)98 uint64_t ResultStateManager::Add(std::shared_ptr<ResultStateV2> result_state,
99 int64_t current_time_ms) {
100 uint64_t new_token = GetUniqueToken();
101
102 result_state_map_.emplace(new_token, std::move(result_state));
103 // Tracks the insertion order
104 token_queue_.push(std::make_pair(new_token, current_time_ms));
105
106 return new_token;
107 }
108
109 libtextclassifier3::StatusOr<std::pair<uint64_t, PageResult>>
GetNextPage(uint64_t next_page_token,const ResultRetrieverV2 & result_retriever,int64_t current_time_ms)110 ResultStateManager::GetNextPage(uint64_t next_page_token,
111 const ResultRetrieverV2& result_retriever,
112 int64_t current_time_ms) {
113 std::shared_ptr<ResultStateV2> result_state = nullptr;
114 {
115 // ResultStateManager critical section
116 absl_ports::unique_lock l(&mutex_);
117
118 // Remove expired result states before fetching
119 InternalInvalidateExpiredResultStates(kDefaultResultStateTtlInMs,
120 current_time_ms);
121
122 const auto& state_iterator = result_state_map_.find(next_page_token);
123 if (state_iterator == result_state_map_.end()) {
124 return absl_ports::NotFoundError("next_page_token not found");
125 }
126 result_state = state_iterator->second;
127 }
128
129 // Retrieve docs outside of ResultStateManager critical section.
130 // Will enter ResultState critical section inside ResultRetriever.
131 auto [page_result, has_more_results] =
132 result_retriever.RetrieveNextPage(*result_state, current_time_ms);
133
134 if (!has_more_results) {
135 {
136 // ResultStateManager critical section
137 absl_ports::unique_lock l(&mutex_);
138
139 InternalInvalidateResultState(next_page_token);
140 }
141
142 next_page_token = kInvalidNextPageToken;
143 }
144 return std::make_pair(next_page_token, std::move(page_result));
145 }
146
InvalidateResultState(uint64_t next_page_token)147 void ResultStateManager::InvalidateResultState(uint64_t next_page_token) {
148 if (next_page_token == kInvalidNextPageToken) {
149 return;
150 }
151
152 absl_ports::unique_lock l(&mutex_);
153
154 InternalInvalidateResultState(next_page_token);
155 }
156
InvalidateAllResultStates()157 void ResultStateManager::InvalidateAllResultStates() {
158 absl_ports::unique_lock l(&mutex_);
159 InternalInvalidateAllResultStates();
160 }
161
InternalInvalidateAllResultStates()162 void ResultStateManager::InternalInvalidateAllResultStates() {
163 // We don't have to reset num_total_hits_ (to 0) here, since clearing
164 // result_state_map_ will "eventually" invoke the destructor of ResultState
165 // (which decrements num_total_hits_) and num_total_hits_ will become 0.
166 result_state_map_.clear();
167 invalidated_token_set_.clear();
168 token_queue_ = std::queue<std::pair<uint64_t, int64_t>>();
169 }
170
GetUniqueToken()171 uint64_t ResultStateManager::GetUniqueToken() {
172 uint64_t new_token = random_generator_();
173 // There's a small chance of collision between the random numbers, here we're
174 // trying to avoid any collisions by checking the keys.
175 while (result_state_map_.find(new_token) != result_state_map_.end() ||
176 invalidated_token_set_.find(new_token) !=
177 invalidated_token_set_.end() ||
178 new_token == kInvalidNextPageToken) {
179 new_token = random_generator_();
180 }
181 return new_token;
182 }
183
RemoveStatesIfNeeded(int num_hits_to_add)184 void ResultStateManager::RemoveStatesIfNeeded(int num_hits_to_add) {
185 if (result_state_map_.empty() || token_queue_.empty()) {
186 return;
187 }
188
189 // 1. Check if this new result_state would take up the entire result state
190 // manager budget.
191 if (num_hits_to_add > max_total_hits_) {
192 // This single result state will exceed our budget. Drop everything else to
193 // accomodate it.
194 InternalInvalidateAllResultStates();
195 return;
196 }
197
198 // 2. Remove any tokens that were previously invalidated.
199 while (!token_queue_.empty() &&
200 invalidated_token_set_.find(token_queue_.front().first) !=
201 invalidated_token_set_.end()) {
202 invalidated_token_set_.erase(token_queue_.front().first);
203 token_queue_.pop();
204 }
205
206 // 3. If we're over budget, remove states from oldest to newest until we fit
207 // into our budget.
208 // Note: num_total_hits_ may not be decremented immediately after invalidating
209 // a result state, since other threads may still hold the shared pointer.
210 // Thus, we have to check if token_queue_ is empty or not, since it is
211 // possible that num_total_hits_ is non-zero and still greater than
212 // max_total_hits_ when token_queue_ is empty. Still "eventually" it will be
213 // decremented after the last thread releases the shared pointer.
214 while (!token_queue_.empty() && num_total_hits_ > max_total_hits_) {
215 InternalInvalidateResultState(token_queue_.front().first);
216 token_queue_.pop();
217 }
218 invalidated_token_set_.clear();
219 }
220
InternalInvalidateResultState(uint64_t token)221 void ResultStateManager::InternalInvalidateResultState(uint64_t token) {
222 // Removes the entry in result_state_map_ and insert the token into
223 // invalidated_token_set_. The entry in token_queue_ can't be easily removed
224 // right now (may need O(n) time), so we leave it there and later completely
225 // remove the token in RemoveStatesIfNeeded().
226 auto itr = result_state_map_.find(token);
227 if (itr != result_state_map_.end()) {
228 // We don't have to decrement num_total_hits_ here, since erasing the shared
229 // ptr instance will "eventually" invoke the destructor of ResultState and
230 // it will handle this.
231 result_state_map_.erase(itr);
232 invalidated_token_set_.insert(token);
233 }
234 }
235
InternalInvalidateExpiredResultStates(int64_t result_state_ttl,int64_t current_time_ms)236 void ResultStateManager::InternalInvalidateExpiredResultStates(
237 int64_t result_state_ttl, int64_t current_time_ms) {
238 while (!token_queue_.empty() &&
239 current_time_ms - token_queue_.front().second >= result_state_ttl) {
240 auto itr = result_state_map_.find(token_queue_.front().first);
241 if (itr != result_state_map_.end()) {
242 // We don't have to decrement num_total_hits_ here, since erasing the
243 // shared ptr instance will "eventually" invoke the destructor of
244 // ResultState and it will handle this.
245 result_state_map_.erase(itr);
246 } else {
247 // Since result_state_map_ and invalidated_token_set_ are mutually
248 // exclusive, we remove the token from invalidated_token_set_ only if it
249 // isn't present in result_state_map_.
250 invalidated_token_set_.erase(token_queue_.front().first);
251 }
252 token_queue_.pop();
253 }
254 }
255
256 } // namespace lib
257 } // namespace icing
258