xref: /aosp_15_r20/external/grpc-grpc-java/netty/src/test/java/io/grpc/netty/NettyAdaptiveCumulatorTest.java (revision e07d83d3ffcef9ecfc9f7f475418ec639ff0e5fe)
1 /*
2  * Copyright 2020 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.netty;
18 
19 import static com.google.common.truth.Truth.assertThat;
20 import static com.google.common.truth.Truth.assertWithMessage;
21 import static com.google.common.truth.TruthJUnit.assume;
22 import static io.netty.util.CharsetUtil.US_ASCII;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertFalse;
25 import static org.junit.Assert.assertSame;
26 import static org.junit.Assert.assertTrue;
27 import static org.junit.Assert.fail;
28 import static org.mockito.ArgumentMatchers.anyInt;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.when;
31 
32 import com.google.common.base.Strings;
33 import com.google.common.collect.ImmutableList;
34 import com.google.common.collect.Lists;
35 import io.netty.buffer.ByteBuf;
36 import io.netty.buffer.ByteBufAllocator;
37 import io.netty.buffer.ByteBufUtil;
38 import io.netty.buffer.CompositeByteBuf;
39 import io.netty.buffer.PooledByteBufAllocator;
40 import io.netty.buffer.UnpooledByteBufAllocator;
41 import java.util.Collection;
42 import java.util.List;
43 import java.util.stream.Collectors;
44 import org.junit.After;
45 import org.junit.Before;
46 import org.junit.Test;
47 import org.junit.experimental.runners.Enclosed;
48 import org.junit.runner.RunWith;
49 import org.junit.runners.JUnit4;
50 import org.junit.runners.Parameterized;
51 import org.junit.runners.Parameterized.Parameter;
52 import org.junit.runners.Parameterized.Parameters;
53 
54 @RunWith(Enclosed.class)
55 public class NettyAdaptiveCumulatorTest {
56 
cartesianProductParams(List<?>.... lists)57   private static Collection<Object[]> cartesianProductParams(List<?>... lists) {
58     return Lists.cartesianProduct(lists).stream().map(List::toArray).collect(Collectors.toList());
59   }
60 
61   @RunWith(JUnit4.class)
62   public static class CumulateTests {
63     // Represent data as immutable ASCII Strings for easy and readable ByteBuf equality assertions.
64     private static final String DATA_INITIAL = "0123";
65     private static final String DATA_INCOMING = "456789";
66     private static final String DATA_CUMULATED = "0123456789";
67 
68     private static final ByteBufAllocator alloc = new UnpooledByteBufAllocator(false);
69     private NettyAdaptiveCumulator cumulator;
70     private NettyAdaptiveCumulator throwingCumulator;
71     private final UnsupportedOperationException throwingCumulatorError =
72         new UnsupportedOperationException();
73 
74     // Buffers for testing
75     private final ByteBuf contiguous = ByteBufUtil.writeAscii(alloc, DATA_INITIAL);
76     private final ByteBuf in = ByteBufUtil.writeAscii(alloc, DATA_INCOMING);
77 
78     @Before
setUp()79     public void setUp() {
80       cumulator = new NettyAdaptiveCumulator(0) {
81         @Override
82         void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
83           // To limit the testing scope to NettyAdaptiveCumulator.cumulate(), always compose
84           composite.addFlattenedComponents(true, in);
85         }
86       };
87 
88       // Throws an error on adding incoming buffer.
89       throwingCumulator = new NettyAdaptiveCumulator(0) {
90         @Override
91         void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
92           throw throwingCumulatorError;
93         }
94       };
95     }
96 
97     @Test
cumulate_notReadableCumulation_replacedWithInputAndReleased()98     public void cumulate_notReadableCumulation_replacedWithInputAndReleased() {
99       contiguous.readerIndex(contiguous.writerIndex());
100       assertFalse(contiguous.isReadable());
101       ByteBuf cumulation = cumulator.cumulate(alloc, contiguous, in);
102       assertEquals(DATA_INCOMING, cumulation.toString(US_ASCII));
103       assertEquals(0, contiguous.refCnt());
104       // In retained by cumulation.
105       assertEquals(1, in.refCnt());
106       assertEquals(1, cumulation.refCnt());
107       cumulation.release();
108     }
109 
110     @Test
cumulate_contiguousCumulation_newCompositeFromContiguousAndInput()111     public void cumulate_contiguousCumulation_newCompositeFromContiguousAndInput() {
112       CompositeByteBuf cumulation = (CompositeByteBuf) cumulator.cumulate(alloc, contiguous, in);
113       assertEquals(DATA_INITIAL, cumulation.component(0).toString(US_ASCII));
114       assertEquals(DATA_INCOMING, cumulation.component(1).toString(US_ASCII));
115       assertEquals(DATA_CUMULATED, cumulation.toString(US_ASCII));
116       // Both in and contiguous are retained by cumulation.
117       assertEquals(1, contiguous.refCnt());
118       assertEquals(1, in.refCnt());
119       assertEquals(1, cumulation.refCnt());
120       cumulation.release();
121     }
122 
123     @Test
cumulate_compositeCumulation_inputAppendedAsANewComponent()124     public void cumulate_compositeCumulation_inputAppendedAsANewComponent() {
125       CompositeByteBuf composite = alloc.compositeBuffer().addComponent(true, contiguous);
126       assertSame(composite, cumulator.cumulate(alloc, composite, in));
127       assertEquals(DATA_INITIAL, composite.component(0).toString(US_ASCII));
128       assertEquals(DATA_INCOMING, composite.component(1).toString(US_ASCII));
129       assertEquals(DATA_CUMULATED, composite.toString(US_ASCII));
130       // Both in and contiguous are retained by cumulation.
131       assertEquals(1, contiguous.refCnt());
132       assertEquals(1, in.refCnt());
133       assertEquals(1, composite.refCnt());
134       composite.release();
135     }
136 
137     @Test
cumulate_compositeCumulation_inputReleasedOnError()138     public void cumulate_compositeCumulation_inputReleasedOnError() {
139       CompositeByteBuf composite = alloc.compositeBuffer().addComponent(true, contiguous);
140       try {
141         throwingCumulator.cumulate(alloc, composite, in);
142         fail("Cumulator didn't throw");
143       } catch (UnsupportedOperationException actualError) {
144         assertSame(throwingCumulatorError, actualError);
145         // Input must be released unless its ownership has been to the composite cumulation.
146         assertEquals(0, in.refCnt());
147         // Initial composite cumulation owned by the caller in this case, so it isn't released.
148         assertEquals(1, composite.refCnt());
149         // Contiguous still managed by the cumulation
150         assertEquals(1, contiguous.refCnt());
151       } finally {
152         composite.release();
153       }
154     }
155 
156     @Test
cumulate_contiguousCumulation_inputAndNewCompositeReleasedOnError()157     public void cumulate_contiguousCumulation_inputAndNewCompositeReleasedOnError() {
158       // Return our instance of new composite to ensure it's released.
159       CompositeByteBuf newComposite = alloc.compositeBuffer(Integer.MAX_VALUE);
160       ByteBufAllocator mockAlloc = mock(ByteBufAllocator.class);
161       when(mockAlloc.compositeBuffer(anyInt())).thenReturn(newComposite);
162 
163       try {
164         // Previous cumulation is non-composite, so cumulator will create anew composite and add
165         // both buffers to it.
166         throwingCumulator.cumulate(mockAlloc, contiguous, in);
167         fail("Cumulator didn't throw");
168       } catch (UnsupportedOperationException actualError) {
169         assertSame(throwingCumulatorError, actualError);
170         // Input must be released unless its ownership has been to the composite cumulation.
171         assertEquals(0, in.refCnt());
172         // New composite cumulation hasn't been returned to the caller, so it must be released.
173         assertEquals(0, newComposite.refCnt());
174         // Previous cumulation released because it was owned by the new composite cumulation.
175         assertEquals(0, contiguous.refCnt());
176       }
177     }
178   }
179 
180   @RunWith(Parameterized.class)
181   public static class ShouldComposeTests {
182     // Represent data as immutable ASCII Strings for easy and readable ByteBuf equality assertions.
183     private static final String DATA_INITIAL = "0123";
184     private static final String DATA_INCOMING = "456789";
185 
186     /**
187      * Cartesian product of the test values.
188      */
189     @Parameters(name = "composeMinSize={0}, tailData=\"{1}\", inData=\"{2}\"")
params()190     public static Collection<Object[]> params() {
191       List<?> composeMinSize = ImmutableList.of(0, 9, 10, 11, Integer.MAX_VALUE);
192       List<?> tailData = ImmutableList.of("", DATA_INITIAL);
193       List<?> inData = ImmutableList.of("", DATA_INCOMING);
194       return cartesianProductParams(composeMinSize, tailData, inData);
195     }
196 
197     @Parameter public int composeMinSize;
198     @Parameter(1) public String tailData;
199     @Parameter(2) public String inData;
200 
201     private CompositeByteBuf composite;
202     private ByteBuf tail;
203     private ByteBuf in;
204 
205     @Before
setUp()206     public void setUp() {
207       ByteBufAllocator alloc = new UnpooledByteBufAllocator(false);
208       in = ByteBufUtil.writeAscii(alloc, inData);
209       tail = ByteBufUtil.writeAscii(alloc, tailData);
210       composite = alloc.compositeBuffer(Integer.MAX_VALUE);
211       // Note that addFlattenedComponents() will not add a new component when tail is not readable.
212       composite.addFlattenedComponents(true, tail);
213     }
214 
215     @After
tearDown()216     public void tearDown() {
217       in.release();
218       composite.release();
219     }
220 
221     @Test
shouldCompose_emptyComposite()222     public void shouldCompose_emptyComposite() {
223       assume().that(composite.numComponents()).isEqualTo(0);
224       assertTrue(NettyAdaptiveCumulator.shouldCompose(composite, in, composeMinSize));
225     }
226 
227     @Test
shouldCompose_composeMinSizeReached()228     public void shouldCompose_composeMinSizeReached() {
229       assume().that(composite.numComponents()).isGreaterThan(0);
230       assume().that(tail.readableBytes() + in.readableBytes()).isAtLeast(composeMinSize);
231       assertTrue(NettyAdaptiveCumulator.shouldCompose(composite, in, composeMinSize));
232     }
233 
234     @Test
shouldCompose_composeMinSizeNotReached()235     public void shouldCompose_composeMinSizeNotReached() {
236       assume().that(composite.numComponents()).isGreaterThan(0);
237       assume().that(tail.readableBytes() + in.readableBytes()).isLessThan(composeMinSize);
238       assertFalse(NettyAdaptiveCumulator.shouldCompose(composite, in, composeMinSize));
239     }
240   }
241 
242   @RunWith(Parameterized.class)
243   public static class MergeWithCompositeTailTests {
244     private static final String INCOMING_DATA_READABLE = "+incoming";
245     private static final String INCOMING_DATA_DISCARDABLE = "discard";
246 
247     private static final String TAIL_DATA_DISCARDABLE = "---";
248     private static final String TAIL_DATA_READABLE = "tail";
249     private static final String TAIL_DATA =  TAIL_DATA_DISCARDABLE + TAIL_DATA_READABLE;
250     private static final int TAIL_READER_INDEX = TAIL_DATA_DISCARDABLE.length();
251     private static final int TAIL_MAX_CAPACITY = 128;
252 
253     // DRY sacrificed to improve readability.
254     private static final String EXPECTED_TAIL_DATA = "tail+incoming";
255 
256     /**
257      * Cartesian product of the test values.
258      *
259      * <p>Test cases when the cumulation contains components, other than tail, and could be
260      * partially read. This is needed to verify the correctness if reader and writer indexes of the
261      * composite cumulation after the merge.
262      */
263     @Parameters(name = "compositeHeadData=\"{0}\", compositeReaderIndex={1}")
params()264     public static Collection<Object[]> params() {
265       String headData = "head";
266 
267       List<?> compositeHeadData = ImmutableList.of(
268           // Test without the "head" component. Empty string is equivalent of fully read buffer,
269           // so it's not added to the composite byte buf. The tail is added as the first component.
270           "",
271           // Test with the "head" component, so the tail is added as the second component.
272           headData
273       );
274 
275       // After the tail is added to the composite cumulator, advance the reader index to
276       // cover different cases.
277       // The reader index only looks at what's readable in the composite byte buf, so
278       // discardable bytes of head and tail doesn't count.
279       List<?> compositeReaderIndex = ImmutableList.of(
280           // Reader in the beginning
281           0,
282           // Within the head (when present) or the tail
283           headData.length() - 2,
284           // Within the tail, even if the head is present
285           headData.length() + 2
286       );
287       return cartesianProductParams(compositeHeadData, compositeReaderIndex);
288     }
289 
290     @Parameter public String compositeHeadData;
291     @Parameter(1) public int compositeReaderIndex;
292 
293     // Use pooled allocator to have maxFastWritableBytes() behave differently than writableBytes().
294     private final ByteBufAllocator alloc = new PooledByteBufAllocator();
295 
296     // Composite buffer to be used in tests.
297     private CompositeByteBuf composite;
298     private ByteBuf tail;
299     private ByteBuf in;
300 
301     @Before
setUp()302     public void setUp() {
303       composite = alloc.compositeBuffer();
304 
305       // The "head" component. It represents existing data in the cumulator.
306       // Note that addFlattenedComponents() does not add completely read buffer, which covers
307       // the case when compositeHeadData parameter is an empty string.
308       ByteBuf head = alloc.buffer().writeBytes(compositeHeadData.getBytes(US_ASCII));
309       composite.addFlattenedComponents(true, head);
310 
311       // The "tail" component. It also represents existing data in the cumulator, but it's
312       // not added to the cumulator during setUp() stage. It is to be manipulated by tests to
313       // produce different buffer write scenarios based on different tail's capacity.
314       // After tail is changes for each test scenario, it's added to the composite buffer.
315       //
316       // The default state of the tail before each test: tail is full, but expandable (the data uses
317       // all initial capacity, but not maximum capacity).
318       // Tail data and indexes:
319       // ----tail
320       //     r   w
321       tail = alloc.buffer(TAIL_DATA.length(), TAIL_MAX_CAPACITY)
322           .writeBytes(TAIL_DATA.getBytes(US_ASCII))
323           .readerIndex(TAIL_READER_INDEX);
324 
325       // Incoming data and indexes:
326       // discard+incoming
327       //        r        w
328       in = alloc.buffer()
329           .writeBytes(INCOMING_DATA_DISCARDABLE.getBytes(US_ASCII))
330           .writeBytes(INCOMING_DATA_READABLE.getBytes(US_ASCII))
331           .readerIndex(INCOMING_DATA_DISCARDABLE.length());
332     }
333 
334     @After
tearDown()335     public void tearDown() {
336       composite.release();
337     }
338 
339     @Test
mergeWithCompositeTail_tailExpandable_write()340     public void mergeWithCompositeTail_tailExpandable_write() {
341       // Make incoming data fit into tail capacity.
342       int fitCapacity = tail.capacity() + INCOMING_DATA_READABLE.length();
343       tail.capacity(fitCapacity);
344       // Confirm it fits.
345       assertThat(in.readableBytes()).isAtMost(tail.writableBytes());
346 
347       // All fits, so tail capacity must stay the same.
348       composite.addFlattenedComponents(true, tail);
349       assertTailExpanded(EXPECTED_TAIL_DATA, fitCapacity);
350     }
351 
352     @Test
mergeWithCompositeTail_tailExpandable_fastWrite()353     public void mergeWithCompositeTail_tailExpandable_fastWrite() {
354       // Confirm that the tail can be expanded fast to fit the incoming data.
355       assertThat(in.readableBytes()).isAtMost(tail.maxFastWritableBytes());
356 
357       // To avoid undesirable buffer unwrapping, at the moment adaptive cumulator is set not
358       // apply fastWrite technique. Even when fast write is possible, it will fall back to
359       // reallocating a larger buffer.
360       // int tailFastCapacity = tail.writerIndex() + tail.maxFastWritableBytes();
361       int tailFastCapacity =
362           alloc.calculateNewCapacity(EXPECTED_TAIL_DATA.length(), Integer.MAX_VALUE);
363 
364       // Tail capacity is extended to its fast capacity.
365       composite.addFlattenedComponents(true, tail);
366       assertTailExpanded(EXPECTED_TAIL_DATA, tailFastCapacity);
367     }
368 
369     @Test
mergeWithCompositeTail_tailExpandable_reallocateInMemory()370     public void mergeWithCompositeTail_tailExpandable_reallocateInMemory() {
371       int tailFastCapacity = tail.writerIndex() + tail.maxFastWritableBytes();
372       String inSuffixOverFastBytes = Strings.repeat("a", tailFastCapacity + 1);
373       int newTailSize =  tail.readableBytes() + inSuffixOverFastBytes.length();
374       composite.addFlattenedComponents(true, tail);
375 
376       // Make input larger than tailFastCapacity
377       in.writeCharSequence(inSuffixOverFastBytes, US_ASCII);
378       // Confirm that the tail can only fit incoming data via reallocation.
379       assertThat(in.readableBytes()).isGreaterThan(tail.maxFastWritableBytes());
380       assertThat(in.readableBytes()).isAtMost(tail.maxWritableBytes());
381 
382       // Confirm the assumption that new capacity is produced by alloc.calculateNewCapacity().
383       int expectedTailCapacity = alloc.calculateNewCapacity(newTailSize, Integer.MAX_VALUE);
384       assertTailExpanded(EXPECTED_TAIL_DATA.concat(inSuffixOverFastBytes), expectedTailCapacity);
385     }
386 
assertTailExpanded(String expectedTailReadableData, int expectedNewTailCapacity)387     private void assertTailExpanded(String expectedTailReadableData, int expectedNewTailCapacity) {
388       int originalNumComponents = composite.numComponents();
389 
390       // Handle the case when reader index is beyond all readable bytes of the cumulation.
391       int compositeReaderIndexBounded = Math.min(compositeReaderIndex, composite.writerIndex());
392       composite.readerIndex(compositeReaderIndexBounded);
393 
394       // Execute the merge logic.
395       NettyAdaptiveCumulator.mergeWithCompositeTail(alloc, composite, in);
396 
397       // Composite component count shouldn't change.
398       assertWithMessage(
399           "When tail is expanded, the number of components in the cumulation must not change")
400           .that(composite.numComponents()).isEqualTo(originalNumComponents);
401 
402       ByteBuf newTail = composite.component(composite.numComponents() - 1);
403 
404       // Verify the readable part of the expanded tail:
405       // 1. Initial readable bytes of the tail not changed
406       // 2. Discardable bytes (0 < discardable < readerIndex) of the incoming buffer are discarded.
407       // 3. Readable bytes of the incoming buffer are fully read and appended to the tail.
408       assertEquals(expectedTailReadableData, newTail.toString(US_ASCII));
409       // Verify expanded capacity.
410       assertEquals(expectedNewTailCapacity, newTail.capacity());
411 
412       // Discardable bytes (0 < discardable < readerIndex) of the tail are kept as is.
413       String newTailDataDiscardable = newTail.toString(0, newTail.readerIndex(), US_ASCII);
414       assertWithMessage("After tail expansion, its discardable bytes should be unchanged")
415           .that(newTailDataDiscardable).isEqualTo(TAIL_DATA_DISCARDABLE);
416 
417       // Reader index must stay where it was
418       assertEquals(TAIL_READER_INDEX, newTail.readerIndex());
419       // Writer index at the end
420       assertEquals(TAIL_READER_INDEX + expectedTailReadableData.length(),
421           newTail.writerIndex());
422 
423       // Verify resulting cumulation.
424       assertExpectedCumulation(newTail, expectedTailReadableData, compositeReaderIndexBounded);
425 
426       // Verify incoming buffer.
427       assertWithMessage("Incoming buffer is fully read").that(in.isReadable()).isFalse();
428       assertWithMessage("Incoming buffer is released").that(in.refCnt()).isEqualTo(0);
429     }
430 
431     @Test
mergeWithCompositeTail_tailNotExpandable_maxCapacityReached()432     public void mergeWithCompositeTail_tailNotExpandable_maxCapacityReached() {
433       // Fill in tail to the maxCapacity.
434       String tailSuffixFullCapacity = Strings.repeat("a", tail.maxWritableBytes());
435       tail.writeCharSequence(tailSuffixFullCapacity, US_ASCII);
436       composite.addFlattenedComponents(true, tail);
437       assertTailReplaced();
438     }
439 
440     @Test
mergeWithCompositeTail_tailNotExpandable_shared()441     public void mergeWithCompositeTail_tailNotExpandable_shared() {
442       tail.retain();
443       composite.addFlattenedComponents(true, tail);
444       assertTailReplaced();
445       tail.release();
446     }
447 
448     @Test
mergeWithCompositeTail_tailNotExpandable_readOnly()449     public void mergeWithCompositeTail_tailNotExpandable_readOnly() {
450       composite.addFlattenedComponents(true, tail.asReadOnly());
451       assertTailReplaced();
452     }
453 
assertTailReplaced()454     private void assertTailReplaced() {
455       int cumulationOriginalComponentsNum = composite.numComponents();
456       int taiOriginalRefCount = tail.refCnt();
457       String expectedTailReadable = tail.toString(US_ASCII) + in.toString(US_ASCII);
458       int expectedReallocatedTailCapacity = alloc
459           .calculateNewCapacity(expectedTailReadable.length(), Integer.MAX_VALUE);
460 
461       int compositeReaderIndexBounded = Math.min(compositeReaderIndex, composite.writerIndex());
462       composite.readerIndex(compositeReaderIndexBounded);
463       NettyAdaptiveCumulator.mergeWithCompositeTail(alloc, composite, in);
464 
465       // Composite component count shouldn't change.
466       assertEquals(cumulationOriginalComponentsNum, composite.numComponents());
467       ByteBuf replacedTail = composite.component(composite.numComponents() - 1);
468 
469       // Verify the readable part of the expanded tail:
470       // 1. Discardable bytes (0 < discardable < readerIndex) of the tail are discarded.
471       // 2. Readable bytes of the tail are kept as is
472       // 3. Discardable bytes (0 < discardable < readerIndex) of the incoming buffer are discarded.
473       // 4. Readable bytes of the incoming buffer are fully read and appended to the tail.
474       assertEquals(0, in.readableBytes());
475       assertEquals(expectedTailReadable, replacedTail.toString(US_ASCII));
476 
477       // Since tail discardable bytes are discarded, new reader index must be reset to 0.
478       assertEquals(0, replacedTail.readerIndex());
479       // And new writer index at the new data's length.
480       assertEquals(expectedTailReadable.length(), replacedTail.writerIndex());
481       // Verify the capacity of reallocated tail.
482       assertEquals(expectedReallocatedTailCapacity, replacedTail.capacity());
483 
484       // Verify resulting cumulation.
485       assertExpectedCumulation(replacedTail, expectedTailReadable, compositeReaderIndexBounded);
486 
487       // Verify incoming buffer.
488       assertWithMessage("Incoming buffer is fully read").that(in.isReadable()).isFalse();
489       assertWithMessage("Incoming buffer is released").that(in.refCnt()).isEqualTo(0);
490 
491       // The old tail must be released once (have one less reference).
492       assertWithMessage("Replaced tail released once.")
493           .that(tail.refCnt()).isEqualTo(taiOriginalRefCount - 1);
494     }
495 
assertExpectedCumulation( ByteBuf newTail, String expectedTailReadable, int expectedReaderIndex)496     private void assertExpectedCumulation(
497         ByteBuf newTail, String expectedTailReadable, int expectedReaderIndex) {
498       // Verify the readable part of the cumulation:
499       // 1. Readable composite head (initial) data
500       // 2. Readable part of the tail
501       // 3. Readable part of the incoming data
502       String expectedCumulationData =
503           compositeHeadData.concat(expectedTailReadable).substring(expectedReaderIndex);
504       assertEquals(expectedCumulationData, composite.toString(US_ASCII));
505 
506       // Cumulation capacity includes:
507       // 1. Full composite head, including discardable bytes
508       // 2. Expanded tail readable bytes
509       int expectedCumulationCapacity = compositeHeadData.length() + expectedTailReadable.length();
510       assertEquals(expectedCumulationCapacity, composite.capacity());
511 
512       // Composite Reader index must stay where it was.
513       assertEquals(expectedReaderIndex, composite.readerIndex());
514       // Composite writer index must be at the end.
515       assertEquals(expectedCumulationCapacity, composite.writerIndex());
516 
517       // Composite cumulation is retained and owns the new tail.
518       assertEquals(1, composite.refCnt());
519       assertEquals(1, newTail.refCnt());
520     }
521 
522     @Test
mergeWithCompositeTail_tailExpandable_mergedReleaseOnThrow()523     public void mergeWithCompositeTail_tailExpandable_mergedReleaseOnThrow() {
524       final UnsupportedOperationException expectedError = new UnsupportedOperationException();
525       CompositeByteBuf compositeThrows = new CompositeByteBuf(alloc, false, Integer.MAX_VALUE,
526           tail) {
527         @Override
528         public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex,
529             ByteBuf buffer) {
530           throw expectedError;
531         }
532       };
533 
534       try {
535         NettyAdaptiveCumulator.mergeWithCompositeTail(alloc, compositeThrows, in);
536         fail("Cumulator didn't throw");
537       } catch (UnsupportedOperationException actualError) {
538         assertSame(expectedError, actualError);
539         // Input must be released unless its ownership has been to the composite cumulation.
540         assertEquals(0, in.refCnt());
541         // Tail released
542         assertEquals(0, tail.refCnt());
543         // Composite cumulation is retained
544         assertEquals(1, compositeThrows.refCnt());
545         // Composite cumulation loses the tail
546         assertEquals(0, compositeThrows.numComponents());
547       } finally {
548         compositeThrows.release();
549       }
550     }
551 
552     @Test
mergeWithCompositeTail_tailNotExpandable_mergedReleaseOnThrow()553     public void mergeWithCompositeTail_tailNotExpandable_mergedReleaseOnThrow() {
554       final UnsupportedOperationException expectedError = new UnsupportedOperationException();
555       CompositeByteBuf compositeRo = new CompositeByteBuf(alloc, false, Integer.MAX_VALUE,
556           tail.asReadOnly()) {
557         @Override
558         public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex,
559             ByteBuf buffer) {
560           throw expectedError;
561         }
562       };
563 
564       // Return our instance of the new buffer to ensure it's released.
565       int newTailSize = tail.readableBytes() + in.readableBytes();
566       ByteBuf newTail = alloc.buffer(alloc.calculateNewCapacity(newTailSize, Integer.MAX_VALUE));
567       ByteBufAllocator mockAlloc = mock(ByteBufAllocator.class);
568       when(mockAlloc.buffer(anyInt())).thenReturn(newTail);
569 
570       try {
571         NettyAdaptiveCumulator.mergeWithCompositeTail(mockAlloc, compositeRo, in);
572         fail("Cumulator didn't throw");
573       } catch (UnsupportedOperationException actualError) {
574         assertSame(expectedError, actualError);
575         // Input must be released unless its ownership has been to the composite cumulation.
576         assertEquals(0, in.refCnt());
577         // New buffer released
578         assertEquals(0, newTail.refCnt());
579         // Composite cumulation is retained
580         assertEquals(1, compositeRo.refCnt());
581         // Composite cumulation loses the tail
582         assertEquals(0, compositeRo.numComponents());
583       } finally {
584         compositeRo.release();
585       }
586     }
587   }
588 
589   /**
590    * Miscellaneous tests for {@link NettyAdaptiveCumulator#mergeWithCompositeTail} that don't
591    * fit into {@link MergeWithCompositeTailTests}, and require custom-crafted scenarios.
592    */
593   @RunWith(JUnit4.class)
594   public static class MergeWithCompositeTailMiscTests {
595     private final ByteBufAllocator alloc = new PooledByteBufAllocator();
596 
597     /**
598      * Test the issue with {@link CompositeByteBuf#component(int)} returning a ByteBuf with
599      * the indexes out-of-sync with {@code CompositeByteBuf.Component} offsets.
600      */
601     @Test
mergeWithCompositeTail_outOfSyncComposite()602     public void mergeWithCompositeTail_outOfSyncComposite() {
603       NettyAdaptiveCumulator cumulator = new NettyAdaptiveCumulator(1024);
604 
605       // Create underlying buffer spacious enough for the test data.
606       ByteBuf buf = alloc.buffer(32).writeBytes("---01234".getBytes(US_ASCII));
607 
608       // Start with a regular cumulation and add the buf as the only component.
609       CompositeByteBuf composite1 = alloc.compositeBuffer(8).addFlattenedComponents(true, buf);
610       // Read composite1 buf to the beginning of the numbers.
611       assertThat(composite1.readCharSequence(3, US_ASCII).toString()).isEqualTo("---");
612 
613       // Wrap composite1 into another cumulation. This is similar to
614       // what NettyAdaptiveCumulator.cumulate() does in the case the cumulation has refCnt != 1.
615       CompositeByteBuf composite2 =
616           alloc.compositeBuffer(8).addFlattenedComponents(true, composite1);
617       assertThat(composite2.toString(US_ASCII)).isEqualTo("01234");
618 
619       // The previous operation does not adjust the read indexes of the underlying buffers,
620       // only the internal Component offsets. When the cumulator attempts to append the input to
621       // the tail buffer, it extracts it from the cumulation, writes to it, and then adds it back.
622       // Because the readerIndex on the tail buffer is not adjusted during the read operation
623       // on the CompositeByteBuf, adding the tail back results in the discarded bytes of the tail
624       // to be added back to the cumulator as if they were never read.
625       //
626       // If the reader index of the tail is not manually corrected, the resulting
627       // cumulation will contain the discarded part of the tail: "---".
628       // If it's corrected, it will only contain the numbers.
629       CompositeByteBuf cumulation = (CompositeByteBuf) cumulator.cumulate(alloc, composite2,
630           ByteBufUtil.writeAscii(alloc, "56789"));
631       assertThat(cumulation.toString(US_ASCII)).isEqualTo("0123456789");
632 
633       // Correctness check: we still have a single component, and this component is still the
634       // original underlying buffer.
635       assertThat(cumulation.numComponents()).isEqualTo(1);
636       // Replace '2' with '*', and '8' with '$'.
637       buf.setByte(5, '*').setByte(11, '$');
638       assertThat(cumulation.toString(US_ASCII)).isEqualTo("01*34567$9");
639     }
640   }
641 }
642