1 /* 2 * Copyright 2020 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.netty; 18 19 import static com.google.common.truth.Truth.assertThat; 20 import static com.google.common.truth.Truth.assertWithMessage; 21 import static com.google.common.truth.TruthJUnit.assume; 22 import static io.netty.util.CharsetUtil.US_ASCII; 23 import static org.junit.Assert.assertEquals; 24 import static org.junit.Assert.assertFalse; 25 import static org.junit.Assert.assertSame; 26 import static org.junit.Assert.assertTrue; 27 import static org.junit.Assert.fail; 28 import static org.mockito.ArgumentMatchers.anyInt; 29 import static org.mockito.Mockito.mock; 30 import static org.mockito.Mockito.when; 31 32 import com.google.common.base.Strings; 33 import com.google.common.collect.ImmutableList; 34 import com.google.common.collect.Lists; 35 import io.netty.buffer.ByteBuf; 36 import io.netty.buffer.ByteBufAllocator; 37 import io.netty.buffer.ByteBufUtil; 38 import io.netty.buffer.CompositeByteBuf; 39 import io.netty.buffer.PooledByteBufAllocator; 40 import io.netty.buffer.UnpooledByteBufAllocator; 41 import java.util.Collection; 42 import java.util.List; 43 import java.util.stream.Collectors; 44 import org.junit.After; 45 import org.junit.Before; 46 import org.junit.Test; 47 import org.junit.experimental.runners.Enclosed; 48 import org.junit.runner.RunWith; 49 import org.junit.runners.JUnit4; 50 import org.junit.runners.Parameterized; 51 import org.junit.runners.Parameterized.Parameter; 52 import org.junit.runners.Parameterized.Parameters; 53 54 @RunWith(Enclosed.class) 55 public class NettyAdaptiveCumulatorTest { 56 cartesianProductParams(List<?>.... lists)57 private static Collection<Object[]> cartesianProductParams(List<?>... lists) { 58 return Lists.cartesianProduct(lists).stream().map(List::toArray).collect(Collectors.toList()); 59 } 60 61 @RunWith(JUnit4.class) 62 public static class CumulateTests { 63 // Represent data as immutable ASCII Strings for easy and readable ByteBuf equality assertions. 64 private static final String DATA_INITIAL = "0123"; 65 private static final String DATA_INCOMING = "456789"; 66 private static final String DATA_CUMULATED = "0123456789"; 67 68 private static final ByteBufAllocator alloc = new UnpooledByteBufAllocator(false); 69 private NettyAdaptiveCumulator cumulator; 70 private NettyAdaptiveCumulator throwingCumulator; 71 private final UnsupportedOperationException throwingCumulatorError = 72 new UnsupportedOperationException(); 73 74 // Buffers for testing 75 private final ByteBuf contiguous = ByteBufUtil.writeAscii(alloc, DATA_INITIAL); 76 private final ByteBuf in = ByteBufUtil.writeAscii(alloc, DATA_INCOMING); 77 78 @Before setUp()79 public void setUp() { 80 cumulator = new NettyAdaptiveCumulator(0) { 81 @Override 82 void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) { 83 // To limit the testing scope to NettyAdaptiveCumulator.cumulate(), always compose 84 composite.addFlattenedComponents(true, in); 85 } 86 }; 87 88 // Throws an error on adding incoming buffer. 89 throwingCumulator = new NettyAdaptiveCumulator(0) { 90 @Override 91 void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) { 92 throw throwingCumulatorError; 93 } 94 }; 95 } 96 97 @Test cumulate_notReadableCumulation_replacedWithInputAndReleased()98 public void cumulate_notReadableCumulation_replacedWithInputAndReleased() { 99 contiguous.readerIndex(contiguous.writerIndex()); 100 assertFalse(contiguous.isReadable()); 101 ByteBuf cumulation = cumulator.cumulate(alloc, contiguous, in); 102 assertEquals(DATA_INCOMING, cumulation.toString(US_ASCII)); 103 assertEquals(0, contiguous.refCnt()); 104 // In retained by cumulation. 105 assertEquals(1, in.refCnt()); 106 assertEquals(1, cumulation.refCnt()); 107 cumulation.release(); 108 } 109 110 @Test cumulate_contiguousCumulation_newCompositeFromContiguousAndInput()111 public void cumulate_contiguousCumulation_newCompositeFromContiguousAndInput() { 112 CompositeByteBuf cumulation = (CompositeByteBuf) cumulator.cumulate(alloc, contiguous, in); 113 assertEquals(DATA_INITIAL, cumulation.component(0).toString(US_ASCII)); 114 assertEquals(DATA_INCOMING, cumulation.component(1).toString(US_ASCII)); 115 assertEquals(DATA_CUMULATED, cumulation.toString(US_ASCII)); 116 // Both in and contiguous are retained by cumulation. 117 assertEquals(1, contiguous.refCnt()); 118 assertEquals(1, in.refCnt()); 119 assertEquals(1, cumulation.refCnt()); 120 cumulation.release(); 121 } 122 123 @Test cumulate_compositeCumulation_inputAppendedAsANewComponent()124 public void cumulate_compositeCumulation_inputAppendedAsANewComponent() { 125 CompositeByteBuf composite = alloc.compositeBuffer().addComponent(true, contiguous); 126 assertSame(composite, cumulator.cumulate(alloc, composite, in)); 127 assertEquals(DATA_INITIAL, composite.component(0).toString(US_ASCII)); 128 assertEquals(DATA_INCOMING, composite.component(1).toString(US_ASCII)); 129 assertEquals(DATA_CUMULATED, composite.toString(US_ASCII)); 130 // Both in and contiguous are retained by cumulation. 131 assertEquals(1, contiguous.refCnt()); 132 assertEquals(1, in.refCnt()); 133 assertEquals(1, composite.refCnt()); 134 composite.release(); 135 } 136 137 @Test cumulate_compositeCumulation_inputReleasedOnError()138 public void cumulate_compositeCumulation_inputReleasedOnError() { 139 CompositeByteBuf composite = alloc.compositeBuffer().addComponent(true, contiguous); 140 try { 141 throwingCumulator.cumulate(alloc, composite, in); 142 fail("Cumulator didn't throw"); 143 } catch (UnsupportedOperationException actualError) { 144 assertSame(throwingCumulatorError, actualError); 145 // Input must be released unless its ownership has been to the composite cumulation. 146 assertEquals(0, in.refCnt()); 147 // Initial composite cumulation owned by the caller in this case, so it isn't released. 148 assertEquals(1, composite.refCnt()); 149 // Contiguous still managed by the cumulation 150 assertEquals(1, contiguous.refCnt()); 151 } finally { 152 composite.release(); 153 } 154 } 155 156 @Test cumulate_contiguousCumulation_inputAndNewCompositeReleasedOnError()157 public void cumulate_contiguousCumulation_inputAndNewCompositeReleasedOnError() { 158 // Return our instance of new composite to ensure it's released. 159 CompositeByteBuf newComposite = alloc.compositeBuffer(Integer.MAX_VALUE); 160 ByteBufAllocator mockAlloc = mock(ByteBufAllocator.class); 161 when(mockAlloc.compositeBuffer(anyInt())).thenReturn(newComposite); 162 163 try { 164 // Previous cumulation is non-composite, so cumulator will create anew composite and add 165 // both buffers to it. 166 throwingCumulator.cumulate(mockAlloc, contiguous, in); 167 fail("Cumulator didn't throw"); 168 } catch (UnsupportedOperationException actualError) { 169 assertSame(throwingCumulatorError, actualError); 170 // Input must be released unless its ownership has been to the composite cumulation. 171 assertEquals(0, in.refCnt()); 172 // New composite cumulation hasn't been returned to the caller, so it must be released. 173 assertEquals(0, newComposite.refCnt()); 174 // Previous cumulation released because it was owned by the new composite cumulation. 175 assertEquals(0, contiguous.refCnt()); 176 } 177 } 178 } 179 180 @RunWith(Parameterized.class) 181 public static class ShouldComposeTests { 182 // Represent data as immutable ASCII Strings for easy and readable ByteBuf equality assertions. 183 private static final String DATA_INITIAL = "0123"; 184 private static final String DATA_INCOMING = "456789"; 185 186 /** 187 * Cartesian product of the test values. 188 */ 189 @Parameters(name = "composeMinSize={0}, tailData=\"{1}\", inData=\"{2}\"") params()190 public static Collection<Object[]> params() { 191 List<?> composeMinSize = ImmutableList.of(0, 9, 10, 11, Integer.MAX_VALUE); 192 List<?> tailData = ImmutableList.of("", DATA_INITIAL); 193 List<?> inData = ImmutableList.of("", DATA_INCOMING); 194 return cartesianProductParams(composeMinSize, tailData, inData); 195 } 196 197 @Parameter public int composeMinSize; 198 @Parameter(1) public String tailData; 199 @Parameter(2) public String inData; 200 201 private CompositeByteBuf composite; 202 private ByteBuf tail; 203 private ByteBuf in; 204 205 @Before setUp()206 public void setUp() { 207 ByteBufAllocator alloc = new UnpooledByteBufAllocator(false); 208 in = ByteBufUtil.writeAscii(alloc, inData); 209 tail = ByteBufUtil.writeAscii(alloc, tailData); 210 composite = alloc.compositeBuffer(Integer.MAX_VALUE); 211 // Note that addFlattenedComponents() will not add a new component when tail is not readable. 212 composite.addFlattenedComponents(true, tail); 213 } 214 215 @After tearDown()216 public void tearDown() { 217 in.release(); 218 composite.release(); 219 } 220 221 @Test shouldCompose_emptyComposite()222 public void shouldCompose_emptyComposite() { 223 assume().that(composite.numComponents()).isEqualTo(0); 224 assertTrue(NettyAdaptiveCumulator.shouldCompose(composite, in, composeMinSize)); 225 } 226 227 @Test shouldCompose_composeMinSizeReached()228 public void shouldCompose_composeMinSizeReached() { 229 assume().that(composite.numComponents()).isGreaterThan(0); 230 assume().that(tail.readableBytes() + in.readableBytes()).isAtLeast(composeMinSize); 231 assertTrue(NettyAdaptiveCumulator.shouldCompose(composite, in, composeMinSize)); 232 } 233 234 @Test shouldCompose_composeMinSizeNotReached()235 public void shouldCompose_composeMinSizeNotReached() { 236 assume().that(composite.numComponents()).isGreaterThan(0); 237 assume().that(tail.readableBytes() + in.readableBytes()).isLessThan(composeMinSize); 238 assertFalse(NettyAdaptiveCumulator.shouldCompose(composite, in, composeMinSize)); 239 } 240 } 241 242 @RunWith(Parameterized.class) 243 public static class MergeWithCompositeTailTests { 244 private static final String INCOMING_DATA_READABLE = "+incoming"; 245 private static final String INCOMING_DATA_DISCARDABLE = "discard"; 246 247 private static final String TAIL_DATA_DISCARDABLE = "---"; 248 private static final String TAIL_DATA_READABLE = "tail"; 249 private static final String TAIL_DATA = TAIL_DATA_DISCARDABLE + TAIL_DATA_READABLE; 250 private static final int TAIL_READER_INDEX = TAIL_DATA_DISCARDABLE.length(); 251 private static final int TAIL_MAX_CAPACITY = 128; 252 253 // DRY sacrificed to improve readability. 254 private static final String EXPECTED_TAIL_DATA = "tail+incoming"; 255 256 /** 257 * Cartesian product of the test values. 258 * 259 * <p>Test cases when the cumulation contains components, other than tail, and could be 260 * partially read. This is needed to verify the correctness if reader and writer indexes of the 261 * composite cumulation after the merge. 262 */ 263 @Parameters(name = "compositeHeadData=\"{0}\", compositeReaderIndex={1}") params()264 public static Collection<Object[]> params() { 265 String headData = "head"; 266 267 List<?> compositeHeadData = ImmutableList.of( 268 // Test without the "head" component. Empty string is equivalent of fully read buffer, 269 // so it's not added to the composite byte buf. The tail is added as the first component. 270 "", 271 // Test with the "head" component, so the tail is added as the second component. 272 headData 273 ); 274 275 // After the tail is added to the composite cumulator, advance the reader index to 276 // cover different cases. 277 // The reader index only looks at what's readable in the composite byte buf, so 278 // discardable bytes of head and tail doesn't count. 279 List<?> compositeReaderIndex = ImmutableList.of( 280 // Reader in the beginning 281 0, 282 // Within the head (when present) or the tail 283 headData.length() - 2, 284 // Within the tail, even if the head is present 285 headData.length() + 2 286 ); 287 return cartesianProductParams(compositeHeadData, compositeReaderIndex); 288 } 289 290 @Parameter public String compositeHeadData; 291 @Parameter(1) public int compositeReaderIndex; 292 293 // Use pooled allocator to have maxFastWritableBytes() behave differently than writableBytes(). 294 private final ByteBufAllocator alloc = new PooledByteBufAllocator(); 295 296 // Composite buffer to be used in tests. 297 private CompositeByteBuf composite; 298 private ByteBuf tail; 299 private ByteBuf in; 300 301 @Before setUp()302 public void setUp() { 303 composite = alloc.compositeBuffer(); 304 305 // The "head" component. It represents existing data in the cumulator. 306 // Note that addFlattenedComponents() does not add completely read buffer, which covers 307 // the case when compositeHeadData parameter is an empty string. 308 ByteBuf head = alloc.buffer().writeBytes(compositeHeadData.getBytes(US_ASCII)); 309 composite.addFlattenedComponents(true, head); 310 311 // The "tail" component. It also represents existing data in the cumulator, but it's 312 // not added to the cumulator during setUp() stage. It is to be manipulated by tests to 313 // produce different buffer write scenarios based on different tail's capacity. 314 // After tail is changes for each test scenario, it's added to the composite buffer. 315 // 316 // The default state of the tail before each test: tail is full, but expandable (the data uses 317 // all initial capacity, but not maximum capacity). 318 // Tail data and indexes: 319 // ----tail 320 // r w 321 tail = alloc.buffer(TAIL_DATA.length(), TAIL_MAX_CAPACITY) 322 .writeBytes(TAIL_DATA.getBytes(US_ASCII)) 323 .readerIndex(TAIL_READER_INDEX); 324 325 // Incoming data and indexes: 326 // discard+incoming 327 // r w 328 in = alloc.buffer() 329 .writeBytes(INCOMING_DATA_DISCARDABLE.getBytes(US_ASCII)) 330 .writeBytes(INCOMING_DATA_READABLE.getBytes(US_ASCII)) 331 .readerIndex(INCOMING_DATA_DISCARDABLE.length()); 332 } 333 334 @After tearDown()335 public void tearDown() { 336 composite.release(); 337 } 338 339 @Test mergeWithCompositeTail_tailExpandable_write()340 public void mergeWithCompositeTail_tailExpandable_write() { 341 // Make incoming data fit into tail capacity. 342 int fitCapacity = tail.capacity() + INCOMING_DATA_READABLE.length(); 343 tail.capacity(fitCapacity); 344 // Confirm it fits. 345 assertThat(in.readableBytes()).isAtMost(tail.writableBytes()); 346 347 // All fits, so tail capacity must stay the same. 348 composite.addFlattenedComponents(true, tail); 349 assertTailExpanded(EXPECTED_TAIL_DATA, fitCapacity); 350 } 351 352 @Test mergeWithCompositeTail_tailExpandable_fastWrite()353 public void mergeWithCompositeTail_tailExpandable_fastWrite() { 354 // Confirm that the tail can be expanded fast to fit the incoming data. 355 assertThat(in.readableBytes()).isAtMost(tail.maxFastWritableBytes()); 356 357 // To avoid undesirable buffer unwrapping, at the moment adaptive cumulator is set not 358 // apply fastWrite technique. Even when fast write is possible, it will fall back to 359 // reallocating a larger buffer. 360 // int tailFastCapacity = tail.writerIndex() + tail.maxFastWritableBytes(); 361 int tailFastCapacity = 362 alloc.calculateNewCapacity(EXPECTED_TAIL_DATA.length(), Integer.MAX_VALUE); 363 364 // Tail capacity is extended to its fast capacity. 365 composite.addFlattenedComponents(true, tail); 366 assertTailExpanded(EXPECTED_TAIL_DATA, tailFastCapacity); 367 } 368 369 @Test mergeWithCompositeTail_tailExpandable_reallocateInMemory()370 public void mergeWithCompositeTail_tailExpandable_reallocateInMemory() { 371 int tailFastCapacity = tail.writerIndex() + tail.maxFastWritableBytes(); 372 String inSuffixOverFastBytes = Strings.repeat("a", tailFastCapacity + 1); 373 int newTailSize = tail.readableBytes() + inSuffixOverFastBytes.length(); 374 composite.addFlattenedComponents(true, tail); 375 376 // Make input larger than tailFastCapacity 377 in.writeCharSequence(inSuffixOverFastBytes, US_ASCII); 378 // Confirm that the tail can only fit incoming data via reallocation. 379 assertThat(in.readableBytes()).isGreaterThan(tail.maxFastWritableBytes()); 380 assertThat(in.readableBytes()).isAtMost(tail.maxWritableBytes()); 381 382 // Confirm the assumption that new capacity is produced by alloc.calculateNewCapacity(). 383 int expectedTailCapacity = alloc.calculateNewCapacity(newTailSize, Integer.MAX_VALUE); 384 assertTailExpanded(EXPECTED_TAIL_DATA.concat(inSuffixOverFastBytes), expectedTailCapacity); 385 } 386 assertTailExpanded(String expectedTailReadableData, int expectedNewTailCapacity)387 private void assertTailExpanded(String expectedTailReadableData, int expectedNewTailCapacity) { 388 int originalNumComponents = composite.numComponents(); 389 390 // Handle the case when reader index is beyond all readable bytes of the cumulation. 391 int compositeReaderIndexBounded = Math.min(compositeReaderIndex, composite.writerIndex()); 392 composite.readerIndex(compositeReaderIndexBounded); 393 394 // Execute the merge logic. 395 NettyAdaptiveCumulator.mergeWithCompositeTail(alloc, composite, in); 396 397 // Composite component count shouldn't change. 398 assertWithMessage( 399 "When tail is expanded, the number of components in the cumulation must not change") 400 .that(composite.numComponents()).isEqualTo(originalNumComponents); 401 402 ByteBuf newTail = composite.component(composite.numComponents() - 1); 403 404 // Verify the readable part of the expanded tail: 405 // 1. Initial readable bytes of the tail not changed 406 // 2. Discardable bytes (0 < discardable < readerIndex) of the incoming buffer are discarded. 407 // 3. Readable bytes of the incoming buffer are fully read and appended to the tail. 408 assertEquals(expectedTailReadableData, newTail.toString(US_ASCII)); 409 // Verify expanded capacity. 410 assertEquals(expectedNewTailCapacity, newTail.capacity()); 411 412 // Discardable bytes (0 < discardable < readerIndex) of the tail are kept as is. 413 String newTailDataDiscardable = newTail.toString(0, newTail.readerIndex(), US_ASCII); 414 assertWithMessage("After tail expansion, its discardable bytes should be unchanged") 415 .that(newTailDataDiscardable).isEqualTo(TAIL_DATA_DISCARDABLE); 416 417 // Reader index must stay where it was 418 assertEquals(TAIL_READER_INDEX, newTail.readerIndex()); 419 // Writer index at the end 420 assertEquals(TAIL_READER_INDEX + expectedTailReadableData.length(), 421 newTail.writerIndex()); 422 423 // Verify resulting cumulation. 424 assertExpectedCumulation(newTail, expectedTailReadableData, compositeReaderIndexBounded); 425 426 // Verify incoming buffer. 427 assertWithMessage("Incoming buffer is fully read").that(in.isReadable()).isFalse(); 428 assertWithMessage("Incoming buffer is released").that(in.refCnt()).isEqualTo(0); 429 } 430 431 @Test mergeWithCompositeTail_tailNotExpandable_maxCapacityReached()432 public void mergeWithCompositeTail_tailNotExpandable_maxCapacityReached() { 433 // Fill in tail to the maxCapacity. 434 String tailSuffixFullCapacity = Strings.repeat("a", tail.maxWritableBytes()); 435 tail.writeCharSequence(tailSuffixFullCapacity, US_ASCII); 436 composite.addFlattenedComponents(true, tail); 437 assertTailReplaced(); 438 } 439 440 @Test mergeWithCompositeTail_tailNotExpandable_shared()441 public void mergeWithCompositeTail_tailNotExpandable_shared() { 442 tail.retain(); 443 composite.addFlattenedComponents(true, tail); 444 assertTailReplaced(); 445 tail.release(); 446 } 447 448 @Test mergeWithCompositeTail_tailNotExpandable_readOnly()449 public void mergeWithCompositeTail_tailNotExpandable_readOnly() { 450 composite.addFlattenedComponents(true, tail.asReadOnly()); 451 assertTailReplaced(); 452 } 453 assertTailReplaced()454 private void assertTailReplaced() { 455 int cumulationOriginalComponentsNum = composite.numComponents(); 456 int taiOriginalRefCount = tail.refCnt(); 457 String expectedTailReadable = tail.toString(US_ASCII) + in.toString(US_ASCII); 458 int expectedReallocatedTailCapacity = alloc 459 .calculateNewCapacity(expectedTailReadable.length(), Integer.MAX_VALUE); 460 461 int compositeReaderIndexBounded = Math.min(compositeReaderIndex, composite.writerIndex()); 462 composite.readerIndex(compositeReaderIndexBounded); 463 NettyAdaptiveCumulator.mergeWithCompositeTail(alloc, composite, in); 464 465 // Composite component count shouldn't change. 466 assertEquals(cumulationOriginalComponentsNum, composite.numComponents()); 467 ByteBuf replacedTail = composite.component(composite.numComponents() - 1); 468 469 // Verify the readable part of the expanded tail: 470 // 1. Discardable bytes (0 < discardable < readerIndex) of the tail are discarded. 471 // 2. Readable bytes of the tail are kept as is 472 // 3. Discardable bytes (0 < discardable < readerIndex) of the incoming buffer are discarded. 473 // 4. Readable bytes of the incoming buffer are fully read and appended to the tail. 474 assertEquals(0, in.readableBytes()); 475 assertEquals(expectedTailReadable, replacedTail.toString(US_ASCII)); 476 477 // Since tail discardable bytes are discarded, new reader index must be reset to 0. 478 assertEquals(0, replacedTail.readerIndex()); 479 // And new writer index at the new data's length. 480 assertEquals(expectedTailReadable.length(), replacedTail.writerIndex()); 481 // Verify the capacity of reallocated tail. 482 assertEquals(expectedReallocatedTailCapacity, replacedTail.capacity()); 483 484 // Verify resulting cumulation. 485 assertExpectedCumulation(replacedTail, expectedTailReadable, compositeReaderIndexBounded); 486 487 // Verify incoming buffer. 488 assertWithMessage("Incoming buffer is fully read").that(in.isReadable()).isFalse(); 489 assertWithMessage("Incoming buffer is released").that(in.refCnt()).isEqualTo(0); 490 491 // The old tail must be released once (have one less reference). 492 assertWithMessage("Replaced tail released once.") 493 .that(tail.refCnt()).isEqualTo(taiOriginalRefCount - 1); 494 } 495 assertExpectedCumulation( ByteBuf newTail, String expectedTailReadable, int expectedReaderIndex)496 private void assertExpectedCumulation( 497 ByteBuf newTail, String expectedTailReadable, int expectedReaderIndex) { 498 // Verify the readable part of the cumulation: 499 // 1. Readable composite head (initial) data 500 // 2. Readable part of the tail 501 // 3. Readable part of the incoming data 502 String expectedCumulationData = 503 compositeHeadData.concat(expectedTailReadable).substring(expectedReaderIndex); 504 assertEquals(expectedCumulationData, composite.toString(US_ASCII)); 505 506 // Cumulation capacity includes: 507 // 1. Full composite head, including discardable bytes 508 // 2. Expanded tail readable bytes 509 int expectedCumulationCapacity = compositeHeadData.length() + expectedTailReadable.length(); 510 assertEquals(expectedCumulationCapacity, composite.capacity()); 511 512 // Composite Reader index must stay where it was. 513 assertEquals(expectedReaderIndex, composite.readerIndex()); 514 // Composite writer index must be at the end. 515 assertEquals(expectedCumulationCapacity, composite.writerIndex()); 516 517 // Composite cumulation is retained and owns the new tail. 518 assertEquals(1, composite.refCnt()); 519 assertEquals(1, newTail.refCnt()); 520 } 521 522 @Test mergeWithCompositeTail_tailExpandable_mergedReleaseOnThrow()523 public void mergeWithCompositeTail_tailExpandable_mergedReleaseOnThrow() { 524 final UnsupportedOperationException expectedError = new UnsupportedOperationException(); 525 CompositeByteBuf compositeThrows = new CompositeByteBuf(alloc, false, Integer.MAX_VALUE, 526 tail) { 527 @Override 528 public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex, 529 ByteBuf buffer) { 530 throw expectedError; 531 } 532 }; 533 534 try { 535 NettyAdaptiveCumulator.mergeWithCompositeTail(alloc, compositeThrows, in); 536 fail("Cumulator didn't throw"); 537 } catch (UnsupportedOperationException actualError) { 538 assertSame(expectedError, actualError); 539 // Input must be released unless its ownership has been to the composite cumulation. 540 assertEquals(0, in.refCnt()); 541 // Tail released 542 assertEquals(0, tail.refCnt()); 543 // Composite cumulation is retained 544 assertEquals(1, compositeThrows.refCnt()); 545 // Composite cumulation loses the tail 546 assertEquals(0, compositeThrows.numComponents()); 547 } finally { 548 compositeThrows.release(); 549 } 550 } 551 552 @Test mergeWithCompositeTail_tailNotExpandable_mergedReleaseOnThrow()553 public void mergeWithCompositeTail_tailNotExpandable_mergedReleaseOnThrow() { 554 final UnsupportedOperationException expectedError = new UnsupportedOperationException(); 555 CompositeByteBuf compositeRo = new CompositeByteBuf(alloc, false, Integer.MAX_VALUE, 556 tail.asReadOnly()) { 557 @Override 558 public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex, 559 ByteBuf buffer) { 560 throw expectedError; 561 } 562 }; 563 564 // Return our instance of the new buffer to ensure it's released. 565 int newTailSize = tail.readableBytes() + in.readableBytes(); 566 ByteBuf newTail = alloc.buffer(alloc.calculateNewCapacity(newTailSize, Integer.MAX_VALUE)); 567 ByteBufAllocator mockAlloc = mock(ByteBufAllocator.class); 568 when(mockAlloc.buffer(anyInt())).thenReturn(newTail); 569 570 try { 571 NettyAdaptiveCumulator.mergeWithCompositeTail(mockAlloc, compositeRo, in); 572 fail("Cumulator didn't throw"); 573 } catch (UnsupportedOperationException actualError) { 574 assertSame(expectedError, actualError); 575 // Input must be released unless its ownership has been to the composite cumulation. 576 assertEquals(0, in.refCnt()); 577 // New buffer released 578 assertEquals(0, newTail.refCnt()); 579 // Composite cumulation is retained 580 assertEquals(1, compositeRo.refCnt()); 581 // Composite cumulation loses the tail 582 assertEquals(0, compositeRo.numComponents()); 583 } finally { 584 compositeRo.release(); 585 } 586 } 587 } 588 589 /** 590 * Miscellaneous tests for {@link NettyAdaptiveCumulator#mergeWithCompositeTail} that don't 591 * fit into {@link MergeWithCompositeTailTests}, and require custom-crafted scenarios. 592 */ 593 @RunWith(JUnit4.class) 594 public static class MergeWithCompositeTailMiscTests { 595 private final ByteBufAllocator alloc = new PooledByteBufAllocator(); 596 597 /** 598 * Test the issue with {@link CompositeByteBuf#component(int)} returning a ByteBuf with 599 * the indexes out-of-sync with {@code CompositeByteBuf.Component} offsets. 600 */ 601 @Test mergeWithCompositeTail_outOfSyncComposite()602 public void mergeWithCompositeTail_outOfSyncComposite() { 603 NettyAdaptiveCumulator cumulator = new NettyAdaptiveCumulator(1024); 604 605 // Create underlying buffer spacious enough for the test data. 606 ByteBuf buf = alloc.buffer(32).writeBytes("---01234".getBytes(US_ASCII)); 607 608 // Start with a regular cumulation and add the buf as the only component. 609 CompositeByteBuf composite1 = alloc.compositeBuffer(8).addFlattenedComponents(true, buf); 610 // Read composite1 buf to the beginning of the numbers. 611 assertThat(composite1.readCharSequence(3, US_ASCII).toString()).isEqualTo("---"); 612 613 // Wrap composite1 into another cumulation. This is similar to 614 // what NettyAdaptiveCumulator.cumulate() does in the case the cumulation has refCnt != 1. 615 CompositeByteBuf composite2 = 616 alloc.compositeBuffer(8).addFlattenedComponents(true, composite1); 617 assertThat(composite2.toString(US_ASCII)).isEqualTo("01234"); 618 619 // The previous operation does not adjust the read indexes of the underlying buffers, 620 // only the internal Component offsets. When the cumulator attempts to append the input to 621 // the tail buffer, it extracts it from the cumulation, writes to it, and then adds it back. 622 // Because the readerIndex on the tail buffer is not adjusted during the read operation 623 // on the CompositeByteBuf, adding the tail back results in the discarded bytes of the tail 624 // to be added back to the cumulator as if they were never read. 625 // 626 // If the reader index of the tail is not manually corrected, the resulting 627 // cumulation will contain the discarded part of the tail: "---". 628 // If it's corrected, it will only contain the numbers. 629 CompositeByteBuf cumulation = (CompositeByteBuf) cumulator.cumulate(alloc, composite2, 630 ByteBufUtil.writeAscii(alloc, "56789")); 631 assertThat(cumulation.toString(US_ASCII)).isEqualTo("0123456789"); 632 633 // Correctness check: we still have a single component, and this component is still the 634 // original underlying buffer. 635 assertThat(cumulation.numComponents()).isEqualTo(1); 636 // Replace '2' with '*', and '8' with '$'. 637 buf.setByte(5, '*').setByte(11, '$'); 638 assertThat(cumulation.toString(US_ASCII)).isEqualTo("01*34567$9"); 639 } 640 } 641 } 642