1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
22 
23 #include <stdlib.h>
24 
25 #include <initializer_list>
26 
27 #include "absl/status/status.h"
28 #include "absl/strings/str_format.h"
29 
30 #include <grpc/slice_buffer.h>
31 #include <grpc/support/log.h>
32 
33 #include "src/core/ext/transport/chttp2/transport/internal.h"
34 #include "src/core/lib/gprpp/status_helper.h"
35 #include "src/core/lib/slice/slice.h"
36 #include "src/core/lib/slice/slice_buffer.h"
37 #include "src/core/lib/transport/transport.h"
38 
grpc_chttp2_data_parser_begin_frame(uint8_t flags,uint32_t stream_id,grpc_chttp2_stream * s)39 absl::Status grpc_chttp2_data_parser_begin_frame(uint8_t flags,
40                                                  uint32_t stream_id,
41                                                  grpc_chttp2_stream* s) {
42   if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
43     return absl::InternalError(absl::StrFormat(
44         "unsupported data flags: 0x%02x stream: %d", flags, stream_id));
45   }
46 
47   if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
48     s->received_last_frame = true;
49     s->eos_received = true;
50   } else {
51     s->received_last_frame = false;
52   }
53 
54   return absl::OkStatus();
55 }
56 
grpc_chttp2_encode_data(uint32_t id,grpc_slice_buffer * inbuf,uint32_t write_bytes,int is_eof,grpc_transport_one_way_stats * stats,grpc_slice_buffer * outbuf)57 void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf,
58                              uint32_t write_bytes, int is_eof,
59                              grpc_transport_one_way_stats* stats,
60                              grpc_slice_buffer* outbuf) {
61   grpc_slice hdr;
62   uint8_t* p;
63   static const size_t header_size = 9;
64 
65   hdr = GRPC_SLICE_MALLOC(header_size);
66   p = GRPC_SLICE_START_PTR(hdr);
67   GPR_ASSERT(write_bytes < (1 << 24));
68   *p++ = static_cast<uint8_t>(write_bytes >> 16);
69   *p++ = static_cast<uint8_t>(write_bytes >> 8);
70   *p++ = static_cast<uint8_t>(write_bytes);
71   *p++ = GRPC_CHTTP2_FRAME_DATA;
72   *p++ = is_eof ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0;
73   *p++ = static_cast<uint8_t>(id >> 24);
74   *p++ = static_cast<uint8_t>(id >> 16);
75   *p++ = static_cast<uint8_t>(id >> 8);
76   *p++ = static_cast<uint8_t>(id);
77   grpc_slice_buffer_add(outbuf, hdr);
78 
79   grpc_slice_buffer_move_first_no_ref(inbuf, write_bytes, outbuf);
80 
81   stats->framing_bytes += header_size;
82   stats->data_bytes += write_bytes;
83 }
84 
grpc_deframe_unprocessed_incoming_frames(grpc_chttp2_stream * s,int64_t * min_progress_size,grpc_core::SliceBuffer * stream_out,uint32_t * message_flags)85 grpc_core::Poll<grpc_error_handle> grpc_deframe_unprocessed_incoming_frames(
86     grpc_chttp2_stream* s, int64_t* min_progress_size,
87     grpc_core::SliceBuffer* stream_out, uint32_t* message_flags) {
88   grpc_slice_buffer* slices = &s->frame_storage;
89   grpc_error_handle error;
90 
91   if (slices->length < 5) {
92     if (min_progress_size != nullptr) *min_progress_size = 5 - slices->length;
93     return grpc_core::Pending{};
94   }
95 
96   uint8_t header[5];
97   grpc_slice_buffer_copy_first_into_buffer(slices, 5, header);
98 
99   switch (header[0]) {
100     case 0:
101       if (message_flags != nullptr) *message_flags = 0;
102       break;
103     case 1:
104       if (message_flags != nullptr) {
105         *message_flags = GRPC_WRITE_INTERNAL_COMPRESS;
106       }
107       break;
108     default:
109       error = GRPC_ERROR_CREATE(
110           absl::StrFormat("Bad GRPC frame type 0x%02x", header[0]));
111       error = grpc_error_set_int(error, grpc_core::StatusIntProperty::kStreamId,
112                                  static_cast<intptr_t>(s->id));
113       return error;
114   }
115 
116   size_t length = (static_cast<uint32_t>(header[1]) << 24) |
117                   (static_cast<uint32_t>(header[2]) << 16) |
118                   (static_cast<uint32_t>(header[3]) << 8) |
119                   static_cast<uint32_t>(header[4]);
120 
121   if (slices->length < length + 5) {
122     if (min_progress_size != nullptr) {
123       *min_progress_size = length + 5 - slices->length;
124     }
125     return grpc_core::Pending{};
126   }
127 
128   if (min_progress_size != nullptr) *min_progress_size = 0;
129 
130   if (stream_out != nullptr) {
131     s->stats.incoming.framing_bytes += 5;
132     s->stats.incoming.data_bytes += length;
133     grpc_slice_buffer_move_first_into_buffer(slices, 5, header);
134     grpc_slice_buffer_move_first(slices, length, stream_out->c_slice_buffer());
135   }
136 
137   return absl::OkStatus();
138 }
139 
grpc_chttp2_data_parser_parse(void *,grpc_chttp2_transport * t,grpc_chttp2_stream * s,const grpc_slice & slice,int is_last)140 grpc_error_handle grpc_chttp2_data_parser_parse(void* /*parser*/,
141                                                 grpc_chttp2_transport* t,
142                                                 grpc_chttp2_stream* s,
143                                                 const grpc_slice& slice,
144                                                 int is_last) {
145   grpc_core::CSliceRef(slice);
146   grpc_slice_buffer_add(&s->frame_storage, slice);
147   grpc_chttp2_maybe_complete_recv_message(t, s);
148 
149   if (is_last && s->received_last_frame) {
150     grpc_chttp2_mark_stream_closed(
151         t, s, true, false,
152         t->is_client
153             ? GRPC_ERROR_CREATE("Data frame with END_STREAM flag received")
154             : absl::OkStatus());
155   }
156 
157   return absl::OkStatus();
158 }
159