1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/security/transport/secure_endpoint.h"
22
23 #include <inttypes.h>
24
25 #include <algorithm>
26 #include <atomic>
27 #include <memory>
28
29 #include "absl/base/thread_annotations.h"
30 #include "absl/status/status.h"
31 #include "absl/strings/str_cat.h"
32 #include "absl/strings/string_view.h"
33 #include "absl/types/optional.h"
34
35 #include <grpc/event_engine/memory_allocator.h>
36 #include <grpc/event_engine/memory_request.h>
37 #include <grpc/slice.h>
38 #include <grpc/slice_buffer.h>
39 #include <grpc/support/alloc.h>
40 #include <grpc/support/atm.h>
41 #include <grpc/support/log.h>
42 #include <grpc/support/sync.h>
43
44 #include "src/core/lib/debug/trace.h"
45 #include "src/core/lib/gpr/string.h"
46 #include "src/core/lib/gprpp/debug_location.h"
47 #include "src/core/lib/gprpp/ref_counted_ptr.h"
48 #include "src/core/lib/gprpp/sync.h"
49 #include "src/core/lib/iomgr/closure.h"
50 #include "src/core/lib/iomgr/error.h"
51 #include "src/core/lib/iomgr/exec_ctx.h"
52 #include "src/core/lib/iomgr/iomgr_fwd.h"
53 #include "src/core/lib/resource_quota/api.h"
54 #include "src/core/lib/resource_quota/memory_quota.h"
55 #include "src/core/lib/resource_quota/resource_quota.h"
56 #include "src/core/lib/resource_quota/trace.h"
57 #include "src/core/lib/security/transport/tsi_error.h"
58 #include "src/core/lib/slice/slice.h"
59 #include "src/core/lib/slice/slice_string_helpers.h"
60 #include "src/core/tsi/transport_security_grpc.h"
61 #include "src/core/tsi/transport_security_interface.h"
62
63 #define STAGING_BUFFER_SIZE 8192
64
65 static void on_read(void* user_data, grpc_error_handle error);
66
67 namespace {
68 struct secure_endpoint {
secure_endpoint__anonfcfd34c60111::secure_endpoint69 secure_endpoint(const grpc_endpoint_vtable* vtable,
70 tsi_frame_protector* protector,
71 tsi_zero_copy_grpc_protector* zero_copy_protector,
72 grpc_endpoint* transport, grpc_slice* leftover_slices,
73 const grpc_channel_args* channel_args,
74 size_t leftover_nslices)
75 : wrapped_ep(transport),
76 protector(protector),
77 zero_copy_protector(zero_copy_protector) {
78 base.vtable = vtable;
79 gpr_mu_init(&protector_mu);
80 GRPC_CLOSURE_INIT(&on_read, ::on_read, this, grpc_schedule_on_exec_ctx);
81 grpc_slice_buffer_init(&source_buffer);
82 grpc_slice_buffer_init(&leftover_bytes);
83 for (size_t i = 0; i < leftover_nslices; i++) {
84 grpc_slice_buffer_add(&leftover_bytes,
85 grpc_core::CSliceRef(leftover_slices[i]));
86 }
87 grpc_slice_buffer_init(&output_buffer);
88 memory_owner =
89 grpc_core::ResourceQuotaFromChannelArgs(channel_args)
90 ->memory_quota()
91 ->CreateMemoryOwner(absl::StrCat(grpc_endpoint_get_peer(transport),
92 ":secure_endpoint"));
93 self_reservation = memory_owner.MakeReservation(sizeof(*this));
94 if (zero_copy_protector) {
95 read_staging_buffer = grpc_empty_slice();
96 write_staging_buffer = grpc_empty_slice();
97 } else {
98 read_staging_buffer =
99 memory_owner.MakeSlice(grpc_core::MemoryRequest(STAGING_BUFFER_SIZE));
100 write_staging_buffer =
101 memory_owner.MakeSlice(grpc_core::MemoryRequest(STAGING_BUFFER_SIZE));
102 }
103 has_posted_reclaimer.store(false, std::memory_order_relaxed);
104 min_progress_size = 1;
105 grpc_slice_buffer_init(&protector_staging_buffer);
106 gpr_ref_init(&ref, 1);
107 }
108
~secure_endpoint__anonfcfd34c60111::secure_endpoint109 ~secure_endpoint() {
110 grpc_endpoint_destroy(wrapped_ep);
111 tsi_frame_protector_destroy(protector);
112 tsi_zero_copy_grpc_protector_destroy(zero_copy_protector);
113 grpc_slice_buffer_destroy(&source_buffer);
114 grpc_slice_buffer_destroy(&leftover_bytes);
115 grpc_core::CSliceUnref(read_staging_buffer);
116 grpc_core::CSliceUnref(write_staging_buffer);
117 grpc_slice_buffer_destroy(&output_buffer);
118 grpc_slice_buffer_destroy(&protector_staging_buffer);
119 gpr_mu_destroy(&protector_mu);
120 }
121
122 grpc_endpoint base;
123 grpc_endpoint* wrapped_ep;
124 struct tsi_frame_protector* protector;
125 struct tsi_zero_copy_grpc_protector* zero_copy_protector;
126 gpr_mu protector_mu;
127 grpc_core::Mutex read_mu;
128 grpc_core::Mutex write_mu;
129 // saved upper level callbacks and user_data.
130 grpc_closure* read_cb = nullptr;
131 grpc_closure* write_cb = nullptr;
132 grpc_closure on_read;
133 grpc_slice_buffer* read_buffer = nullptr;
134 grpc_slice_buffer source_buffer;
135 // saved handshaker leftover data to unprotect.
136 grpc_slice_buffer leftover_bytes;
137 // buffers for read and write
138 grpc_slice read_staging_buffer ABSL_GUARDED_BY(read_mu);
139 grpc_slice write_staging_buffer ABSL_GUARDED_BY(write_mu);
140 grpc_slice_buffer output_buffer;
141 grpc_core::MemoryOwner memory_owner;
142 grpc_core::MemoryAllocator::Reservation self_reservation;
143 std::atomic<bool> has_posted_reclaimer;
144 int min_progress_size;
145 grpc_slice_buffer protector_staging_buffer;
146 gpr_refcount ref;
147 };
148 } // namespace
149
150 grpc_core::TraceFlag grpc_trace_secure_endpoint(false, "secure_endpoint");
151
destroy(secure_endpoint * ep)152 static void destroy(secure_endpoint* ep) { delete ep; }
153
154 #ifndef NDEBUG
155 #define SECURE_ENDPOINT_UNREF(ep, reason) \
156 secure_endpoint_unref((ep), (reason), __FILE__, __LINE__)
157 #define SECURE_ENDPOINT_REF(ep, reason) \
158 secure_endpoint_ref((ep), (reason), __FILE__, __LINE__)
secure_endpoint_unref(secure_endpoint * ep,const char * reason,const char * file,int line)159 static void secure_endpoint_unref(secure_endpoint* ep, const char* reason,
160 const char* file, int line) {
161 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_secure_endpoint)) {
162 gpr_atm val = gpr_atm_no_barrier_load(&ep->ref.count);
163 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
164 "SECENDP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep, reason, val,
165 val - 1);
166 }
167 if (gpr_unref(&ep->ref)) {
168 destroy(ep);
169 }
170 }
171
secure_endpoint_ref(secure_endpoint * ep,const char * reason,const char * file,int line)172 static void secure_endpoint_ref(secure_endpoint* ep, const char* reason,
173 const char* file, int line) {
174 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_secure_endpoint)) {
175 gpr_atm val = gpr_atm_no_barrier_load(&ep->ref.count);
176 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
177 "SECENDP ref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep, reason, val,
178 val + 1);
179 }
180 gpr_ref(&ep->ref);
181 }
182 #else
183 #define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep))
184 #define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep))
secure_endpoint_unref(secure_endpoint * ep)185 static void secure_endpoint_unref(secure_endpoint* ep) {
186 if (gpr_unref(&ep->ref)) {
187 destroy(ep);
188 }
189 }
190
secure_endpoint_ref(secure_endpoint * ep)191 static void secure_endpoint_ref(secure_endpoint* ep) { gpr_ref(&ep->ref); }
192 #endif
193
maybe_post_reclaimer(secure_endpoint * ep)194 static void maybe_post_reclaimer(secure_endpoint* ep) {
195 if (!ep->has_posted_reclaimer) {
196 SECURE_ENDPOINT_REF(ep, "benign_reclaimer");
197 ep->has_posted_reclaimer.exchange(true, std::memory_order_relaxed);
198 ep->memory_owner.PostReclaimer(
199 grpc_core::ReclamationPass::kBenign,
200 [ep](absl::optional<grpc_core::ReclamationSweep> sweep) {
201 if (sweep.has_value()) {
202 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
203 gpr_log(GPR_INFO,
204 "secure endpoint: benign reclamation to free memory");
205 }
206 grpc_slice temp_read_slice;
207 grpc_slice temp_write_slice;
208
209 ep->read_mu.Lock();
210 temp_read_slice = ep->read_staging_buffer;
211 ep->read_staging_buffer = grpc_empty_slice();
212 ep->read_mu.Unlock();
213
214 ep->write_mu.Lock();
215 temp_write_slice = ep->write_staging_buffer;
216 ep->write_staging_buffer = grpc_empty_slice();
217 ep->write_mu.Unlock();
218
219 grpc_core::CSliceUnref(temp_read_slice);
220 grpc_core::CSliceUnref(temp_write_slice);
221 ep->has_posted_reclaimer.exchange(false, std::memory_order_relaxed);
222 }
223 SECURE_ENDPOINT_UNREF(ep, "benign_reclaimer");
224 });
225 }
226 }
227
flush_read_staging_buffer(secure_endpoint * ep,uint8_t ** cur,uint8_t ** end)228 static void flush_read_staging_buffer(secure_endpoint* ep, uint8_t** cur,
229 uint8_t** end)
230 ABSL_EXCLUSIVE_LOCKS_REQUIRED(ep->read_mu) {
231 grpc_slice_buffer_add_indexed(ep->read_buffer, ep->read_staging_buffer);
232 ep->read_staging_buffer =
233 ep->memory_owner.MakeSlice(grpc_core::MemoryRequest(STAGING_BUFFER_SIZE));
234 *cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer);
235 *end = GRPC_SLICE_END_PTR(ep->read_staging_buffer);
236 }
237
call_read_cb(secure_endpoint * ep,grpc_error_handle error)238 static void call_read_cb(secure_endpoint* ep, grpc_error_handle error) {
239 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_secure_endpoint) &&
240 gpr_should_log(GPR_LOG_SEVERITY_INFO)) {
241 size_t i;
242 for (i = 0; i < ep->read_buffer->count; i++) {
243 char* data = grpc_dump_slice(ep->read_buffer->slices[i],
244 GPR_DUMP_HEX | GPR_DUMP_ASCII);
245 gpr_log(GPR_INFO, "READ %p: %s", ep, data);
246 gpr_free(data);
247 }
248 }
249 ep->read_buffer = nullptr;
250 grpc_core::ExecCtx::Run(DEBUG_LOCATION, ep->read_cb, error);
251 SECURE_ENDPOINT_UNREF(ep, "read");
252 }
253
on_read(void * user_data,grpc_error_handle error)254 static void on_read(void* user_data, grpc_error_handle error) {
255 unsigned i;
256 uint8_t keep_looping = 0;
257 tsi_result result = TSI_OK;
258 secure_endpoint* ep = static_cast<secure_endpoint*>(user_data);
259
260 {
261 grpc_core::MutexLock l(&ep->read_mu);
262 uint8_t* cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer);
263 uint8_t* end = GRPC_SLICE_END_PTR(ep->read_staging_buffer);
264
265 if (!error.ok()) {
266 grpc_slice_buffer_reset_and_unref(ep->read_buffer);
267 call_read_cb(
268 ep, GRPC_ERROR_CREATE_REFERENCING("Secure read failed", &error, 1));
269 return;
270 }
271
272 if (ep->zero_copy_protector != nullptr) {
273 // Use zero-copy grpc protector to unprotect.
274 int min_progress_size = 1;
275 // Get the size of the last frame which is not yet fully decrypted.
276 // This estimated frame size is stored in ep->min_progress_size which is
277 // passed to the TCP layer to indicate the minimum number of
278 // bytes that need to be read to make meaningful progress. This would
279 // avoid reading of small slices from the network.
280 // TODO(vigneshbabu): Set min_progress_size in the regular (non-zero-copy)
281 // frame protector code path as well.
282 result = tsi_zero_copy_grpc_protector_unprotect(
283 ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer,
284 &min_progress_size);
285 min_progress_size = std::max(1, min_progress_size);
286 ep->min_progress_size = result != TSI_OK ? 1 : min_progress_size;
287 } else {
288 // Use frame protector to unprotect.
289 // TODO(yangg) check error, maybe bail out early
290 for (i = 0; i < ep->source_buffer.count; i++) {
291 grpc_slice encrypted = ep->source_buffer.slices[i];
292 uint8_t* message_bytes = GRPC_SLICE_START_PTR(encrypted);
293 size_t message_size = GRPC_SLICE_LENGTH(encrypted);
294
295 while (message_size > 0 || keep_looping) {
296 size_t unprotected_buffer_size_written =
297 static_cast<size_t>(end - cur);
298 size_t processed_message_size = message_size;
299 gpr_mu_lock(&ep->protector_mu);
300 result = tsi_frame_protector_unprotect(
301 ep->protector, message_bytes, &processed_message_size, cur,
302 &unprotected_buffer_size_written);
303 gpr_mu_unlock(&ep->protector_mu);
304 if (result != TSI_OK) {
305 gpr_log(GPR_ERROR, "Decryption error: %s",
306 tsi_result_to_string(result));
307 break;
308 }
309 message_bytes += processed_message_size;
310 message_size -= processed_message_size;
311 cur += unprotected_buffer_size_written;
312
313 if (cur == end) {
314 flush_read_staging_buffer(ep, &cur, &end);
315 // Force to enter the loop again to extract buffered bytes in
316 // protector. The bytes could be buffered because of running out of
317 // staging_buffer. If this happens at the end of all slices, doing
318 // another unprotect avoids leaving data in the protector.
319 keep_looping = 1;
320 } else if (unprotected_buffer_size_written > 0) {
321 keep_looping = 1;
322 } else {
323 keep_looping = 0;
324 }
325 }
326 if (result != TSI_OK) break;
327 }
328
329 if (cur != GRPC_SLICE_START_PTR(ep->read_staging_buffer)) {
330 grpc_slice_buffer_add(
331 ep->read_buffer,
332 grpc_slice_split_head(
333 &ep->read_staging_buffer,
334 static_cast<size_t>(
335 cur - GRPC_SLICE_START_PTR(ep->read_staging_buffer))));
336 }
337 }
338 }
339
340 // TODO(yangg) experiment with moving this block after read_cb to see if it
341 // helps latency
342 grpc_slice_buffer_reset_and_unref(&ep->source_buffer);
343
344 if (result != TSI_OK) {
345 grpc_slice_buffer_reset_and_unref(ep->read_buffer);
346 call_read_cb(ep, grpc_set_tsi_error_result(
347 GRPC_ERROR_CREATE("Unwrap failed"), result));
348 return;
349 }
350
351 call_read_cb(ep, absl::OkStatus());
352 }
353
endpoint_read(grpc_endpoint * secure_ep,grpc_slice_buffer * slices,grpc_closure * cb,bool urgent,int)354 static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
355 grpc_closure* cb, bool urgent,
356 int /*min_progress_size*/) {
357 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
358 ep->read_cb = cb;
359 ep->read_buffer = slices;
360 grpc_slice_buffer_reset_and_unref(ep->read_buffer);
361
362 SECURE_ENDPOINT_REF(ep, "read");
363 if (ep->leftover_bytes.count) {
364 grpc_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer);
365 GPR_ASSERT(ep->leftover_bytes.count == 0);
366 on_read(ep, absl::OkStatus());
367 return;
368 }
369
370 grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read, urgent,
371 /*min_progress_size=*/ep->min_progress_size);
372 }
373
flush_write_staging_buffer(secure_endpoint * ep,uint8_t ** cur,uint8_t ** end)374 static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur,
375 uint8_t** end)
376 ABSL_EXCLUSIVE_LOCKS_REQUIRED(ep->write_mu) {
377 grpc_slice_buffer_add_indexed(&ep->output_buffer, ep->write_staging_buffer);
378 ep->write_staging_buffer =
379 ep->memory_owner.MakeSlice(grpc_core::MemoryRequest(STAGING_BUFFER_SIZE));
380 *cur = GRPC_SLICE_START_PTR(ep->write_staging_buffer);
381 *end = GRPC_SLICE_END_PTR(ep->write_staging_buffer);
382 maybe_post_reclaimer(ep);
383 }
384
endpoint_write(grpc_endpoint * secure_ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg,int max_frame_size)385 static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
386 grpc_closure* cb, void* arg, int max_frame_size) {
387 unsigned i;
388 tsi_result result = TSI_OK;
389 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
390
391 {
392 grpc_core::MutexLock l(&ep->write_mu);
393 uint8_t* cur = GRPC_SLICE_START_PTR(ep->write_staging_buffer);
394 uint8_t* end = GRPC_SLICE_END_PTR(ep->write_staging_buffer);
395
396 grpc_slice_buffer_reset_and_unref(&ep->output_buffer);
397
398 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_secure_endpoint) &&
399 gpr_should_log(GPR_LOG_SEVERITY_INFO)) {
400 for (i = 0; i < slices->count; i++) {
401 char* data =
402 grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
403 gpr_log(GPR_INFO, "WRITE %p: %s", ep, data);
404 gpr_free(data);
405 }
406 }
407
408 if (ep->zero_copy_protector != nullptr) {
409 // Use zero-copy grpc protector to protect.
410 result = TSI_OK;
411 // Break the input slices into chunks of size = max_frame_size and call
412 // tsi_zero_copy_grpc_protector_protect on each chunk. This ensures that
413 // the protector cannot create frames larger than the specified
414 // max_frame_size.
415 while (slices->length > static_cast<size_t>(max_frame_size) &&
416 result == TSI_OK) {
417 grpc_slice_buffer_move_first(slices,
418 static_cast<size_t>(max_frame_size),
419 &ep->protector_staging_buffer);
420 result = tsi_zero_copy_grpc_protector_protect(
421 ep->zero_copy_protector, &ep->protector_staging_buffer,
422 &ep->output_buffer);
423 }
424 if (result == TSI_OK && slices->length > 0) {
425 result = tsi_zero_copy_grpc_protector_protect(
426 ep->zero_copy_protector, slices, &ep->output_buffer);
427 }
428 grpc_slice_buffer_reset_and_unref(&ep->protector_staging_buffer);
429 } else {
430 // Use frame protector to protect.
431 for (i = 0; i < slices->count; i++) {
432 grpc_slice plain = slices->slices[i];
433 uint8_t* message_bytes = GRPC_SLICE_START_PTR(plain);
434 size_t message_size = GRPC_SLICE_LENGTH(plain);
435 while (message_size > 0) {
436 size_t protected_buffer_size_to_send = static_cast<size_t>(end - cur);
437 size_t processed_message_size = message_size;
438 gpr_mu_lock(&ep->protector_mu);
439 result = tsi_frame_protector_protect(ep->protector, message_bytes,
440 &processed_message_size, cur,
441 &protected_buffer_size_to_send);
442 gpr_mu_unlock(&ep->protector_mu);
443 if (result != TSI_OK) {
444 gpr_log(GPR_ERROR, "Encryption error: %s",
445 tsi_result_to_string(result));
446 break;
447 }
448 message_bytes += processed_message_size;
449 message_size -= processed_message_size;
450 cur += protected_buffer_size_to_send;
451
452 if (cur == end) {
453 flush_write_staging_buffer(ep, &cur, &end);
454 }
455 }
456 if (result != TSI_OK) break;
457 }
458 if (result == TSI_OK) {
459 size_t still_pending_size;
460 do {
461 size_t protected_buffer_size_to_send = static_cast<size_t>(end - cur);
462 gpr_mu_lock(&ep->protector_mu);
463 result = tsi_frame_protector_protect_flush(
464 ep->protector, cur, &protected_buffer_size_to_send,
465 &still_pending_size);
466 gpr_mu_unlock(&ep->protector_mu);
467 if (result != TSI_OK) break;
468 cur += protected_buffer_size_to_send;
469 if (cur == end) {
470 flush_write_staging_buffer(ep, &cur, &end);
471 }
472 } while (still_pending_size > 0);
473 if (cur != GRPC_SLICE_START_PTR(ep->write_staging_buffer)) {
474 grpc_slice_buffer_add(
475 &ep->output_buffer,
476 grpc_slice_split_head(
477 &ep->write_staging_buffer,
478 static_cast<size_t>(
479 cur - GRPC_SLICE_START_PTR(ep->write_staging_buffer))));
480 }
481 }
482 }
483 }
484
485 if (result != TSI_OK) {
486 // TODO(yangg) do different things according to the error type?
487 grpc_slice_buffer_reset_and_unref(&ep->output_buffer);
488 grpc_core::ExecCtx::Run(
489 DEBUG_LOCATION, cb,
490 grpc_set_tsi_error_result(GRPC_ERROR_CREATE("Wrap failed"), result));
491 return;
492 }
493
494 grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb, arg,
495 max_frame_size);
496 }
497
endpoint_shutdown(grpc_endpoint * secure_ep,grpc_error_handle why)498 static void endpoint_shutdown(grpc_endpoint* secure_ep, grpc_error_handle why) {
499 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
500 grpc_endpoint_shutdown(ep->wrapped_ep, why);
501 }
502
endpoint_destroy(grpc_endpoint * secure_ep)503 static void endpoint_destroy(grpc_endpoint* secure_ep) {
504 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
505 ep->memory_owner.Reset();
506 SECURE_ENDPOINT_UNREF(ep, "destroy");
507 }
508
endpoint_add_to_pollset(grpc_endpoint * secure_ep,grpc_pollset * pollset)509 static void endpoint_add_to_pollset(grpc_endpoint* secure_ep,
510 grpc_pollset* pollset) {
511 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
512 grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset);
513 }
514
endpoint_add_to_pollset_set(grpc_endpoint * secure_ep,grpc_pollset_set * pollset_set)515 static void endpoint_add_to_pollset_set(grpc_endpoint* secure_ep,
516 grpc_pollset_set* pollset_set) {
517 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
518 grpc_endpoint_add_to_pollset_set(ep->wrapped_ep, pollset_set);
519 }
520
endpoint_delete_from_pollset_set(grpc_endpoint * secure_ep,grpc_pollset_set * pollset_set)521 static void endpoint_delete_from_pollset_set(grpc_endpoint* secure_ep,
522 grpc_pollset_set* pollset_set) {
523 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
524 grpc_endpoint_delete_from_pollset_set(ep->wrapped_ep, pollset_set);
525 }
526
endpoint_get_peer(grpc_endpoint * secure_ep)527 static absl::string_view endpoint_get_peer(grpc_endpoint* secure_ep) {
528 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
529 return grpc_endpoint_get_peer(ep->wrapped_ep);
530 }
531
endpoint_get_local_address(grpc_endpoint * secure_ep)532 static absl::string_view endpoint_get_local_address(grpc_endpoint* secure_ep) {
533 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
534 return grpc_endpoint_get_local_address(ep->wrapped_ep);
535 }
536
endpoint_get_fd(grpc_endpoint * secure_ep)537 static int endpoint_get_fd(grpc_endpoint* secure_ep) {
538 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
539 return grpc_endpoint_get_fd(ep->wrapped_ep);
540 }
541
endpoint_can_track_err(grpc_endpoint * secure_ep)542 static bool endpoint_can_track_err(grpc_endpoint* secure_ep) {
543 secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
544 return grpc_endpoint_can_track_err(ep->wrapped_ep);
545 }
546
547 static const grpc_endpoint_vtable vtable = {endpoint_read,
548 endpoint_write,
549 endpoint_add_to_pollset,
550 endpoint_add_to_pollset_set,
551 endpoint_delete_from_pollset_set,
552 endpoint_shutdown,
553 endpoint_destroy,
554 endpoint_get_peer,
555 endpoint_get_local_address,
556 endpoint_get_fd,
557 endpoint_can_track_err};
558
grpc_secure_endpoint_create(struct tsi_frame_protector * protector,struct tsi_zero_copy_grpc_protector * zero_copy_protector,grpc_endpoint * to_wrap,grpc_slice * leftover_slices,const grpc_channel_args * channel_args,size_t leftover_nslices)559 grpc_endpoint* grpc_secure_endpoint_create(
560 struct tsi_frame_protector* protector,
561 struct tsi_zero_copy_grpc_protector* zero_copy_protector,
562 grpc_endpoint* to_wrap, grpc_slice* leftover_slices,
563 const grpc_channel_args* channel_args, size_t leftover_nslices) {
564 secure_endpoint* ep =
565 new secure_endpoint(&vtable, protector, zero_copy_protector, to_wrap,
566 leftover_slices, channel_args, leftover_nslices);
567 return &ep->base;
568 }
569