Thanks Gordon.

*2017-06-01 20:22:44,400 WARN 
org.apache.kafka.clients.producer.internals.Sender - Got error 
produce response with correlation id 4 on topic-partit 
ion topic.http.stream.event.processor-0, retrying (99999 attempts left). 
Error: NOT_ENOUGH_REPLICAS 

, not sure if this may be related to not being build with the Cloudera
binaries.*

This seems normal when kafka is down. 

*Could you provide info on how exactly you’re verifying the lost messages?*

Our use case is pretty simple. 

1) Source - Kafka (Flink task id: b93b267f087865c245babeb259c76593)
2) Group by key
3) Apply session window
4) Sink - Kafka

2, 3, 4 are assigned task id: b4a5c72b52779ab9b2b093b85b8b20c9

We bring down all kafka brokers once flink has received the messages from
Kafka. 
Flink tries to send the messages to Kafka sink, but isn't able to because
Kafka is down. This task fails:

*2017-06-01 20:22:44,426 INFO  org.apache.flink.runtime.taskmanager.Task        
            
- Attempting to fail task externally
TriggerWindow(ProcessingTimeSessionWindows(30000),
ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293},
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521)) ->
Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9).
*

Both tasks fail and this is communicated to job manager.

Job Manager fails the job:
*2017-06-01 20:22:44,500 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
event-filter (510a7a83f509adace6704e7f2caa0b75) switched from state RUNNING
to FAILING.
*

Job Manager restarts the job again:
*2017-06-01 20:22:44,530 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Restarting
the job event-filter (510a7a83f509adace6704e7f2caa0b75).
*

At this point we're expecting that the Flink task to send to Kafka should be
recovered, because it wasn't successfully committed. I see some similar logs
in job manager logs:

*2017-06-01 20:22:54,536 INFO 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
Recovering checkpoints from ZooKeeper.
2017-06-01 20:22:54,543 INFO 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
Found 1 checkpoints in ZooKeeper.
2017-06-01 20:22:54,543 INFO 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
Trying to retrieve checkpoint 7.
2017-06-01 20:22:54,585 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Restoring
from latest valid checkpoint: Checkpoint 7 @ 1496348508893 for
510a7a83f509adace6704e7f2caa0b75.*

Now, if I bring up all Kafka brokers, I am expecting that the messages which
didn't make it to the Kafka sink should be sent. 

But that's not happening. 

All these logs are present in the files that I attached. 

I am going to try this on the standalone cluster today. 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13458.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to