According to the Flink 1.12 documentation (
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html),
it states to use FlinkKafkaSource when consuming from Kafka.

However, I noticed that the newer API uses KafkaSource, which uses
KafkaSourceBuilder and OffsetsInitializer.

Although I am on the Flink 1.12 codebase, I preemptively decided to use
KafkaSource instead in order to use the more advanced offsets feature. It
worked, until I deployed it to EMR and had to connect to AWS Kafka (MSK).

The logs show a few suspicious things.

1) The ConsumerConfig logs these properties:

security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS

but I actually passed the following:

security.protocol = SSL
ssl.truststore.location = /etc/alternatives/jre/lib/security/cacerts
ssl.truststore.password = changeit
ssl.truststore.type = JKS

2) The job fails and this exception is thrown:

2022-02-03 00:40:57,239 ERROR
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext [] -
Exception while handling result from async call in
SourceCoordinator-Source: kafka sensor tags -> Sink: s3 sink. Triggering
job failover.
org.apache.flink.util.FlinkRuntimeException: Failed to handle partition
splits change due to
at
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.handlePartitionSplitChanges(KafkaSourceEnumerator.java:223)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:86)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
[flink-dist_2.12-1.12.1.jar:1.12.1]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_312]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_312]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]
Caused by: java.lang.RuntimeException: Failed to get topic metadata.
at
org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:59)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:196)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_312]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_312]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
~[?:1.8.0_312]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
~[?:1.8.0_312]
... 3 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.TimeoutException:
Call(callName=describeTopics, deadlineMs=1643848916823) timed out at
9223372036854775807 after 1 attempt(s)
at
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getPartitionChanges(TopicListSubscriber.java:57)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.discoverAndInitializePartitionSplit(KafkaSourceEnumerator.java:196)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:83)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_312]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_312]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
~[?:1.8.0_312]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
~[?:1.8.0_312]
... 3 more
Caused by: org.apache.kafka.common.errors.TimeoutException:
Call(callName=describeTopics, deadlineMs=1643848916823) timed out at
9223372036854775807 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: The AdminClient
thread has exited.
2022-02-03 00:40:57,246 ERROR org.apache.kafka.common.utils.KafkaThread
               [] - Uncaught exception in thread 'kafka-admin-client-thread
| null-enumerator-admin-client':
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) ~[?:1.8.0_312]
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[?:1.8.0_312]
at
org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:448)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:398)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1272)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1203)
~[feature-LUM-5531-offset--b7c3bfee.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]

Which is more indicative of an SSL error than a memory error according to
many sources.

My code looks like this:

private KafkaSource<SensorMessage> buildKafkaSource() {

   final KafkaSourceBuilder<SensorMessage> sourceBuilder =
KafkaSource.<SensorMessage>builder()
         .setTopics(sensorKafkaTopic)
         .setDeserializer(new SensorMessageKafkaRecordDeserializer())
         .setBootstrapServers(String.join(",", kafkaBootstrapServers))
         .setGroupId(kafkaGroupId)
         .setProperty("zookeeper.connect", zookeeperConnect);


   if (kafkaRequiresSsl) {
      sourceBuilder.setProperty("ssl.truststore.location",
kafkaSslTruststoreLocation);
      sourceBuilder.setProperty("ssl.truststore.password",
kafkaSslTruststorePassword);
      sourceBuilder.setProperty("security.protocol", "SSL");
   }

   assignOffsetStrategy(sourceBuilder);
   sourceBuilder.setProperty("commit.offsets.on.checkpoint",
Boolean.TRUE.toString());

   return sourceBuilder.build();
}

private void assignOffsetStrategy(KafkaSourceBuilder<SensorMessage>
sourceBuilder) {
   if (kafkaOffset != null) {
      switch (kafkaOffset) {
         case committed:
            if (kafkaOffsetResetStrategy != null) {
               switch (kafkaOffsetResetStrategy) {
                  case earliest:

sourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST));
                     break;
                  case latest:

sourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST));
                     break;
               }
            } else {
               
sourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets());
            }
            break;
         case timestamp:
            
sourceBuilder.setStartingOffsets(OffsetsInitializer.timestamp(kafkaOffsetTimestamp));
            break;
         case earliest:
            sourceBuilder.setStartingOffsets(OffsetsInitializer.earliest());
            break;
         case latest:
            sourceBuilder.setStartingOffsets(OffsetsInitializer.latest());
            break;
      }
   }
}

public enum OffsetOption {
   committed,
   timestamp,
   earliest,
   latest
}

public enum OffsetResetStrategyOption {
   earliest,
   latest
}

So, did I prematurely use KafkaSource and KafkaSourceBuilder? Should I
revert to using FlinkKafkaSource?

Any advice or insight would be very helpful.

Thank you.

Marco A. Villalobos

Reply via email to