xref: /aosp_15_r20/external/tensorflow/tensorflow/core/platform/cloud/ram_file_block_cache.h (revision b6fb3261f9314811a0f4371741dbb8839866f948)
1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_PLATFORM_CLOUD_RAM_FILE_BLOCK_CACHE_H_
17 #define TENSORFLOW_CORE_PLATFORM_CLOUD_RAM_FILE_BLOCK_CACHE_H_
18 
19 #include <functional>
20 #include <list>
21 #include <map>
22 #include <memory>
23 #include <string>
24 #include <vector>
25 
26 #include "tensorflow/core/platform/cloud/file_block_cache.h"
27 #include "tensorflow/core/platform/env.h"
28 #include "tensorflow/core/platform/mutex.h"
29 #include "tensorflow/core/platform/notification.h"
30 #include "tensorflow/core/platform/status.h"
31 #include "tensorflow/core/platform/stringpiece.h"
32 #include "tensorflow/core/platform/thread_annotations.h"
33 #include "tensorflow/core/platform/types.h"
34 
35 namespace tensorflow {
36 
37 /// \brief An LRU block cache of file contents, keyed by {filename, offset}.
38 ///
39 /// This class should be shared by read-only random access files on a remote
40 /// filesystem (e.g. GCS).
41 class RamFileBlockCache : public FileBlockCache {
42  public:
43   /// The callback executed when a block is not found in the cache, and needs to
44   /// be fetched from the backing filesystem. This callback is provided when the
45   /// cache is constructed. The returned Status should be OK as long as the
46   /// read from the remote filesystem succeeded (similar to the semantics of the
47   /// read(2) system call).
48   typedef std::function<Status(const string& filename, size_t offset,
49                                size_t buffer_size, char* buffer,
50                                size_t* bytes_transferred)>
51       BlockFetcher;
52 
53   RamFileBlockCache(size_t block_size, size_t max_bytes, uint64 max_staleness,
54                     BlockFetcher block_fetcher, Env* env = Env::Default())
block_size_(block_size)55       : block_size_(block_size),
56         max_bytes_(max_bytes),
57         max_staleness_(max_staleness),
58         block_fetcher_(block_fetcher),
59         env_(env) {
60     if (max_staleness_ > 0) {
61       pruning_thread_.reset(env_->StartThread(ThreadOptions(), "TF_prune_FBC",
62                                               [this] { Prune(); }));
63     }
64     VLOG(1) << "GCS file block cache is "
65             << (IsCacheEnabled() ? "enabled" : "disabled");
66   }
67 
~RamFileBlockCache()68   ~RamFileBlockCache() override {
69     if (pruning_thread_) {
70       stop_pruning_thread_.Notify();
71       // Destroying pruning_thread_ will block until Prune() receives the above
72       // notification and returns.
73       pruning_thread_.reset();
74     }
75   }
76 
77   /// Read `n` bytes from `filename` starting at `offset` into `out`. This
78   /// method will return:
79   ///
80   /// 1) The error from the remote filesystem, if the read from the remote
81   ///    filesystem failed.
82   /// 2) PRECONDITION_FAILED if the read from the remote filesystem succeeded,
83   ///    but the read returned a partial block, and the LRU cache contained a
84   ///    block at a higher offset (indicating that the partial block should have
85   ///    been a full block).
86   /// 3) OUT_OF_RANGE if the read from the remote filesystem succeeded, but
87   ///    the file contents do not extend past `offset` and thus nothing was
88   ///    placed in `out`.
89   /// 4) OK otherwise (i.e. the read succeeded, and at least one byte was placed
90   ///    in `out`).
91   Status Read(const string& filename, size_t offset, size_t n, char* buffer,
92               size_t* bytes_transferred) override;
93 
94   // Validate the given file signature with the existing file signature in the
95   // cache. Returns true if the signature doesn't change or the file doesn't
96   // exist before. If the signature changes, update the existing signature with
97   // the new one and remove the file from cache.
98   bool ValidateAndUpdateFileSignature(const string& filename,
99                                       int64_t file_signature) override
100       TF_LOCKS_EXCLUDED(mu_);
101 
102   /// Remove all cached blocks for `filename`.
103   void RemoveFile(const string& filename) override TF_LOCKS_EXCLUDED(mu_);
104 
105   /// Remove all cached data.
106   void Flush() override TF_LOCKS_EXCLUDED(mu_);
107 
108   /// Accessors for cache parameters.
block_size()109   size_t block_size() const override { return block_size_; }
max_bytes()110   size_t max_bytes() const override { return max_bytes_; }
max_staleness()111   uint64 max_staleness() const override { return max_staleness_; }
112 
113   /// The current size (in bytes) of the cache.
114   size_t CacheSize() const override TF_LOCKS_EXCLUDED(mu_);
115 
116   // Returns true if the cache is enabled. If false, the BlockFetcher callback
117   // is always executed during Read.
IsCacheEnabled()118   bool IsCacheEnabled() const override {
119     return block_size_ > 0 && max_bytes_ > 0;
120   }
121 
122  private:
123   /// The size of the blocks stored in the LRU cache, as well as the size of the
124   /// reads from the underlying filesystem.
125   const size_t block_size_;
126   /// The maximum number of bytes (sum of block sizes) allowed in the LRU cache.
127   const size_t max_bytes_;
128   /// The maximum staleness of any block in the LRU cache, in seconds.
129   const uint64 max_staleness_;
130   /// The callback to read a block from the underlying filesystem.
131   const BlockFetcher block_fetcher_;
132   /// The Env from which we read timestamps.
133   Env* const env_;  // not owned
134 
135   /// \brief The key type for the file block cache.
136   ///
137   /// The file block cache key is a {filename, offset} pair.
138   typedef std::pair<string, size_t> Key;
139 
140   /// \brief The state of a block.
141   ///
142   /// A block begins in the CREATED stage. The first thread will attempt to read
143   /// the block from the filesystem, transitioning the state of the block to
144   /// FETCHING. After completing, if the read was successful the state should
145   /// be FINISHED. Otherwise the state should be ERROR. A subsequent read can
146   /// re-fetch the block if the state is ERROR.
147   enum class FetchState {
148     CREATED,
149     FETCHING,
150     FINISHED,
151     ERROR,
152   };
153 
154   /// \brief A block of a file.
155   ///
156   /// A file block consists of the block data, the block's current position in
157   /// the LRU cache, the timestamp (seconds since epoch) at which the block
158   /// was cached, a coordination lock, and state & condition variables.
159   ///
160   /// Thread safety:
161   /// The iterator and timestamp fields should only be accessed while holding
162   /// the block-cache-wide mu_ instance variable. The state variable should only
163   /// be accessed while holding the Block's mu lock. The data vector should only
164   /// be accessed after state == FINISHED, and it should never be modified.
165   ///
166   /// In order to prevent deadlocks, never grab the block-cache-wide mu_ lock
167   /// AFTER grabbing any block's mu lock. It is safe to grab mu without locking
168   /// mu_.
169   struct Block {
170     /// The block data.
171     std::vector<char> data;
172     /// A list iterator pointing to the block's position in the LRU list.
173     std::list<Key>::iterator lru_iterator;
174     /// A list iterator pointing to the block's position in the LRA list.
175     std::list<Key>::iterator lra_iterator;
176     /// The timestamp (seconds since epoch) at which the block was cached.
177     uint64 timestamp;
178     /// Mutex to guard state variable
179     mutex mu;
180     /// The state of the block.
181     FetchState state TF_GUARDED_BY(mu) = FetchState::CREATED;
182     /// Wait on cond_var if state is FETCHING.
183     condition_variable cond_var;
184   };
185 
186   /// \brief The block map type for the file block cache.
187   ///
188   /// The block map is an ordered map from Key to Block.
189   typedef std::map<Key, std::shared_ptr<Block>> BlockMap;
190 
191   /// Prune the cache by removing files with expired blocks.
192   void Prune() TF_LOCKS_EXCLUDED(mu_);
193 
194   bool BlockNotStale(const std::shared_ptr<Block>& block)
195       TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
196 
197   /// Look up a Key in the block cache.
198   std::shared_ptr<Block> Lookup(const Key& key) TF_LOCKS_EXCLUDED(mu_);
199 
200   Status MaybeFetch(const Key& key, const std::shared_ptr<Block>& block)
201       TF_LOCKS_EXCLUDED(mu_);
202 
203   /// Trim the block cache to make room for another entry.
204   void Trim() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
205 
206   /// Update the LRU iterator for the block at `key`.
207   Status UpdateLRU(const Key& key, const std::shared_ptr<Block>& block)
208       TF_LOCKS_EXCLUDED(mu_);
209 
210   /// Remove all blocks of a file, with mu_ already held.
211   void RemoveFile_Locked(const string& filename)
212       TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
213 
214   /// Remove the block `entry` from the block map and LRU list, and update the
215   /// cache size accordingly.
216   void RemoveBlock(BlockMap::iterator entry) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
217 
218   /// The cache pruning thread that removes files with expired blocks.
219   std::unique_ptr<Thread> pruning_thread_;
220 
221   /// Notification for stopping the cache pruning thread.
222   Notification stop_pruning_thread_;
223 
224   /// Guards access to the block map, LRU list, and cached byte count.
225   mutable mutex mu_;
226 
227   /// The block map (map from Key to Block).
228   BlockMap block_map_ TF_GUARDED_BY(mu_);
229 
230   /// The LRU list of block keys. The front of the list identifies the most
231   /// recently accessed block.
232   std::list<Key> lru_list_ TF_GUARDED_BY(mu_);
233 
234   /// The LRA (least recently added) list of block keys. The front of the list
235   /// identifies the most recently added block.
236   ///
237   /// Note: blocks are added to lra_list_ only after they have successfully been
238   /// fetched from the underlying block store.
239   std::list<Key> lra_list_ TF_GUARDED_BY(mu_);
240 
241   /// The combined number of bytes in all of the cached blocks.
242   size_t cache_size_ TF_GUARDED_BY(mu_) = 0;
243 
244   // A filename->file_signature map.
245   std::map<string, int64_t> file_signature_map_ TF_GUARDED_BY(mu_);
246 };
247 
248 }  // namespace tensorflow
249 
250 #endif  // TENSORFLOW_CORE_PLATFORM_CLOUD_RAM_FILE_BLOCK_CACHE_H_
251