Thanks Gordon and Kostas. 

Gordon,

"When a failure occurs in the job, Flink uses the last completed checkpoint
to restart the job. In the case of the Flink Kafka producer, this
essentially makes sure that records which did not make it into Kafka and
caused the last run to fail are reprocessed and sent to Kafka again."

This is exactly what we were expecting. Thanks for confirming. However, we
still do not see messages in Kafka.
All the Kafka properties are as expected:

Replication: 3
Min ISR: 2
acks: all

We also tried this with Flink 1.2.1. Now, we haven't tested this with the
standalone configuration. We will test it to see if the result is different. 

That being said, we're running this on cloudera YARN/hadoop cluster. But we
haven't built FLINK against cloudera binaries. The logs certainly don't
indicate that being the problem. 

Please let us know how we can troubleshoot this. 

I have attached the JobManager and TaskManager log files for reference.

Relevant logs from the logs files:

*Job Manager*

2017-06-01 20:22:44,499 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph        -
TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDesc
riptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293},
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521
)) -> Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9) switched
from RUNNING to FAILED.

2017-06-01 20:22:44,530 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Restarting
the job event-filter (510a7a83f509adace6704e7f2caa0b75).
2017-06-01 20:22:44,534 INFO 
org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestarter  -
Delaying retry of job execution for 10000 ms ...
2017-06-01 20:22:48,233 DEBUG
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization  - Failed to
serialize gauge.

2017-06-01 20:22:54,535 DEBUG
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Resetting
execution vertex Source: Custom Source (1/1) for new execution.
2017-06-01 20:22:54,535 DEBUG
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Resetting
execution vertex TriggerWindow(ProcessingTimeSessionWindows(30000),
ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293},
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521)) ->
Sink: sink.http.sep (1/1) for new execution.
2017-06-01 20:22:54,535 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
event-filter (510a7a83f509adace6704e7f2caa0b75) switched from state
RESTARTING to CREATED.
2017-06-01 20:22:54,536 INFO 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
Recovering checkpoints from ZooKeeper.
2017-06-01 20:22:54,543 INFO 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
Found 1 checkpoints in ZooKeeper.
2017-06-01 20:22:54,543 INFO 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
Trying to retrieve checkpoint 7.
2017-06-01 20:22:54,585 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Restoring
from latest valid checkpoint: Checkpoint 7 @ 1496348508893 for
510a7a83f509adace6704e7f2caa0b75.
2017-06-01 20:22:54,591 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
event-filter (510a7a83f509adace6704e7f2caa0b75) switched from state CREATED
to RUNNING.


*Task Manager 1*

2017-06-01 20:22:44,400 WARN 
org.apache.kafka.clients.producer.internals.Sender            - Got error
produce response with correlation id 4 on topic-partit
ion topic.http.stream.event.processor-0, retrying (99999 attempts left).
Error: NOT_ENOUGH_REPLICAS

2017-06-01 20:22:44,426 INFO  org.apache.flink.runtime.taskmanager.Task         
           
- Attempting to fail task externally TriggerWindow(ProcessingTimeS
essionWindows(30000),
ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293},
ProcessingTimeTrigger(), WindowedS
tream.apply(WindowedStream.java:521)) -> Sink: sink.http.sep (1/1)
(b4a5c72b52779ab9b2b093b85b8b20c9).

2017-06-01 20:22:44,427 INFO  org.apache.flink.runtime.taskmanager.Task         
           
- TriggerWindow(ProcessingTimeSessionWindows(30000), ListStateDesc
riptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293},
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521
)) -> Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9) switched
from RUNNING to FAILED.
TimerException{java.lang.RuntimeException: Could not forward element to next
operator}

*Task Manager 2* jobManager.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13443/jobManager.log>
  
taskManager.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13443/taskManager.log>
  
taskManager.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13443/taskManager.log>
  
2017-06-01 20:22:54,741 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartition  - Source:
Custom Source (1/1) (8ee2c8a628968bc3f8006f0740bb8ad1):
 Initialized ResultPartition
8d68b9c00d6a329d70ee2bf1ed320318@8ee2c8a628968bc3f8006f0740bb8ad1
[PIPELINED, 1 subpartitions, 1 pending references]
2017-06-01 20:22:54,760 INFO  org.apache.flink.yarn.YarnTaskManager             
           
- Received task Source: Custom Source (1/1)

2017-06-01 20:27:30,388 WARN  org.apache.kafka.clients.NetworkClient            
           
- Error while fetching metadata with correlation id 1 :
{topic.event.filter=LEADER_NOT_AVAILABLE}




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13443.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to