Thanks for the fix guys. I am trying to test this with 1.1.5, but still
seeing a data loss. I am not able to get much from logs except this:

Here's our use case:

1) Consume from Kafka
2) Apply session window
3) Send messages of window to Kafka

If there's a failure in step 3, because all kafka brokers are down, we see a
data loss. 

Here are relevant logs:

java.lang.Exception: Could not perform checkpoint 2 for operator
TriggerWindow(ProcessingTimeSessionWindows(30000),
ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67},
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) ->
Sink: sink.http.sep (2/4).
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:611)
        at
org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:360)
        at
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:272)
        at
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:174)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:195)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:67)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not perform the checkpoint 2 for 0th
operator in chain.
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:666)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:603)
        ... 8 more
Caused by: java.lang.Exception: Failed to snapshot function state of
TriggerWindow(ProcessingTimeSessionWindows(30000),
ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67},
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) ->
Sink: sink.http.sep (2/4).
        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:139)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:652)
        ... 9 more
Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:366)
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:335)






--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13412.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to