Hey Flink users,

Currently using Flink 1.7.2 with a job using FlinkKafkaProducer with its write 
semantic set to Semantic.EXACTLY_ONCE. When there is a job failure and restart 
(in our case from checkpoint timeout), it begins a failure loop that requires a 
cancellation and resubmission to fix. The expected and desired outcome should 
be a recovery from failure and the job restarts successfully. Some digging 
revealed an issue where the class loader closes before the connection to kafka 
is fully terminated resulting in a NoClassDefFoundError. A description of what 
is happening has already been described here: 
https://heap.io/blog/engineering/missing-scala-class-noclassdeffounderror, 
though we are experiencing this with kafka, not Redis:

5/3/19
3:14:18.780 PM
2019-05-03 15:14:18,780 ERROR org.apache.kafka.common.utils.KafkaThread         
            - Uncaught exception in thread 'kafka-producer-network-thread | 
producer-80':
java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
at 
org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:658)
at 
org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:805)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:520)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:226)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
Collapse
date_hour =       15

Interestingly, this only happens when we extend the FlinkKafkaProducer for the 
purposes of setting the write semantic to EXACTLY_ONCE. When running with the 
default FlinkKafkaProducer (using Semantic.AT_LEAST_ONCE), the class loader has 
no issues disconnecting the kafka client on job failure, and the job recovers 
just fine. We are not doing anything particularly strange in our extended 
producer as far as I can tell:

public class CustomFlinkKafkaProducer<IN> extends FlinkKafkaProducer<IN> {

  public CustomFlinkKafkaProducer(Properties properties, String topicId,
      AvroKeyedSerializer<IN> serializationSchema) {
    super(
        topicId,
        serializationSchema,
        properties,
        Optional.of(new FlinkFixedPartitioner<>()),
        Semantic.EXACTLY_ONCE,
        DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
  }
  public static Properties getPropertiesFromBrokerList(String brokerList) {
    […]
  }
}


Reply via email to