Hi Ninad,

This exception you’re seeing does not cause data loss. As a matter of fact, its 
preventing data loss based on how Flink’s checkpoints / fault-tolerance works.

So, a recap of what the problem was when this “uncaught exception leak” issue 
was first reported:
Prior to the fix, on checkpoints the Flink Kafka producer did not check for any 
async produce errors, therefore voiding the at-least-once guarantee of the sink.
In other words, the checkpoint was incorrectly succeeding without respecting 
that some previous data wasn’t sent to Kafka.

The fix included in 1.1.5 / 1.2.1 basically corrects this by rethrowing any 
async errors that occurred before the checkpoint happened, and fails the 
checkpoint snapshot (as what you are observing from this exception).

When a failure occurs in the job, Flink uses the last completed checkpoint to 
restart the job. In the case of the Flink Kafka producer, this essentially 
makes sure that records which did not make it into Kafka and caused the last 
run to fail are reprocessed and sent to Kafka again.

Hope this helps!

Cheers,
Gordon

On 1 June 2017 at 12:15:47 PM, Kostas Kloudas (k.klou...@data-artisans.com) 
wrote:

Hi Ninad,  

I think that Gordon could shed some more light on this but I suggest  
you should update your Flink version to at least the 1.2.  

The reason is that we are already in the process of releasing Flink 1.3  
(which will come probably today) and a lot of things have  
changed/fixed/improved since the 1.1 release. In fact, it would help us  
a lot if you could check if your problem still exists in the upcoming 1.3 
release.  

In addition, I suppose that the 1.1 release will soon be not supported  
anymore.  

Cheers,  
Kostas  

> On Jun 1, 2017, at 12:15 AM, ninad <nni...@gmail.com> wrote:  
>  
> Thanks for the fix guys. I am trying to test this with 1.1.5, but still  
> seeing a data loss. I am not able to get much from logs except this:  
>  
> Here's our use case:  
>  
> 1) Consume from Kafka  
> 2) Apply session window  
> 3) Send messages of window to Kafka  
>  
> If there's a failure in step 3, because all kafka brokers are down, we see a  
> data loss.  
>  
> Here are relevant logs:  
>  
> java.lang.Exception: Could not perform checkpoint 2 for operator  
> TriggerWindow(ProcessingTimeSessionWindows(30000),  
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67},
>   
> ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) ->  
> Sink: sink.http.sep (2/4).  
> at  
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:611)
>   
> at  
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:360)
>   
> at  
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:272)
>   
> at  
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:174)
>   
> at  
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:195)
>   
> at  
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:67)
>   
> at  
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
>   
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)  
> at java.lang.Thread.run(Thread.java:745)  
> Caused by: java.lang.Exception: Could not perform the checkpoint 2 for 0th  
> operator in chain.  
> at  
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:666)
>   
> at  
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:603)
>   
> ... 8 more  
> Caused by: java.lang.Exception: Failed to snapshot function state of  
> TriggerWindow(ProcessingTimeSessionWindows(30000),  
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67},
>   
> ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) ->  
> Sink: sink.http.sep (2/4).  
> at  
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:139)
>   
> at  
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:652)
>   
> ... 9 more  
> Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired  
> at  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:366)
>   
> at  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:335)
>   
>  
>  
>  
>  
>  
>  
> --  
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13412.html
>   
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.  

Reply via email to