xref: /aosp_15_r20/external/cronet/net/filter/filter_source_stream_unittest.cc (revision 6777b5387eb2ff775bb5750e3f5d96f37fb7352b)
1 // Copyright 2016 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include <algorithm>
6 #include <string>
7 
8 #include "base/functional/bind.h"
9 #include "base/functional/callback.h"
10 #include "base/numerics/safe_conversions.h"
11 #include "base/types/expected.h"
12 #include "net/base/io_buffer.h"
13 #include "net/base/net_errors.h"
14 #include "net/base/test_completion_callback.h"
15 #include "net/filter/filter_source_stream.h"
16 #include "net/filter/mock_source_stream.h"
17 #include "testing/gtest/include/gtest/gtest.h"
18 
19 namespace net {
20 
21 namespace {
22 
23 const size_t kDefaultBufferSize = 4096;
24 const size_t kSmallBufferSize = 1;
25 
26 class TestFilterSourceStreamBase : public FilterSourceStream {
27  public:
TestFilterSourceStreamBase(std::unique_ptr<SourceStream> upstream)28   explicit TestFilterSourceStreamBase(std::unique_ptr<SourceStream> upstream)
29       : FilterSourceStream(SourceStream::TYPE_NONE, std::move(upstream)) {}
30 
31   TestFilterSourceStreamBase(const TestFilterSourceStreamBase&) = delete;
32   TestFilterSourceStreamBase& operator=(const TestFilterSourceStreamBase&) =
33       delete;
34 
~TestFilterSourceStreamBase()35   ~TestFilterSourceStreamBase() override { DCHECK(buffer_.empty()); }
GetTypeAsString() const36   std::string GetTypeAsString() const override { return type_string_; }
37 
set_type_string(const std::string & type_string)38   void set_type_string(const std::string& type_string) {
39     type_string_ = type_string;
40   }
41 
42  protected:
43   // Writes contents of |buffer_| to |output_buffer| and returns the number of
44   // bytes written or an error code. Additionally removes consumed data from
45   // |buffer_|.
WriteBufferToOutput(IOBuffer * output_buffer,size_t output_buffer_size)46   size_t WriteBufferToOutput(IOBuffer* output_buffer,
47                              size_t output_buffer_size) {
48     size_t bytes_to_filter = std::min(buffer_.length(), output_buffer_size);
49     memcpy(output_buffer->data(), buffer_.data(), bytes_to_filter);
50     buffer_.erase(0, bytes_to_filter);
51     return bytes_to_filter;
52   }
53 
54   // Buffer used by subclasses to hold data that is yet to be passed to the
55   // caller.
56   std::string buffer_;
57 
58  private:
59   std::string type_string_;
60 };
61 
62 // A FilterSourceStream that needs all input data before it can return non-zero
63 // bytes read.
64 class NeedsAllInputFilterSourceStream : public TestFilterSourceStreamBase {
65  public:
NeedsAllInputFilterSourceStream(std::unique_ptr<SourceStream> upstream,size_t expected_input_bytes)66   NeedsAllInputFilterSourceStream(std::unique_ptr<SourceStream> upstream,
67                                   size_t expected_input_bytes)
68       : TestFilterSourceStreamBase(std::move(upstream)),
69         expected_input_bytes_(expected_input_bytes) {}
70 
71   NeedsAllInputFilterSourceStream(const NeedsAllInputFilterSourceStream&) =
72       delete;
73   NeedsAllInputFilterSourceStream& operator=(
74       const NeedsAllInputFilterSourceStream&) = delete;
75 
FilterData(IOBuffer * output_buffer,size_t output_buffer_size,IOBuffer * input_buffer,size_t input_buffer_size,size_t * consumed_bytes,bool upstream_eof_reached)76   base::expected<size_t, Error> FilterData(IOBuffer* output_buffer,
77                                            size_t output_buffer_size,
78                                            IOBuffer* input_buffer,
79                                            size_t input_buffer_size,
80                                            size_t* consumed_bytes,
81                                            bool upstream_eof_reached) override {
82     buffer_.append(input_buffer->data(), input_buffer_size);
83     EXPECT_GE(expected_input_bytes_, input_buffer_size);
84     expected_input_bytes_ -= input_buffer_size;
85     *consumed_bytes = input_buffer_size;
86     if (!upstream_eof_reached) {
87       // Keep returning 0 bytes read until all input has been consumed.
88       return 0;
89     }
90     EXPECT_EQ(0u, expected_input_bytes_);
91     return WriteBufferToOutput(output_buffer, output_buffer_size);
92   }
93 
94  private:
95   // Expected remaining bytes to be received from |upstream|.
96   size_t expected_input_bytes_;
97 };
98 
99 // A FilterSourceStream that repeat every input byte by |multiplier| amount of
100 // times.
101 class MultiplySourceStream : public TestFilterSourceStreamBase {
102  public:
MultiplySourceStream(std::unique_ptr<SourceStream> upstream,int multiplier)103   MultiplySourceStream(std::unique_ptr<SourceStream> upstream, int multiplier)
104       : TestFilterSourceStreamBase(std::move(upstream)),
105         multiplier_(multiplier) {}
106 
107   MultiplySourceStream(const MultiplySourceStream&) = delete;
108   MultiplySourceStream& operator=(const MultiplySourceStream&) = delete;
109 
FilterData(IOBuffer * output_buffer,size_t output_buffer_size,IOBuffer * input_buffer,size_t input_buffer_size,size_t * consumed_bytes,bool)110   base::expected<size_t, Error> FilterData(
111       IOBuffer* output_buffer,
112       size_t output_buffer_size,
113       IOBuffer* input_buffer,
114       size_t input_buffer_size,
115       size_t* consumed_bytes,
116       bool /*upstream_eof_reached*/) override {
117     for (size_t i = 0; i < input_buffer_size; i++) {
118       for (int j = 0; j < multiplier_; j++)
119         buffer_.append(input_buffer->data() + i, 1);
120     }
121     *consumed_bytes = input_buffer_size;
122     return WriteBufferToOutput(output_buffer, output_buffer_size);
123   }
124 
125  private:
126   int multiplier_;
127 };
128 
129 // A FilterSourceStream passes through data unchanged to consumer.
130 class PassThroughFilterSourceStream : public TestFilterSourceStreamBase {
131  public:
PassThroughFilterSourceStream(std::unique_ptr<SourceStream> upstream)132   explicit PassThroughFilterSourceStream(std::unique_ptr<SourceStream> upstream)
133       : TestFilterSourceStreamBase(std::move(upstream)) {}
134 
135   PassThroughFilterSourceStream(const PassThroughFilterSourceStream&) = delete;
136   PassThroughFilterSourceStream& operator=(
137       const PassThroughFilterSourceStream&) = delete;
138 
FilterData(IOBuffer * output_buffer,size_t output_buffer_size,IOBuffer * input_buffer,size_t input_buffer_size,size_t * consumed_bytes,bool)139   base::expected<size_t, Error> FilterData(
140       IOBuffer* output_buffer,
141       size_t output_buffer_size,
142       IOBuffer* input_buffer,
143       size_t input_buffer_size,
144       size_t* consumed_bytes,
145       bool /*upstream_eof_reached*/) override {
146     buffer_.append(input_buffer->data(), input_buffer_size);
147     *consumed_bytes = input_buffer_size;
148     return WriteBufferToOutput(output_buffer, output_buffer_size);
149   }
150 };
151 
152 // A FilterSourceStream passes throttle input data such that it returns them to
153 // caller only one bytes at a time.
154 class ThrottleSourceStream : public TestFilterSourceStreamBase {
155  public:
ThrottleSourceStream(std::unique_ptr<SourceStream> upstream)156   explicit ThrottleSourceStream(std::unique_ptr<SourceStream> upstream)
157       : TestFilterSourceStreamBase(std::move(upstream)) {}
158 
159   ThrottleSourceStream(const ThrottleSourceStream&) = delete;
160   ThrottleSourceStream& operator=(const ThrottleSourceStream&) = delete;
161 
FilterData(IOBuffer * output_buffer,size_t output_buffer_size,IOBuffer * input_buffer,size_t input_buffer_size,size_t * consumed_bytes,bool)162   base::expected<size_t, Error> FilterData(
163       IOBuffer* output_buffer,
164       size_t output_buffer_size,
165       IOBuffer* input_buffer,
166       size_t input_buffer_size,
167       size_t* consumed_bytes,
168       bool /*upstream_eof_reached*/) override {
169     buffer_.append(input_buffer->data(), input_buffer_size);
170     *consumed_bytes = input_buffer_size;
171     size_t bytes_to_read = std::min(size_t{1}, buffer_.size());
172     memcpy(output_buffer->data(), buffer_.data(), bytes_to_read);
173     buffer_.erase(0, bytes_to_read);
174     return bytes_to_read;
175   }
176 };
177 
178 // A FilterSourceStream that consumes all input data but return no output.
179 class NoOutputSourceStream : public TestFilterSourceStreamBase {
180  public:
NoOutputSourceStream(std::unique_ptr<SourceStream> upstream,size_t expected_input_size)181   NoOutputSourceStream(std::unique_ptr<SourceStream> upstream,
182                        size_t expected_input_size)
183       : TestFilterSourceStreamBase(std::move(upstream)),
184         expected_input_size_(expected_input_size) {}
185 
186   NoOutputSourceStream(const NoOutputSourceStream&) = delete;
187   NoOutputSourceStream& operator=(const NoOutputSourceStream&) = delete;
188 
FilterData(IOBuffer * output_buffer,size_t output_buffer_size,IOBuffer * input_buffer,size_t input_buffer_size,size_t * consumed_bytes,bool)189   base::expected<size_t, Error> FilterData(
190       IOBuffer* output_buffer,
191       size_t output_buffer_size,
192       IOBuffer* input_buffer,
193       size_t input_buffer_size,
194       size_t* consumed_bytes,
195       bool /*upstream_eof_reached*/) override {
196     EXPECT_GE(expected_input_size_, input_buffer_size);
197     expected_input_size_ -= input_buffer_size;
198     *consumed_bytes = input_buffer_size;
199     consumed_all_input_ = (expected_input_size_ == 0);
200     return 0;
201   }
202 
consumed_all_input() const203   bool consumed_all_input() const { return consumed_all_input_; }
204 
205  private:
206   // Expected remaining bytes to be received from |upstream|.
207   size_t expected_input_size_;
208   bool consumed_all_input_ = false;
209 };
210 
211 // A FilterSourceStream return an error code in FilterData().
212 class ErrorFilterSourceStream : public FilterSourceStream {
213  public:
ErrorFilterSourceStream(std::unique_ptr<SourceStream> upstream)214   explicit ErrorFilterSourceStream(std::unique_ptr<SourceStream> upstream)
215       : FilterSourceStream(SourceStream::TYPE_NONE, std::move(upstream)) {}
216 
217   ErrorFilterSourceStream(const ErrorFilterSourceStream&) = delete;
218   ErrorFilterSourceStream& operator=(const ErrorFilterSourceStream&) = delete;
219 
FilterData(IOBuffer * output_buffer,size_t output_buffer_size,IOBuffer * input_buffer,size_t input_buffer_size,size_t * consumed_bytes,bool)220   base::expected<size_t, Error> FilterData(
221       IOBuffer* output_buffer,
222       size_t output_buffer_size,
223       IOBuffer* input_buffer,
224       size_t input_buffer_size,
225       size_t* consumed_bytes,
226       bool /*upstream_eof_reached*/) override {
227     return base::unexpected(ERR_CONTENT_DECODING_FAILED);
228   }
GetTypeAsString() const229   std::string GetTypeAsString() const override { return ""; }
230 };
231 
232 }  // namespace
233 
234 class FilterSourceStreamTest
235     : public ::testing::TestWithParam<MockSourceStream::Mode> {
236  protected:
237   // If MockSourceStream::Mode is ASYNC, completes |num_reads| from
238   // |mock_stream| and wait for |callback| to complete. If Mode is not ASYNC,
239   // does nothing and returns |previous_result|.
CompleteReadIfAsync(int previous_result,TestCompletionCallback * callback,MockSourceStream * mock_stream,size_t num_reads)240   int CompleteReadIfAsync(int previous_result,
241                           TestCompletionCallback* callback,
242                           MockSourceStream* mock_stream,
243                           size_t num_reads) {
244     if (GetParam() == MockSourceStream::ASYNC) {
245       EXPECT_EQ(ERR_IO_PENDING, previous_result);
246       while (num_reads > 0) {
247         mock_stream->CompleteNextRead();
248         num_reads--;
249       }
250       return callback->WaitForResult();
251     }
252     return previous_result;
253   }
254 };
255 
256 INSTANTIATE_TEST_SUITE_P(FilterSourceStreamTests,
257                          FilterSourceStreamTest,
258                          ::testing::Values(MockSourceStream::SYNC,
259                                            MockSourceStream::ASYNC));
260 
261 // Tests that a FilterSourceStream subclass (NeedsAllInputFilterSourceStream)
262 // can return 0 bytes for FilterData()s when it has not consumed EOF from the
263 // upstream. In this case, FilterSourceStream should continue reading from
264 // upstream to complete filtering.
TEST_P(FilterSourceStreamTest,FilterDataReturnNoBytesExceptLast)265 TEST_P(FilterSourceStreamTest, FilterDataReturnNoBytesExceptLast) {
266   auto source = std::make_unique<MockSourceStream>();
267   std::string input("hello, world!");
268   size_t read_size = 2;
269   size_t num_reads = 0;
270   // Add a sequence of small reads.
271   for (size_t offset = 0; offset < input.length(); offset += read_size) {
272     source->AddReadResult(input.data() + offset,
273                           std::min(read_size, input.length() - offset), OK,
274                           GetParam());
275     num_reads++;
276   }
277   source->AddReadResult(input.data(), 0, OK, GetParam());  // EOF
278   num_reads++;
279 
280   MockSourceStream* mock_stream = source.get();
281   NeedsAllInputFilterSourceStream stream(std::move(source), input.length());
282   scoped_refptr<IOBufferWithSize> output_buffer =
283       base::MakeRefCounted<IOBufferWithSize>(kDefaultBufferSize);
284   TestCompletionCallback callback;
285   std::string actual_output;
286   while (true) {
287     int rv = stream.Read(output_buffer.get(), output_buffer->size(),
288                          callback.callback());
289     if (rv == ERR_IO_PENDING)
290       rv = CompleteReadIfAsync(rv, &callback, mock_stream, num_reads);
291     if (rv == OK)
292       break;
293     ASSERT_GT(rv, OK);
294     actual_output.append(output_buffer->data(), rv);
295   }
296   EXPECT_EQ(input, actual_output);
297 }
298 
299 // Tests that FilterData() returns 0 byte read because the upstream gives an
300 // EOF.
TEST_P(FilterSourceStreamTest,FilterDataReturnNoByte)301 TEST_P(FilterSourceStreamTest, FilterDataReturnNoByte) {
302   auto source = std::make_unique<MockSourceStream>();
303   std::string input;
304   source->AddReadResult(input.data(), 0, OK, GetParam());
305   MockSourceStream* mock_stream = source.get();
306   PassThroughFilterSourceStream stream(std::move(source));
307   scoped_refptr<IOBufferWithSize> output_buffer =
308       base::MakeRefCounted<IOBufferWithSize>(kDefaultBufferSize);
309   TestCompletionCallback callback;
310   int rv = stream.Read(output_buffer.get(), output_buffer->size(),
311                        callback.callback());
312   rv = CompleteReadIfAsync(rv, &callback, mock_stream, 1);
313   EXPECT_EQ(OK, rv);
314 }
315 
316 // Tests that FilterData() returns 0 byte filtered even though the upstream
317 // produces data.
TEST_P(FilterSourceStreamTest,FilterDataOutputNoData)318 TEST_P(FilterSourceStreamTest, FilterDataOutputNoData) {
319   auto source = std::make_unique<MockSourceStream>();
320   std::string input = "hello, world!";
321   size_t read_size = 2;
322   size_t num_reads = 0;
323   // Add a sequence of small reads.
324   for (size_t offset = 0; offset < input.length(); offset += read_size) {
325     source->AddReadResult(input.data() + offset,
326                           std::min(read_size, input.length() - offset), OK,
327                           GetParam());
328     num_reads++;
329   }
330   // Add a 0 byte read to signal EOF.
331   source->AddReadResult(input.data() + input.length(), 0, OK, GetParam());
332   num_reads++;
333   MockSourceStream* mock_stream = source.get();
334   NoOutputSourceStream stream(std::move(source), input.length());
335   scoped_refptr<IOBufferWithSize> output_buffer =
336       base::MakeRefCounted<IOBufferWithSize>(kDefaultBufferSize);
337   TestCompletionCallback callback;
338   int rv = stream.Read(output_buffer.get(), output_buffer->size(),
339                        callback.callback());
340   rv = CompleteReadIfAsync(rv, &callback, mock_stream, num_reads);
341   EXPECT_EQ(OK, rv);
342   EXPECT_TRUE(stream.consumed_all_input());
343 }
344 
345 // Tests that FilterData() returns non-zero bytes because the upstream
346 // returns data.
TEST_P(FilterSourceStreamTest,FilterDataReturnData)347 TEST_P(FilterSourceStreamTest, FilterDataReturnData) {
348   auto source = std::make_unique<MockSourceStream>();
349   std::string input = "hello, world!";
350   size_t read_size = 2;
351   // Add a sequence of small reads.
352   for (size_t offset = 0; offset < input.length(); offset += read_size) {
353     source->AddReadResult(input.data() + offset,
354                           std::min(read_size, input.length() - offset), OK,
355                           GetParam());
356   }
357   // Add a 0 byte read to signal EOF.
358   source->AddReadResult(input.data() + input.length(), 0, OK, GetParam());
359   MockSourceStream* mock_stream = source.get();
360   PassThroughFilterSourceStream stream(std::move(source));
361   scoped_refptr<IOBufferWithSize> output_buffer =
362       base::MakeRefCounted<IOBufferWithSize>(kDefaultBufferSize);
363   TestCompletionCallback callback;
364   std::string actual_output;
365   while (true) {
366     int rv = stream.Read(output_buffer.get(), output_buffer->size(),
367                          callback.callback());
368     rv = CompleteReadIfAsync(rv, &callback, mock_stream, /*num_reads=*/1);
369     if (rv == OK)
370       break;
371     ASSERT_GE(static_cast<int>(read_size), rv);
372     ASSERT_GT(rv, OK);
373     actual_output.append(output_buffer->data(), rv);
374   }
375   EXPECT_EQ(input, actual_output);
376 }
377 
378 // Tests that FilterData() returns more data than what it consumed.
TEST_P(FilterSourceStreamTest,FilterDataReturnMoreData)379 TEST_P(FilterSourceStreamTest, FilterDataReturnMoreData) {
380   auto source = std::make_unique<MockSourceStream>();
381   std::string input = "hello, world!";
382   size_t read_size = 2;
383   // Add a sequence of small reads.
384   for (size_t offset = 0; offset < input.length(); offset += read_size) {
385     source->AddReadResult(input.data() + offset,
386                           std::min(read_size, input.length() - offset), OK,
387                           GetParam());
388   }
389   // Add a 0 byte read to signal EOF.
390   source->AddReadResult(input.data() + input.length(), 0, OK, GetParam());
391   MockSourceStream* mock_stream = source.get();
392   int multiplier = 2;
393   MultiplySourceStream stream(std::move(source), multiplier);
394   scoped_refptr<IOBufferWithSize> output_buffer =
395       base::MakeRefCounted<IOBufferWithSize>(kDefaultBufferSize);
396   TestCompletionCallback callback;
397   std::string actual_output;
398   while (true) {
399     int rv = stream.Read(output_buffer.get(), output_buffer->size(),
400                          callback.callback());
401     rv = CompleteReadIfAsync(rv, &callback, mock_stream, /*num_reads=*/1);
402     if (rv == OK)
403       break;
404     ASSERT_GE(static_cast<int>(read_size) * multiplier, rv);
405     ASSERT_GT(rv, OK);
406     actual_output.append(output_buffer->data(), rv);
407   }
408   EXPECT_EQ("hheelllloo,,  wwoorrlldd!!", actual_output);
409 }
410 
411 // Tests that FilterData() returns non-zero bytes and output buffer size is
412 // smaller than the number of bytes read from the upstream.
TEST_P(FilterSourceStreamTest,FilterDataOutputSpace)413 TEST_P(FilterSourceStreamTest, FilterDataOutputSpace) {
414   auto source = std::make_unique<MockSourceStream>();
415   std::string input = "hello, world!";
416   size_t read_size = 2;
417   // Add a sequence of small reads.
418   for (size_t offset = 0; offset < input.length(); offset += read_size) {
419     source->AddReadResult(input.data() + offset,
420                           std::min(read_size, input.length() - offset), OK,
421                           GetParam());
422   }
423   // Add a 0 byte read to signal EOF.
424   source->AddReadResult(input.data() + input.length(), 0, OK, GetParam());
425   // Use an extremely small buffer size, so FilterData will need more output
426   // space.
427   scoped_refptr<IOBufferWithSize> output_buffer =
428       base::MakeRefCounted<IOBufferWithSize>(kSmallBufferSize);
429   MockSourceStream* mock_stream = source.get();
430   PassThroughFilterSourceStream stream(std::move(source));
431   TestCompletionCallback callback;
432   std::string actual_output;
433   while (true) {
434     int rv = stream.Read(output_buffer.get(), output_buffer->size(),
435                          callback.callback());
436     if (rv == ERR_IO_PENDING)
437       rv = CompleteReadIfAsync(rv, &callback, mock_stream, /*num_reads=*/1);
438     if (rv == OK)
439       break;
440     ASSERT_GT(rv, OK);
441     ASSERT_GE(kSmallBufferSize, static_cast<size_t>(rv));
442     actual_output.append(output_buffer->data(), rv);
443   }
444   EXPECT_EQ(input, actual_output);
445 }
446 
447 // Tests that FilterData() returns an error code, which is then surfaced as
448 // the result of calling Read().
TEST_P(FilterSourceStreamTest,FilterDataReturnError)449 TEST_P(FilterSourceStreamTest, FilterDataReturnError) {
450   auto source = std::make_unique<MockSourceStream>();
451   std::string input;
452   source->AddReadResult(input.data(), 0, OK, GetParam());
453   scoped_refptr<IOBufferWithSize> output_buffer =
454       base::MakeRefCounted<IOBufferWithSize>(kDefaultBufferSize);
455   MockSourceStream* mock_stream = source.get();
456   ErrorFilterSourceStream stream(std::move(source));
457   TestCompletionCallback callback;
458   int rv = stream.Read(output_buffer.get(), output_buffer->size(),
459                        callback.callback());
460   rv = CompleteReadIfAsync(rv, &callback, mock_stream, /*num_reads=*/1);
461   EXPECT_EQ(ERR_CONTENT_DECODING_FAILED, rv);
462   // Reading from |stream| again should return the same error.
463   rv = stream.Read(output_buffer.get(), output_buffer->size(),
464                    callback.callback());
465   EXPECT_EQ(ERR_CONTENT_DECODING_FAILED, rv);
466 }
467 
TEST_P(FilterSourceStreamTest,FilterChaining)468 TEST_P(FilterSourceStreamTest, FilterChaining) {
469   auto source = std::make_unique<MockSourceStream>();
470   std::string input = "hello, world!";
471   source->AddReadResult(input.data(), input.length(), OK, GetParam());
472   source->AddReadResult(input.data(), 0, OK, GetParam());  // EOF
473 
474   MockSourceStream* mock_stream = source.get();
475   auto pass_through_source =
476       std::make_unique<PassThroughFilterSourceStream>(std::move(source));
477   pass_through_source->set_type_string("FIRST_PASS_THROUGH");
478   auto needs_all_input_source =
479       std::make_unique<NeedsAllInputFilterSourceStream>(
480           std::move(pass_through_source), input.length());
481   needs_all_input_source->set_type_string("NEEDS_ALL");
482   auto second_pass_through_source =
483       std::make_unique<PassThroughFilterSourceStream>(
484           std::move(needs_all_input_source));
485   second_pass_through_source->set_type_string("SECOND_PASS_THROUGH");
486   scoped_refptr<IOBufferWithSize> output_buffer =
487       base::MakeRefCounted<IOBufferWithSize>(kDefaultBufferSize);
488 
489   TestCompletionCallback callback;
490   std::string actual_output;
491   while (true) {
492     int rv = second_pass_through_source->Read(
493         output_buffer.get(), output_buffer->size(), callback.callback());
494     if (rv == ERR_IO_PENDING)
495       rv = CompleteReadIfAsync(rv, &callback, mock_stream, /*num_reads=*/2);
496     if (rv == OK)
497       break;
498     ASSERT_GT(rv, OK);
499     actual_output.append(output_buffer->data(), rv);
500   }
501   EXPECT_EQ(input, actual_output);
502   // Type string (from left to right) should be the order of data flow.
503   EXPECT_EQ("FIRST_PASS_THROUGH,NEEDS_ALL,SECOND_PASS_THROUGH",
504             second_pass_through_source->Description());
505 }
506 
507 // Tests that FilterData() returns multiple times for a single MockStream
508 // read, because there is not enough output space.
TEST_P(FilterSourceStreamTest,OutputSpaceForOneRead)509 TEST_P(FilterSourceStreamTest, OutputSpaceForOneRead) {
510   auto source = std::make_unique<MockSourceStream>();
511   std::string input = "hello, world!";
512   source->AddReadResult(input.data(), input.length(), OK, GetParam());
513   // Add a 0 byte read to signal EOF.
514   source->AddReadResult(input.data() + input.length(), 0, OK, GetParam());
515   // Use an extremely small buffer size (1 byte), so FilterData will need more
516   // output space.
517   scoped_refptr<IOBufferWithSize> output_buffer =
518       base::MakeRefCounted<IOBufferWithSize>(kSmallBufferSize);
519   MockSourceStream* mock_stream = source.get();
520   PassThroughFilterSourceStream stream(std::move(source));
521   TestCompletionCallback callback;
522   std::string actual_output;
523   while (true) {
524     int rv = stream.Read(output_buffer.get(), output_buffer->size(),
525                          callback.callback());
526     if (rv == ERR_IO_PENDING)
527       rv = CompleteReadIfAsync(rv, &callback, mock_stream, /*num_reads=*/1);
528     if (rv == OK)
529       break;
530     ASSERT_GT(rv, OK);
531     ASSERT_GE(kSmallBufferSize, static_cast<size_t>(rv));
532     actual_output.append(output_buffer->data(), rv);
533   }
534   EXPECT_EQ(input, actual_output);
535 }
536 
537 // Tests that FilterData() returns multiple times for a single MockStream
538 // read, because the filter returns one byte at a time.
TEST_P(FilterSourceStreamTest,ThrottleSourceStream)539 TEST_P(FilterSourceStreamTest, ThrottleSourceStream) {
540   auto source = std::make_unique<MockSourceStream>();
541   std::string input = "hello, world!";
542   source->AddReadResult(input.data(), input.length(), OK, GetParam());
543   // Add a 0 byte read to signal EOF.
544   source->AddReadResult(input.data() + input.length(), 0, OK, GetParam());
545   scoped_refptr<IOBufferWithSize> output_buffer =
546       base::MakeRefCounted<IOBufferWithSize>(kDefaultBufferSize);
547   MockSourceStream* mock_stream = source.get();
548   ThrottleSourceStream stream(std::move(source));
549   TestCompletionCallback callback;
550   std::string actual_output;
551   while (true) {
552     int rv = stream.Read(output_buffer.get(), output_buffer->size(),
553                          callback.callback());
554     if (rv == ERR_IO_PENDING)
555       rv = CompleteReadIfAsync(rv, &callback, mock_stream, /*num_reads=*/1);
556     if (rv == OK)
557       break;
558     ASSERT_GT(rv, OK);
559     // ThrottleSourceStream returns 1 byte at a time.
560     ASSERT_GE(1u, static_cast<size_t>(rv));
561     actual_output.append(output_buffer->data(), rv);
562   }
563   EXPECT_EQ(input, actual_output);
564 }
565 
566 }  // namespace net
567