
David Mollitor commented on KAFKA-4090:

OK.  I took a stab at this and I added a unit test to simulate the SSL response 
laid out in KIP-498.

I don't think the answer is to use some sort of max buffer size.  That feature 
may be nice as protection from a misconfigured server, but not for this use 

As pointed out int KIP-498, the server identifies that this is a plain-text 
connection, but the client is unable to identify the connection as being 
How is that the case?  Well, the server buffers data as it is being read in 
chunks (8K perhaps).  Once it reads enough data into the buffer, it runs a 
quick scan on the bytes to try to detect SSL v.s. plaintext.

I tried putting the logic into the client, but it was very difficult.  The 
client does not buffer data in chunks in the same way.  It reads 4 bytes from 
the stream, and when those four bytes are read, it parses the size and creates 
the necessary payload buffer.  It's very simple.

There's just not a lot of flexibility in the existing setup.  Once the data is 
consumed from the stream, you can't put it back.  That is, there is no way to 
read 16 bytes, check for SSL, and if it's not SSL, put the bytes back into the 
front of the stream (or just inspect without consuming) for normal processing.  
Once those 16 bytes are consumed, they are consumed... even if the stream has 
two 8 byte packets one-after-the-other.  In this scenario, reading 16 bytes to 
parse one packet just ate a packet.

To detect SSL, Netty (Apache 2.0) requires 5 bytes.  So, I am doing the same.  
I am consuming 4 bytes and not loading a buffer at that time.  If the stream 
produces just 1 more byte, then I check if SSL is present, and if not, I create 
the required buffer.  It is much like a lazy-load implementation.  The payload 
buffer doesn't get created unless it absolutely has to.

Unfortunately, the unit tests bit me hard, because they all assume there will 
be two reads: one 4-byte read to get the payload size (and create the payload 
buffer), then one read to get the payload itself.  So, the mocking all mimics 
two reads.  My code does 3 reads: 1 for the size, 1 to get a more byte to test 
for SSL, then 1 for the payload. For anything that is a "live" test, with a 
test socket actually receiving data, this is brittle but generally works 
because it is very unlikely that the socket won't at least have 4 bytes for the 
size in the first read.  However, I think some of these live test would fail if 
there was only 1 or 2 bytes came from the stream at a time.  There is one unit 
test that is counting the number of reads that take place.  I had to disable it 
because it assumes the payload buffer is created after only 1 read (flaky test 
if the data comes in slowly),... it blocks (for testing purposes) any 
subsequent reads, so my implementation never creates the lazy payload buffer 
because a second read is never triggered).

