1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPCPP_SUPPORT_PROTO_BUFFER_READER_H
20 #define GRPCPP_SUPPORT_PROTO_BUFFER_READER_H
21 
22 #include <type_traits>
23 
24 #include "absl/strings/cord.h"
25 
26 #include <grpc/byte_buffer.h>
27 #include <grpc/byte_buffer_reader.h>
28 #include <grpc/impl/grpc_types.h>
29 #include <grpc/slice.h>
30 #include <grpc/support/log.h>
31 #include <grpcpp/impl/codegen/config_protobuf.h>
32 #include <grpcpp/impl/serialization_traits.h>
33 #include <grpcpp/support/byte_buffer.h>
34 #include <grpcpp/support/status.h>
35 
36 /// This header provides an object that reads bytes directly from a
37 /// grpc::ByteBuffer, via the ZeroCopyInputStream interface
38 
39 namespace grpc {
40 
41 /// This is a specialization of the protobuf class ZeroCopyInputStream
42 /// The principle is to get one chunk of data at a time from the proto layer,
43 /// with options to backup (re-see some bytes) or skip (forward past some bytes)
44 ///
45 /// Read more about ZeroCopyInputStream interface here:
46 /// https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.io.zero_copy_stream#ZeroCopyInputStream
47 class ProtoBufferReader : public grpc::protobuf::io::ZeroCopyInputStream {
48  public:
49   /// Constructs buffer reader from \a buffer. Will set \a status() to non ok
50   /// if \a buffer is invalid (the internal buffer has not been initialized).
ProtoBufferReader(ByteBuffer * buffer)51   explicit ProtoBufferReader(ByteBuffer* buffer)
52       : byte_count_(0), backup_count_(0), status_() {
53     /// Implemented through a grpc_byte_buffer_reader which iterates
54     /// over the slices that make up a byte buffer
55     if (!buffer->Valid() ||
56         !grpc_byte_buffer_reader_init(&reader_, buffer->c_buffer())) {
57       status_ = Status(StatusCode::INTERNAL,
58                        "Couldn't initialize byte buffer reader");
59     }
60   }
61 
~ProtoBufferReader()62   ~ProtoBufferReader() override {
63     if (status_.ok()) {
64       grpc_byte_buffer_reader_destroy(&reader_);
65     }
66   }
67 
68   /// Give the proto library a chunk of data from the stream. The caller
69   /// may safely read from data[0, size - 1].
Next(const void ** data,int * size)70   bool Next(const void** data, int* size) override {
71     if (!status_.ok()) {
72       return false;
73     }
74     /// If we have backed up previously, we need to return the backed-up slice
75     if (backup_count_ > 0) {
76       *data = GRPC_SLICE_START_PTR(*slice_) + GRPC_SLICE_LENGTH(*slice_) -
77               backup_count_;
78       GPR_ASSERT(backup_count_ <= INT_MAX);
79       *size = static_cast<int>(backup_count_);
80       backup_count_ = 0;
81       return true;
82     }
83     /// Otherwise get the next slice from the byte buffer reader
84     if (!grpc_byte_buffer_reader_peek(&reader_, &slice_)) {
85       return false;
86     }
87     *data = GRPC_SLICE_START_PTR(*slice_);
88     // On win x64, int is only 32bit
89     GPR_ASSERT(GRPC_SLICE_LENGTH(*slice_) <= INT_MAX);
90     byte_count_ += * size = static_cast<int>(GRPC_SLICE_LENGTH(*slice_));
91     return true;
92   }
93 
94   /// Returns the status of the buffer reader.
status()95   Status status() const { return status_; }
96 
97   /// The proto library calls this to indicate that we should back up \a count
98   /// bytes that have already been returned by the last call of Next.
99   /// So do the backup and have that ready for a later Next.
BackUp(int count)100   void BackUp(int count) override {
101     GPR_ASSERT(count <= static_cast<int>(GRPC_SLICE_LENGTH(*slice_)));
102     backup_count_ = count;
103   }
104 
105   /// The proto library calls this to skip over \a count bytes. Implement this
106   /// using Next and BackUp combined.
Skip(int count)107   bool Skip(int count) override {
108     const void* data;
109     int size;
110     while (Next(&data, &size)) {
111       if (size >= count) {
112         BackUp(size - count);
113         return true;
114       }
115       // size < count;
116       count -= size;
117     }
118     // error or we have too large count;
119     return false;
120   }
121 
122   /// Returns the total number of bytes read since this object was created.
ByteCount()123   int64_t ByteCount() const override { return byte_count_ - backup_count_; }
124 
125 #ifdef GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
126   /// Read the next `count` bytes and append it to the given Cord.
127   // (override is intentionally omitted here to support old Protobuf which
128   //  doesn't have ReadCord method)
129   // NOLINTNEXTLINE(modernize-use-override)
ReadCord(absl::Cord * cord,int count)130   virtual bool ReadCord(absl::Cord* cord, int count) {
131     if (!status().ok()) {
132       return false;
133     }
134     // check for backed up data
135     if (backup_count() > 0) {
136       if (backup_count() <= count) {
137         cord->Append(MakeCordFromSlice(grpc_slice_split_tail(
138             slice(), GRPC_SLICE_LENGTH(*slice()) - backup_count())));
139       } else {
140         cord->Append(MakeCordFromSlice(grpc_slice_sub(
141             *slice(), GRPC_SLICE_LENGTH(*slice()) - backup_count(),
142             GRPC_SLICE_LENGTH(*slice()) - backup_count() + count)));
143       }
144       int64_t take = std::min(backup_count(), static_cast<int64_t>(count));
145       set_backup_count(backup_count() - take);
146       count -= take;
147       if (count == 0) {
148         return true;
149       }
150     }
151     while (count > 0) {
152       if (!grpc_byte_buffer_reader_peek(reader(), mutable_slice_ptr())) {
153         return false;
154       }
155       uint64_t slice_length = GRPC_SLICE_LENGTH(*slice());
156       set_byte_count(ByteCount() + slice_length);
157       if (slice_length <= count) {
158         cord->Append(MakeCordFromSlice(grpc_slice_ref(*slice())));
159         count -= slice_length;
160       } else {
161         cord->Append(MakeCordFromSlice(grpc_slice_split_head(slice(), count)));
162         set_backup_count(slice_length - count);
163         return true;
164       }
165     }
166     GPR_ASSERT(count == 0);
167     return true;
168   }
169 #endif  // GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
170 
171   // These protected members are needed to support internal optimizations.
172   // they expose internal bits of grpc core that are NOT stable. If you have
173   // a use case needs to use one of these functions, please send an email to
174   // https://groups.google.com/forum/#!forum/grpc-io.
175  protected:
set_byte_count(int64_t byte_count)176   void set_byte_count(int64_t byte_count) { byte_count_ = byte_count; }
backup_count()177   int64_t backup_count() { return backup_count_; }
set_backup_count(int64_t backup_count)178   void set_backup_count(int64_t backup_count) { backup_count_ = backup_count; }
reader()179   grpc_byte_buffer_reader* reader() { return &reader_; }
slice()180   grpc_slice* slice() { return slice_; }
mutable_slice_ptr()181   grpc_slice** mutable_slice_ptr() { return &slice_; }
182 
183  private:
184 #ifdef GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
185   // This function takes ownership of slice and return a newly created Cord off
186   // of it.
MakeCordFromSlice(grpc_slice slice)187   static absl::Cord MakeCordFromSlice(grpc_slice slice) {
188     return absl::MakeCordFromExternal(
189         absl::string_view(reinterpret_cast<char*>(GRPC_SLICE_START_PTR(slice)),
190                           GRPC_SLICE_LENGTH(slice)),
191         [slice](absl::string_view view) { grpc_slice_unref(slice); });
192   }
193 #endif  // GRPC_PROTOBUF_CORD_SUPPORT_ENABLED
194 
195   int64_t byte_count_;              ///< total bytes read since object creation
196   int64_t backup_count_;            ///< how far backed up in the stream we are
197   grpc_byte_buffer_reader reader_;  ///< internal object to read \a grpc_slice
198                                     ///< from the \a grpc_byte_buffer
199   grpc_slice* slice_;               ///< current slice passed back to the caller
200   Status status_;                   ///< status of the entire object
201 };
202 
203 }  // namespace grpc
204 
205 #endif  // GRPCPP_SUPPORT_PROTO_BUFFER_READER_H
206