Thanks for the fix guys. I am trying to test this with 1.1.5, but still seeing a data loss. I am not able to get much from logs except this:
Here's our use case: 1) Consume from Kafka 2) Apply session window 3) Send messages of window to Kafka If there's a failure in step 3, because all kafka brokers are down, we see a data loss. Here are relevant logs: java.lang.Exception: Could not perform checkpoint 2 for operator TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) -> Sink: sink.http.sep (2/4). at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:611) at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:360) at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:272) at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:174) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:195) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Could not perform the checkpoint 2 for 0th operator in chain. at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:666) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:603) ... 8 more Caused by: java.lang.Exception: Failed to snapshot function state of TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67}, ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) -> Sink: sink.http.sep (2/4). at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:139) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:652) ... 9 more Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:366) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:335) -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13412.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.