xref: /aosp_15_r20/external/grpc-grpc/test/core/end2end/fixtures/proxy.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "test/core/end2end/fixtures/proxy.h"
20 
21 #include <string.h>
22 
23 #include <string>
24 #include <utility>
25 
26 #include <grpc/byte_buffer.h>
27 #include <grpc/impl/channel_arg_names.h>
28 #include <grpc/impl/propagation_bits.h>
29 #include <grpc/slice.h>
30 #include <grpc/status.h>
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/log.h>
33 #include <grpc/support/sync.h>
34 #include <grpc/support/time.h>
35 
36 #include "src/core/lib/channel/channel_args.h"
37 #include "src/core/lib/gprpp/crash.h"
38 #include "src/core/lib/gprpp/host_port.h"
39 #include "src/core/lib/gprpp/thd.h"
40 #include "src/core/lib/surface/call.h"
41 #include "test/core/util/port.h"
42 
43 struct grpc_end2end_proxy {
grpc_end2end_proxygrpc_end2end_proxy44   grpc_end2end_proxy()
45       : cq(nullptr),
46         server(nullptr),
47         client(nullptr),
48         shutdown(false),
49         new_call(nullptr) {
50     memset(&new_call_details, 0, sizeof(new_call_details));
51     memset(&new_call_metadata, 0, sizeof(new_call_metadata));
52   }
53   grpc_core::Thread thd;
54   std::string proxy_port;
55   std::string server_port;
56   grpc_completion_queue* cq;
57   grpc_server* server;
58   grpc_channel* client;
59 
60   int shutdown;
61 
62   // requested call
63   grpc_call* new_call;
64   grpc_call_details new_call_details;
65   grpc_metadata_array new_call_metadata;
66 };
67 
68 typedef struct {
69   void (*func)(void* arg, int success);
70   void* arg;
71 } closure;
72 
73 typedef struct {
74   gpr_refcount refs;
75   grpc_end2end_proxy* proxy;
76 
77   grpc_call* c2p;
78   grpc_call* p2s;
79 
80   grpc_metadata_array c2p_initial_metadata;
81   grpc_metadata_array p2s_initial_metadata;
82 
83   grpc_byte_buffer* c2p_msg;
84   grpc_byte_buffer* p2s_msg;
85 
86   grpc_metadata_array p2s_trailing_metadata;
87   grpc_status_code p2s_status;
88   grpc_slice p2s_status_details;
89 
90   int c2p_server_cancelled;
91 } proxy_call;
92 
93 static void thread_main(void* arg);
94 static void request_call(grpc_end2end_proxy* proxy);
95 
grpc_end2end_proxy_create(const grpc_end2end_proxy_def * def,const grpc_channel_args * client_args,const grpc_channel_args * server_args)96 grpc_end2end_proxy* grpc_end2end_proxy_create(
97     const grpc_end2end_proxy_def* def, const grpc_channel_args* client_args,
98     const grpc_channel_args* server_args) {
99   int proxy_port = grpc_pick_unused_port_or_die();
100   int server_port = grpc_pick_unused_port_or_die();
101 
102   grpc_end2end_proxy* proxy = new grpc_end2end_proxy();
103 
104   proxy->proxy_port = grpc_core::JoinHostPort("localhost", proxy_port);
105   proxy->server_port = grpc_core::JoinHostPort("localhost", server_port);
106 
107   gpr_log(GPR_DEBUG, "PROXY ADDR:%s BACKEND:%s", proxy->proxy_port.c_str(),
108           proxy->server_port.c_str());
109 
110   proxy->cq = grpc_completion_queue_create_for_next(nullptr);
111   proxy->server = def->create_server(proxy->proxy_port.c_str(), server_args);
112 
113   const char* arg_to_remove = GRPC_ARG_ENABLE_RETRIES;
114   grpc_arg arg_to_add = grpc_channel_arg_integer_create(
115       const_cast<char*>(GRPC_ARG_ENABLE_RETRIES), 0);
116   const grpc_channel_args* proxy_client_args =
117       grpc_channel_args_copy_and_add_and_remove(client_args, &arg_to_remove, 1,
118                                                 &arg_to_add, 1);
119   proxy->client =
120       def->create_client(proxy->server_port.c_str(), proxy_client_args);
121   grpc_channel_args_destroy(proxy_client_args);
122 
123   grpc_server_register_completion_queue(proxy->server, proxy->cq, nullptr);
124   grpc_server_start(proxy->server);
125 
126   grpc_call_details_init(&proxy->new_call_details);
127   proxy->thd = grpc_core::Thread("grpc_end2end_proxy", thread_main, proxy);
128   proxy->thd.Start();
129 
130   request_call(proxy);
131 
132   return proxy;
133 }
134 
new_closure(void (* func)(void * arg,int success),void * arg)135 static closure* new_closure(void (*func)(void* arg, int success), void* arg) {
136   closure* cl = static_cast<closure*>(gpr_malloc(sizeof(*cl)));
137   cl->func = func;
138   cl->arg = arg;
139   return cl;
140 }
141 
shutdown_complete(void * arg,int)142 static void shutdown_complete(void* arg, int /*success*/) {
143   grpc_end2end_proxy* proxy = static_cast<grpc_end2end_proxy*>(arg);
144   proxy->shutdown = 1;
145   grpc_completion_queue_shutdown(proxy->cq);
146 }
147 
grpc_end2end_proxy_destroy(grpc_end2end_proxy * proxy)148 void grpc_end2end_proxy_destroy(grpc_end2end_proxy* proxy) {
149   grpc_server_shutdown_and_notify(proxy->server, proxy->cq,
150                                   new_closure(shutdown_complete, proxy));
151   proxy->thd.Join();
152   grpc_server_destroy(proxy->server);
153   grpc_channel_destroy(proxy->client);
154   grpc_completion_queue_destroy(proxy->cq);
155   grpc_call_details_destroy(&proxy->new_call_details);
156   delete proxy;
157 }
158 
unrefpc(proxy_call * pc,const char *)159 static void unrefpc(proxy_call* pc, const char* /*reason*/) {
160   if (gpr_unref(&pc->refs)) {
161     grpc_call_unref(pc->c2p);
162     grpc_call_unref(pc->p2s);
163     grpc_metadata_array_destroy(&pc->c2p_initial_metadata);
164     grpc_metadata_array_destroy(&pc->p2s_initial_metadata);
165     grpc_metadata_array_destroy(&pc->p2s_trailing_metadata);
166     grpc_slice_unref(pc->p2s_status_details);
167     gpr_free(pc);
168   }
169 }
170 
refpc(proxy_call * pc,const char *)171 static void refpc(proxy_call* pc, const char* /*reason*/) {
172   gpr_ref(&pc->refs);
173 }
174 
on_c2p_sent_initial_metadata(void * arg,int)175 static void on_c2p_sent_initial_metadata(void* arg, int /*success*/) {
176   proxy_call* pc = static_cast<proxy_call*>(arg);
177   unrefpc(pc, "on_c2p_sent_initial_metadata");
178 }
179 
on_p2s_recv_initial_metadata(void * arg,int)180 static void on_p2s_recv_initial_metadata(void* arg, int /*success*/) {
181   proxy_call* pc = static_cast<proxy_call*>(arg);
182   grpc_op op;
183   grpc_call_error err;
184 
185   memset(&op, 0, sizeof(op));
186   if (!pc->proxy->shutdown && !grpc_call_is_trailers_only(pc->p2s)) {
187     op.op = GRPC_OP_SEND_INITIAL_METADATA;
188     op.flags = 0;
189     op.reserved = nullptr;
190     op.data.send_initial_metadata.count = pc->p2s_initial_metadata.count;
191     op.data.send_initial_metadata.metadata = pc->p2s_initial_metadata.metadata;
192     refpc(pc, "on_c2p_sent_initial_metadata");
193     err = grpc_call_start_batch(pc->c2p, &op, 1,
194                                 new_closure(on_c2p_sent_initial_metadata, pc),
195                                 nullptr);
196     GPR_ASSERT(err == GRPC_CALL_OK);
197   }
198 
199   unrefpc(pc, "on_p2s_recv_initial_metadata");
200 }
201 
on_p2s_sent_initial_metadata(void * arg,int)202 static void on_p2s_sent_initial_metadata(void* arg, int /*success*/) {
203   proxy_call* pc = static_cast<proxy_call*>(arg);
204   unrefpc(pc, "on_p2s_sent_initial_metadata");
205 }
206 
207 static void on_c2p_recv_msg(void* arg, int success);
208 
on_p2s_sent_message(void * arg,int success)209 static void on_p2s_sent_message(void* arg, int success) {
210   proxy_call* pc = static_cast<proxy_call*>(arg);
211   grpc_op op;
212   grpc_call_error err;
213 
214   grpc_byte_buffer_destroy(std::exchange(pc->c2p_msg, nullptr));
215   if (!pc->proxy->shutdown && success) {
216     op.op = GRPC_OP_RECV_MESSAGE;
217     op.flags = 0;
218     op.reserved = nullptr;
219     op.data.recv_message.recv_message = &pc->c2p_msg;
220     refpc(pc, "on_c2p_recv_msg");
221     err = grpc_call_start_batch(pc->c2p, &op, 1,
222                                 new_closure(on_c2p_recv_msg, pc), nullptr);
223     GPR_ASSERT(err == GRPC_CALL_OK);
224   }
225 
226   unrefpc(pc, "on_p2s_sent_message");
227 }
228 
on_p2s_sent_close(void * arg,int)229 static void on_p2s_sent_close(void* arg, int /*success*/) {
230   proxy_call* pc = static_cast<proxy_call*>(arg);
231   unrefpc(pc, "on_p2s_sent_close");
232 }
233 
on_c2p_recv_msg(void * arg,int success)234 static void on_c2p_recv_msg(void* arg, int success) {
235   proxy_call* pc = static_cast<proxy_call*>(arg);
236   grpc_op op;
237   grpc_call_error err;
238 
239   if (!pc->proxy->shutdown && success) {
240     if (pc->c2p_msg != nullptr) {
241       op.op = GRPC_OP_SEND_MESSAGE;
242       op.flags = 0;
243       op.reserved = nullptr;
244       op.data.send_message.send_message = pc->c2p_msg;
245       refpc(pc, "on_p2s_sent_message");
246       err = grpc_call_start_batch(
247           pc->p2s, &op, 1, new_closure(on_p2s_sent_message, pc), nullptr);
248       GPR_ASSERT(err == GRPC_CALL_OK);
249     } else {
250       op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
251       op.flags = 0;
252       op.reserved = nullptr;
253       refpc(pc, "on_p2s_sent_close");
254       err = grpc_call_start_batch(pc->p2s, &op, 1,
255                                   new_closure(on_p2s_sent_close, pc), nullptr);
256       GPR_ASSERT(err == GRPC_CALL_OK);
257     }
258   } else {
259     if (pc->c2p_msg != nullptr) {
260       grpc_byte_buffer_destroy(pc->c2p_msg);
261     }
262   }
263 
264   unrefpc(pc, "on_c2p_recv_msg");
265 }
266 
267 static void on_p2s_recv_msg(void* arg, int success);
268 
on_c2p_sent_message(void * arg,int success)269 static void on_c2p_sent_message(void* arg, int success) {
270   proxy_call* pc = static_cast<proxy_call*>(arg);
271   grpc_op op;
272   grpc_call_error err;
273 
274   grpc_byte_buffer_destroy(pc->p2s_msg);
275   if (!pc->proxy->shutdown && success) {
276     op.op = GRPC_OP_RECV_MESSAGE;
277     op.flags = 0;
278     op.reserved = nullptr;
279     op.data.recv_message.recv_message = &pc->p2s_msg;
280     refpc(pc, "on_p2s_recv_msg");
281     err = grpc_call_start_batch(pc->p2s, &op, 1,
282                                 new_closure(on_p2s_recv_msg, pc), nullptr);
283     GPR_ASSERT(err == GRPC_CALL_OK);
284   }
285 
286   unrefpc(pc, "on_c2p_sent_message");
287 }
288 
on_p2s_recv_msg(void * arg,int success)289 static void on_p2s_recv_msg(void* arg, int success) {
290   proxy_call* pc = static_cast<proxy_call*>(arg);
291   grpc_op op;
292   grpc_call_error err;
293 
294   if (!pc->proxy->shutdown && success && pc->p2s_msg) {
295     op.op = GRPC_OP_SEND_MESSAGE;
296     op.flags = 0;
297     op.reserved = nullptr;
298     op.data.send_message.send_message = pc->p2s_msg;
299     refpc(pc, "on_c2p_sent_message");
300     err = grpc_call_start_batch(pc->c2p, &op, 1,
301                                 new_closure(on_c2p_sent_message, pc), nullptr);
302     GPR_ASSERT(err == GRPC_CALL_OK);
303   } else {
304     grpc_byte_buffer_destroy(pc->p2s_msg);
305   }
306   unrefpc(pc, "on_p2s_recv_msg");
307 }
308 
on_c2p_sent_status(void * arg,int)309 static void on_c2p_sent_status(void* arg, int /*success*/) {
310   proxy_call* pc = static_cast<proxy_call*>(arg);
311   unrefpc(pc, "on_c2p_sent_status");
312 }
313 
on_p2s_status(void * arg,int success)314 static void on_p2s_status(void* arg, int success) {
315   proxy_call* pc = static_cast<proxy_call*>(arg);
316   grpc_op op[2];  // Possibly send empty initial metadata also if trailers-only
317   grpc_call_error err;
318 
319   memset(op, 0, sizeof(op));
320 
321   if (!pc->proxy->shutdown) {
322     GPR_ASSERT(success);
323 
324     int op_count = 0;
325     if (grpc_call_is_trailers_only(pc->p2s)) {
326       op[op_count].op = GRPC_OP_SEND_INITIAL_METADATA;
327       op_count++;
328     }
329 
330     op[op_count].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
331     op[op_count].flags = 0;
332     op[op_count].reserved = nullptr;
333     op[op_count].data.send_status_from_server.trailing_metadata_count =
334         pc->p2s_trailing_metadata.count;
335     op[op_count].data.send_status_from_server.trailing_metadata =
336         pc->p2s_trailing_metadata.metadata;
337     op[op_count].data.send_status_from_server.status = pc->p2s_status;
338     op[op_count].data.send_status_from_server.status_details =
339         &pc->p2s_status_details;
340     op_count++;
341     refpc(pc, "on_c2p_sent_status");
342     err = grpc_call_start_batch(pc->c2p, op, op_count,
343                                 new_closure(on_c2p_sent_status, pc), nullptr);
344     GPR_ASSERT(err == GRPC_CALL_OK);
345   }
346 
347   unrefpc(pc, "on_p2s_status");
348 }
349 
on_c2p_closed(void * arg,int)350 static void on_c2p_closed(void* arg, int /*success*/) {
351   proxy_call* pc = static_cast<proxy_call*>(arg);
352   unrefpc(pc, "on_c2p_closed");
353 }
354 
on_new_call(void * arg,int success)355 static void on_new_call(void* arg, int success) {
356   grpc_end2end_proxy* proxy = static_cast<grpc_end2end_proxy*>(arg);
357   grpc_call_error err;
358 
359   if (success) {
360     grpc_op op;
361     memset(&op, 0, sizeof(op));
362     proxy_call* pc = static_cast<proxy_call*>(gpr_malloc(sizeof(*pc)));
363     memset(pc, 0, sizeof(*pc));
364     pc->proxy = proxy;
365     std::swap(pc->c2p_initial_metadata, proxy->new_call_metadata);
366     pc->c2p = proxy->new_call;
367     pc->p2s = grpc_channel_create_call(
368         proxy->client, pc->c2p, GRPC_PROPAGATE_DEFAULTS, proxy->cq,
369         proxy->new_call_details.method, &proxy->new_call_details.host,
370         proxy->new_call_details.deadline, nullptr);
371     gpr_ref_init(&pc->refs, 1);
372 
373     op.reserved = nullptr;
374 
375     op.op = GRPC_OP_RECV_INITIAL_METADATA;
376     op.flags = 0;
377     op.data.recv_initial_metadata.recv_initial_metadata =
378         &pc->p2s_initial_metadata;
379     refpc(pc, "on_p2s_recv_initial_metadata");
380     err = grpc_call_start_batch(pc->p2s, &op, 1,
381                                 new_closure(on_p2s_recv_initial_metadata, pc),
382                                 nullptr);
383     GPR_ASSERT(err == GRPC_CALL_OK);
384 
385     op.op = GRPC_OP_SEND_INITIAL_METADATA;
386     op.flags = 0;
387     op.data.send_initial_metadata.count = pc->c2p_initial_metadata.count;
388     op.data.send_initial_metadata.metadata = pc->c2p_initial_metadata.metadata;
389     refpc(pc, "on_p2s_sent_initial_metadata");
390     err = grpc_call_start_batch(pc->p2s, &op, 1,
391                                 new_closure(on_p2s_sent_initial_metadata, pc),
392                                 nullptr);
393     GPR_ASSERT(err == GRPC_CALL_OK);
394 
395     op.op = GRPC_OP_RECV_MESSAGE;
396     op.flags = 0;
397     op.data.recv_message.recv_message = &pc->c2p_msg;
398     refpc(pc, "on_c2p_recv_msg");
399     err = grpc_call_start_batch(pc->c2p, &op, 1,
400                                 new_closure(on_c2p_recv_msg, pc), nullptr);
401     GPR_ASSERT(err == GRPC_CALL_OK);
402 
403     op.op = GRPC_OP_RECV_MESSAGE;
404     op.flags = 0;
405     op.data.recv_message.recv_message = &pc->p2s_msg;
406     refpc(pc, "on_p2s_recv_msg");
407     err = grpc_call_start_batch(pc->p2s, &op, 1,
408                                 new_closure(on_p2s_recv_msg, pc), nullptr);
409     GPR_ASSERT(err == GRPC_CALL_OK);
410 
411     op.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
412     op.flags = 0;
413     op.data.recv_status_on_client.trailing_metadata =
414         &pc->p2s_trailing_metadata;
415     op.data.recv_status_on_client.status = &pc->p2s_status;
416     op.data.recv_status_on_client.status_details = &pc->p2s_status_details;
417     refpc(pc, "on_p2s_status");
418     err = grpc_call_start_batch(pc->p2s, &op, 1, new_closure(on_p2s_status, pc),
419                                 nullptr);
420     GPR_ASSERT(err == GRPC_CALL_OK);
421 
422     op.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
423     op.flags = 0;
424     op.data.recv_close_on_server.cancelled = &pc->c2p_server_cancelled;
425     refpc(pc, "on_c2p_closed");
426     err = grpc_call_start_batch(pc->c2p, &op, 1, new_closure(on_c2p_closed, pc),
427                                 nullptr);
428     GPR_ASSERT(err == GRPC_CALL_OK);
429 
430     request_call(proxy);
431 
432     grpc_call_details_destroy(&proxy->new_call_details);
433     grpc_call_details_init(&proxy->new_call_details);
434 
435     unrefpc(pc, "init");
436   } else {
437     GPR_ASSERT(proxy->new_call == nullptr);
438   }
439 }
440 
request_call(grpc_end2end_proxy * proxy)441 static void request_call(grpc_end2end_proxy* proxy) {
442   proxy->new_call = nullptr;
443   GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
444                                  proxy->server, &proxy->new_call,
445                                  &proxy->new_call_details,
446                                  &proxy->new_call_metadata, proxy->cq,
447                                  proxy->cq, new_closure(on_new_call, proxy)));
448 }
449 
thread_main(void * arg)450 static void thread_main(void* arg) {
451   grpc_end2end_proxy* proxy = static_cast<grpc_end2end_proxy*>(arg);
452   closure* cl;
453   for (;;) {
454     grpc_event ev = grpc_completion_queue_next(
455         proxy->cq, gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr);
456     switch (ev.type) {
457       case GRPC_QUEUE_TIMEOUT:
458         grpc_core::Crash("Should never reach here");
459       case GRPC_QUEUE_SHUTDOWN:
460         return;
461       case GRPC_OP_COMPLETE:
462         cl = static_cast<closure*>(ev.tag);
463         cl->func(cl->arg, ev.success);
464         gpr_free(cl);
465         break;
466     }
467   }
468 }
469 
grpc_end2end_proxy_get_client_target(grpc_end2end_proxy * proxy)470 const char* grpc_end2end_proxy_get_client_target(grpc_end2end_proxy* proxy) {
471   return proxy->proxy_port.c_str();
472 }
473 
grpc_end2end_proxy_get_server_port(grpc_end2end_proxy * proxy)474 const char* grpc_end2end_proxy_get_server_port(grpc_end2end_proxy* proxy) {
475   return proxy->server_port.c_str();
476 }
477