-
Notifications
You must be signed in to change notification settings - Fork 995
UnpooledByteBufAllocator is always used with Netty and JDK SSL #6843
Description
Describe the bug
When the Netty HTTP client selects SslProvider.JDK, ChannelPipelineInitializer.channelCreated() unconditionally sets ChannelOption.ALLOCATOR to UnpooledByteBufAllocator.DEFAULT.
This overrides Netty's default pooled allocator for the entire channel, causing every buffer in the HTTP/2 pipeline - to be allocated as unpooled heap buffers.
Regression Issue
- Select this option if this issue appears to be a regression.
Expected Behavior
Netty's default pooled allocator is used regardless of SSL provider. The original Netty issue is resolved with a comment that claims memory usage has been decreased.
Current Behavior
There is an EFO consumer reading data from a Kinesis stream with 200 shards. The consumer is running on 8 vCPUs, 18 GB heap.
Due to UnpooledByteBufAllocator.DEFAULT override all channel buffers are unpooled heap byte[] allocations. Under a sustained streaming workload (e.g. Kinesis EFO consumer), this produces ~300 GB of heap allocation per 5 minutes, out of 1.6TB of total allocations.
A large number of allocations (including the ones mentioned in this issue) degrades the performance of a garbage collector, causing overall system degradation.
Reproduction Steps
// Configure the clients.
SdkAsyncHttpClient asyncHttpClient = NettyNioAsyncHttpClient.builder()
.protocol(Protocol.HTTP2)
.maxConcurrency(500)
.connectionAcquisitionTimeout(Duration.ofSeconds(60))
.build();
KinesisAsyncClient asyncClient = KinesisAsyncClient.builder()
.region(Region.US_WEST_2)
.credentialsProvider(DefaultCredentialsProvider.create())
.httpClient(asyncHttpClient)
.build();
String shardId = "shardId-000000000000";
String consumerArn = "efo:consumer:arn";
// Subscribe to a shard.
SubscribeToShardRequest request = SubscribeToShardRequest.builder()
.consumerARN(consumerArn)
.shardId(shardId)
.startingPosition(StartingPosition.builder()
.type(ShardIteratorType.TRIM_HORIZON)
.build())
.build();
Subscriber<SubscribeToShardEventStream> subscriber = new Subscriber<>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
System.out.println("EFO subscription established for " + shardId);
s.request(1);
}
@Override
public void onNext(SubscribeToShardEventStream eventStream) {
if (eventStream instanceof SubscribeToShardEvent event) {
// process record data ...
}
subscription.request(1);
}
@Override
public void onError(Throwable t) {
System.err.println("EFO error on " + shardId + ": " + t.getMessage());
}
@Override
public void onComplete() {
System.out.println("EFO subscription completed for " + shardId);
}
};
SubscribeToShardResponseHandler handler = SubscribeToShardResponseHandler.builder()
.subscriber(() -> subscriber)
.onError(t -> System.err.println("Handler error: " + t.getMessage()))
.build();
// Start consumption.
asyncClient.subscribeToShard(request, handler);Possible Solution
Remove the UnpooledByteBufAllocator.DEFAULT override entirely.
Additional Information/Context
No response
AWS Java SDK version used
2.42.21
JDK version used
21.0.10
Operating System and version
Linux (version n/a)