Hi Ninad, After recovery, the job should continue from where the last checkpoint was taken. Thus, it should output all remaining messages at least once to Kafka.
Could you share the complete JobManager and TaskManager logs with us? Maybe they contain some information which could be helpful to get to the bottom of the problem. Cheers, Till On Fri, Jun 2, 2017 at 4:32 PM, ninad <nni...@gmail.com> wrote: > Thanks Gordon. > > *2017-06-01 20:22:44,400 WARN > org.apache.kafka.clients.producer.internals.Sender - Got error > produce response with correlation id 4 on topic-partit > ion topic.http.stream.event.processor-0, retrying (99999 attempts left). > Error: NOT_ENOUGH_REPLICAS > > , not sure if this may be related to not being build with the Cloudera > binaries.* > > This seems normal when kafka is down. > > *Could you provide info on how exactly you’re verifying the lost messages?* > > Our use case is pretty simple. > > 1) Source - Kafka (Flink task id: b93b267f087865c245babeb259c76593) > 2) Group by key > 3) Apply session window > 4) Sink - Kafka > > 2, 3, 4 are assigned task id: b4a5c72b52779ab9b2b093b85b8b20c9 > > We bring down all kafka brokers once flink has received the messages from > Kafka. > Flink tries to send the messages to Kafka sink, but isn't able to because > Kafka is down. This task fails: > > *2017-06-01 20:22:44,426 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally > TriggerWindow(ProcessingTimeSessionWindows(30000), > ListStateDescriptor{serializer=org.apache.flink. > api.java.typeutils.runtime.TupleSerializer@e56b3293}, > ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521)) -> > Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9). > * > > Both tasks fail and this is communicated to job manager. > > Job Manager fails the job: > *2017-06-01 20:22:44,500 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job > event-filter (510a7a83f509adace6704e7f2caa0b75) switched from state > RUNNING > to FAILING. > * > > Job Manager restarts the job again: > *2017-06-01 20:22:44,530 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Restarting > the job event-filter (510a7a83f509adace6704e7f2caa0b75). > * > > At this point we're expecting that the Flink task to send to Kafka should > be > recovered, because it wasn't successfully committed. I see some similar > logs > in job manager logs: > > *2017-06-01 20:22:54,536 INFO > org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - > Recovering checkpoints from ZooKeeper. > 2017-06-01 20:22:54,543 INFO > org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - > Found 1 checkpoints in ZooKeeper. > 2017-06-01 20:22:54,543 INFO > org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - > Trying to retrieve checkpoint 7. > 2017-06-01 20:22:54,585 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring > from latest valid checkpoint: Checkpoint 7 @ 1496348508893 for > 510a7a83f509adace6704e7f2caa0b75.* > > Now, if I bring up all Kafka brokers, I am expecting that the messages > which > didn't make it to the Kafka sink should be sent. > > But that's not happening. > > All these logs are present in the files that I attached. > > I am going to try this on the standalone cluster today. > > > > -- > View this message in context: http://apache-flink-user- > mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss- > tp11413p13458.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >