First I'd check whether kafkaRequiresSsl is actually true when the job is submitted.
(actually just remove the condition and see what happens)

Would this supposed SSL OOM happen only if the client uses SSL?

On 03/02/2022 03:40, Marco Villalobos wrote:
According to the Flink 1.12 documentation (https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html), it states to use FlinkKafkaSource when consuming from Kafka.

However, I noticed that the newer API uses KafkaSource, which uses KafkaSourceBuilder and OffsetsInitializer.

Although I am on the Flink 1.12 codebase, I preemptively decided to use KafkaSource instead in order to use the more advanced offsets feature. It worked, until I deployed it to EMR and had to connect to AWS Kafka (MSK).

The logs show a few suspicious things.

1) The ConsumerConfig logs these properties:

security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS

but I actually passed the following:

security.protocol = SSL
ssl.truststore.location = /etc/alternatives/jre/lib/security/cacerts
ssl.truststore.password = changeit
ssl.truststore.type = JKS

2) The job fails and this exception is thrown:

2022-02-03 00:40:57,239 ERROR org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext [] - Exception while handling result from async call in SourceCoordinator-Source: kafka sensor tags -> Sink: s3 sink. Triggering job failover. org.apache.flink.util.FlinkRuntimeException: Failed to handle partition splits change due to at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:223) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) [flink-dist_2.12-1.12.1.jar:1.12.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_312] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_312]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]
Caused by: java.lang.RuntimeException: Failed to get topic metadata.
at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:59) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:196) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_312]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_312]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_312] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_312]
... 3 more
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1643848916823) timed out at 9223372036854775807 after 1 attempt(s) at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:57) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:196) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_312]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_312]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_312] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_312]
... 3 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1643848916823) timed out at 9223372036854775807 after 1 attempt(s) Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. 2022-02-03 00:40:57,246 ERROR org.apache.kafka.common.utils.KafkaThread  [] - Uncaught exception in thread 'kafka-admin-client-thread | null-enumerator-admin-client':
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) ~[?:1.8.0_312]
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[?:1.8.0_312]
at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:448) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:398) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.kafka.common.network.Selector.poll(Selector.java:485) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1272) ~[feature-LUM-5531-offset--b7c3bfee.jar:?] at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1203) ~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]

Which is more indicative of an SSL error than a memory error according to many sources.

My code looks like this:

private KafkaSource<SensorMessage> buildKafkaSource() {

    final KafkaSourceBuilder<SensorMessage> sourceBuilder 
=KafkaSource.<SensorMessage>builder()
          .setTopics(sensorKafkaTopic)
          .setDeserializer(new SensorMessageKafkaRecordDeserializer())
          .setBootstrapServers(String.join(",", kafkaBootstrapServers))
          .setGroupId(kafkaGroupId)
          .setProperty("zookeeper.connect", zookeeperConnect); if 
(kafkaRequiresSsl) {
       sourceBuilder.setProperty("ssl.truststore.location", kafkaSslTruststoreLocation); 
sourceBuilder.setProperty("ssl.truststore.password", kafkaSslTruststorePassword); 
sourceBuilder.setProperty("security.protocol", "SSL"); }

    assignOffsetStrategy(sourceBuilder); 
sourceBuilder.setProperty("commit.offsets.on.checkpoint", 
Boolean.TRUE.toString()); return sourceBuilder.build(); }
private void assignOffsetStrategy(KafkaSourceBuilder<SensorMessage> 
sourceBuilder) {
    if (kafkaOffset !=null) {
       switch (kafkaOffset) {
          case committed:
             if (kafkaOffsetResetStrategy !=null) {
                switch (kafkaOffsetResetStrategy) {
                   case earliest:
                      
sourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST));
 break; case latest:
                      
sourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST));
 break; }
             }else {
                
sourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets()); }
             break; case timestamp:
             
sourceBuilder.setStartingOffsets(OffsetsInitializer.timestamp(kafkaOffsetTimestamp));
 break; case earliest:
             sourceBuilder.setStartingOffsets(OffsetsInitializer.earliest()); 
break; case latest:
             sourceBuilder.setStartingOffsets(OffsetsInitializer.latest()); 
break; }
    }
}
public enum OffsetOption {committed,timestamp,earliest,latest }

public enum OffsetResetStrategyOption {
    earliest, latest }
So, did I prematurely use KafkaSource and KafkaSourceBuilder? Should I revert to using FlinkKafkaSource?

Any advice or insight would be very helpful.

Thank you.

Marco A. Villalobos

Reply via email to