1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5 #include "db/log_writer.h"
6
7 #include <cstdint>
8
9 #include "leveldb/env.h"
10 #include "util/coding.h"
11 #include "util/crc32c.h"
12
13 namespace leveldb {
14 namespace log {
15
InitTypeCrc(uint32_t * type_crc)16 static void InitTypeCrc(uint32_t* type_crc) {
17 for (int i = 0; i <= kMaxRecordType; i++) {
18 char t = static_cast<char>(i);
19 type_crc[i] = crc32c::Value(&t, 1);
20 }
21 }
22
Writer(WritableFile * dest)23 Writer::Writer(WritableFile* dest) : dest_(dest), block_offset_(0) {
24 InitTypeCrc(type_crc_);
25 }
26
Writer(WritableFile * dest,uint64_t dest_length)27 Writer::Writer(WritableFile* dest, uint64_t dest_length)
28 : dest_(dest), block_offset_(dest_length % kBlockSize) {
29 InitTypeCrc(type_crc_);
30 }
31
32 Writer::~Writer() = default;
33
AddRecord(const Slice & slice)34 Status Writer::AddRecord(const Slice& slice) {
35 const char* ptr = slice.data();
36 size_t left = slice.size();
37
38 // Fragment the record if necessary and emit it. Note that if slice
39 // is empty, we still want to iterate once to emit a single
40 // zero-length record
41 Status s;
42 bool begin = true;
43 do {
44 const int leftover = kBlockSize - block_offset_;
45 assert(leftover >= 0);
46 if (leftover < kHeaderSize) {
47 // Switch to a new block
48 if (leftover > 0) {
49 // Fill the trailer (literal below relies on kHeaderSize being 7)
50 static_assert(kHeaderSize == 7, "");
51 dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
52 }
53 block_offset_ = 0;
54 }
55
56 // Invariant: we never leave < kHeaderSize bytes in a block.
57 assert(kBlockSize - block_offset_ - kHeaderSize >= 0);
58
59 const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
60 const size_t fragment_length = (left < avail) ? left : avail;
61
62 RecordType type;
63 const bool end = (left == fragment_length);
64 if (begin && end) {
65 type = kFullType;
66 } else if (begin) {
67 type = kFirstType;
68 } else if (end) {
69 type = kLastType;
70 } else {
71 type = kMiddleType;
72 }
73
74 s = EmitPhysicalRecord(type, ptr, fragment_length);
75 ptr += fragment_length;
76 left -= fragment_length;
77 begin = false;
78 } while (s.ok() && left > 0);
79 return s;
80 }
81
EmitPhysicalRecord(RecordType t,const char * ptr,size_t length)82 Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr,
83 size_t length) {
84 assert(length <= 0xffff); // Must fit in two bytes
85 assert(block_offset_ + kHeaderSize + length <= kBlockSize);
86
87 // Format the header
88 char buf[kHeaderSize];
89 buf[4] = static_cast<char>(length & 0xff);
90 buf[5] = static_cast<char>(length >> 8);
91 buf[6] = static_cast<char>(t);
92
93 // Compute the crc of the record type and the payload.
94 uint32_t crc = crc32c::Extend(type_crc_[t], ptr, length);
95 crc = crc32c::Mask(crc); // Adjust for storage
96 EncodeFixed32(buf, crc);
97
98 // Write the header and the payload
99 Status s = dest_->Append(Slice(buf, kHeaderSize));
100 if (s.ok()) {
101 s = dest_->Append(Slice(ptr, length));
102 if (s.ok()) {
103 s = dest_->Flush();
104 }
105 }
106 block_offset_ += kHeaderSize + length;
107 return s;
108 }
109
110 } // namespace log
111 } // namespace leveldb
112