1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/chttp2/transport/frame_data.h"
22
23 #include <stdlib.h>
24
25 #include <initializer_list>
26
27 #include "absl/status/status.h"
28 #include "absl/strings/str_format.h"
29
30 #include <grpc/slice_buffer.h>
31 #include <grpc/support/log.h>
32
33 #include "src/core/ext/transport/chttp2/transport/internal.h"
34 #include "src/core/lib/gprpp/status_helper.h"
35 #include "src/core/lib/slice/slice.h"
36 #include "src/core/lib/slice/slice_buffer.h"
37 #include "src/core/lib/transport/transport.h"
38
grpc_chttp2_data_parser_begin_frame(uint8_t flags,uint32_t stream_id,grpc_chttp2_stream * s)39 absl::Status grpc_chttp2_data_parser_begin_frame(uint8_t flags,
40 uint32_t stream_id,
41 grpc_chttp2_stream* s) {
42 if (flags & ~GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
43 return absl::InternalError(absl::StrFormat(
44 "unsupported data flags: 0x%02x stream: %d", flags, stream_id));
45 }
46
47 if (flags & GRPC_CHTTP2_DATA_FLAG_END_STREAM) {
48 s->received_last_frame = true;
49 s->eos_received = true;
50 } else {
51 s->received_last_frame = false;
52 }
53
54 return absl::OkStatus();
55 }
56
grpc_chttp2_encode_data(uint32_t id,grpc_slice_buffer * inbuf,uint32_t write_bytes,int is_eof,grpc_transport_one_way_stats * stats,grpc_slice_buffer * outbuf)57 void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer* inbuf,
58 uint32_t write_bytes, int is_eof,
59 grpc_transport_one_way_stats* stats,
60 grpc_slice_buffer* outbuf) {
61 grpc_slice hdr;
62 uint8_t* p;
63 static const size_t header_size = 9;
64
65 hdr = GRPC_SLICE_MALLOC(header_size);
66 p = GRPC_SLICE_START_PTR(hdr);
67 GPR_ASSERT(write_bytes < (1 << 24));
68 *p++ = static_cast<uint8_t>(write_bytes >> 16);
69 *p++ = static_cast<uint8_t>(write_bytes >> 8);
70 *p++ = static_cast<uint8_t>(write_bytes);
71 *p++ = GRPC_CHTTP2_FRAME_DATA;
72 *p++ = is_eof ? GRPC_CHTTP2_DATA_FLAG_END_STREAM : 0;
73 *p++ = static_cast<uint8_t>(id >> 24);
74 *p++ = static_cast<uint8_t>(id >> 16);
75 *p++ = static_cast<uint8_t>(id >> 8);
76 *p++ = static_cast<uint8_t>(id);
77 grpc_slice_buffer_add(outbuf, hdr);
78
79 grpc_slice_buffer_move_first_no_ref(inbuf, write_bytes, outbuf);
80
81 stats->framing_bytes += header_size;
82 stats->data_bytes += write_bytes;
83 }
84
grpc_deframe_unprocessed_incoming_frames(grpc_chttp2_stream * s,int64_t * min_progress_size,grpc_core::SliceBuffer * stream_out,uint32_t * message_flags)85 grpc_core::Poll<grpc_error_handle> grpc_deframe_unprocessed_incoming_frames(
86 grpc_chttp2_stream* s, int64_t* min_progress_size,
87 grpc_core::SliceBuffer* stream_out, uint32_t* message_flags) {
88 grpc_slice_buffer* slices = &s->frame_storage;
89 grpc_error_handle error;
90
91 if (slices->length < 5) {
92 if (min_progress_size != nullptr) *min_progress_size = 5 - slices->length;
93 return grpc_core::Pending{};
94 }
95
96 uint8_t header[5];
97 grpc_slice_buffer_copy_first_into_buffer(slices, 5, header);
98
99 switch (header[0]) {
100 case 0:
101 if (message_flags != nullptr) *message_flags = 0;
102 break;
103 case 1:
104 if (message_flags != nullptr) {
105 *message_flags = GRPC_WRITE_INTERNAL_COMPRESS;
106 }
107 break;
108 default:
109 error = GRPC_ERROR_CREATE(
110 absl::StrFormat("Bad GRPC frame type 0x%02x", header[0]));
111 error = grpc_error_set_int(error, grpc_core::StatusIntProperty::kStreamId,
112 static_cast<intptr_t>(s->id));
113 return error;
114 }
115
116 size_t length = (static_cast<uint32_t>(header[1]) << 24) |
117 (static_cast<uint32_t>(header[2]) << 16) |
118 (static_cast<uint32_t>(header[3]) << 8) |
119 static_cast<uint32_t>(header[4]);
120
121 if (slices->length < length + 5) {
122 if (min_progress_size != nullptr) {
123 *min_progress_size = length + 5 - slices->length;
124 }
125 return grpc_core::Pending{};
126 }
127
128 if (min_progress_size != nullptr) *min_progress_size = 0;
129
130 if (stream_out != nullptr) {
131 s->stats.incoming.framing_bytes += 5;
132 s->stats.incoming.data_bytes += length;
133 grpc_slice_buffer_move_first_into_buffer(slices, 5, header);
134 grpc_slice_buffer_move_first(slices, length, stream_out->c_slice_buffer());
135 }
136
137 return absl::OkStatus();
138 }
139
grpc_chttp2_data_parser_parse(void *,grpc_chttp2_transport * t,grpc_chttp2_stream * s,const grpc_slice & slice,int is_last)140 grpc_error_handle grpc_chttp2_data_parser_parse(void* /*parser*/,
141 grpc_chttp2_transport* t,
142 grpc_chttp2_stream* s,
143 const grpc_slice& slice,
144 int is_last) {
145 grpc_core::CSliceRef(slice);
146 grpc_slice_buffer_add(&s->frame_storage, slice);
147 grpc_chttp2_maybe_complete_recv_message(t, s);
148
149 if (is_last && s->received_last_frame) {
150 grpc_chttp2_mark_stream_closed(
151 t, s, true, false,
152 t->is_client
153 ? GRPC_ERROR_CREATE("Data frame with END_STREAM flag received")
154 : absl::OkStatus());
155 }
156
157 return absl::OkStatus();
158 }
159