1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/security/transport/secure_endpoint.h"
22 
23 #include <inttypes.h>
24 
25 #include <algorithm>
26 #include <atomic>
27 #include <memory>
28 
29 #include "absl/base/thread_annotations.h"
30 #include "absl/status/status.h"
31 #include "absl/strings/str_cat.h"
32 #include "absl/strings/string_view.h"
33 #include "absl/types/optional.h"
34 
35 #include <grpc/event_engine/memory_allocator.h>
36 #include <grpc/event_engine/memory_request.h>
37 #include <grpc/slice.h>
38 #include <grpc/slice_buffer.h>
39 #include <grpc/support/alloc.h>
40 #include <grpc/support/atm.h>
41 #include <grpc/support/log.h>
42 #include <grpc/support/sync.h>
43 
44 #include "src/core/lib/debug/trace.h"
45 #include "src/core/lib/gpr/string.h"
46 #include "src/core/lib/gprpp/debug_location.h"
47 #include "src/core/lib/gprpp/ref_counted_ptr.h"
48 #include "src/core/lib/gprpp/sync.h"
49 #include "src/core/lib/iomgr/closure.h"
50 #include "src/core/lib/iomgr/error.h"
51 #include "src/core/lib/iomgr/exec_ctx.h"
52 #include "src/core/lib/iomgr/iomgr_fwd.h"
53 #include "src/core/lib/resource_quota/api.h"
54 #include "src/core/lib/resource_quota/memory_quota.h"
55 #include "src/core/lib/resource_quota/resource_quota.h"
56 #include "src/core/lib/resource_quota/trace.h"
57 #include "src/core/lib/security/transport/tsi_error.h"
58 #include "src/core/lib/slice/slice.h"
59 #include "src/core/lib/slice/slice_string_helpers.h"
60 #include "src/core/tsi/transport_security_grpc.h"
61 #include "src/core/tsi/transport_security_interface.h"
62 
63 #define STAGING_BUFFER_SIZE 8192
64 
65 static void on_read(void* user_data, grpc_error_handle error);
66 
67 namespace {
68 struct secure_endpoint {
secure_endpoint__anonfcfd34c60111::secure_endpoint69   secure_endpoint(const grpc_endpoint_vtable* vtable,
70                   tsi_frame_protector* protector,
71                   tsi_zero_copy_grpc_protector* zero_copy_protector,
72                   grpc_endpoint* transport, grpc_slice* leftover_slices,
73                   const grpc_channel_args* channel_args,
74                   size_t leftover_nslices)
75       : wrapped_ep(transport),
76         protector(protector),
77         zero_copy_protector(zero_copy_protector) {
78     base.vtable = vtable;
79     gpr_mu_init(&protector_mu);
80     GRPC_CLOSURE_INIT(&on_read, ::on_read, this, grpc_schedule_on_exec_ctx);
81     grpc_slice_buffer_init(&source_buffer);
82     grpc_slice_buffer_init(&leftover_bytes);
83     for (size_t i = 0; i < leftover_nslices; i++) {
84       grpc_slice_buffer_add(&leftover_bytes,
85                             grpc_core::CSliceRef(leftover_slices[i]));
86     }
87     grpc_slice_buffer_init(&output_buffer);
88     memory_owner =
89         grpc_core::ResourceQuotaFromChannelArgs(channel_args)
90             ->memory_quota()
91             ->CreateMemoryOwner(absl::StrCat(grpc_endpoint_get_peer(transport),
92                                              ":secure_endpoint"));
93     self_reservation = memory_owner.MakeReservation(sizeof(*this));
94     if (zero_copy_protector) {
95       read_staging_buffer = grpc_empty_slice();
96       write_staging_buffer = grpc_empty_slice();
97     } else {
98       read_staging_buffer =
99           memory_owner.MakeSlice(grpc_core::MemoryRequest(STAGING_BUFFER_SIZE));
100       write_staging_buffer =
101           memory_owner.MakeSlice(grpc_core::MemoryRequest(STAGING_BUFFER_SIZE));
102     }
103     has_posted_reclaimer.store(false, std::memory_order_relaxed);
104     min_progress_size = 1;
105     grpc_slice_buffer_init(&protector_staging_buffer);
106     gpr_ref_init(&ref, 1);
107   }
108 
~secure_endpoint__anonfcfd34c60111::secure_endpoint109   ~secure_endpoint() {
110     grpc_endpoint_destroy(wrapped_ep);
111     tsi_frame_protector_destroy(protector);
112     tsi_zero_copy_grpc_protector_destroy(zero_copy_protector);
113     grpc_slice_buffer_destroy(&source_buffer);
114     grpc_slice_buffer_destroy(&leftover_bytes);
115     grpc_core::CSliceUnref(read_staging_buffer);
116     grpc_core::CSliceUnref(write_staging_buffer);
117     grpc_slice_buffer_destroy(&output_buffer);
118     grpc_slice_buffer_destroy(&protector_staging_buffer);
119     gpr_mu_destroy(&protector_mu);
120   }
121 
122   grpc_endpoint base;
123   grpc_endpoint* wrapped_ep;
124   struct tsi_frame_protector* protector;
125   struct tsi_zero_copy_grpc_protector* zero_copy_protector;
126   gpr_mu protector_mu;
127   grpc_core::Mutex read_mu;
128   grpc_core::Mutex write_mu;
129   // saved upper level callbacks and user_data.
130   grpc_closure* read_cb = nullptr;
131   grpc_closure* write_cb = nullptr;
132   grpc_closure on_read;
133   grpc_slice_buffer* read_buffer = nullptr;
134   grpc_slice_buffer source_buffer;
135   // saved handshaker leftover data to unprotect.
136   grpc_slice_buffer leftover_bytes;
137   // buffers for read and write
138   grpc_slice read_staging_buffer ABSL_GUARDED_BY(read_mu);
139   grpc_slice write_staging_buffer ABSL_GUARDED_BY(write_mu);
140   grpc_slice_buffer output_buffer;
141   grpc_core::MemoryOwner memory_owner;
142   grpc_core::MemoryAllocator::Reservation self_reservation;
143   std::atomic<bool> has_posted_reclaimer;
144   int min_progress_size;
145   grpc_slice_buffer protector_staging_buffer;
146   gpr_refcount ref;
147 };
148 }  // namespace
149 
150 grpc_core::TraceFlag grpc_trace_secure_endpoint(false, "secure_endpoint");
151 
destroy(secure_endpoint * ep)152 static void destroy(secure_endpoint* ep) { delete ep; }
153 
154 #ifndef NDEBUG
155 #define SECURE_ENDPOINT_UNREF(ep, reason) \
156   secure_endpoint_unref((ep), (reason), __FILE__, __LINE__)
157 #define SECURE_ENDPOINT_REF(ep, reason) \
158   secure_endpoint_ref((ep), (reason), __FILE__, __LINE__)
secure_endpoint_unref(secure_endpoint * ep,const char * reason,const char * file,int line)159 static void secure_endpoint_unref(secure_endpoint* ep, const char* reason,
160                                   const char* file, int line) {
161   if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_secure_endpoint)) {
162     gpr_atm val = gpr_atm_no_barrier_load(&ep->ref.count);
163     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
164             "SECENDP unref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep, reason, val,
165             val - 1);
166   }
167   if (gpr_unref(&ep->ref)) {
168     destroy(ep);
169   }
170 }
171 
secure_endpoint_ref(secure_endpoint * ep,const char * reason,const char * file,int line)172 static void secure_endpoint_ref(secure_endpoint* ep, const char* reason,
173                                 const char* file, int line) {
174   if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_secure_endpoint)) {
175     gpr_atm val = gpr_atm_no_barrier_load(&ep->ref.count);
176     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
177             "SECENDP   ref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep, reason, val,
178             val + 1);
179   }
180   gpr_ref(&ep->ref);
181 }
182 #else
183 #define SECURE_ENDPOINT_UNREF(ep, reason) secure_endpoint_unref((ep))
184 #define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep))
secure_endpoint_unref(secure_endpoint * ep)185 static void secure_endpoint_unref(secure_endpoint* ep) {
186   if (gpr_unref(&ep->ref)) {
187     destroy(ep);
188   }
189 }
190 
secure_endpoint_ref(secure_endpoint * ep)191 static void secure_endpoint_ref(secure_endpoint* ep) { gpr_ref(&ep->ref); }
192 #endif
193 
maybe_post_reclaimer(secure_endpoint * ep)194 static void maybe_post_reclaimer(secure_endpoint* ep) {
195   if (!ep->has_posted_reclaimer) {
196     SECURE_ENDPOINT_REF(ep, "benign_reclaimer");
197     ep->has_posted_reclaimer.exchange(true, std::memory_order_relaxed);
198     ep->memory_owner.PostReclaimer(
199         grpc_core::ReclamationPass::kBenign,
200         [ep](absl::optional<grpc_core::ReclamationSweep> sweep) {
201           if (sweep.has_value()) {
202             if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
203               gpr_log(GPR_INFO,
204                       "secure endpoint: benign reclamation to free memory");
205             }
206             grpc_slice temp_read_slice;
207             grpc_slice temp_write_slice;
208 
209             ep->read_mu.Lock();
210             temp_read_slice = ep->read_staging_buffer;
211             ep->read_staging_buffer = grpc_empty_slice();
212             ep->read_mu.Unlock();
213 
214             ep->write_mu.Lock();
215             temp_write_slice = ep->write_staging_buffer;
216             ep->write_staging_buffer = grpc_empty_slice();
217             ep->write_mu.Unlock();
218 
219             grpc_core::CSliceUnref(temp_read_slice);
220             grpc_core::CSliceUnref(temp_write_slice);
221             ep->has_posted_reclaimer.exchange(false, std::memory_order_relaxed);
222           }
223           SECURE_ENDPOINT_UNREF(ep, "benign_reclaimer");
224         });
225   }
226 }
227 
flush_read_staging_buffer(secure_endpoint * ep,uint8_t ** cur,uint8_t ** end)228 static void flush_read_staging_buffer(secure_endpoint* ep, uint8_t** cur,
229                                       uint8_t** end)
230     ABSL_EXCLUSIVE_LOCKS_REQUIRED(ep->read_mu) {
231   grpc_slice_buffer_add_indexed(ep->read_buffer, ep->read_staging_buffer);
232   ep->read_staging_buffer =
233       ep->memory_owner.MakeSlice(grpc_core::MemoryRequest(STAGING_BUFFER_SIZE));
234   *cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer);
235   *end = GRPC_SLICE_END_PTR(ep->read_staging_buffer);
236 }
237 
call_read_cb(secure_endpoint * ep,grpc_error_handle error)238 static void call_read_cb(secure_endpoint* ep, grpc_error_handle error) {
239   if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_secure_endpoint) &&
240       gpr_should_log(GPR_LOG_SEVERITY_INFO)) {
241     size_t i;
242     for (i = 0; i < ep->read_buffer->count; i++) {
243       char* data = grpc_dump_slice(ep->read_buffer->slices[i],
244                                    GPR_DUMP_HEX | GPR_DUMP_ASCII);
245       gpr_log(GPR_INFO, "READ %p: %s", ep, data);
246       gpr_free(data);
247     }
248   }
249   ep->read_buffer = nullptr;
250   grpc_core::ExecCtx::Run(DEBUG_LOCATION, ep->read_cb, error);
251   SECURE_ENDPOINT_UNREF(ep, "read");
252 }
253 
on_read(void * user_data,grpc_error_handle error)254 static void on_read(void* user_data, grpc_error_handle error) {
255   unsigned i;
256   uint8_t keep_looping = 0;
257   tsi_result result = TSI_OK;
258   secure_endpoint* ep = static_cast<secure_endpoint*>(user_data);
259 
260   {
261     grpc_core::MutexLock l(&ep->read_mu);
262     uint8_t* cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer);
263     uint8_t* end = GRPC_SLICE_END_PTR(ep->read_staging_buffer);
264 
265     if (!error.ok()) {
266       grpc_slice_buffer_reset_and_unref(ep->read_buffer);
267       call_read_cb(
268           ep, GRPC_ERROR_CREATE_REFERENCING("Secure read failed", &error, 1));
269       return;
270     }
271 
272     if (ep->zero_copy_protector != nullptr) {
273       // Use zero-copy grpc protector to unprotect.
274       int min_progress_size = 1;
275       // Get the size of the last frame which is not yet fully decrypted.
276       // This estimated frame size is stored in ep->min_progress_size which is
277       // passed to the TCP layer to indicate the minimum number of
278       // bytes that need to be read to make meaningful progress. This would
279       // avoid reading of small slices from the network.
280       // TODO(vigneshbabu): Set min_progress_size in the regular (non-zero-copy)
281       // frame protector code path as well.
282       result = tsi_zero_copy_grpc_protector_unprotect(
283           ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer,
284           &min_progress_size);
285       min_progress_size = std::max(1, min_progress_size);
286       ep->min_progress_size = result != TSI_OK ? 1 : min_progress_size;
287     } else {
288       // Use frame protector to unprotect.
289       // TODO(yangg) check error, maybe bail out early
290       for (i = 0; i < ep->source_buffer.count; i++) {
291         grpc_slice encrypted = ep->source_buffer.slices[i];
292         uint8_t* message_bytes = GRPC_SLICE_START_PTR(encrypted);
293         size_t message_size = GRPC_SLICE_LENGTH(encrypted);
294 
295         while (message_size > 0 || keep_looping) {
296           size_t unprotected_buffer_size_written =
297               static_cast<size_t>(end - cur);
298           size_t processed_message_size = message_size;
299           gpr_mu_lock(&ep->protector_mu);
300           result = tsi_frame_protector_unprotect(
301               ep->protector, message_bytes, &processed_message_size, cur,
302               &unprotected_buffer_size_written);
303           gpr_mu_unlock(&ep->protector_mu);
304           if (result != TSI_OK) {
305             gpr_log(GPR_ERROR, "Decryption error: %s",
306                     tsi_result_to_string(result));
307             break;
308           }
309           message_bytes += processed_message_size;
310           message_size -= processed_message_size;
311           cur += unprotected_buffer_size_written;
312 
313           if (cur == end) {
314             flush_read_staging_buffer(ep, &cur, &end);
315             // Force to enter the loop again to extract buffered bytes in
316             // protector. The bytes could be buffered because of running out of
317             // staging_buffer. If this happens at the end of all slices, doing
318             // another unprotect avoids leaving data in the protector.
319             keep_looping = 1;
320           } else if (unprotected_buffer_size_written > 0) {
321             keep_looping = 1;
322           } else {
323             keep_looping = 0;
324           }
325         }
326         if (result != TSI_OK) break;
327       }
328 
329       if (cur != GRPC_SLICE_START_PTR(ep->read_staging_buffer)) {
330         grpc_slice_buffer_add(
331             ep->read_buffer,
332             grpc_slice_split_head(
333                 &ep->read_staging_buffer,
334                 static_cast<size_t>(
335                     cur - GRPC_SLICE_START_PTR(ep->read_staging_buffer))));
336       }
337     }
338   }
339 
340   // TODO(yangg) experiment with moving this block after read_cb to see if it
341   // helps latency
342   grpc_slice_buffer_reset_and_unref(&ep->source_buffer);
343 
344   if (result != TSI_OK) {
345     grpc_slice_buffer_reset_and_unref(ep->read_buffer);
346     call_read_cb(ep, grpc_set_tsi_error_result(
347                          GRPC_ERROR_CREATE("Unwrap failed"), result));
348     return;
349   }
350 
351   call_read_cb(ep, absl::OkStatus());
352 }
353 
endpoint_read(grpc_endpoint * secure_ep,grpc_slice_buffer * slices,grpc_closure * cb,bool urgent,int)354 static void endpoint_read(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
355                           grpc_closure* cb, bool urgent,
356                           int /*min_progress_size*/) {
357   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
358   ep->read_cb = cb;
359   ep->read_buffer = slices;
360   grpc_slice_buffer_reset_and_unref(ep->read_buffer);
361 
362   SECURE_ENDPOINT_REF(ep, "read");
363   if (ep->leftover_bytes.count) {
364     grpc_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer);
365     GPR_ASSERT(ep->leftover_bytes.count == 0);
366     on_read(ep, absl::OkStatus());
367     return;
368   }
369 
370   grpc_endpoint_read(ep->wrapped_ep, &ep->source_buffer, &ep->on_read, urgent,
371                      /*min_progress_size=*/ep->min_progress_size);
372 }
373 
flush_write_staging_buffer(secure_endpoint * ep,uint8_t ** cur,uint8_t ** end)374 static void flush_write_staging_buffer(secure_endpoint* ep, uint8_t** cur,
375                                        uint8_t** end)
376     ABSL_EXCLUSIVE_LOCKS_REQUIRED(ep->write_mu) {
377   grpc_slice_buffer_add_indexed(&ep->output_buffer, ep->write_staging_buffer);
378   ep->write_staging_buffer =
379       ep->memory_owner.MakeSlice(grpc_core::MemoryRequest(STAGING_BUFFER_SIZE));
380   *cur = GRPC_SLICE_START_PTR(ep->write_staging_buffer);
381   *end = GRPC_SLICE_END_PTR(ep->write_staging_buffer);
382   maybe_post_reclaimer(ep);
383 }
384 
endpoint_write(grpc_endpoint * secure_ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg,int max_frame_size)385 static void endpoint_write(grpc_endpoint* secure_ep, grpc_slice_buffer* slices,
386                            grpc_closure* cb, void* arg, int max_frame_size) {
387   unsigned i;
388   tsi_result result = TSI_OK;
389   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
390 
391   {
392     grpc_core::MutexLock l(&ep->write_mu);
393     uint8_t* cur = GRPC_SLICE_START_PTR(ep->write_staging_buffer);
394     uint8_t* end = GRPC_SLICE_END_PTR(ep->write_staging_buffer);
395 
396     grpc_slice_buffer_reset_and_unref(&ep->output_buffer);
397 
398     if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_secure_endpoint) &&
399         gpr_should_log(GPR_LOG_SEVERITY_INFO)) {
400       for (i = 0; i < slices->count; i++) {
401         char* data =
402             grpc_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
403         gpr_log(GPR_INFO, "WRITE %p: %s", ep, data);
404         gpr_free(data);
405       }
406     }
407 
408     if (ep->zero_copy_protector != nullptr) {
409       // Use zero-copy grpc protector to protect.
410       result = TSI_OK;
411       // Break the input slices into chunks of size = max_frame_size and call
412       // tsi_zero_copy_grpc_protector_protect on each chunk. This ensures that
413       // the protector cannot create frames larger than the specified
414       // max_frame_size.
415       while (slices->length > static_cast<size_t>(max_frame_size) &&
416              result == TSI_OK) {
417         grpc_slice_buffer_move_first(slices,
418                                      static_cast<size_t>(max_frame_size),
419                                      &ep->protector_staging_buffer);
420         result = tsi_zero_copy_grpc_protector_protect(
421             ep->zero_copy_protector, &ep->protector_staging_buffer,
422             &ep->output_buffer);
423       }
424       if (result == TSI_OK && slices->length > 0) {
425         result = tsi_zero_copy_grpc_protector_protect(
426             ep->zero_copy_protector, slices, &ep->output_buffer);
427       }
428       grpc_slice_buffer_reset_and_unref(&ep->protector_staging_buffer);
429     } else {
430       // Use frame protector to protect.
431       for (i = 0; i < slices->count; i++) {
432         grpc_slice plain = slices->slices[i];
433         uint8_t* message_bytes = GRPC_SLICE_START_PTR(plain);
434         size_t message_size = GRPC_SLICE_LENGTH(plain);
435         while (message_size > 0) {
436           size_t protected_buffer_size_to_send = static_cast<size_t>(end - cur);
437           size_t processed_message_size = message_size;
438           gpr_mu_lock(&ep->protector_mu);
439           result = tsi_frame_protector_protect(ep->protector, message_bytes,
440                                                &processed_message_size, cur,
441                                                &protected_buffer_size_to_send);
442           gpr_mu_unlock(&ep->protector_mu);
443           if (result != TSI_OK) {
444             gpr_log(GPR_ERROR, "Encryption error: %s",
445                     tsi_result_to_string(result));
446             break;
447           }
448           message_bytes += processed_message_size;
449           message_size -= processed_message_size;
450           cur += protected_buffer_size_to_send;
451 
452           if (cur == end) {
453             flush_write_staging_buffer(ep, &cur, &end);
454           }
455         }
456         if (result != TSI_OK) break;
457       }
458       if (result == TSI_OK) {
459         size_t still_pending_size;
460         do {
461           size_t protected_buffer_size_to_send = static_cast<size_t>(end - cur);
462           gpr_mu_lock(&ep->protector_mu);
463           result = tsi_frame_protector_protect_flush(
464               ep->protector, cur, &protected_buffer_size_to_send,
465               &still_pending_size);
466           gpr_mu_unlock(&ep->protector_mu);
467           if (result != TSI_OK) break;
468           cur += protected_buffer_size_to_send;
469           if (cur == end) {
470             flush_write_staging_buffer(ep, &cur, &end);
471           }
472         } while (still_pending_size > 0);
473         if (cur != GRPC_SLICE_START_PTR(ep->write_staging_buffer)) {
474           grpc_slice_buffer_add(
475               &ep->output_buffer,
476               grpc_slice_split_head(
477                   &ep->write_staging_buffer,
478                   static_cast<size_t>(
479                       cur - GRPC_SLICE_START_PTR(ep->write_staging_buffer))));
480         }
481       }
482     }
483   }
484 
485   if (result != TSI_OK) {
486     // TODO(yangg) do different things according to the error type?
487     grpc_slice_buffer_reset_and_unref(&ep->output_buffer);
488     grpc_core::ExecCtx::Run(
489         DEBUG_LOCATION, cb,
490         grpc_set_tsi_error_result(GRPC_ERROR_CREATE("Wrap failed"), result));
491     return;
492   }
493 
494   grpc_endpoint_write(ep->wrapped_ep, &ep->output_buffer, cb, arg,
495                       max_frame_size);
496 }
497 
endpoint_shutdown(grpc_endpoint * secure_ep,grpc_error_handle why)498 static void endpoint_shutdown(grpc_endpoint* secure_ep, grpc_error_handle why) {
499   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
500   grpc_endpoint_shutdown(ep->wrapped_ep, why);
501 }
502 
endpoint_destroy(grpc_endpoint * secure_ep)503 static void endpoint_destroy(grpc_endpoint* secure_ep) {
504   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
505   ep->memory_owner.Reset();
506   SECURE_ENDPOINT_UNREF(ep, "destroy");
507 }
508 
endpoint_add_to_pollset(grpc_endpoint * secure_ep,grpc_pollset * pollset)509 static void endpoint_add_to_pollset(grpc_endpoint* secure_ep,
510                                     grpc_pollset* pollset) {
511   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
512   grpc_endpoint_add_to_pollset(ep->wrapped_ep, pollset);
513 }
514 
endpoint_add_to_pollset_set(grpc_endpoint * secure_ep,grpc_pollset_set * pollset_set)515 static void endpoint_add_to_pollset_set(grpc_endpoint* secure_ep,
516                                         grpc_pollset_set* pollset_set) {
517   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
518   grpc_endpoint_add_to_pollset_set(ep->wrapped_ep, pollset_set);
519 }
520 
endpoint_delete_from_pollset_set(grpc_endpoint * secure_ep,grpc_pollset_set * pollset_set)521 static void endpoint_delete_from_pollset_set(grpc_endpoint* secure_ep,
522                                              grpc_pollset_set* pollset_set) {
523   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
524   grpc_endpoint_delete_from_pollset_set(ep->wrapped_ep, pollset_set);
525 }
526 
endpoint_get_peer(grpc_endpoint * secure_ep)527 static absl::string_view endpoint_get_peer(grpc_endpoint* secure_ep) {
528   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
529   return grpc_endpoint_get_peer(ep->wrapped_ep);
530 }
531 
endpoint_get_local_address(grpc_endpoint * secure_ep)532 static absl::string_view endpoint_get_local_address(grpc_endpoint* secure_ep) {
533   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
534   return grpc_endpoint_get_local_address(ep->wrapped_ep);
535 }
536 
endpoint_get_fd(grpc_endpoint * secure_ep)537 static int endpoint_get_fd(grpc_endpoint* secure_ep) {
538   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
539   return grpc_endpoint_get_fd(ep->wrapped_ep);
540 }
541 
endpoint_can_track_err(grpc_endpoint * secure_ep)542 static bool endpoint_can_track_err(grpc_endpoint* secure_ep) {
543   secure_endpoint* ep = reinterpret_cast<secure_endpoint*>(secure_ep);
544   return grpc_endpoint_can_track_err(ep->wrapped_ep);
545 }
546 
547 static const grpc_endpoint_vtable vtable = {endpoint_read,
548                                             endpoint_write,
549                                             endpoint_add_to_pollset,
550                                             endpoint_add_to_pollset_set,
551                                             endpoint_delete_from_pollset_set,
552                                             endpoint_shutdown,
553                                             endpoint_destroy,
554                                             endpoint_get_peer,
555                                             endpoint_get_local_address,
556                                             endpoint_get_fd,
557                                             endpoint_can_track_err};
558 
grpc_secure_endpoint_create(struct tsi_frame_protector * protector,struct tsi_zero_copy_grpc_protector * zero_copy_protector,grpc_endpoint * to_wrap,grpc_slice * leftover_slices,const grpc_channel_args * channel_args,size_t leftover_nslices)559 grpc_endpoint* grpc_secure_endpoint_create(
560     struct tsi_frame_protector* protector,
561     struct tsi_zero_copy_grpc_protector* zero_copy_protector,
562     grpc_endpoint* to_wrap, grpc_slice* leftover_slices,
563     const grpc_channel_args* channel_args, size_t leftover_nslices) {
564   secure_endpoint* ep =
565       new secure_endpoint(&vtable, protector, zero_copy_protector, to_wrap,
566                           leftover_slices, channel_args, leftover_nslices);
567   return &ep->base;
568 }
569