First I'd check whether kafkaRequiresSsl is actually true when the job
is submitted.
(actually just remove the condition and see what happens)
Would this supposed SSL OOM happen only if the client uses SSL?
On 03/02/2022 03:40, Marco Villalobos wrote:
According to the Flink 1.12 documentation
(https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html),
it states to use FlinkKafkaSource when consuming from Kafka.
However, I noticed that the newer API uses KafkaSource, which uses
KafkaSourceBuilder and OffsetsInitializer.
Although I am on the Flink 1.12 codebase, I preemptively decided to
use KafkaSource instead in order to use the more advanced offsets
feature. It worked, until I deployed it to EMR and had to connect to
AWS Kafka (MSK).
The logs show a few suspicious things.
1) The ConsumerConfig logs these properties:
security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
but I actually passed the following:
security.protocol = SSL
ssl.truststore.location = /etc/alternatives/jre/lib/security/cacerts
ssl.truststore.password = changeit
ssl.truststore.type = JKS
2) The job fails and this exception is thrown:
2022-02-03 00:40:57,239 ERROR
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext
[] - Exception while handling result from async call in
SourceCoordinator-Source: kafka sensor tags -> Sink: s3 sink.
Triggering job failover.
org.apache.flink.util.FlinkRuntimeException: Failed to handle
partition splits change due to
at
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:223)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
[flink-dist_2.12-1.12.1.jar:1.12.1]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_312]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_312]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]
Caused by: java.lang.RuntimeException: Failed to get topic metadata.
at
org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:59)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:196)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_312]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_312]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
~[?:1.8.0_312]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
~[?:1.8.0_312]
... 3 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.TimeoutException:
Call(callName=describeTopics, deadlineMs=1643848916823) timed out at
9223372036854775807 after 1 attempt(s)
at
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:57)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:196)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_312]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_312]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
~[?:1.8.0_312]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
~[?:1.8.0_312]
... 3 more
Caused by: org.apache.kafka.common.errors.TimeoutException:
Call(callName=describeTopics, deadlineMs=1643848916823) timed out at
9223372036854775807 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: The
AdminClient thread has exited.
2022-02-03 00:40:57,246 ERROR
org.apache.kafka.common.utils.KafkaThread [] - Uncaught exception in
thread 'kafka-admin-client-thread | null-enumerator-admin-client':
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) ~[?:1.8.0_312]
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[?:1.8.0_312]
at
org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:448)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:398)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1272)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1203)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]
Which is more indicative of an SSL error than a memory error according
to many sources.
My code looks like this:
private KafkaSource<SensorMessage> buildKafkaSource() {
final KafkaSourceBuilder<SensorMessage> sourceBuilder
=KafkaSource.<SensorMessage>builder()
.setTopics(sensorKafkaTopic)
.setDeserializer(new SensorMessageKafkaRecordDeserializer())
.setBootstrapServers(String.join(",", kafkaBootstrapServers))
.setGroupId(kafkaGroupId)
.setProperty("zookeeper.connect", zookeeperConnect); if
(kafkaRequiresSsl) {
sourceBuilder.setProperty("ssl.truststore.location", kafkaSslTruststoreLocation);
sourceBuilder.setProperty("ssl.truststore.password", kafkaSslTruststorePassword);
sourceBuilder.setProperty("security.protocol", "SSL"); }
assignOffsetStrategy(sourceBuilder);
sourceBuilder.setProperty("commit.offsets.on.checkpoint",
Boolean.TRUE.toString()); return sourceBuilder.build(); }
private void assignOffsetStrategy(KafkaSourceBuilder<SensorMessage>
sourceBuilder) {
if (kafkaOffset !=null) {
switch (kafkaOffset) {
case committed:
if (kafkaOffsetResetStrategy !=null) {
switch (kafkaOffsetResetStrategy) {
case earliest:
sourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST));
break; case latest:
sourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST));
break; }
}else {
sourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets()); }
break; case timestamp:
sourceBuilder.setStartingOffsets(OffsetsInitializer.timestamp(kafkaOffsetTimestamp));
break; case earliest:
sourceBuilder.setStartingOffsets(OffsetsInitializer.earliest());
break; case latest:
sourceBuilder.setStartingOffsets(OffsetsInitializer.latest());
break; }
}
}
public enum OffsetOption {committed,timestamp,earliest,latest }
public enum OffsetResetStrategyOption {
earliest, latest }
So, did I prematurely use KafkaSource and KafkaSourceBuilder? Should I
revert to using FlinkKafkaSource?
Any advice or insight would be very helpful.
Thank you.
Marco A. Villalobos