1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "test/core/end2end/fixtures/proxy.h"
20
21 #include <string.h>
22
23 #include <string>
24 #include <utility>
25
26 #include <grpc/byte_buffer.h>
27 #include <grpc/impl/channel_arg_names.h>
28 #include <grpc/impl/propagation_bits.h>
29 #include <grpc/slice.h>
30 #include <grpc/status.h>
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/log.h>
33 #include <grpc/support/sync.h>
34 #include <grpc/support/time.h>
35
36 #include "src/core/lib/channel/channel_args.h"
37 #include "src/core/lib/gprpp/crash.h"
38 #include "src/core/lib/gprpp/host_port.h"
39 #include "src/core/lib/gprpp/thd.h"
40 #include "src/core/lib/surface/call.h"
41 #include "test/core/util/port.h"
42
43 struct grpc_end2end_proxy {
grpc_end2end_proxygrpc_end2end_proxy44 grpc_end2end_proxy()
45 : cq(nullptr),
46 server(nullptr),
47 client(nullptr),
48 shutdown(false),
49 new_call(nullptr) {
50 memset(&new_call_details, 0, sizeof(new_call_details));
51 memset(&new_call_metadata, 0, sizeof(new_call_metadata));
52 }
53 grpc_core::Thread thd;
54 std::string proxy_port;
55 std::string server_port;
56 grpc_completion_queue* cq;
57 grpc_server* server;
58 grpc_channel* client;
59
60 int shutdown;
61
62 // requested call
63 grpc_call* new_call;
64 grpc_call_details new_call_details;
65 grpc_metadata_array new_call_metadata;
66 };
67
68 typedef struct {
69 void (*func)(void* arg, int success);
70 void* arg;
71 } closure;
72
73 typedef struct {
74 gpr_refcount refs;
75 grpc_end2end_proxy* proxy;
76
77 grpc_call* c2p;
78 grpc_call* p2s;
79
80 grpc_metadata_array c2p_initial_metadata;
81 grpc_metadata_array p2s_initial_metadata;
82
83 grpc_byte_buffer* c2p_msg;
84 grpc_byte_buffer* p2s_msg;
85
86 grpc_metadata_array p2s_trailing_metadata;
87 grpc_status_code p2s_status;
88 grpc_slice p2s_status_details;
89
90 int c2p_server_cancelled;
91 } proxy_call;
92
93 static void thread_main(void* arg);
94 static void request_call(grpc_end2end_proxy* proxy);
95
grpc_end2end_proxy_create(const grpc_end2end_proxy_def * def,const grpc_channel_args * client_args,const grpc_channel_args * server_args)96 grpc_end2end_proxy* grpc_end2end_proxy_create(
97 const grpc_end2end_proxy_def* def, const grpc_channel_args* client_args,
98 const grpc_channel_args* server_args) {
99 int proxy_port = grpc_pick_unused_port_or_die();
100 int server_port = grpc_pick_unused_port_or_die();
101
102 grpc_end2end_proxy* proxy = new grpc_end2end_proxy();
103
104 proxy->proxy_port = grpc_core::JoinHostPort("localhost", proxy_port);
105 proxy->server_port = grpc_core::JoinHostPort("localhost", server_port);
106
107 gpr_log(GPR_DEBUG, "PROXY ADDR:%s BACKEND:%s", proxy->proxy_port.c_str(),
108 proxy->server_port.c_str());
109
110 proxy->cq = grpc_completion_queue_create_for_next(nullptr);
111 proxy->server = def->create_server(proxy->proxy_port.c_str(), server_args);
112
113 const char* arg_to_remove = GRPC_ARG_ENABLE_RETRIES;
114 grpc_arg arg_to_add = grpc_channel_arg_integer_create(
115 const_cast<char*>(GRPC_ARG_ENABLE_RETRIES), 0);
116 const grpc_channel_args* proxy_client_args =
117 grpc_channel_args_copy_and_add_and_remove(client_args, &arg_to_remove, 1,
118 &arg_to_add, 1);
119 proxy->client =
120 def->create_client(proxy->server_port.c_str(), proxy_client_args);
121 grpc_channel_args_destroy(proxy_client_args);
122
123 grpc_server_register_completion_queue(proxy->server, proxy->cq, nullptr);
124 grpc_server_start(proxy->server);
125
126 grpc_call_details_init(&proxy->new_call_details);
127 proxy->thd = grpc_core::Thread("grpc_end2end_proxy", thread_main, proxy);
128 proxy->thd.Start();
129
130 request_call(proxy);
131
132 return proxy;
133 }
134
new_closure(void (* func)(void * arg,int success),void * arg)135 static closure* new_closure(void (*func)(void* arg, int success), void* arg) {
136 closure* cl = static_cast<closure*>(gpr_malloc(sizeof(*cl)));
137 cl->func = func;
138 cl->arg = arg;
139 return cl;
140 }
141
shutdown_complete(void * arg,int)142 static void shutdown_complete(void* arg, int /*success*/) {
143 grpc_end2end_proxy* proxy = static_cast<grpc_end2end_proxy*>(arg);
144 proxy->shutdown = 1;
145 grpc_completion_queue_shutdown(proxy->cq);
146 }
147
grpc_end2end_proxy_destroy(grpc_end2end_proxy * proxy)148 void grpc_end2end_proxy_destroy(grpc_end2end_proxy* proxy) {
149 grpc_server_shutdown_and_notify(proxy->server, proxy->cq,
150 new_closure(shutdown_complete, proxy));
151 proxy->thd.Join();
152 grpc_server_destroy(proxy->server);
153 grpc_channel_destroy(proxy->client);
154 grpc_completion_queue_destroy(proxy->cq);
155 grpc_call_details_destroy(&proxy->new_call_details);
156 delete proxy;
157 }
158
unrefpc(proxy_call * pc,const char *)159 static void unrefpc(proxy_call* pc, const char* /*reason*/) {
160 if (gpr_unref(&pc->refs)) {
161 grpc_call_unref(pc->c2p);
162 grpc_call_unref(pc->p2s);
163 grpc_metadata_array_destroy(&pc->c2p_initial_metadata);
164 grpc_metadata_array_destroy(&pc->p2s_initial_metadata);
165 grpc_metadata_array_destroy(&pc->p2s_trailing_metadata);
166 grpc_slice_unref(pc->p2s_status_details);
167 gpr_free(pc);
168 }
169 }
170
refpc(proxy_call * pc,const char *)171 static void refpc(proxy_call* pc, const char* /*reason*/) {
172 gpr_ref(&pc->refs);
173 }
174
on_c2p_sent_initial_metadata(void * arg,int)175 static void on_c2p_sent_initial_metadata(void* arg, int /*success*/) {
176 proxy_call* pc = static_cast<proxy_call*>(arg);
177 unrefpc(pc, "on_c2p_sent_initial_metadata");
178 }
179
on_p2s_recv_initial_metadata(void * arg,int)180 static void on_p2s_recv_initial_metadata(void* arg, int /*success*/) {
181 proxy_call* pc = static_cast<proxy_call*>(arg);
182 grpc_op op;
183 grpc_call_error err;
184
185 memset(&op, 0, sizeof(op));
186 if (!pc->proxy->shutdown && !grpc_call_is_trailers_only(pc->p2s)) {
187 op.op = GRPC_OP_SEND_INITIAL_METADATA;
188 op.flags = 0;
189 op.reserved = nullptr;
190 op.data.send_initial_metadata.count = pc->p2s_initial_metadata.count;
191 op.data.send_initial_metadata.metadata = pc->p2s_initial_metadata.metadata;
192 refpc(pc, "on_c2p_sent_initial_metadata");
193 err = grpc_call_start_batch(pc->c2p, &op, 1,
194 new_closure(on_c2p_sent_initial_metadata, pc),
195 nullptr);
196 GPR_ASSERT(err == GRPC_CALL_OK);
197 }
198
199 unrefpc(pc, "on_p2s_recv_initial_metadata");
200 }
201
on_p2s_sent_initial_metadata(void * arg,int)202 static void on_p2s_sent_initial_metadata(void* arg, int /*success*/) {
203 proxy_call* pc = static_cast<proxy_call*>(arg);
204 unrefpc(pc, "on_p2s_sent_initial_metadata");
205 }
206
207 static void on_c2p_recv_msg(void* arg, int success);
208
on_p2s_sent_message(void * arg,int success)209 static void on_p2s_sent_message(void* arg, int success) {
210 proxy_call* pc = static_cast<proxy_call*>(arg);
211 grpc_op op;
212 grpc_call_error err;
213
214 grpc_byte_buffer_destroy(std::exchange(pc->c2p_msg, nullptr));
215 if (!pc->proxy->shutdown && success) {
216 op.op = GRPC_OP_RECV_MESSAGE;
217 op.flags = 0;
218 op.reserved = nullptr;
219 op.data.recv_message.recv_message = &pc->c2p_msg;
220 refpc(pc, "on_c2p_recv_msg");
221 err = grpc_call_start_batch(pc->c2p, &op, 1,
222 new_closure(on_c2p_recv_msg, pc), nullptr);
223 GPR_ASSERT(err == GRPC_CALL_OK);
224 }
225
226 unrefpc(pc, "on_p2s_sent_message");
227 }
228
on_p2s_sent_close(void * arg,int)229 static void on_p2s_sent_close(void* arg, int /*success*/) {
230 proxy_call* pc = static_cast<proxy_call*>(arg);
231 unrefpc(pc, "on_p2s_sent_close");
232 }
233
on_c2p_recv_msg(void * arg,int success)234 static void on_c2p_recv_msg(void* arg, int success) {
235 proxy_call* pc = static_cast<proxy_call*>(arg);
236 grpc_op op;
237 grpc_call_error err;
238
239 if (!pc->proxy->shutdown && success) {
240 if (pc->c2p_msg != nullptr) {
241 op.op = GRPC_OP_SEND_MESSAGE;
242 op.flags = 0;
243 op.reserved = nullptr;
244 op.data.send_message.send_message = pc->c2p_msg;
245 refpc(pc, "on_p2s_sent_message");
246 err = grpc_call_start_batch(
247 pc->p2s, &op, 1, new_closure(on_p2s_sent_message, pc), nullptr);
248 GPR_ASSERT(err == GRPC_CALL_OK);
249 } else {
250 op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
251 op.flags = 0;
252 op.reserved = nullptr;
253 refpc(pc, "on_p2s_sent_close");
254 err = grpc_call_start_batch(pc->p2s, &op, 1,
255 new_closure(on_p2s_sent_close, pc), nullptr);
256 GPR_ASSERT(err == GRPC_CALL_OK);
257 }
258 } else {
259 if (pc->c2p_msg != nullptr) {
260 grpc_byte_buffer_destroy(pc->c2p_msg);
261 }
262 }
263
264 unrefpc(pc, "on_c2p_recv_msg");
265 }
266
267 static void on_p2s_recv_msg(void* arg, int success);
268
on_c2p_sent_message(void * arg,int success)269 static void on_c2p_sent_message(void* arg, int success) {
270 proxy_call* pc = static_cast<proxy_call*>(arg);
271 grpc_op op;
272 grpc_call_error err;
273
274 grpc_byte_buffer_destroy(pc->p2s_msg);
275 if (!pc->proxy->shutdown && success) {
276 op.op = GRPC_OP_RECV_MESSAGE;
277 op.flags = 0;
278 op.reserved = nullptr;
279 op.data.recv_message.recv_message = &pc->p2s_msg;
280 refpc(pc, "on_p2s_recv_msg");
281 err = grpc_call_start_batch(pc->p2s, &op, 1,
282 new_closure(on_p2s_recv_msg, pc), nullptr);
283 GPR_ASSERT(err == GRPC_CALL_OK);
284 }
285
286 unrefpc(pc, "on_c2p_sent_message");
287 }
288
on_p2s_recv_msg(void * arg,int success)289 static void on_p2s_recv_msg(void* arg, int success) {
290 proxy_call* pc = static_cast<proxy_call*>(arg);
291 grpc_op op;
292 grpc_call_error err;
293
294 if (!pc->proxy->shutdown && success && pc->p2s_msg) {
295 op.op = GRPC_OP_SEND_MESSAGE;
296 op.flags = 0;
297 op.reserved = nullptr;
298 op.data.send_message.send_message = pc->p2s_msg;
299 refpc(pc, "on_c2p_sent_message");
300 err = grpc_call_start_batch(pc->c2p, &op, 1,
301 new_closure(on_c2p_sent_message, pc), nullptr);
302 GPR_ASSERT(err == GRPC_CALL_OK);
303 } else {
304 grpc_byte_buffer_destroy(pc->p2s_msg);
305 }
306 unrefpc(pc, "on_p2s_recv_msg");
307 }
308
on_c2p_sent_status(void * arg,int)309 static void on_c2p_sent_status(void* arg, int /*success*/) {
310 proxy_call* pc = static_cast<proxy_call*>(arg);
311 unrefpc(pc, "on_c2p_sent_status");
312 }
313
on_p2s_status(void * arg,int success)314 static void on_p2s_status(void* arg, int success) {
315 proxy_call* pc = static_cast<proxy_call*>(arg);
316 grpc_op op[2]; // Possibly send empty initial metadata also if trailers-only
317 grpc_call_error err;
318
319 memset(op, 0, sizeof(op));
320
321 if (!pc->proxy->shutdown) {
322 GPR_ASSERT(success);
323
324 int op_count = 0;
325 if (grpc_call_is_trailers_only(pc->p2s)) {
326 op[op_count].op = GRPC_OP_SEND_INITIAL_METADATA;
327 op_count++;
328 }
329
330 op[op_count].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
331 op[op_count].flags = 0;
332 op[op_count].reserved = nullptr;
333 op[op_count].data.send_status_from_server.trailing_metadata_count =
334 pc->p2s_trailing_metadata.count;
335 op[op_count].data.send_status_from_server.trailing_metadata =
336 pc->p2s_trailing_metadata.metadata;
337 op[op_count].data.send_status_from_server.status = pc->p2s_status;
338 op[op_count].data.send_status_from_server.status_details =
339 &pc->p2s_status_details;
340 op_count++;
341 refpc(pc, "on_c2p_sent_status");
342 err = grpc_call_start_batch(pc->c2p, op, op_count,
343 new_closure(on_c2p_sent_status, pc), nullptr);
344 GPR_ASSERT(err == GRPC_CALL_OK);
345 }
346
347 unrefpc(pc, "on_p2s_status");
348 }
349
on_c2p_closed(void * arg,int)350 static void on_c2p_closed(void* arg, int /*success*/) {
351 proxy_call* pc = static_cast<proxy_call*>(arg);
352 unrefpc(pc, "on_c2p_closed");
353 }
354
on_new_call(void * arg,int success)355 static void on_new_call(void* arg, int success) {
356 grpc_end2end_proxy* proxy = static_cast<grpc_end2end_proxy*>(arg);
357 grpc_call_error err;
358
359 if (success) {
360 grpc_op op;
361 memset(&op, 0, sizeof(op));
362 proxy_call* pc = static_cast<proxy_call*>(gpr_malloc(sizeof(*pc)));
363 memset(pc, 0, sizeof(*pc));
364 pc->proxy = proxy;
365 std::swap(pc->c2p_initial_metadata, proxy->new_call_metadata);
366 pc->c2p = proxy->new_call;
367 pc->p2s = grpc_channel_create_call(
368 proxy->client, pc->c2p, GRPC_PROPAGATE_DEFAULTS, proxy->cq,
369 proxy->new_call_details.method, &proxy->new_call_details.host,
370 proxy->new_call_details.deadline, nullptr);
371 gpr_ref_init(&pc->refs, 1);
372
373 op.reserved = nullptr;
374
375 op.op = GRPC_OP_RECV_INITIAL_METADATA;
376 op.flags = 0;
377 op.data.recv_initial_metadata.recv_initial_metadata =
378 &pc->p2s_initial_metadata;
379 refpc(pc, "on_p2s_recv_initial_metadata");
380 err = grpc_call_start_batch(pc->p2s, &op, 1,
381 new_closure(on_p2s_recv_initial_metadata, pc),
382 nullptr);
383 GPR_ASSERT(err == GRPC_CALL_OK);
384
385 op.op = GRPC_OP_SEND_INITIAL_METADATA;
386 op.flags = 0;
387 op.data.send_initial_metadata.count = pc->c2p_initial_metadata.count;
388 op.data.send_initial_metadata.metadata = pc->c2p_initial_metadata.metadata;
389 refpc(pc, "on_p2s_sent_initial_metadata");
390 err = grpc_call_start_batch(pc->p2s, &op, 1,
391 new_closure(on_p2s_sent_initial_metadata, pc),
392 nullptr);
393 GPR_ASSERT(err == GRPC_CALL_OK);
394
395 op.op = GRPC_OP_RECV_MESSAGE;
396 op.flags = 0;
397 op.data.recv_message.recv_message = &pc->c2p_msg;
398 refpc(pc, "on_c2p_recv_msg");
399 err = grpc_call_start_batch(pc->c2p, &op, 1,
400 new_closure(on_c2p_recv_msg, pc), nullptr);
401 GPR_ASSERT(err == GRPC_CALL_OK);
402
403 op.op = GRPC_OP_RECV_MESSAGE;
404 op.flags = 0;
405 op.data.recv_message.recv_message = &pc->p2s_msg;
406 refpc(pc, "on_p2s_recv_msg");
407 err = grpc_call_start_batch(pc->p2s, &op, 1,
408 new_closure(on_p2s_recv_msg, pc), nullptr);
409 GPR_ASSERT(err == GRPC_CALL_OK);
410
411 op.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
412 op.flags = 0;
413 op.data.recv_status_on_client.trailing_metadata =
414 &pc->p2s_trailing_metadata;
415 op.data.recv_status_on_client.status = &pc->p2s_status;
416 op.data.recv_status_on_client.status_details = &pc->p2s_status_details;
417 refpc(pc, "on_p2s_status");
418 err = grpc_call_start_batch(pc->p2s, &op, 1, new_closure(on_p2s_status, pc),
419 nullptr);
420 GPR_ASSERT(err == GRPC_CALL_OK);
421
422 op.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
423 op.flags = 0;
424 op.data.recv_close_on_server.cancelled = &pc->c2p_server_cancelled;
425 refpc(pc, "on_c2p_closed");
426 err = grpc_call_start_batch(pc->c2p, &op, 1, new_closure(on_c2p_closed, pc),
427 nullptr);
428 GPR_ASSERT(err == GRPC_CALL_OK);
429
430 request_call(proxy);
431
432 grpc_call_details_destroy(&proxy->new_call_details);
433 grpc_call_details_init(&proxy->new_call_details);
434
435 unrefpc(pc, "init");
436 } else {
437 GPR_ASSERT(proxy->new_call == nullptr);
438 }
439 }
440
request_call(grpc_end2end_proxy * proxy)441 static void request_call(grpc_end2end_proxy* proxy) {
442 proxy->new_call = nullptr;
443 GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
444 proxy->server, &proxy->new_call,
445 &proxy->new_call_details,
446 &proxy->new_call_metadata, proxy->cq,
447 proxy->cq, new_closure(on_new_call, proxy)));
448 }
449
thread_main(void * arg)450 static void thread_main(void* arg) {
451 grpc_end2end_proxy* proxy = static_cast<grpc_end2end_proxy*>(arg);
452 closure* cl;
453 for (;;) {
454 grpc_event ev = grpc_completion_queue_next(
455 proxy->cq, gpr_inf_future(GPR_CLOCK_MONOTONIC), nullptr);
456 switch (ev.type) {
457 case GRPC_QUEUE_TIMEOUT:
458 grpc_core::Crash("Should never reach here");
459 case GRPC_QUEUE_SHUTDOWN:
460 return;
461 case GRPC_OP_COMPLETE:
462 cl = static_cast<closure*>(ev.tag);
463 cl->func(cl->arg, ev.success);
464 gpr_free(cl);
465 break;
466 }
467 }
468 }
469
grpc_end2end_proxy_get_client_target(grpc_end2end_proxy * proxy)470 const char* grpc_end2end_proxy_get_client_target(grpc_end2end_proxy* proxy) {
471 return proxy->proxy_port.c_str();
472 }
473
grpc_end2end_proxy_get_server_port(grpc_end2end_proxy * proxy)474 const char* grpc_end2end_proxy_get_server_port(grpc_end2end_proxy* proxy) {
475 return proxy->server_port.c_str();
476 }
477