Thanks Gordon. *2017-06-01 20:22:44,400 WARN org.apache.kafka.clients.producer.internals.Sender - Got error produce response with correlation id 4 on topic-partit ion topic.http.stream.event.processor-0, retrying (99999 attempts left). Error: NOT_ENOUGH_REPLICAS
, not sure if this may be related to not being build with the Cloudera binaries.* This seems normal when kafka is down. *Could you provide info on how exactly you’re verifying the lost messages?* Our use case is pretty simple. 1) Source - Kafka (Flink task id: b93b267f087865c245babeb259c76593) 2) Group by key 3) Apply session window 4) Sink - Kafka 2, 3, 4 are assigned task id: b4a5c72b52779ab9b2b093b85b8b20c9 We bring down all kafka brokers once flink has received the messages from Kafka. Flink tries to send the messages to Kafka sink, but isn't able to because Kafka is down. This task fails: *2017-06-01 20:22:44,426 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521)) -> Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9). * Both tasks fail and this is communicated to job manager. Job Manager fails the job: *2017-06-01 20:22:44,500 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job event-filter (510a7a83f509adace6704e7f2caa0b75) switched from state RUNNING to FAILING. * Job Manager restarts the job again: *2017-06-01 20:22:44,530 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Restarting the job event-filter (510a7a83f509adace6704e7f2caa0b75). * At this point we're expecting that the Flink task to send to Kafka should be recovered, because it wasn't successfully committed. I see some similar logs in job manager logs: *2017-06-01 20:22:54,536 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Recovering checkpoints from ZooKeeper. 2017-06-01 20:22:54,543 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Found 1 checkpoints in ZooKeeper. 2017-06-01 20:22:54,543 INFO org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7. 2017-06-01 20:22:54,585 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring from latest valid checkpoint: Checkpoint 7 @ 1496348508893 for 510a7a83f509adace6704e7f2caa0b75.* Now, if I bring up all Kafka brokers, I am expecting that the messages which didn't make it to the Kafka sink should be sent. But that's not happening. All these logs are present in the files that I attached. I am going to try this on the standalone cluster today. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13458.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.