1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5 #include <dirent.h>
6 #include <fcntl.h>
7 #include <pthread.h>
8 #include <sys/mman.h>
9 #include <sys/resource.h>
10 #include <sys/stat.h>
11 #include <sys/time.h>
12 #include <sys/types.h>
13 #include <unistd.h>
14
15 #include <atomic>
16 #include <cerrno>
17 #include <cstddef>
18 #include <cstdint>
19 #include <cstdio>
20 #include <cstdlib>
21 #include <cstring>
22 #include <limits>
23 #include <queue>
24 #include <set>
25 #include <string>
26 #include <thread>
27 #include <type_traits>
28 #include <utility>
29
30 #include "leveldb/env.h"
31 #include "leveldb/slice.h"
32 #include "leveldb/status.h"
33 #include "port/port.h"
34 #include "port/thread_annotations.h"
35 #include "util/env_posix_test_helper.h"
36 #include "util/posix_logger.h"
37
38 namespace leveldb {
39
40 namespace {
41
42 // Set by EnvPosixTestHelper::SetReadOnlyMMapLimit() and MaxOpenFiles().
43 int g_open_read_only_file_limit = -1;
44
45 // Up to 1000 mmap regions for 64-bit binaries; none for 32-bit.
46 constexpr const int kDefaultMmapLimit = (sizeof(void*) >= 8) ? 1000 : 0;
47
48 // Can be set using EnvPosixTestHelper::SetReadOnlyMMapLimit().
49 int g_mmap_limit = kDefaultMmapLimit;
50
51 // Common flags defined for all posix open operations
52 #if defined(HAVE_O_CLOEXEC)
53 constexpr const int kOpenBaseFlags = O_CLOEXEC;
54 #else
55 constexpr const int kOpenBaseFlags = 0;
56 #endif // defined(HAVE_O_CLOEXEC)
57
58 constexpr const size_t kWritableFileBufferSize = 65536;
59
PosixError(const std::string & context,int error_number)60 Status PosixError(const std::string& context, int error_number) {
61 if (error_number == ENOENT) {
62 return Status::NotFound(context, std::strerror(error_number));
63 } else {
64 return Status::IOError(context, std::strerror(error_number));
65 }
66 }
67
68 // Helper class to limit resource usage to avoid exhaustion.
69 // Currently used to limit read-only file descriptors and mmap file usage
70 // so that we do not run out of file descriptors or virtual memory, or run into
71 // kernel performance problems for very large databases.
72 class Limiter {
73 public:
74 // Limit maximum number of resources to |max_acquires|.
Limiter(int max_acquires)75 Limiter(int max_acquires) : acquires_allowed_(max_acquires) {}
76
77 Limiter(const Limiter&) = delete;
78 Limiter operator=(const Limiter&) = delete;
79
80 // If another resource is available, acquire it and return true.
81 // Else return false.
Acquire()82 bool Acquire() {
83 int old_acquires_allowed =
84 acquires_allowed_.fetch_sub(1, std::memory_order_relaxed);
85
86 if (old_acquires_allowed > 0) return true;
87
88 acquires_allowed_.fetch_add(1, std::memory_order_relaxed);
89 return false;
90 }
91
92 // Release a resource acquired by a previous call to Acquire() that returned
93 // true.
Release()94 void Release() { acquires_allowed_.fetch_add(1, std::memory_order_relaxed); }
95
96 private:
97 // The number of available resources.
98 //
99 // This is a counter and is not tied to the invariants of any other class, so
100 // it can be operated on safely using std::memory_order_relaxed.
101 std::atomic<int> acquires_allowed_;
102 };
103
104 // Implements sequential read access in a file using read().
105 //
106 // Instances of this class are thread-friendly but not thread-safe, as required
107 // by the SequentialFile API.
108 class PosixSequentialFile final : public SequentialFile {
109 public:
PosixSequentialFile(std::string filename,int fd)110 PosixSequentialFile(std::string filename, int fd)
111 : fd_(fd), filename_(filename) {}
~PosixSequentialFile()112 ~PosixSequentialFile() override { close(fd_); }
113
Read(size_t n,Slice * result,char * scratch)114 Status Read(size_t n, Slice* result, char* scratch) override {
115 Status status;
116 while (true) {
117 ::ssize_t read_size = ::read(fd_, scratch, n);
118 if (read_size < 0) { // Read error.
119 if (errno == EINTR) {
120 continue; // Retry
121 }
122 status = PosixError(filename_, errno);
123 break;
124 }
125 *result = Slice(scratch, read_size);
126 break;
127 }
128 return status;
129 }
130
Skip(uint64_t n)131 Status Skip(uint64_t n) override {
132 if (::lseek(fd_, n, SEEK_CUR) == static_cast<off_t>(-1)) {
133 return PosixError(filename_, errno);
134 }
135 return Status::OK();
136 }
137
138 private:
139 const int fd_;
140 const std::string filename_;
141 };
142
143 // Implements random read access in a file using pread().
144 //
145 // Instances of this class are thread-safe, as required by the RandomAccessFile
146 // API. Instances are immutable and Read() only calls thread-safe library
147 // functions.
148 class PosixRandomAccessFile final : public RandomAccessFile {
149 public:
150 // The new instance takes ownership of |fd|. |fd_limiter| must outlive this
151 // instance, and will be used to determine if .
PosixRandomAccessFile(std::string filename,int fd,Limiter * fd_limiter)152 PosixRandomAccessFile(std::string filename, int fd, Limiter* fd_limiter)
153 : has_permanent_fd_(fd_limiter->Acquire()),
154 fd_(has_permanent_fd_ ? fd : -1),
155 fd_limiter_(fd_limiter),
156 filename_(std::move(filename)) {
157 if (!has_permanent_fd_) {
158 assert(fd_ == -1);
159 ::close(fd); // The file will be opened on every read.
160 }
161 }
162
~PosixRandomAccessFile()163 ~PosixRandomAccessFile() override {
164 if (has_permanent_fd_) {
165 assert(fd_ != -1);
166 ::close(fd_);
167 fd_limiter_->Release();
168 }
169 }
170
Read(uint64_t offset,size_t n,Slice * result,char * scratch) const171 Status Read(uint64_t offset, size_t n, Slice* result,
172 char* scratch) const override {
173 int fd = fd_;
174 if (!has_permanent_fd_) {
175 fd = ::open(filename_.c_str(), O_RDONLY | kOpenBaseFlags);
176 if (fd < 0) {
177 return PosixError(filename_, errno);
178 }
179 }
180
181 assert(fd != -1);
182
183 Status status;
184 ssize_t read_size = ::pread(fd, scratch, n, static_cast<off_t>(offset));
185 *result = Slice(scratch, (read_size < 0) ? 0 : read_size);
186 if (read_size < 0) {
187 // An error: return a non-ok status.
188 status = PosixError(filename_, errno);
189 }
190 if (!has_permanent_fd_) {
191 // Close the temporary file descriptor opened earlier.
192 assert(fd != fd_);
193 ::close(fd);
194 }
195 return status;
196 }
197
198 private:
199 const bool has_permanent_fd_; // If false, the file is opened on every read.
200 const int fd_; // -1 if has_permanent_fd_ is false.
201 Limiter* const fd_limiter_;
202 const std::string filename_;
203 };
204
205 // Implements random read access in a file using mmap().
206 //
207 // Instances of this class are thread-safe, as required by the RandomAccessFile
208 // API. Instances are immutable and Read() only calls thread-safe library
209 // functions.
210 class PosixMmapReadableFile final : public RandomAccessFile {
211 public:
212 // mmap_base[0, length-1] points to the memory-mapped contents of the file. It
213 // must be the result of a successful call to mmap(). This instances takes
214 // over the ownership of the region.
215 //
216 // |mmap_limiter| must outlive this instance. The caller must have already
217 // aquired the right to use one mmap region, which will be released when this
218 // instance is destroyed.
PosixMmapReadableFile(std::string filename,char * mmap_base,size_t length,Limiter * mmap_limiter)219 PosixMmapReadableFile(std::string filename, char* mmap_base, size_t length,
220 Limiter* mmap_limiter)
221 : mmap_base_(mmap_base),
222 length_(length),
223 mmap_limiter_(mmap_limiter),
224 filename_(std::move(filename)) {}
225
~PosixMmapReadableFile()226 ~PosixMmapReadableFile() override {
227 ::munmap(static_cast<void*>(mmap_base_), length_);
228 mmap_limiter_->Release();
229 }
230
Read(uint64_t offset,size_t n,Slice * result,char * scratch) const231 Status Read(uint64_t offset, size_t n, Slice* result,
232 char* scratch) const override {
233 if (offset + n > length_) {
234 *result = Slice();
235 return PosixError(filename_, EINVAL);
236 }
237
238 *result = Slice(mmap_base_ + offset, n);
239 return Status::OK();
240 }
241
242 private:
243 char* const mmap_base_;
244 const size_t length_;
245 Limiter* const mmap_limiter_;
246 const std::string filename_;
247 };
248
249 class PosixWritableFile final : public WritableFile {
250 public:
PosixWritableFile(std::string filename,int fd)251 PosixWritableFile(std::string filename, int fd)
252 : pos_(0),
253 fd_(fd),
254 is_manifest_(IsManifest(filename)),
255 filename_(std::move(filename)),
256 dirname_(Dirname(filename_)) {}
257
~PosixWritableFile()258 ~PosixWritableFile() override {
259 if (fd_ >= 0) {
260 // Ignoring any potential errors
261 Close();
262 }
263 }
264
Append(const Slice & data)265 Status Append(const Slice& data) override {
266 size_t write_size = data.size();
267 const char* write_data = data.data();
268
269 // Fit as much as possible into buffer.
270 size_t copy_size = std::min(write_size, kWritableFileBufferSize - pos_);
271 std::memcpy(buf_ + pos_, write_data, copy_size);
272 write_data += copy_size;
273 write_size -= copy_size;
274 pos_ += copy_size;
275 if (write_size == 0) {
276 return Status::OK();
277 }
278
279 // Can't fit in buffer, so need to do at least one write.
280 Status status = FlushBuffer();
281 if (!status.ok()) {
282 return status;
283 }
284
285 // Small writes go to buffer, large writes are written directly.
286 if (write_size < kWritableFileBufferSize) {
287 std::memcpy(buf_, write_data, write_size);
288 pos_ = write_size;
289 return Status::OK();
290 }
291 return WriteUnbuffered(write_data, write_size);
292 }
293
Close()294 Status Close() override {
295 Status status = FlushBuffer();
296 const int close_result = ::close(fd_);
297 if (close_result < 0 && status.ok()) {
298 status = PosixError(filename_, errno);
299 }
300 fd_ = -1;
301 return status;
302 }
303
Flush()304 Status Flush() override { return FlushBuffer(); }
305
Sync()306 Status Sync() override {
307 // Ensure new files referred to by the manifest are in the filesystem.
308 //
309 // This needs to happen before the manifest file is flushed to disk, to
310 // avoid crashing in a state where the manifest refers to files that are not
311 // yet on disk.
312 Status status = SyncDirIfManifest();
313 if (!status.ok()) {
314 return status;
315 }
316
317 status = FlushBuffer();
318 if (!status.ok()) {
319 return status;
320 }
321
322 return SyncFd(fd_, filename_);
323 }
324
325 private:
FlushBuffer()326 Status FlushBuffer() {
327 Status status = WriteUnbuffered(buf_, pos_);
328 pos_ = 0;
329 return status;
330 }
331
WriteUnbuffered(const char * data,size_t size)332 Status WriteUnbuffered(const char* data, size_t size) {
333 while (size > 0) {
334 ssize_t write_result = ::write(fd_, data, size);
335 if (write_result < 0) {
336 if (errno == EINTR) {
337 continue; // Retry
338 }
339 return PosixError(filename_, errno);
340 }
341 data += write_result;
342 size -= write_result;
343 }
344 return Status::OK();
345 }
346
SyncDirIfManifest()347 Status SyncDirIfManifest() {
348 Status status;
349 if (!is_manifest_) {
350 return status;
351 }
352
353 int fd = ::open(dirname_.c_str(), O_RDONLY | kOpenBaseFlags);
354 if (fd < 0) {
355 status = PosixError(dirname_, errno);
356 } else {
357 status = SyncFd(fd, dirname_);
358 ::close(fd);
359 }
360 return status;
361 }
362
363 // Ensures that all the caches associated with the given file descriptor's
364 // data are flushed all the way to durable media, and can withstand power
365 // failures.
366 //
367 // The path argument is only used to populate the description string in the
368 // returned Status if an error occurs.
SyncFd(int fd,const std::string & fd_path)369 static Status SyncFd(int fd, const std::string& fd_path) {
370 #if HAVE_FULLFSYNC
371 // On macOS and iOS, fsync() doesn't guarantee durability past power
372 // failures. fcntl(F_FULLFSYNC) is required for that purpose. Some
373 // filesystems don't support fcntl(F_FULLFSYNC), and require a fallback to
374 // fsync().
375 if (::fcntl(fd, F_FULLFSYNC) == 0) {
376 return Status::OK();
377 }
378 #endif // HAVE_FULLFSYNC
379
380 #if HAVE_FDATASYNC
381 bool sync_success = ::fdatasync(fd) == 0;
382 #else
383 bool sync_success = ::fsync(fd) == 0;
384 #endif // HAVE_FDATASYNC
385
386 if (sync_success) {
387 return Status::OK();
388 }
389 return PosixError(fd_path, errno);
390 }
391
392 // Returns the directory name in a path pointing to a file.
393 //
394 // Returns "." if the path does not contain any directory separator.
Dirname(const std::string & filename)395 static std::string Dirname(const std::string& filename) {
396 std::string::size_type separator_pos = filename.rfind('/');
397 if (separator_pos == std::string::npos) {
398 return std::string(".");
399 }
400 // The filename component should not contain a path separator. If it does,
401 // the splitting was done incorrectly.
402 assert(filename.find('/', separator_pos + 1) == std::string::npos);
403
404 return filename.substr(0, separator_pos);
405 }
406
407 // Extracts the file name from a path pointing to a file.
408 //
409 // The returned Slice points to |filename|'s data buffer, so it is only valid
410 // while |filename| is alive and unchanged.
Basename(const std::string & filename)411 static Slice Basename(const std::string& filename) {
412 std::string::size_type separator_pos = filename.rfind('/');
413 if (separator_pos == std::string::npos) {
414 return Slice(filename);
415 }
416 // The filename component should not contain a path separator. If it does,
417 // the splitting was done incorrectly.
418 assert(filename.find('/', separator_pos + 1) == std::string::npos);
419
420 return Slice(filename.data() + separator_pos + 1,
421 filename.length() - separator_pos - 1);
422 }
423
424 // True if the given file is a manifest file.
IsManifest(const std::string & filename)425 static bool IsManifest(const std::string& filename) {
426 return Basename(filename).starts_with("MANIFEST");
427 }
428
429 // buf_[0, pos_ - 1] contains data to be written to fd_.
430 char buf_[kWritableFileBufferSize];
431 size_t pos_;
432 int fd_;
433
434 const bool is_manifest_; // True if the file's name starts with MANIFEST.
435 const std::string filename_;
436 const std::string dirname_; // The directory of filename_.
437 };
438
LockOrUnlock(int fd,bool lock)439 int LockOrUnlock(int fd, bool lock) {
440 errno = 0;
441 struct ::flock file_lock_info;
442 std::memset(&file_lock_info, 0, sizeof(file_lock_info));
443 file_lock_info.l_type = (lock ? F_WRLCK : F_UNLCK);
444 file_lock_info.l_whence = SEEK_SET;
445 file_lock_info.l_start = 0;
446 file_lock_info.l_len = 0; // Lock/unlock entire file.
447 return ::fcntl(fd, F_SETLK, &file_lock_info);
448 }
449
450 // Instances are thread-safe because they are immutable.
451 class PosixFileLock : public FileLock {
452 public:
PosixFileLock(int fd,std::string filename)453 PosixFileLock(int fd, std::string filename)
454 : fd_(fd), filename_(std::move(filename)) {}
455
fd() const456 int fd() const { return fd_; }
filename() const457 const std::string& filename() const { return filename_; }
458
459 private:
460 const int fd_;
461 const std::string filename_;
462 };
463
464 // Tracks the files locked by PosixEnv::LockFile().
465 //
466 // We maintain a separate set instead of relying on fcntl(F_SETLK) because
467 // fcntl(F_SETLK) does not provide any protection against multiple uses from the
468 // same process.
469 //
470 // Instances are thread-safe because all member data is guarded by a mutex.
471 class PosixLockTable {
472 public:
Insert(const std::string & fname)473 bool Insert(const std::string& fname) LOCKS_EXCLUDED(mu_) {
474 mu_.Lock();
475 bool succeeded = locked_files_.insert(fname).second;
476 mu_.Unlock();
477 return succeeded;
478 }
Remove(const std::string & fname)479 void Remove(const std::string& fname) LOCKS_EXCLUDED(mu_) {
480 mu_.Lock();
481 locked_files_.erase(fname);
482 mu_.Unlock();
483 }
484
485 private:
486 port::Mutex mu_;
487 std::set<std::string> locked_files_ GUARDED_BY(mu_);
488 };
489
490 class PosixEnv : public Env {
491 public:
492 PosixEnv();
~PosixEnv()493 ~PosixEnv() override {
494 static const char msg[] =
495 "PosixEnv singleton destroyed. Unsupported behavior!\n";
496 std::fwrite(msg, 1, sizeof(msg), stderr);
497 std::abort();
498 }
499
NewSequentialFile(const std::string & filename,SequentialFile ** result)500 Status NewSequentialFile(const std::string& filename,
501 SequentialFile** result) override {
502 int fd = ::open(filename.c_str(), O_RDONLY | kOpenBaseFlags);
503 if (fd < 0) {
504 *result = nullptr;
505 return PosixError(filename, errno);
506 }
507
508 *result = new PosixSequentialFile(filename, fd);
509 return Status::OK();
510 }
511
NewRandomAccessFile(const std::string & filename,RandomAccessFile ** result)512 Status NewRandomAccessFile(const std::string& filename,
513 RandomAccessFile** result) override {
514 *result = nullptr;
515 int fd = ::open(filename.c_str(), O_RDONLY | kOpenBaseFlags);
516 if (fd < 0) {
517 return PosixError(filename, errno);
518 }
519
520 if (!mmap_limiter_.Acquire()) {
521 *result = new PosixRandomAccessFile(filename, fd, &fd_limiter_);
522 return Status::OK();
523 }
524
525 uint64_t file_size;
526 Status status = GetFileSize(filename, &file_size);
527 if (status.ok()) {
528 void* mmap_base =
529 ::mmap(/*addr=*/nullptr, file_size, PROT_READ, MAP_SHARED, fd, 0);
530 if (mmap_base != MAP_FAILED) {
531 *result = new PosixMmapReadableFile(filename,
532 reinterpret_cast<char*>(mmap_base),
533 file_size, &mmap_limiter_);
534 } else {
535 status = PosixError(filename, errno);
536 }
537 }
538 ::close(fd);
539 if (!status.ok()) {
540 mmap_limiter_.Release();
541 }
542 return status;
543 }
544
NewWritableFile(const std::string & filename,WritableFile ** result)545 Status NewWritableFile(const std::string& filename,
546 WritableFile** result) override {
547 int fd = ::open(filename.c_str(),
548 O_TRUNC | O_WRONLY | O_CREAT | kOpenBaseFlags, 0644);
549 if (fd < 0) {
550 *result = nullptr;
551 return PosixError(filename, errno);
552 }
553
554 *result = new PosixWritableFile(filename, fd);
555 return Status::OK();
556 }
557
NewAppendableFile(const std::string & filename,WritableFile ** result)558 Status NewAppendableFile(const std::string& filename,
559 WritableFile** result) override {
560 int fd = ::open(filename.c_str(),
561 O_APPEND | O_WRONLY | O_CREAT | kOpenBaseFlags, 0644);
562 if (fd < 0) {
563 *result = nullptr;
564 return PosixError(filename, errno);
565 }
566
567 *result = new PosixWritableFile(filename, fd);
568 return Status::OK();
569 }
570
FileExists(const std::string & filename)571 bool FileExists(const std::string& filename) override {
572 return ::access(filename.c_str(), F_OK) == 0;
573 }
574
GetChildren(const std::string & directory_path,std::vector<std::string> * result)575 Status GetChildren(const std::string& directory_path,
576 std::vector<std::string>* result) override {
577 result->clear();
578 ::DIR* dir = ::opendir(directory_path.c_str());
579 if (dir == nullptr) {
580 return PosixError(directory_path, errno);
581 }
582 struct ::dirent* entry;
583 while ((entry = ::readdir(dir)) != nullptr) {
584 result->emplace_back(entry->d_name);
585 }
586 ::closedir(dir);
587 return Status::OK();
588 }
589
RemoveFile(const std::string & filename)590 Status RemoveFile(const std::string& filename) override {
591 if (::unlink(filename.c_str()) != 0) {
592 return PosixError(filename, errno);
593 }
594 return Status::OK();
595 }
596
CreateDir(const std::string & dirname)597 Status CreateDir(const std::string& dirname) override {
598 if (::mkdir(dirname.c_str(), 0755) != 0) {
599 return PosixError(dirname, errno);
600 }
601 return Status::OK();
602 }
603
RemoveDir(const std::string & dirname)604 Status RemoveDir(const std::string& dirname) override {
605 if (::rmdir(dirname.c_str()) != 0) {
606 return PosixError(dirname, errno);
607 }
608 return Status::OK();
609 }
610
GetFileSize(const std::string & filename,uint64_t * size)611 Status GetFileSize(const std::string& filename, uint64_t* size) override {
612 struct ::stat file_stat;
613 if (::stat(filename.c_str(), &file_stat) != 0) {
614 *size = 0;
615 return PosixError(filename, errno);
616 }
617 *size = file_stat.st_size;
618 return Status::OK();
619 }
620
RenameFile(const std::string & from,const std::string & to)621 Status RenameFile(const std::string& from, const std::string& to) override {
622 if (std::rename(from.c_str(), to.c_str()) != 0) {
623 return PosixError(from, errno);
624 }
625 return Status::OK();
626 }
627
LockFile(const std::string & filename,FileLock ** lock)628 Status LockFile(const std::string& filename, FileLock** lock) override {
629 *lock = nullptr;
630
631 int fd = ::open(filename.c_str(), O_RDWR | O_CREAT | kOpenBaseFlags, 0644);
632 if (fd < 0) {
633 return PosixError(filename, errno);
634 }
635
636 if (!locks_.Insert(filename)) {
637 ::close(fd);
638 return Status::IOError("lock " + filename, "already held by process");
639 }
640
641 if (LockOrUnlock(fd, true) == -1) {
642 int lock_errno = errno;
643 ::close(fd);
644 locks_.Remove(filename);
645 return PosixError("lock " + filename, lock_errno);
646 }
647
648 *lock = new PosixFileLock(fd, filename);
649 return Status::OK();
650 }
651
UnlockFile(FileLock * lock)652 Status UnlockFile(FileLock* lock) override {
653 PosixFileLock* posix_file_lock = static_cast<PosixFileLock*>(lock);
654 if (LockOrUnlock(posix_file_lock->fd(), false) == -1) {
655 return PosixError("unlock " + posix_file_lock->filename(), errno);
656 }
657 locks_.Remove(posix_file_lock->filename());
658 ::close(posix_file_lock->fd());
659 delete posix_file_lock;
660 return Status::OK();
661 }
662
663 void Schedule(void (*background_work_function)(void* background_work_arg),
664 void* background_work_arg) override;
665
StartThread(void (* thread_main)(void * thread_main_arg),void * thread_main_arg)666 void StartThread(void (*thread_main)(void* thread_main_arg),
667 void* thread_main_arg) override {
668 std::thread new_thread(thread_main, thread_main_arg);
669 new_thread.detach();
670 }
671
GetTestDirectory(std::string * result)672 Status GetTestDirectory(std::string* result) override {
673 const char* env = std::getenv("TEST_TMPDIR");
674 if (env && env[0] != '\0') {
675 *result = env;
676 } else {
677 char buf[100];
678 std::snprintf(buf, sizeof(buf), "/tmp/leveldbtest-%d",
679 static_cast<int>(::geteuid()));
680 *result = buf;
681 }
682
683 // The CreateDir status is ignored because the directory may already exist.
684 CreateDir(*result);
685
686 return Status::OK();
687 }
688
NewLogger(const std::string & filename,Logger ** result)689 Status NewLogger(const std::string& filename, Logger** result) override {
690 int fd = ::open(filename.c_str(),
691 O_APPEND | O_WRONLY | O_CREAT | kOpenBaseFlags, 0644);
692 if (fd < 0) {
693 *result = nullptr;
694 return PosixError(filename, errno);
695 }
696
697 std::FILE* fp = ::fdopen(fd, "w");
698 if (fp == nullptr) {
699 ::close(fd);
700 *result = nullptr;
701 return PosixError(filename, errno);
702 } else {
703 *result = new PosixLogger(fp);
704 return Status::OK();
705 }
706 }
707
NowMicros()708 uint64_t NowMicros() override {
709 static constexpr uint64_t kUsecondsPerSecond = 1000000;
710 struct ::timeval tv;
711 ::gettimeofday(&tv, nullptr);
712 return static_cast<uint64_t>(tv.tv_sec) * kUsecondsPerSecond + tv.tv_usec;
713 }
714
SleepForMicroseconds(int micros)715 void SleepForMicroseconds(int micros) override {
716 std::this_thread::sleep_for(std::chrono::microseconds(micros));
717 }
718
719 private:
720 void BackgroundThreadMain();
721
BackgroundThreadEntryPoint(PosixEnv * env)722 static void BackgroundThreadEntryPoint(PosixEnv* env) {
723 env->BackgroundThreadMain();
724 }
725
726 // Stores the work item data in a Schedule() call.
727 //
728 // Instances are constructed on the thread calling Schedule() and used on the
729 // background thread.
730 //
731 // This structure is thread-safe beacuse it is immutable.
732 struct BackgroundWorkItem {
BackgroundWorkItemleveldb::__anon233a18300111::PosixEnv::BackgroundWorkItem733 explicit BackgroundWorkItem(void (*function)(void* arg), void* arg)
734 : function(function), arg(arg) {}
735
736 void (*const function)(void*);
737 void* const arg;
738 };
739
740 port::Mutex background_work_mutex_;
741 port::CondVar background_work_cv_ GUARDED_BY(background_work_mutex_);
742 bool started_background_thread_ GUARDED_BY(background_work_mutex_);
743
744 std::queue<BackgroundWorkItem> background_work_queue_
745 GUARDED_BY(background_work_mutex_);
746
747 PosixLockTable locks_; // Thread-safe.
748 Limiter mmap_limiter_; // Thread-safe.
749 Limiter fd_limiter_; // Thread-safe.
750 };
751
752 // Return the maximum number of concurrent mmaps.
MaxMmaps()753 int MaxMmaps() { return g_mmap_limit; }
754
755 // Return the maximum number of read-only files to keep open.
MaxOpenFiles()756 int MaxOpenFiles() {
757 if (g_open_read_only_file_limit >= 0) {
758 return g_open_read_only_file_limit;
759 }
760 struct ::rlimit rlim;
761 if (::getrlimit(RLIMIT_NOFILE, &rlim)) {
762 // getrlimit failed, fallback to hard-coded default.
763 g_open_read_only_file_limit = 50;
764 } else if (rlim.rlim_cur == RLIM_INFINITY) {
765 g_open_read_only_file_limit = std::numeric_limits<int>::max();
766 } else {
767 // Allow use of 20% of available file descriptors for read-only files.
768 g_open_read_only_file_limit = rlim.rlim_cur / 5;
769 }
770 return g_open_read_only_file_limit;
771 }
772
773 } // namespace
774
PosixEnv()775 PosixEnv::PosixEnv()
776 : background_work_cv_(&background_work_mutex_),
777 started_background_thread_(false),
778 mmap_limiter_(MaxMmaps()),
779 fd_limiter_(MaxOpenFiles()) {}
780
Schedule(void (* background_work_function)(void * background_work_arg),void * background_work_arg)781 void PosixEnv::Schedule(
782 void (*background_work_function)(void* background_work_arg),
783 void* background_work_arg) {
784 background_work_mutex_.Lock();
785
786 // Start the background thread, if we haven't done so already.
787 if (!started_background_thread_) {
788 started_background_thread_ = true;
789 std::thread background_thread(PosixEnv::BackgroundThreadEntryPoint, this);
790 background_thread.detach();
791 }
792
793 // If the queue is empty, the background thread may be waiting for work.
794 if (background_work_queue_.empty()) {
795 background_work_cv_.Signal();
796 }
797
798 background_work_queue_.emplace(background_work_function, background_work_arg);
799 background_work_mutex_.Unlock();
800 }
801
BackgroundThreadMain()802 void PosixEnv::BackgroundThreadMain() {
803 while (true) {
804 background_work_mutex_.Lock();
805
806 // Wait until there is work to be done.
807 while (background_work_queue_.empty()) {
808 background_work_cv_.Wait();
809 }
810
811 assert(!background_work_queue_.empty());
812 auto background_work_function = background_work_queue_.front().function;
813 void* background_work_arg = background_work_queue_.front().arg;
814 background_work_queue_.pop();
815
816 background_work_mutex_.Unlock();
817 background_work_function(background_work_arg);
818 }
819 }
820
821 namespace {
822
823 // Wraps an Env instance whose destructor is never created.
824 //
825 // Intended usage:
826 // using PlatformSingletonEnv = SingletonEnv<PlatformEnv>;
827 // void ConfigurePosixEnv(int param) {
828 // PlatformSingletonEnv::AssertEnvNotInitialized();
829 // // set global configuration flags.
830 // }
831 // Env* Env::Default() {
832 // static PlatformSingletonEnv default_env;
833 // return default_env.env();
834 // }
835 template <typename EnvType>
836 class SingletonEnv {
837 public:
SingletonEnv()838 SingletonEnv() {
839 #if !defined(NDEBUG)
840 env_initialized_.store(true, std::memory_order::memory_order_relaxed);
841 #endif // !defined(NDEBUG)
842 static_assert(sizeof(env_storage_) >= sizeof(EnvType),
843 "env_storage_ will not fit the Env");
844 static_assert(alignof(decltype(env_storage_)) >= alignof(EnvType),
845 "env_storage_ does not meet the Env's alignment needs");
846 new (&env_storage_) EnvType();
847 }
848 ~SingletonEnv() = default;
849
850 SingletonEnv(const SingletonEnv&) = delete;
851 SingletonEnv& operator=(const SingletonEnv&) = delete;
852
env()853 Env* env() { return reinterpret_cast<Env*>(&env_storage_); }
854
AssertEnvNotInitialized()855 static void AssertEnvNotInitialized() {
856 #if !defined(NDEBUG)
857 assert(!env_initialized_.load(std::memory_order::memory_order_relaxed));
858 #endif // !defined(NDEBUG)
859 }
860
861 private:
862 typename std::aligned_storage<sizeof(EnvType), alignof(EnvType)>::type
863 env_storage_;
864 #if !defined(NDEBUG)
865 static std::atomic<bool> env_initialized_;
866 #endif // !defined(NDEBUG)
867 };
868
869 #if !defined(NDEBUG)
870 template <typename EnvType>
871 std::atomic<bool> SingletonEnv<EnvType>::env_initialized_;
872 #endif // !defined(NDEBUG)
873
874 using PosixDefaultEnv = SingletonEnv<PosixEnv>;
875
876 } // namespace
877
SetReadOnlyFDLimit(int limit)878 void EnvPosixTestHelper::SetReadOnlyFDLimit(int limit) {
879 PosixDefaultEnv::AssertEnvNotInitialized();
880 g_open_read_only_file_limit = limit;
881 }
882
SetReadOnlyMMapLimit(int limit)883 void EnvPosixTestHelper::SetReadOnlyMMapLimit(int limit) {
884 PosixDefaultEnv::AssertEnvNotInitialized();
885 g_mmap_limit = limit;
886 }
887
Default()888 Env* Env::Default() {
889 static PosixDefaultEnv env_container;
890 return env_container.env();
891 }
892
893 } // namespace leveldb
894