1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #ifndef TENSORFLOW_CORE_PLATFORM_CLOUD_RAM_FILE_BLOCK_CACHE_H_ 17 #define TENSORFLOW_CORE_PLATFORM_CLOUD_RAM_FILE_BLOCK_CACHE_H_ 18 19 #include <functional> 20 #include <list> 21 #include <map> 22 #include <memory> 23 #include <string> 24 #include <vector> 25 26 #include "tensorflow/core/platform/cloud/file_block_cache.h" 27 #include "tensorflow/core/platform/env.h" 28 #include "tensorflow/core/platform/mutex.h" 29 #include "tensorflow/core/platform/notification.h" 30 #include "tensorflow/core/platform/status.h" 31 #include "tensorflow/core/platform/stringpiece.h" 32 #include "tensorflow/core/platform/thread_annotations.h" 33 #include "tensorflow/core/platform/types.h" 34 35 namespace tensorflow { 36 37 /// \brief An LRU block cache of file contents, keyed by {filename, offset}. 38 /// 39 /// This class should be shared by read-only random access files on a remote 40 /// filesystem (e.g. GCS). 41 class RamFileBlockCache : public FileBlockCache { 42 public: 43 /// The callback executed when a block is not found in the cache, and needs to 44 /// be fetched from the backing filesystem. This callback is provided when the 45 /// cache is constructed. The returned Status should be OK as long as the 46 /// read from the remote filesystem succeeded (similar to the semantics of the 47 /// read(2) system call). 48 typedef std::function<Status(const string& filename, size_t offset, 49 size_t buffer_size, char* buffer, 50 size_t* bytes_transferred)> 51 BlockFetcher; 52 53 RamFileBlockCache(size_t block_size, size_t max_bytes, uint64 max_staleness, 54 BlockFetcher block_fetcher, Env* env = Env::Default()) block_size_(block_size)55 : block_size_(block_size), 56 max_bytes_(max_bytes), 57 max_staleness_(max_staleness), 58 block_fetcher_(block_fetcher), 59 env_(env) { 60 if (max_staleness_ > 0) { 61 pruning_thread_.reset(env_->StartThread(ThreadOptions(), "TF_prune_FBC", 62 [this] { Prune(); })); 63 } 64 VLOG(1) << "GCS file block cache is " 65 << (IsCacheEnabled() ? "enabled" : "disabled"); 66 } 67 ~RamFileBlockCache()68 ~RamFileBlockCache() override { 69 if (pruning_thread_) { 70 stop_pruning_thread_.Notify(); 71 // Destroying pruning_thread_ will block until Prune() receives the above 72 // notification and returns. 73 pruning_thread_.reset(); 74 } 75 } 76 77 /// Read `n` bytes from `filename` starting at `offset` into `out`. This 78 /// method will return: 79 /// 80 /// 1) The error from the remote filesystem, if the read from the remote 81 /// filesystem failed. 82 /// 2) PRECONDITION_FAILED if the read from the remote filesystem succeeded, 83 /// but the read returned a partial block, and the LRU cache contained a 84 /// block at a higher offset (indicating that the partial block should have 85 /// been a full block). 86 /// 3) OUT_OF_RANGE if the read from the remote filesystem succeeded, but 87 /// the file contents do not extend past `offset` and thus nothing was 88 /// placed in `out`. 89 /// 4) OK otherwise (i.e. the read succeeded, and at least one byte was placed 90 /// in `out`). 91 Status Read(const string& filename, size_t offset, size_t n, char* buffer, 92 size_t* bytes_transferred) override; 93 94 // Validate the given file signature with the existing file signature in the 95 // cache. Returns true if the signature doesn't change or the file doesn't 96 // exist before. If the signature changes, update the existing signature with 97 // the new one and remove the file from cache. 98 bool ValidateAndUpdateFileSignature(const string& filename, 99 int64_t file_signature) override 100 TF_LOCKS_EXCLUDED(mu_); 101 102 /// Remove all cached blocks for `filename`. 103 void RemoveFile(const string& filename) override TF_LOCKS_EXCLUDED(mu_); 104 105 /// Remove all cached data. 106 void Flush() override TF_LOCKS_EXCLUDED(mu_); 107 108 /// Accessors for cache parameters. block_size()109 size_t block_size() const override { return block_size_; } max_bytes()110 size_t max_bytes() const override { return max_bytes_; } max_staleness()111 uint64 max_staleness() const override { return max_staleness_; } 112 113 /// The current size (in bytes) of the cache. 114 size_t CacheSize() const override TF_LOCKS_EXCLUDED(mu_); 115 116 // Returns true if the cache is enabled. If false, the BlockFetcher callback 117 // is always executed during Read. IsCacheEnabled()118 bool IsCacheEnabled() const override { 119 return block_size_ > 0 && max_bytes_ > 0; 120 } 121 122 private: 123 /// The size of the blocks stored in the LRU cache, as well as the size of the 124 /// reads from the underlying filesystem. 125 const size_t block_size_; 126 /// The maximum number of bytes (sum of block sizes) allowed in the LRU cache. 127 const size_t max_bytes_; 128 /// The maximum staleness of any block in the LRU cache, in seconds. 129 const uint64 max_staleness_; 130 /// The callback to read a block from the underlying filesystem. 131 const BlockFetcher block_fetcher_; 132 /// The Env from which we read timestamps. 133 Env* const env_; // not owned 134 135 /// \brief The key type for the file block cache. 136 /// 137 /// The file block cache key is a {filename, offset} pair. 138 typedef std::pair<string, size_t> Key; 139 140 /// \brief The state of a block. 141 /// 142 /// A block begins in the CREATED stage. The first thread will attempt to read 143 /// the block from the filesystem, transitioning the state of the block to 144 /// FETCHING. After completing, if the read was successful the state should 145 /// be FINISHED. Otherwise the state should be ERROR. A subsequent read can 146 /// re-fetch the block if the state is ERROR. 147 enum class FetchState { 148 CREATED, 149 FETCHING, 150 FINISHED, 151 ERROR, 152 }; 153 154 /// \brief A block of a file. 155 /// 156 /// A file block consists of the block data, the block's current position in 157 /// the LRU cache, the timestamp (seconds since epoch) at which the block 158 /// was cached, a coordination lock, and state & condition variables. 159 /// 160 /// Thread safety: 161 /// The iterator and timestamp fields should only be accessed while holding 162 /// the block-cache-wide mu_ instance variable. The state variable should only 163 /// be accessed while holding the Block's mu lock. The data vector should only 164 /// be accessed after state == FINISHED, and it should never be modified. 165 /// 166 /// In order to prevent deadlocks, never grab the block-cache-wide mu_ lock 167 /// AFTER grabbing any block's mu lock. It is safe to grab mu without locking 168 /// mu_. 169 struct Block { 170 /// The block data. 171 std::vector<char> data; 172 /// A list iterator pointing to the block's position in the LRU list. 173 std::list<Key>::iterator lru_iterator; 174 /// A list iterator pointing to the block's position in the LRA list. 175 std::list<Key>::iterator lra_iterator; 176 /// The timestamp (seconds since epoch) at which the block was cached. 177 uint64 timestamp; 178 /// Mutex to guard state variable 179 mutex mu; 180 /// The state of the block. 181 FetchState state TF_GUARDED_BY(mu) = FetchState::CREATED; 182 /// Wait on cond_var if state is FETCHING. 183 condition_variable cond_var; 184 }; 185 186 /// \brief The block map type for the file block cache. 187 /// 188 /// The block map is an ordered map from Key to Block. 189 typedef std::map<Key, std::shared_ptr<Block>> BlockMap; 190 191 /// Prune the cache by removing files with expired blocks. 192 void Prune() TF_LOCKS_EXCLUDED(mu_); 193 194 bool BlockNotStale(const std::shared_ptr<Block>& block) 195 TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); 196 197 /// Look up a Key in the block cache. 198 std::shared_ptr<Block> Lookup(const Key& key) TF_LOCKS_EXCLUDED(mu_); 199 200 Status MaybeFetch(const Key& key, const std::shared_ptr<Block>& block) 201 TF_LOCKS_EXCLUDED(mu_); 202 203 /// Trim the block cache to make room for another entry. 204 void Trim() TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); 205 206 /// Update the LRU iterator for the block at `key`. 207 Status UpdateLRU(const Key& key, const std::shared_ptr<Block>& block) 208 TF_LOCKS_EXCLUDED(mu_); 209 210 /// Remove all blocks of a file, with mu_ already held. 211 void RemoveFile_Locked(const string& filename) 212 TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); 213 214 /// Remove the block `entry` from the block map and LRU list, and update the 215 /// cache size accordingly. 216 void RemoveBlock(BlockMap::iterator entry) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); 217 218 /// The cache pruning thread that removes files with expired blocks. 219 std::unique_ptr<Thread> pruning_thread_; 220 221 /// Notification for stopping the cache pruning thread. 222 Notification stop_pruning_thread_; 223 224 /// Guards access to the block map, LRU list, and cached byte count. 225 mutable mutex mu_; 226 227 /// The block map (map from Key to Block). 228 BlockMap block_map_ TF_GUARDED_BY(mu_); 229 230 /// The LRU list of block keys. The front of the list identifies the most 231 /// recently accessed block. 232 std::list<Key> lru_list_ TF_GUARDED_BY(mu_); 233 234 /// The LRA (least recently added) list of block keys. The front of the list 235 /// identifies the most recently added block. 236 /// 237 /// Note: blocks are added to lra_list_ only after they have successfully been 238 /// fetched from the underlying block store. 239 std::list<Key> lra_list_ TF_GUARDED_BY(mu_); 240 241 /// The combined number of bytes in all of the cached blocks. 242 size_t cache_size_ TF_GUARDED_BY(mu_) = 0; 243 244 // A filename->file_signature map. 245 std::map<string, int64_t> file_signature_map_ TF_GUARDED_BY(mu_); 246 }; 247 248 } // namespace tensorflow 249 250 #endif // TENSORFLOW_CORE_PLATFORM_CLOUD_RAM_FILE_BLOCK_CACHE_H_ 251