1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "test/core/transport/test_suite/test.h"
16
17 namespace grpc_core {
18
TRANSPORT_TEST(MetadataOnlyRequest)19 TRANSPORT_TEST(MetadataOnlyRequest) {
20 SetServerAcceptor();
21 auto initiator = CreateCall();
22 SpawnTestSeq(
23 initiator, "initiator",
24 [&]() {
25 auto md = Arena::MakePooled<ClientMetadata>();
26 md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
27 return initiator.PushClientInitialMetadata(std::move(md));
28 },
29 [&](StatusFlag status) mutable {
30 EXPECT_TRUE(status.ok());
31 initiator.FinishSends();
32 return initiator.PullServerInitialMetadata();
33 },
34 [&](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
35 EXPECT_TRUE(md.ok());
36 EXPECT_TRUE(md.value().has_value());
37 EXPECT_EQ(*md.value().value()->get_pointer(ContentTypeMetadata()),
38 ContentTypeMetadata::kApplicationGrpc);
39 return initiator.PullServerTrailingMetadata();
40 },
41 [&](ValueOrFailure<ServerMetadataHandle> md) {
42 EXPECT_TRUE(md.ok());
43 EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()),
44 GRPC_STATUS_UNIMPLEMENTED);
45 return Empty{};
46 });
47 auto handler = TickUntilServerCall();
48 SpawnTestSeq(
49 handler, "handler", [&] { return handler.PullClientInitialMetadata(); },
50 [&](ValueOrFailure<ServerMetadataHandle> md) {
51 EXPECT_TRUE(md.ok());
52 EXPECT_EQ(md.value()->get_pointer(HttpPathMetadata())->as_string_view(),
53 "/foo/bar");
54 return handler.PullMessage();
55 },
56 [&](NextResult<MessageHandle> msg) {
57 EXPECT_FALSE(msg.has_value());
58 auto md = Arena::MakePooled<ServerMetadata>();
59 md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc);
60 return handler.PushServerInitialMetadata(std::move(md));
61 },
62 [&](StatusFlag result) mutable {
63 EXPECT_TRUE(result.ok());
64 auto md = Arena::MakePooled<ServerMetadata>();
65 md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
66 return handler.PushServerTrailingMetadata(std::move(md));
67 },
68 [&](StatusFlag result) mutable {
69 EXPECT_TRUE(result.ok());
70 return Empty{};
71 });
72 WaitForAllPendingWork();
73 }
74
TRANSPORT_TEST(MetadataOnlyRequestServerAbortsAfterInitialMetadata)75 TRANSPORT_TEST(MetadataOnlyRequestServerAbortsAfterInitialMetadata) {
76 // TODO(ctiller): Re-enable this test once CallSpine rewrite completes.
77 GTEST_SKIP() << "CallSpine has a bug right now that makes this provide the "
78 "wrong status code: we don't care for any cases we're "
79 "rolling out soon, so leaving this disabled.";
80
81 SetServerAcceptor();
82 auto initiator = CreateCall();
83 SpawnTestSeq(
84 initiator, "initiator",
85 [&]() {
86 auto md = Arena::MakePooled<ClientMetadata>();
87 md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
88 return initiator.PushClientInitialMetadata(std::move(md));
89 },
90 [&](StatusFlag status) mutable {
91 EXPECT_TRUE(status.ok());
92 // We don't close the sending stream here.
93 return initiator.PullServerInitialMetadata();
94 },
95 [&](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
96 EXPECT_TRUE(md.ok());
97 EXPECT_TRUE(md.value().has_value());
98 EXPECT_EQ(*md.value().value()->get_pointer(ContentTypeMetadata()),
99 ContentTypeMetadata::kApplicationGrpc);
100 return initiator.PullServerTrailingMetadata();
101 },
102 [&](ValueOrFailure<ServerMetadataHandle> md) {
103 EXPECT_TRUE(md.ok());
104 EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()),
105 GRPC_STATUS_UNIMPLEMENTED);
106 return Empty{};
107 });
108 auto handler = TickUntilServerCall();
109 SpawnTestSeq(
110 handler, "handler", [&] { return handler.PullClientInitialMetadata(); },
111 [&](ValueOrFailure<ServerMetadataHandle> got_md) {
112 EXPECT_TRUE(got_md.ok());
113 EXPECT_EQ(
114 got_md.value()->get_pointer(HttpPathMetadata())->as_string_view(),
115 "/foo/bar");
116 // Don't wait for end of stream for client->server messages, just
117 // publish initial then trailing metadata.
118 auto md = Arena::MakePooled<ServerMetadata>();
119 md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc);
120 return handler.PushServerInitialMetadata(std::move(md));
121 },
122 [&](StatusFlag result) mutable {
123 EXPECT_TRUE(result.ok());
124 auto md = Arena::MakePooled<ServerMetadata>();
125 md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
126 return handler.PushServerTrailingMetadata(std::move(md));
127 },
128 [&](StatusFlag result) mutable {
129 EXPECT_TRUE(result.ok());
130 return Empty{};
131 });
132 WaitForAllPendingWork();
133 }
134
TRANSPORT_TEST(MetadataOnlyRequestServerAbortsImmediately)135 TRANSPORT_TEST(MetadataOnlyRequestServerAbortsImmediately) {
136 // TODO(ctiller): Re-enable this test once CallSpine rewrite completes.
137 GTEST_SKIP() << "CallSpine has a bug right now that makes this provide the "
138 "wrong status code: we don't care for any cases we're "
139 "rolling out soon, so leaving this disabled.";
140
141 SetServerAcceptor();
142 auto initiator = CreateCall();
143 SpawnTestSeq(
144 initiator, "initiator",
145 [&]() {
146 auto md = Arena::MakePooled<ClientMetadata>();
147 md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
148 return initiator.PushClientInitialMetadata(std::move(md));
149 },
150 [&](StatusFlag status) mutable {
151 EXPECT_TRUE(status.ok());
152 // We don't close the sending stream here.
153 return initiator.PullServerInitialMetadata();
154 },
155 [&](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
156 EXPECT_TRUE(md.ok());
157 EXPECT_FALSE(md.value().has_value());
158 return initiator.PullServerTrailingMetadata();
159 },
160 [&](ValueOrFailure<ServerMetadataHandle> md) {
161 EXPECT_TRUE(md.ok());
162 EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()),
163 GRPC_STATUS_UNIMPLEMENTED);
164 return Empty{};
165 });
166 auto handler = TickUntilServerCall();
167 SpawnTestSeq(
168 handler, "handler", [&] { return handler.PullClientInitialMetadata(); },
169 [&](ValueOrFailure<ServerMetadataHandle> got_md) {
170 EXPECT_TRUE(got_md.ok());
171 EXPECT_EQ(
172 got_md.value()->get_pointer(HttpPathMetadata())->as_string_view(),
173 "/foo/bar");
174 // Don't wait for end of stream for client->server messages, just
175 // and don't send initial metadata - just trailing metadata.
176 auto md = Arena::MakePooled<ServerMetadata>();
177 md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
178 return handler.PushServerTrailingMetadata(std::move(md));
179 },
180 [&](StatusFlag result) mutable {
181 EXPECT_TRUE(result.ok());
182 return Empty{};
183 });
184 WaitForAllPendingWork();
185 }
186
TRANSPORT_TEST(CanCreateCallThenAbandonIt)187 TRANSPORT_TEST(CanCreateCallThenAbandonIt) {
188 SetServerAcceptor();
189 auto initiator = CreateCall();
190 SpawnTestSeq(
191 initiator, "start-call",
192 [&]() {
193 auto md = Arena::MakePooled<ClientMetadata>();
194 md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
195 return initiator.PushClientInitialMetadata(std::move(md));
196 },
197 [&](StatusFlag status) mutable {
198 EXPECT_TRUE(status.ok());
199 return Empty{};
200 });
201 auto handler = TickUntilServerCall();
202 SpawnTestSeq(initiator, "end-call", [&]() {
203 initiator.Cancel();
204 return Empty{};
205 });
206 WaitForAllPendingWork();
207 }
208
TRANSPORT_TEST(UnaryRequest)209 TRANSPORT_TEST(UnaryRequest) {
210 SetServerAcceptor();
211 auto initiator = CreateCall();
212 SpawnTestSeq(
213 initiator, "initiator",
214 [&]() {
215 auto md = Arena::MakePooled<ClientMetadata>();
216 md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
217 return initiator.PushClientInitialMetadata(std::move(md));
218 },
219 [&](StatusFlag status) mutable {
220 EXPECT_TRUE(status.ok());
221 return initiator.PushMessage(Arena::MakePooled<Message>(
222 SliceBuffer(Slice::FromCopiedString("hello world")), 0));
223 },
224 [&](StatusFlag status) mutable {
225 EXPECT_TRUE(status.ok());
226 initiator.FinishSends();
227 return initiator.PullServerInitialMetadata();
228 },
229 [&](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
230 EXPECT_TRUE(md.ok());
231 EXPECT_TRUE(md.value().has_value());
232 EXPECT_EQ(*md.value().value()->get_pointer(ContentTypeMetadata()),
233 ContentTypeMetadata::kApplicationGrpc);
234 return initiator.PullMessage();
235 },
236 [&](NextResult<MessageHandle> msg) {
237 EXPECT_TRUE(msg.has_value());
238 EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
239 "why hello neighbor");
240 return initiator.PullMessage();
241 },
242 [&](NextResult<MessageHandle> msg) {
243 EXPECT_FALSE(msg.has_value());
244 EXPECT_FALSE(msg.cancelled());
245 return initiator.PullServerTrailingMetadata();
246 },
247 [&](ValueOrFailure<ServerMetadataHandle> md) {
248 EXPECT_TRUE(md.ok());
249 EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()),
250 GRPC_STATUS_UNIMPLEMENTED);
251 return Empty{};
252 });
253 auto handler = TickUntilServerCall();
254 SpawnTestSeq(
255 handler, "handler", [&] { return handler.PullClientInitialMetadata(); },
256 [&](ValueOrFailure<ServerMetadataHandle> md) {
257 EXPECT_TRUE(md.ok());
258 EXPECT_EQ(md.value()->get_pointer(HttpPathMetadata())->as_string_view(),
259 "/foo/bar");
260 return handler.PullMessage();
261 },
262 [&](NextResult<MessageHandle> msg) {
263 EXPECT_TRUE(msg.has_value());
264 EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world");
265 return handler.PullMessage();
266 },
267 [&](NextResult<MessageHandle> msg) {
268 EXPECT_FALSE(msg.has_value());
269 EXPECT_FALSE(msg.cancelled());
270 auto md = Arena::MakePooled<ServerMetadata>();
271 md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc);
272 return handler.PushServerInitialMetadata(std::move(md));
273 },
274 [&](StatusFlag result) mutable {
275 EXPECT_TRUE(result.ok());
276 return handler.PushMessage(Arena::MakePooled<Message>(
277 SliceBuffer(Slice::FromCopiedString("why hello neighbor")), 0));
278 },
279 [&](StatusFlag result) mutable {
280 EXPECT_TRUE(result.ok());
281 auto md = Arena::MakePooled<ServerMetadata>();
282 md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
283 return handler.PushServerTrailingMetadata(std::move(md));
284 },
285 [&](StatusFlag result) mutable {
286 EXPECT_TRUE(result.ok());
287 return Empty{};
288 });
289 WaitForAllPendingWork();
290 }
291
TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream)292 TRANSPORT_TEST(UnaryRequestOmitCheckEndOfStream) {
293 SetServerAcceptor();
294 auto initiator = CreateCall();
295 SpawnTestSeq(
296 initiator, "initiator",
297 [&]() {
298 auto md = Arena::MakePooled<ClientMetadata>();
299 md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
300 return initiator.PushClientInitialMetadata(std::move(md));
301 },
302 [&](StatusFlag status) mutable {
303 EXPECT_TRUE(status.ok());
304 return initiator.PushMessage(Arena::MakePooled<Message>(
305 SliceBuffer(Slice::FromCopiedString("hello world")), 0));
306 },
307 [&](StatusFlag status) mutable {
308 EXPECT_TRUE(status.ok());
309 initiator.FinishSends();
310 return initiator.PullServerInitialMetadata();
311 },
312 [&](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
313 EXPECT_TRUE(md.ok());
314 EXPECT_TRUE(md.value().has_value());
315 EXPECT_EQ(*md.value().value()->get_pointer(ContentTypeMetadata()),
316 ContentTypeMetadata::kApplicationGrpc);
317 return initiator.PullMessage();
318 },
319 [&](NextResult<MessageHandle> msg) {
320 EXPECT_TRUE(msg.has_value());
321 EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
322 "why hello neighbor");
323 return initiator.PullServerTrailingMetadata();
324 },
325 [&](ValueOrFailure<ServerMetadataHandle> md) {
326 EXPECT_TRUE(md.ok());
327 EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()),
328 GRPC_STATUS_UNIMPLEMENTED);
329 return Empty{};
330 });
331 auto handler = TickUntilServerCall();
332 SpawnTestSeq(
333 handler, "handler", [&] { return handler.PullClientInitialMetadata(); },
334 [&](ValueOrFailure<ServerMetadataHandle> md) {
335 EXPECT_TRUE(md.ok());
336 EXPECT_EQ(md.value()->get_pointer(HttpPathMetadata())->as_string_view(),
337 "/foo/bar");
338 return handler.PullMessage();
339 },
340 [&](NextResult<MessageHandle> msg) {
341 EXPECT_TRUE(msg.has_value());
342 EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world");
343 auto md = Arena::MakePooled<ServerMetadata>();
344 md->Set(ContentTypeMetadata(), ContentTypeMetadata::kApplicationGrpc);
345 return handler.PushServerInitialMetadata(std::move(md));
346 },
347 [&](StatusFlag result) mutable {
348 EXPECT_TRUE(result.ok());
349 return handler.PushMessage(Arena::MakePooled<Message>(
350 SliceBuffer(Slice::FromCopiedString("why hello neighbor")), 0));
351 },
352 [&](StatusFlag result) mutable {
353 EXPECT_TRUE(result.ok());
354 auto md = Arena::MakePooled<ServerMetadata>();
355 md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
356 return handler.PushServerTrailingMetadata(std::move(md));
357 },
358 [&](StatusFlag result) mutable {
359 EXPECT_TRUE(result.ok());
360 return Empty{};
361 });
362 WaitForAllPendingWork();
363 }
364
TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload)365 TRANSPORT_TEST(UnaryRequestWaitForServerInitialMetadataBeforeSendingPayload) {
366 SetServerAcceptor();
367 auto initiator = CreateCall();
368 SpawnTestSeq(
369 initiator, "initiator",
370 [&]() {
371 auto md = Arena::MakePooled<ClientMetadata>();
372 md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
373 return initiator.PushClientInitialMetadata(std::move(md));
374 },
375 [&](StatusFlag status) mutable {
376 EXPECT_TRUE(status.ok());
377 return initiator.PullServerInitialMetadata();
378 },
379 [&](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
380 EXPECT_TRUE(md.ok());
381 EXPECT_TRUE(md.value().has_value());
382 EXPECT_EQ(*md.value().value()->get_pointer(ContentTypeMetadata()),
383 ContentTypeMetadata::kApplicationGrpc);
384 return initiator.PushMessage(Arena::MakePooled<Message>(
385 SliceBuffer(Slice::FromCopiedString("hello world")), 0));
386 },
387 [&](StatusFlag status) mutable {
388 EXPECT_TRUE(status.ok());
389 initiator.FinishSends();
390 return initiator.PullMessage();
391 },
392 [&](NextResult<MessageHandle> msg) {
393 EXPECT_TRUE(msg.has_value());
394 EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
395 "why hello neighbor");
396 return initiator.PullMessage();
397 },
398 [&](NextResult<MessageHandle> msg) {
399 EXPECT_FALSE(msg.has_value());
400 EXPECT_FALSE(msg.cancelled());
401 return initiator.PullServerTrailingMetadata();
402 },
403 [&](ValueOrFailure<ServerMetadataHandle> md) {
404 EXPECT_TRUE(md.ok());
405 EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()),
406 GRPC_STATUS_UNIMPLEMENTED);
407 return Empty{};
408 });
409 auto handler = TickUntilServerCall();
410 SpawnTestSeq(
411 handler, "handler", [&] { return handler.PullClientInitialMetadata(); },
412 [&](ValueOrFailure<ServerMetadataHandle> md) {
413 EXPECT_TRUE(md.ok());
414 EXPECT_EQ(md.value()->get_pointer(HttpPathMetadata())->as_string_view(),
415 "/foo/bar");
416 auto md_out = Arena::MakePooled<ServerMetadata>();
417 md_out->Set(ContentTypeMetadata(),
418 ContentTypeMetadata::kApplicationGrpc);
419 return handler.PushServerInitialMetadata(std::move(md_out));
420 },
421 [&](StatusFlag result) mutable {
422 EXPECT_TRUE(result.ok());
423 return handler.PullMessage();
424 },
425 [&](NextResult<MessageHandle> msg) {
426 EXPECT_TRUE(msg.has_value());
427 EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world");
428 return handler.PullMessage();
429 },
430 [&](NextResult<MessageHandle> msg) {
431 EXPECT_FALSE(msg.has_value());
432 EXPECT_FALSE(msg.cancelled());
433 return handler.PushMessage(Arena::MakePooled<Message>(
434 SliceBuffer(Slice::FromCopiedString("why hello neighbor")), 0));
435 },
436 [&](StatusFlag result) mutable {
437 EXPECT_TRUE(result.ok());
438 auto md = Arena::MakePooled<ServerMetadata>();
439 md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
440 return handler.PushServerTrailingMetadata(std::move(md));
441 },
442 [&](StatusFlag result) mutable {
443 EXPECT_TRUE(result.ok());
444 return Empty{};
445 });
446 WaitForAllPendingWork();
447 }
448
TRANSPORT_TEST(ClientStreamingRequest)449 TRANSPORT_TEST(ClientStreamingRequest) {
450 SetServerAcceptor();
451 auto initiator = CreateCall();
452 SpawnTestSeq(
453 initiator, "initiator",
454 [&]() {
455 auto md = Arena::MakePooled<ClientMetadata>();
456 md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
457 return initiator.PushClientInitialMetadata(std::move(md));
458 },
459 [&](StatusFlag status) mutable {
460 EXPECT_TRUE(status.ok());
461 return initiator.PullServerInitialMetadata();
462 },
463 [&](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
464 EXPECT_TRUE(md.ok());
465 EXPECT_TRUE(md.value().has_value());
466 EXPECT_EQ(*md.value().value()->get_pointer(ContentTypeMetadata()),
467 ContentTypeMetadata::kApplicationGrpc);
468 return initiator.PushMessage(Arena::MakePooled<Message>(
469 SliceBuffer(Slice::FromCopiedString("hello world")), 0));
470 },
471 [&](StatusFlag status) mutable {
472 EXPECT_TRUE(status.ok());
473 return initiator.PushMessage(Arena::MakePooled<Message>(
474 SliceBuffer(Slice::FromCopiedString("hello world (2)")), 0));
475 },
476 [&](StatusFlag status) mutable {
477 EXPECT_TRUE(status.ok());
478 return initiator.PushMessage(Arena::MakePooled<Message>(
479 SliceBuffer(Slice::FromCopiedString("hello world (3)")), 0));
480 },
481 [&](StatusFlag status) mutable {
482 EXPECT_TRUE(status.ok());
483 return initiator.PushMessage(Arena::MakePooled<Message>(
484 SliceBuffer(Slice::FromCopiedString("hello world (4)")), 0));
485 },
486 [&](StatusFlag status) mutable {
487 EXPECT_TRUE(status.ok());
488 return initiator.PushMessage(Arena::MakePooled<Message>(
489 SliceBuffer(Slice::FromCopiedString("hello world (5)")), 0));
490 },
491 [&](StatusFlag status) mutable {
492 EXPECT_TRUE(status.ok());
493 initiator.FinishSends();
494 return initiator.PullMessage();
495 },
496 [&](NextResult<MessageHandle> msg) {
497 EXPECT_FALSE(msg.has_value());
498 EXPECT_FALSE(msg.cancelled());
499 return initiator.PullServerTrailingMetadata();
500 },
501 [&](ValueOrFailure<ServerMetadataHandle> md) {
502 EXPECT_TRUE(md.ok());
503 EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()),
504 GRPC_STATUS_UNIMPLEMENTED);
505 return Empty{};
506 });
507 auto handler = TickUntilServerCall();
508 SpawnTestSeq(
509 handler, "handler", [&] { return handler.PullClientInitialMetadata(); },
510 [&](ValueOrFailure<ServerMetadataHandle> md) {
511 EXPECT_TRUE(md.ok());
512 EXPECT_EQ(md.value()->get_pointer(HttpPathMetadata())->as_string_view(),
513 "/foo/bar");
514 auto md_out = Arena::MakePooled<ServerMetadata>();
515 md_out->Set(ContentTypeMetadata(),
516 ContentTypeMetadata::kApplicationGrpc);
517 return handler.PushServerInitialMetadata(std::move(md_out));
518 },
519 [&](StatusFlag result) mutable {
520 EXPECT_TRUE(result.ok());
521 return handler.PullMessage();
522 },
523 [&](NextResult<MessageHandle> msg) {
524 EXPECT_TRUE(msg.has_value());
525 EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world");
526 return handler.PullMessage();
527 },
528 [&](NextResult<MessageHandle> msg) {
529 EXPECT_TRUE(msg.has_value());
530 EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world (2)");
531 return handler.PullMessage();
532 },
533 [&](NextResult<MessageHandle> msg) {
534 EXPECT_TRUE(msg.has_value());
535 EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world (3)");
536 return handler.PullMessage();
537 },
538 [&](NextResult<MessageHandle> msg) {
539 EXPECT_TRUE(msg.has_value());
540 EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world (4)");
541 return handler.PullMessage();
542 },
543 [&](NextResult<MessageHandle> msg) {
544 EXPECT_TRUE(msg.has_value());
545 EXPECT_EQ(msg.value()->payload()->JoinIntoString(), "hello world (5)");
546 return handler.PullMessage();
547 },
548 [&](NextResult<MessageHandle> msg) {
549 EXPECT_FALSE(msg.has_value());
550 EXPECT_FALSE(msg.cancelled());
551 auto md = Arena::MakePooled<ServerMetadata>();
552 md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
553 return handler.PushServerTrailingMetadata(std::move(md));
554 },
555 [&](StatusFlag result) mutable {
556 EXPECT_TRUE(result.ok());
557 return Empty{};
558 });
559 WaitForAllPendingWork();
560 }
561
TRANSPORT_TEST(ServerStreamingRequest)562 TRANSPORT_TEST(ServerStreamingRequest) {
563 SetServerAcceptor();
564 auto initiator = CreateCall();
565 SpawnTestSeq(
566 initiator, "initiator",
567 [&]() {
568 auto md = Arena::MakePooled<ClientMetadata>();
569 md->Set(HttpPathMetadata(), Slice::FromExternalString("/foo/bar"));
570 return initiator.PushClientInitialMetadata(std::move(md));
571 },
572 [&](StatusFlag status) mutable {
573 EXPECT_TRUE(status.ok());
574 return initiator.PullServerInitialMetadata();
575 },
576 [&](ValueOrFailure<absl::optional<ServerMetadataHandle>> md) {
577 EXPECT_TRUE(md.ok());
578 EXPECT_TRUE(md.value().has_value());
579 EXPECT_EQ(*md.value().value()->get_pointer(ContentTypeMetadata()),
580 ContentTypeMetadata::kApplicationGrpc);
581 initiator.FinishSends();
582 return initiator.PullMessage();
583 },
584 [&](NextResult<MessageHandle> msg) {
585 EXPECT_TRUE(msg.has_value());
586 EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
587 "why hello neighbor");
588 return initiator.PullMessage();
589 },
590 [&](NextResult<MessageHandle> msg) {
591 EXPECT_TRUE(msg.has_value());
592 EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
593 "why hello neighbor (2)");
594 return initiator.PullMessage();
595 },
596 [&](NextResult<MessageHandle> msg) {
597 EXPECT_TRUE(msg.has_value());
598 EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
599 "why hello neighbor (3)");
600 return initiator.PullMessage();
601 },
602 [&](NextResult<MessageHandle> msg) {
603 EXPECT_TRUE(msg.has_value());
604 EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
605 "why hello neighbor (4)");
606 return initiator.PullMessage();
607 },
608 [&](NextResult<MessageHandle> msg) {
609 EXPECT_TRUE(msg.has_value());
610 EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
611 "why hello neighbor (5)");
612 return initiator.PullMessage();
613 },
614 [&](NextResult<MessageHandle> msg) {
615 EXPECT_TRUE(msg.has_value());
616 EXPECT_EQ(msg.value()->payload()->JoinIntoString(),
617 "why hello neighbor (6)");
618 return initiator.PullMessage();
619 },
620 [&](NextResult<MessageHandle> msg) {
621 EXPECT_FALSE(msg.has_value());
622 EXPECT_FALSE(msg.cancelled());
623 return initiator.PullServerTrailingMetadata();
624 },
625 [&](ValueOrFailure<ServerMetadataHandle> md) {
626 EXPECT_TRUE(md.ok());
627 EXPECT_EQ(*md.value()->get_pointer(GrpcStatusMetadata()),
628 GRPC_STATUS_UNIMPLEMENTED);
629 return Empty{};
630 });
631 auto handler = TickUntilServerCall();
632 SpawnTestSeq(
633 handler, "handler", [&] { return handler.PullClientInitialMetadata(); },
634 [&](ValueOrFailure<ServerMetadataHandle> md) {
635 EXPECT_TRUE(md.ok());
636 EXPECT_EQ(md.value()->get_pointer(HttpPathMetadata())->as_string_view(),
637 "/foo/bar");
638 auto md_out = Arena::MakePooled<ServerMetadata>();
639 md_out->Set(ContentTypeMetadata(),
640 ContentTypeMetadata::kApplicationGrpc);
641 return handler.PushServerInitialMetadata(std::move(md_out));
642 },
643 [&](StatusFlag result) mutable {
644 EXPECT_TRUE(result.ok());
645 return handler.PullMessage();
646 },
647 [&](NextResult<MessageHandle> msg) {
648 EXPECT_FALSE(msg.has_value());
649 EXPECT_FALSE(msg.cancelled());
650 return handler.PushMessage(Arena::MakePooled<Message>(
651 SliceBuffer(Slice::FromCopiedString("why hello neighbor")), 0));
652 },
653 [&](StatusFlag result) mutable {
654 EXPECT_TRUE(result.ok());
655 return handler.PushMessage(Arena::MakePooled<Message>(
656 SliceBuffer(Slice::FromCopiedString("why hello neighbor (2)")), 0));
657 },
658 [&](StatusFlag result) mutable {
659 EXPECT_TRUE(result.ok());
660 return handler.PushMessage(Arena::MakePooled<Message>(
661 SliceBuffer(Slice::FromCopiedString("why hello neighbor (3)")), 0));
662 },
663 [&](StatusFlag result) mutable {
664 EXPECT_TRUE(result.ok());
665 return handler.PushMessage(Arena::MakePooled<Message>(
666 SliceBuffer(Slice::FromCopiedString("why hello neighbor (4)")), 0));
667 },
668 [&](StatusFlag result) mutable {
669 EXPECT_TRUE(result.ok());
670 return handler.PushMessage(Arena::MakePooled<Message>(
671 SliceBuffer(Slice::FromCopiedString("why hello neighbor (5)")), 0));
672 },
673 [&](StatusFlag result) mutable {
674 EXPECT_TRUE(result.ok());
675 return handler.PushMessage(Arena::MakePooled<Message>(
676 SliceBuffer(Slice::FromCopiedString("why hello neighbor (6)")), 0));
677 },
678 [&](StatusFlag result) mutable {
679 EXPECT_TRUE(result.ok());
680 auto md = Arena::MakePooled<ServerMetadata>();
681 md->Set(GrpcStatusMetadata(), GRPC_STATUS_UNIMPLEMENTED);
682 return handler.PushServerTrailingMetadata(std::move(md));
683 },
684 [&](StatusFlag result) mutable {
685 EXPECT_TRUE(result.ok());
686 return Empty{};
687 });
688 WaitForAllPendingWork();
689 }
690
691 } // namespace grpc_core
692