> JVM runs into OOM if (Java) client uses a SSL port without setting the 
> security protocol
> ----------------------------------------------------------------------------------------
>                 Key: KAFKA-4090
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4090
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions:,, 2.1.0
>            Reporter: Jaikiran Pai
>            Assignee: Alexandre Dupriez
>            Priority: Major
> Quoting from the mail thread that was sent to Kafka mailing list:
> {quote}
> We have been using Kafka (server and Java client libraries). So far 
> we had been using it with plaintext transport but recently have been 
> considering upgrading to using SSL. It mostly works except that a 
> mis-configured producer (and even consumer) causes a hard to relate 
> OutOfMemory exception and thus causing the JVM in which the client is 
> running, to go into a bad state. We can consistently reproduce that OOM very 
> easily. We decided to check if this is something that is fixed in so 
> upgraded one of our test systems to that version (both server and client 
> libraries) but still see the same issue. Here's how it can be easily 
> reproduced
> 1. Enable SSL listener on the broker via server.properties, as per the Kafka 
> documentation
> {code}
> listeners=PLAINTEXT://:9092,SSL://:9093
> ssl.keystore.location=<location-of-keystore>
> ssl.keystore.password=pass
> ssl.key.password=pass
> ssl.truststore.location=<location-of-truststore>
> ssl.truststore.password=pass
> {code}
> 2. Start zookeeper and kafka server
> 3. Create a "oom-test" topic (which will be used for these tests):
> {code}
> kafka-topics.sh --zookeeper localhost:2181 --create --topic oom-test  
> --partitions 1 --replication-factor 1
> {code}
> 4. Create a simple producer which sends a single message to the topic via 
> Java (new producer) APIs:
> {code}
> public class OOMTest {
>     public static void main(final String[] args) throws Exception {
>         final Properties kafkaProducerConfigs = new Properties();
>         // NOTE: Intentionally use a SSL port without specifying 
> security.protocol as SSL
> kafkaProducerConfigs.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9093");
> kafkaProducerConfigs.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> kafkaProducerConfigs.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>  StringSerializer.class.getName());
>         try (KafkaProducer<String, String> producer = new 
> KafkaProducer<>(kafkaProducerConfigs)) {
>             System.out.println("Created Kafka producer");
>             final String topicName = "oom-test";
>             final String message = "Hello OOM!";
>             // send a message to the topic
>             final Future<RecordMetadata> recordMetadataFuture = 
> producer.send(new ProducerRecord<>(topicName, message));
>             final RecordMetadata sentRecordMetadata = 
> recordMetadataFuture.get();
>             System.out.println("Sent message '" + message + "' to topic '" + 
> topicName + "'");
>         }
>         System.out.println("Tests complete");
>     }
> }
> {code}
> Notice that the server URL is using a SSL endpoint localhost:9093 but isn't 
> specifying any of the other necessary SSL configs like security.protocol.
> 5. For the sake of easily reproducing this issue run this class with a max 
> heap size of 256MB (-Xmx256M). Running this code throws up the following 
> OutOfMemoryError in one of the Sender threads:
> {code}
> 18:33:25,770 ERROR [KafkaThread] - Uncaught exception in 
> kafka-producer-network-thread | producer-1:
> java.lang.OutOfMemoryError: Java heap space
>     at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>     at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>     at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
>     at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>     at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
>     at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
>     at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
>     at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>     at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>     at java.lang.Thread.run(Thread.java:745)
> {code}
> Note that I set it to 256MB as heap size to easily reproduce it but this 
> isn't specific to that size. We have been able to reproduce it at even 516MB 
> and higher too.
> This even happens with the consumer and in fact can be reproduced out of the 
> box with the kafka-consumer-group.sh script. All you have to do is run that 
> tool as follows:
> {code}
> ./kafka-consumer-groups.sh --list --bootstrap-server localhost:9093 
> --new-consumer
> {code}
> {code}
> Error while executing consumer group command Java heap space
> java.lang.OutOfMemoryError: Java heap space
>     at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
>     at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>     at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
>     at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>     at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
>     at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
>     at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
>     at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>     at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>     at 
> kafka.admin.AdminClient.kafka$admin$AdminClient$$send(AdminClient.scala:49)
>     at 
> kafka.admin.AdminClient$$anonfun$sendAnyNode$1.apply(AdminClient.scala:61)
>     at 
> kafka.admin.AdminClient$$anonfun$sendAnyNode$1.apply(AdminClient.scala:58)
>     at scala.collection.immutable.List.foreach(List.scala:381)
>     at kafka.admin.AdminClient.sendAnyNode(AdminClient.scala:58)
>     at kafka.admin.AdminClient.findAllBrokers(AdminClient.scala:87)
>     at kafka.admin.AdminClient.listAllGroups(AdminClient.scala:96)
>     at kafka.admin.AdminClient.listAllGroupsFlattened(AdminClient.scala:117)
>     at 
> kafka.admin.AdminClient.listAllConsumerGroupsFlattened(AdminClient.scala:121)
>     at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.list(ConsumerGroupCommand.scala:311)
>     at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:63)
>     at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
> {code}
> Notice that here again I'm using the new consumer and the SSL port without 
> any additional SSL configs.
> Once this OOM occurs, the producer is useless since it's (background) sender 
> thread is dead. Not just that, since we run these producers/consumers from 
> within our application, this OOM trips the JVM and our whole JVM goes into an 
> unstable state.
> Debugging shows that the NetworkReceive class in its readFromReadableChannel 
> method receives a value of something like 352518912 and then goes ahead to 
> allocate a ByteBuffer of that size. This 352518912 is approximately 300 odd 
> MB and obviously causes allocation issues. I suspect the value being passed 
> over the channel is incorrect.
> Of course this exception is triggered by a user config error but given that 
> ends up in a (almost unclear) OOM and causing the JVM to go in a bad state, 
> is there a way the Kafka Java library can handle this better?
> {quote}

This message was sent by Atlassian Jira

Reply via email to