Re: Fink: KafkaProducer Data Loss

2017-07-31 Thread Tzu-Li (Gordon) Tai
Hi!

Thanks a lot for providing this.
I'll try to find some time this week to look into this using your example code.

Cheers,
Gordon

On 29 July 2017 at 4:46:57 AM, ninad (nni...@gmail.com) wrote:

Hi Gordon, I was able to reproduce the data loss on standalone flink cluster 
also. I have stripped down version of our code with here: 

Environment: 
Flink standalone 1.3.0 
Kafka 0.9 

*What the code is doing:* 
-consume messages from kafka topic ('event.filter.topic' property in 
application.properties) 
-group them by key 
-analyze the events in a window and filter some messages. 
-send remaining messages to kafka topc ('sep.http.topic' property in 
application.properties) 

To build: 
./gradlew clean assemble 

The jar needs path to 'application.properties' file to run 

Important properties in application.properties: 
window.session.interval.sec 
kafka.brokers 
event.filter.topic --> source topic 
sep.http.topic --> destination topic 

To test: 
-Use 'EventGenerator' class to publish messages to source kafka topic 
The data published won't be filtered by the logic. If you publish 10 
messages to the source topic, 
those 10 messages should be sent to the destination topic. 

-Once we see that flink has received all the messages, bring down all kafka 
brokers 

-Let Flink jobs fail for 2-3 times. 

-Restart kafka brokers. 

Note: Data loss isn't observed frequently. 1/4 times or so. 

Thanks for all your help. 

eventFilter.zip 
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14522/eventFilter.zip>
 









-- 
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p14522.html
 
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com. 


Re: Fink: KafkaProducer Data Loss

2017-07-28 Thread ninad
Hi Gordon, I was able to reproduce the data loss on standalone flink cluster
also. I have stripped down version of our code with here:

Environment:
Flink standalone 1.3.0
Kafka 0.9

*What the code is doing:*
-consume messages from kafka topic ('event.filter.topic' property in
application.properties)
-group them by key
-analyze the events in a window and filter some messages.
-send remaining messages to kafka topc ('sep.http.topic' property in
application.properties)

To build: 
./gradlew clean assemble

The jar needs path to 'application.properties' file to run

Important properties in application.properties:
window.session.interval.sec
kafka.brokers
event.filter.topic --> source topic
sep.http.topic --> destination topic

To test:
-Use 'EventGenerator' class to publish messages to source kafka topic
The data published won't be filtered by the logic. If you publish 10
messages to the source topic, 
those 10 messages should be sent to the destination topic.

-Once we  see that flink has received all the messages, bring down all kafka
brokers

-Let Flink jobs fail for 2-3 times. 

-Restart kafka brokers. 

Note: Data loss isn't observed frequently. 1/4 times or so.

Thanks for all your help.

eventFilter.zip
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14522/eventFilter.zip>
  









--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p14522.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Fink: KafkaProducer Data Loss

2017-07-13 Thread Piotr Nowojski
Ops, sorry, I forgot that this issue was relevant to FlinkKafkaProducer010 only.

Piotrek

> On Jul 13, 2017, at 9:33 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
> 
> Hi Ninad & Piotr,
> 
> AFAIK, when this issue was reported, Ninad was using 09.
> FLINK-6996 only affects Flink Kafka Producer 010, so I don’t think that’s the 
> cause here.
> 
> @Ninad
> Code to reproduce this would definitely be helpful here, thanks. If you 
> prefer to provide that privately, that would also be fine.
> 
> Cheers,
> Gordon
> 
> On 13 July 2017 at 4:13:07 PM, Piotr Nowojski (pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>) wrote:
> 
>> Hi,
>> 
>> I’m not sure how relevant is this, but recently I have found and fixed a 
>> bug, that in certain conditions was causing data losses for all of the 
>> FlinkKafkaProducers in Flink:
>> 
>> https://issues.apache.org/jira/browse/FLINK-6996 
>> <https://issues.apache.org/jira/browse/FLINK-6996>
>> 
>> Namely on checkpoint “flush” method was not being called. It should be fixed 
>> in Flink 1.3.2 and 1.4 releases.
>> 
>> Piotrek
>> 
>>> On Jul 12, 2017, at 7:32 PM, ninad <nni...@gmail.com 
>>> <mailto:nni...@gmail.com>> wrote:
>>> 
>>> Hey guys, any update on this? If needed I can attach our code.
>>> 
>>> 
>>> 
>>> --
>>> View this message in context: 
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p14224.html
>>>  
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p14224.html>
>>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>>> at Nabble.com <http://nabble.com/>.



Re: Fink: KafkaProducer Data Loss

2017-07-13 Thread Tzu-Li (Gordon) Tai
Hi Ninad & Piotr,

AFAIK, when this issue was reported, Ninad was using 09.
FLINK-6996 only affects Flink Kafka Producer 010, so I don’t think that’s the 
cause here.

@Ninad
Code to reproduce this would definitely be helpful here, thanks. If you prefer 
to provide that privately, that would also be fine.

Cheers,
Gordon

On 13 July 2017 at 4:13:07 PM, Piotr Nowojski (pi...@data-artisans.com) wrote:

Hi,

I’m not sure how relevant is this, but recently I have found and fixed a bug, 
that in certain conditions was causing data losses for all of the 
FlinkKafkaProducers in Flink:

https://issues.apache.org/jira/browse/FLINK-6996

Namely on checkpoint “flush” method was not being called. It should be fixed in 
Flink 1.3.2 and 1.4 releases.

Piotrek

On Jul 12, 2017, at 7:32 PM, ninad <nni...@gmail.com> wrote:

Hey guys, any update on this? If needed I can attach our code.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p14224.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.



Re: Fink: KafkaProducer Data Loss

2017-07-13 Thread Piotr Nowojski
Hi,

I’m not sure how relevant is this, but recently I have found and fixed a bug, 
that in certain conditions was causing data losses for all of the 
FlinkKafkaProducers in Flink:

https://issues.apache.org/jira/browse/FLINK-6996 
<https://issues.apache.org/jira/browse/FLINK-6996>

Namely on checkpoint “flush” method was not being called. It should be fixed in 
Flink 1.3.2 and 1.4 releases.

Piotrek

> On Jul 12, 2017, at 7:32 PM, ninad <nni...@gmail.com> wrote:
> 
> Hey guys, any update on this? If needed I can attach our code.
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p14224.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Fink: KafkaProducer Data Loss

2017-07-12 Thread ninad
Hey guys, any update on this? If needed I can attach our code.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p14224.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Fink: KafkaProducer Data Loss

2017-06-16 Thread ninad
Hi Aljoscha,

I gather you guys aren't able to reproduce this. 

Here are the answers to your questions:

How do you ensure that you only shut down the brokers once Flink has read
all the data that you expect it to read

Ninad: I am able to see the number of messages received on the Flink Job UI.

And, how do you ensure that the offset that Flink checkpoints in step 3) is
the offset that corresponds to the end of your test data.

Ninad: I haven't explicitly verified which offsets were checkpointed. When I
say that a checkpoint was successful, I am referring to the Flink logs. So,
as long as Flink says that my last successful checkpoint was #7. And on
recovery, it restores it's state of checkpoint #7.


What is the difference between steps 3) and 5)?

Ninad: I didn't realize that windows are merged eagerly. I have a session
window with interval of 30 secs. Once I see from the UI that all the
messages have been received, I don't see the following logs for 30 secs. So
that's why I thought that the windows are merged once the window trigger is
fired.

Ex:

I verified from the UI that all messages were received. 

I then see this checkpoint in the logs:
2017-06-01 20:21:49,012 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask   - Notification
of complete checkpoint for task TriggerWindow(ProcessingTimeSessionWindows
(3),
ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293},
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521)
) -> Sink: sink.http.sep (1/1)


I then see the windows being merged after a few seconds:

2017-06-01 20:22:14,300 DEBUG
org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet  -
Merging [TimeWindow{start=1496348534287, end=1496348564287},
TimeWindow{start=1496348534300, end=1496348564300}] into
TimeWindow{start=1496348534287, end=1496348564300}


So, point 3 is referring to these logs "MergingWindowSet - Merging .."
And point 4 is referring to the data in windows being evaluated.

Hope this helps. Thanks.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13805.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Fink: KafkaProducer Data Loss

2017-06-15 Thread Aljoscha Krettek
4, 15110, 15126, 
>> 15142, 15158, 15174, 15190, 15206, 16088, 16104, 16120, 16136, 28818, 28834, 
>> 28850, 28866, 28882, 28898, 28914, 28930, 28946, 28962, 28978, 28994, 29010, 
>> 29026, 29042, 29058, 29074, 29090, 29106, 29122, 29138, 29154, 29170, 40346, 
>> 40362, 40378, 40394, 40410, 40426, 40442, 40458, 40474, 40490, 40506, 40522, 
>> 40538, 41420, 41436, 41452, 41468, 41484, 41500, 41516, 41532, 41548, 41564, 
>> 41580, 41596, 41612, 41628, 41644, 41660, 41676, 41692, 41708, 41724, 41740, 
>> 41756, 41772, 41788, 41804, 41820, 41836, 41852, 41868, 41884, 41900, 
>> 41916]}, data=File State: 
>> hdfs://dc1udtlhcld005.stack.qadev.corp:8022/user/harmony_user/flink-checkpoint/3f5aef5e15a23bce627c05c94760fb16/chk-2/aa076014-120b-4955-ab46-89094358ebeb
>>  
>> [41932 bytes]}].* 
>> 
>> But apparently, the retore state didn't have all the messages the window had 
>> received. Because 
>> a few messages were not replayed, and the kafka sink didn't receive all the 
>> messages. 
>> 
>> Attaching the files here. 
>> 
>> jmCloudera583.log 
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13597/jmCloudera583.log>
>>   
>> tmOneCloudera583.log 
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13597/tmOneCloudera583.log>
>>   
>> tmTwoCloudera583.log 
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13597/tmTwoCloudera583.log>
>>   
>> 
>> BTW, I posted my test results and logs for regular Flink v1.3.0 on Jun 6, 
>> but don't see that post here. I did receive an email thought. Hope you guys 
>> saw that.  
>> 
>> Thanks for your patience guys.  
>> 
>> 
>> 
>> -- 
>> View this message in context: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13597.html
>>  
>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>> at Nabble.com. 
> 
> 
> If you reply to this email, your message will be added to the discussion 
> below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13620.html
> To unsubscribe from Fink: KafkaProducer Data Loss, click here.
> NAML
> 
> View this message in context: Re: Fink: KafkaProducer Data Loss
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Fink: KafkaProducer Data Loss

2017-06-11 Thread ninad
bble.com/file/n13597/jmCloudera583.log>
> tmOneCloudera583.log
> <http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/file/n13597/tmOneCloudera583.log>
> tmTwoCloudera583.log
> <http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/file/n13597/tmTwoCloudera583.log>
>
> BTW, I posted my test results and logs for regular Flink v1.3.0 on Jun 6,
> but don't see that post here. I did receive an email thought. Hope you
> guys
> saw that.
>
> Thanks for your patience guys.
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-
> tp11413p13597.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-
> KafkaProducer-Data-Loss-tp11413p13620.html
> To unsubscribe from Fink: KafkaProducer Data Loss, click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=11413=bm5pbmFkQGdtYWlsLmNvbXwxMTQxM3wtNTE2ODM5Mzg5>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13621.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Fink: KafkaProducer Data Loss

2017-06-11 Thread Tzu-Li (Gordon) Tai
Hi Ninad,

Thanks for the logs!
Just to let you know, I’ll continue to investigate this early next week.

Cheers,
Gordon

On 8 June 2017 at 7:08:23 PM, ninad (nni...@gmail.com) wrote:

I built Flink v1.3.0 with cloudera hadoop and am able to see the data loss.  

Here are the details:  

*tmOneCloudera583.log*  

Received session window task:  
*2017-06-08 15:10:46,131 INFO org.apache.flink.runtime.taskmanager.Task  
- TriggerWindow(ProcessingTimeSessionWindows(3),  
ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@f4361ec},
  
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:1100)) ->  
Sink: sink.http.sep (1/1) (3ef2f947096f3b0986b8ad334c7d5d3c) switched from  
CREATED to DEPLOYING.  

Finished checkpoint 2 (Synchronous part)  
2017-06-08 15:15:51,982 DEBUG  
org.apache.flink.streaming.runtime.tasks.StreamTask -  
TriggerWindow(ProcessingTimeSessionWindows(3),  
ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@f4361ec},
  
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:1100)) ->  
Sink: sink.http.sep (1/1) - finished synchronous part of checkpoint  
2.Alignment duration: 0 ms, snapshot duration 215 ms  
*  

The task failed before the verification of completed checkpoint could be  
verified.  
i.e, I don't see the log saying "Notification of complete checkpoint for  
task TriggerWindow" for checkpoint 2.  

*jmCloudera583.log*  

Job Manager received acks for checkpoint 2  

*2017-06-08 15:15:51,898 DEBUG  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received  
acknowledge message for checkpoint 2 from task  
3e980e4d3278ef0d0f5e0fa9591cc53d of job 3f5aef5e15a23bce627c05c94760fb16.  
2017-06-08 15:15:51,982 DEBUG  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received  
acknowledge message for checkpoint 2 from task  
3ef2f947096f3b0986b8ad334c7d5d3c of job 3f5aef5e15a23bce627c05c94760fb16*.  

Job Manager tried to restore from checkpoint 2.  

*2017-06-08 15:16:02,111 INFO  
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore -  
Found 1 checkpoints in ZooKeeper.  
2017-06-08 15:16:02,111 INFO  
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore -  
Trying to retrieve checkpoint 2.  
2017-06-08 15:16:02,122 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring  
from latest valid checkpoint: Checkpoint 2 @ 149693476  
6105 for 3f5aef5e15a23bce627c05c94760fb16.*  

*tmTwocloudera583.log*  

Task Manager tried to restore the data and was successful.  

*2017-06-08 15:16:02,258 DEBUG  
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Restoring  
snapshot from state handles:  
[KeyGroupsStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
  
endKeyGroup=127}, offsets=[13460, 13476, 13492, 13508, 13524, 13540, 13556,  
13572, 13588, 13604, 13620, 13636, 13652, 13668, 13684, 14566, 14582, 14598,  
14614, 14630, 14646, 14662, 14678, 14694, 14710, 14726, 14742, 14758, 14774,  
14790, 14806, 14822, 14838, 14854, 14870, 14886, 14902, 14918, 14934, 14950,  
14966, 14982, 14998, 15014, 15030, 15046, 15062, 15078, 15094, 15110, 15126,  
15142, 15158, 15174, 15190, 15206, 16088, 16104, 16120, 16136, 28818, 28834,  
28850, 28866, 28882, 28898, 28914, 28930, 28946, 28962, 28978, 28994, 29010,  
29026, 29042, 29058, 29074, 29090, 29106, 29122, 29138, 29154, 29170, 40346,  
40362, 40378, 40394, 40410, 40426, 40442, 40458, 40474, 40490, 40506, 40522,  
40538, 41420, 41436, 41452, 41468, 41484, 41500, 41516, 41532, 41548, 41564,  
41580, 41596, 41612, 41628, 41644, 41660, 41676, 41692, 41708, 41724, 41740,  
41756, 41772, 41788, 41804, 41820, 41836, 41852, 41868, 41884, 41900,  
41916]}, data=File State:  
hdfs://dc1udtlhcld005.stack.qadev.corp:8022/user/harmony_user/flink-checkpoint/3f5aef5e15a23bce627c05c94760fb16/chk-2/aa076014-120b-4955-ab46-89094358ebeb
  
[41932 bytes]}].*  

But apparently, the retore state didn't have all the messages the window had  
received. Because  
a few messages were not replayed, and the kafka sink didn't receive all the  
messages.  

Attaching the files here.  

jmCloudera583.log  
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13597/jmCloudera583.log>
  
tmOneCloudera583.log  
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13597/tmOneCloudera583.log>
  
tmTwoCloudera583.log  
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13597/tmTwoCloudera583.log>
  

BTW, I posted my test results and logs for regular Flink v1.3.0 on Jun 6,  
but don't see that post here. I did receive an email thought. Hope you guys  
saw that.  

Thanks for your patience guys.  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13597.html

Re: Fink: KafkaProducer Data Loss

2017-06-08 Thread ninad
I built Flink v1.3.0 with cloudera hadoop and am able to see the data loss. 

Here are the details:

*tmOneCloudera583.log*

Received session window task:
*2017-06-08 15:10:46,131 INFO  org.apache.flink.runtime.taskmanager.Task

- TriggerWindow(ProcessingTimeSessionWindows(3),
ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@f4361ec},
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:1100)) ->
Sink: sink.http.sep (1/1) (3ef2f947096f3b0986b8ad334c7d5d3c) switched from
CREATED to DEPLOYING.

Finished checkpoint 2 (Synchronous part) 
2017-06-08 15:15:51,982 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask   -
TriggerWindow(ProcessingTimeSessionWindows(3),
ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@f4361ec},
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:1100)) ->
Sink: sink.http.sep (1/1) - finished synchronous part of checkpoint
2.Alignment duration: 0 ms, snapshot duration 215 ms
*

The task failed before the verification of completed checkpoint could be
verified.
i.e, I don't see the log saying "Notification of complete checkpoint for
task TriggerWindow" for checkpoint 2.

*jmCloudera583.log*

Job Manager received acks for checkpoint 2

*2017-06-08 15:15:51,898 DEBUG
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received
acknowledge message for checkpoint 2 from task
3e980e4d3278ef0d0f5e0fa9591cc53d of job 3f5aef5e15a23bce627c05c94760fb16.
2017-06-08 15:15:51,982 DEBUG
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Received
acknowledge message for checkpoint 2 from task
3ef2f947096f3b0986b8ad334c7d5d3c of job 3f5aef5e15a23bce627c05c94760fb16*.

Job Manager tried to restore from checkpoint 2.

*2017-06-08 15:16:02,111 INFO 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
Found 1 checkpoints in ZooKeeper.
2017-06-08 15:16:02,111 INFO 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
Trying to retrieve checkpoint 2.
2017-06-08 15:16:02,122 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring
from latest valid checkpoint: Checkpoint 2 @ 149693476
6105 for 3f5aef5e15a23bce627c05c94760fb16.*

*tmTwocloudera583.log*

Task Manager tried to restore the data and was successful. 

*2017-06-08 15:16:02,258 DEBUG
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Restoring
snapshot from state handles:
[KeyGroupsStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
endKeyGroup=127}, offsets=[13460, 13476, 13492, 13508, 13524, 13540, 13556,
13572, 13588, 13604, 13620, 13636, 13652, 13668, 13684, 14566, 14582, 14598,
14614, 14630, 14646, 14662, 14678, 14694, 14710, 14726, 14742, 14758, 14774,
14790, 14806, 14822, 14838, 14854, 14870, 14886, 14902, 14918, 14934, 14950,
14966, 14982, 14998, 15014, 15030, 15046, 15062, 15078, 15094, 15110, 15126,
15142, 15158, 15174, 15190, 15206, 16088, 16104, 16120, 16136, 28818, 28834,
28850, 28866, 28882, 28898, 28914, 28930, 28946, 28962, 28978, 28994, 29010,
29026, 29042, 29058, 29074, 29090, 29106, 29122, 29138, 29154, 29170, 40346,
40362, 40378, 40394, 40410, 40426, 40442, 40458, 40474, 40490, 40506, 40522,
40538, 41420, 41436, 41452, 41468, 41484, 41500, 41516, 41532, 41548, 41564,
41580, 41596, 41612, 41628, 41644, 41660, 41676, 41692, 41708, 41724, 41740,
41756, 41772, 41788, 41804, 41820, 41836, 41852, 41868, 41884, 41900,
41916]}, data=File State:
hdfs://dc1udtlhcld005.stack.qadev.corp:8022/user/harmony_user/flink-checkpoint/3f5aef5e15a23bce627c05c94760fb16/chk-2/aa076014-120b-4955-ab46-89094358ebeb
[41932 bytes]}].*

But apparently, the retore state didn't have all the messages the window had
received. Because
a few messages were not replayed, and the kafka sink didn't receive all the
messages.

Attaching the files here.

jmCloudera583.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13597/jmCloudera583.log>
   
tmOneCloudera583.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13597/tmOneCloudera583.log>
   
tmTwoCloudera583.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13597/tmTwoCloudera583.log>
   

BTW, I posted my test results and logs for regular Flink v1.3.0 on Jun 6,
but don't see that post here. I did receive an email thought. Hope you guys
saw that. 

Thanks for your patience guys. 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13597.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Fink: KafkaProducer Data Loss

2017-06-06 Thread ninad
I tried to build Flink with hadoop cdc 5.8.3 and test it, but it was unable
to come up. When I ran the yarn-session script, it exited with something
like "Can't get the url for job manager". 

I didn't spend much time figuring out what's wrong, and went straight to
Flink 1.3.0. I ran several tests, but saw data loss just once. It's very
hard to reproduce though, with this version. With 1.2.1, it was pretty easy
to reproduce. 

Attaching the logs again. Although in this run, I had kafka logs on DEBUG,
so the files are pretty big.

jmV3.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13549/jmV3.log>
   
tmOneV3.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13549/tmOneV3.log>
   
tmTwoV3.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13549/tmTwoV3.log>
   





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13549.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Fink: KafkaProducer Data Loss

2017-06-06 Thread ninad
Not yet. Planning to do that today.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13544.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Fink: KafkaProducer Data Loss

2017-06-06 Thread Tzu-Li (Gordon) Tai
Hi,

From the logs and the description of your test scenarios where data loss is 
observed and not observed, it seems like the differentiating factor here is 
whether or not the session windows trigger was first fired when the checkpoint 
occurred.

It doesn’t however explain the case where your tests pass on a standalone 
cluster. Have you also re-tested your scenarios on Cloudera, with Flink built 
against the Cloudera binaries?


On 6 June 2017 at 4:53:30 PM, ninad (nni...@gmail.com) wrote:

Hi Till,  
Attaching the logs to this post again.  

Thanks.  

jobManager.log  
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13527/jobManager.log>
  
tmOne.log  
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13527/tmOne.log>
  
tmTwo.log  
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13527/tmTwo.log>
  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13527.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Fink: KafkaProducer Data Loss

2017-06-06 Thread ninad
Hi Till,
Attaching the logs to this post again. 

Thanks. 

jobManager.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13527/jobManager.log>
   
tmOne.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13527/tmOne.log>
   
tmTwo.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13527/tmTwo.log>
   



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13527.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Fink: KafkaProducer Data Loss

2017-06-06 Thread Till Rohrmann
Hi Ninad,

the logs for the data loss case weren't attached to the mails. Maybe you
could attach them again in the same way as you did for the no data loss
logs.

Cheers,
Till

On Sun, Jun 4, 2017 at 2:55 PM, ninad <nni...@gmail.com> wrote:

> Yeah, this seems like a problem with flink check-pointing. The fact that
> flink thinks that a checkpoint was successful, but in fact it wasn't.
>
> On Jun 4, 2017 7:37 AM, "Tzu-Li (Gordon) Tai [via Apache Flink User
> Mailing List archive.]" <[hidden email]
> <http:///user/SendEmail.jtp?type=node=13480=0>> wrote:
>
>> Thanks for the updates and testing efforts on this!
>>
>> I’m sorry that I currently haven’t found the change to look closely into
>> the testing scenarios you’ve listed, yet.
>> But please keep us updated on this thread after testing it out also with
>> the Cloudera build.
>>
>> One other suggestion for your test to make sure that some failed record
>> is actually retried: you can add a dummy verifying operator right before
>> the Kafka sink.
>> At least that way you should be able to eliminate the possibility that
>> the Kafka sink is incorrectly ignoring failed records when checkpointing.
>> From another look at the Kafka sink code, I’m pretty sure this shouldn’t be
>> the case.
>>
>> Many thanks,
>> Gordon
>>
>> On 4 June 2017 at 2:14:40 PM, ninad ([hidden email]
>> <http:///user/SendEmail.jtp?type=node=13479=0>) wrote:
>>
>> I tested this with the standalone cluster, and I don't see this problem.
>> So,
>> the problem could be that we haven't built Flink against cloudera Hadoop?
>> I
>> will test it out.
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-
>> Data-Loss-tp11413p13477.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>>
>>
>> ------
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13479.html
>> To unsubscribe from Fink: KafkaProducer Data Loss, click here.
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
> --
> View this message in context: Re: Fink: KafkaProducer Data Loss
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13480.html>
>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>


Re: Fink: KafkaProducer Data Loss

2017-06-04 Thread ninad
Yeah, this seems like a problem with flink check-pointing. The fact that
flink thinks that a checkpoint was successful, but in fact it wasn't.

On Jun 4, 2017 7:37 AM, "Tzu-Li (Gordon) Tai [via Apache Flink User Mailing
List archive.]" <ml+s2336050n1347...@n4.nabble.com> wrote:

> Thanks for the updates and testing efforts on this!
>
> I’m sorry that I currently haven’t found the change to look closely into
> the testing scenarios you’ve listed, yet.
> But please keep us updated on this thread after testing it out also with
> the Cloudera build.
>
> One other suggestion for your test to make sure that some failed record is
> actually retried: you can add a dummy verifying operator right before the
> Kafka sink.
> At least that way you should be able to eliminate the possibility that the
> Kafka sink is incorrectly ignoring failed records when checkpointing. From
> another look at the Kafka sink code, I’m pretty sure this shouldn’t be the
> case.
>
> Many thanks,
> Gordon
>
> On 4 June 2017 at 2:14:40 PM, ninad ([hidden email]
> <http:///user/SendEmail.jtp?type=node=13479=0>) wrote:
>
> I tested this with the standalone cluster, and I don't see this problem.
> So,
> the problem could be that we haven't built Flink against cloudera Hadoop?
> I
> will test it out.
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-
> tp11413p13477.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-
> KafkaProducer-Data-Loss-tp11413p13479.html
> To unsubscribe from Fink: KafkaProducer Data Loss, click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code=11413=bm5pbmFkQGdtYWlsLmNvbXwxMTQxM3wtNTE2ODM5Mzg5>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer=instant_html%21nabble%3Aemail.naml=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13480.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Re: Fink: KafkaProducer Data Loss

2017-06-04 Thread Tzu-Li (Gordon) Tai
Thanks for the updates and testing efforts on this!

I’m sorry that I currently haven’t found the change to look closely into the 
testing scenarios you’ve listed, yet.
But please keep us updated on this thread after testing it out also with the 
Cloudera build.

One other suggestion for your test to make sure that some failed record is 
actually retried: you can add a dummy verifying operator right before the Kafka 
sink.
At least that way you should be able to eliminate the possibility that the 
Kafka sink is incorrectly ignoring failed records when checkpointing. From 
another look at the Kafka sink code, I’m pretty sure this shouldn’t be the case.

Many thanks,
Gordon

On 4 June 2017 at 2:14:40 PM, ninad (nni...@gmail.com) wrote:

I tested this with the standalone cluster, and I don't see this problem. So,  
the problem could be that we haven't built Flink against cloudera Hadoop? I  
will test it out.  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13477.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Fink: KafkaProducer Data Loss

2017-06-04 Thread ninad
I tested this with the standalone cluster, and I don't see this problem. So,
the problem could be that we haven't built Flink against cloudera Hadoop? I
will test it out.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13477.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Fink: KafkaProducer Data Loss

2017-06-02 Thread ninad
.stack.qadev.corp:8022/user/harmony_user/flink-checkpoint/ea51350d9d689b2b09ab8fd2fe0f6454/chk-12/8373feed-7494-4b43-98ae-e1c4214b7890
[34820 bytes]}].*


Assuming that I am correctly interpreting the logs, I can think of two
conclusions about why we observed data loss in the first case:

1) I am missing some Flink setting.

2) Flink thought it check-pointed the windows data successfully, but didn't.
(We're using cloudera hadoop, but haven't built Flink with cloudera hadoop
binaries. )

3) Flink is not set up to check-point the data in session windows before
they are merged?

I have attached the log files for the successful run with this post.

Please let us know what you guys think. Thanks for your patience. 

jobManagerNoDataLoss.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13467/jobManagerNoDataLoss.log>
   
tmOneNoDataLoss.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13467/tmOneNoDataLoss.log>
   
tmTwoNoDataLoss.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13467/tmTwoNoDataLoss.log>
  







--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13467.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Fink: KafkaProducer Data Loss

2017-06-02 Thread ninad
Thanks Till. The log files I have attached are the complete logs. They are
DEBUG level. There are three files:
jobManger.log, tmOne.log and tmTwo.log.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13463.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Fink: KafkaProducer Data Loss

2017-06-02 Thread Till Rohrmann
Hi Ninad,

After recovery, the job should continue from where the last checkpoint was
taken. Thus, it should output all remaining messages at least once to Kafka.

Could you share the complete JobManager and TaskManager logs with us? Maybe
they contain some information which could be helpful to get to the bottom
of the problem.

Cheers,
Till

On Fri, Jun 2, 2017 at 4:32 PM, ninad <nni...@gmail.com> wrote:

> Thanks Gordon.
>
> *2017-06-01 20:22:44,400 WARN
> org.apache.kafka.clients.producer.internals.Sender - Got error
> produce response with correlation id 4 on topic-partit
> ion topic.http.stream.event.processor-0, retrying (9 attempts left).
> Error: NOT_ENOUGH_REPLICAS
>
> , not sure if this may be related to not being build with the Cloudera
> binaries.*
>
> This seems normal when kafka is down.
>
> *Could you provide info on how exactly you’re verifying the lost messages?*
>
> Our use case is pretty simple.
>
> 1) Source - Kafka (Flink task id: b93b267f087865c245babeb259c76593)
> 2) Group by key
> 3) Apply session window
> 4) Sink - Kafka
>
> 2, 3, 4 are assigned task id: b4a5c72b52779ab9b2b093b85b8b20c9
>
> We bring down all kafka brokers once flink has received the messages from
> Kafka.
> Flink tries to send the messages to Kafka sink, but isn't able to because
> Kafka is down. This task fails:
>
> *2017-06-01 20:22:44,426 INFO  org.apache.flink.runtime.taskmanager.Task
> - Attempting to fail task externally
> TriggerWindow(ProcessingTimeSessionWindows(3),
> ListStateDescriptor{serializer=org.apache.flink.
> api.java.typeutils.runtime.TupleSerializer@e56b3293},
> ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521)) ->
> Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9).
> *
>
> Both tasks fail and this is communicated to job manager.
>
> Job Manager fails the job:
> *2017-06-01 20:22:44,500 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
> event-filter (510a7a83f509adace6704e7f2caa0b75) switched from state
> RUNNING
> to FAILING.
> *
>
> Job Manager restarts the job again:
> *2017-06-01 20:22:44,530 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Restarting
> the job event-filter (510a7a83f509adace6704e7f2caa0b75).
> *
>
> At this point we're expecting that the Flink task to send to Kafka should
> be
> recovered, because it wasn't successfully committed. I see some similar
> logs
> in job manager logs:
>
> *2017-06-01 20:22:54,536 INFO
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
> Recovering checkpoints from ZooKeeper.
> 2017-06-01 20:22:54,543 INFO
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
> Found 1 checkpoints in ZooKeeper.
> 2017-06-01 20:22:54,543 INFO
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
> Trying to retrieve checkpoint 7.
> 2017-06-01 20:22:54,585 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring
> from latest valid checkpoint: Checkpoint 7 @ 1496348508893 for
> 510a7a83f509adace6704e7f2caa0b75.*
>
> Now, if I bring up all Kafka brokers, I am expecting that the messages
> which
> didn't make it to the Kafka sink should be sent.
>
> But that's not happening.
>
> All these logs are present in the files that I attached.
>
> I am going to try this on the standalone cluster today.
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-
> tp11413p13458.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Fink: KafkaProducer Data Loss

2017-06-02 Thread ninad
Thanks Gordon.

*2017-06-01 20:22:44,400 WARN 
org.apache.kafka.clients.producer.internals.Sender - Got error 
produce response with correlation id 4 on topic-partit 
ion topic.http.stream.event.processor-0, retrying (9 attempts left). 
Error: NOT_ENOUGH_REPLICAS 

, not sure if this may be related to not being build with the Cloudera
binaries.*

This seems normal when kafka is down. 

*Could you provide info on how exactly you’re verifying the lost messages?*

Our use case is pretty simple. 

1) Source - Kafka (Flink task id: b93b267f087865c245babeb259c76593)
2) Group by key
3) Apply session window
4) Sink - Kafka

2, 3, 4 are assigned task id: b4a5c72b52779ab9b2b093b85b8b20c9

We bring down all kafka brokers once flink has received the messages from
Kafka. 
Flink tries to send the messages to Kafka sink, but isn't able to because
Kafka is down. This task fails:

*2017-06-01 20:22:44,426 INFO  org.apache.flink.runtime.taskmanager.Task

- Attempting to fail task externally
TriggerWindow(ProcessingTimeSessionWindows(3),
ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293},
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521)) ->
Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9).
*

Both tasks fail and this is communicated to job manager.

Job Manager fails the job:
*2017-06-01 20:22:44,500 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
event-filter (510a7a83f509adace6704e7f2caa0b75) switched from state RUNNING
to FAILING.
*

Job Manager restarts the job again:
*2017-06-01 20:22:44,530 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph- Restarting
the job event-filter (510a7a83f509adace6704e7f2caa0b75).
*

At this point we're expecting that the Flink task to send to Kafka should be
recovered, because it wasn't successfully committed. I see some similar logs
in job manager logs:

*2017-06-01 20:22:54,536 INFO 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
Recovering checkpoints from ZooKeeper.
2017-06-01 20:22:54,543 INFO 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
Found 1 checkpoints in ZooKeeper.
2017-06-01 20:22:54,543 INFO 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
Trying to retrieve checkpoint 7.
2017-06-01 20:22:54,585 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Restoring
from latest valid checkpoint: Checkpoint 7 @ 1496348508893 for
510a7a83f509adace6704e7f2caa0b75.*

Now, if I bring up all Kafka brokers, I am expecting that the messages which
didn't make it to the Kafka sink should be sent. 

But that's not happening. 

All these logs are present in the files that I attached. 

I am going to try this on the standalone cluster today. 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13458.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Fink: KafkaProducer Data Loss

2017-06-02 Thread Tzu-Li (Gordon) Tai
 
riptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@e56b3293},
  
ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:521  
)) -> Sink: sink.http.sep (1/1) (b4a5c72b52779ab9b2b093b85b8b20c9) switched  
from RUNNING to FAILED.  
TimerException{java.lang.RuntimeException: Could not forward element to next  
operator}  

*Task Manager 2* jobManager.log  
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13443/jobManager.log>
  
taskManager.log  
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13443/taskManager.log>
  
taskManager.log  
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n13443/taskManager.log>
  
2017-06-01 20:22:54,741 DEBUG  
org.apache.flink.runtime.io.network.partition.ResultPartition - Source:  
Custom Source (1/1) (8ee2c8a628968bc3f8006f0740bb8ad1):  
Initialized ResultPartition  
8d68b9c00d6a329d70ee2bf1ed320318@8ee2c8a628968bc3f8006f0740bb8ad1  
[PIPELINED, 1 subpartitions, 1 pending references]  
2017-06-01 20:22:54,760 INFO org.apache.flink.yarn.YarnTaskManager  
- Received task Source: Custom Source (1/1)  

2017-06-01 20:27:30,388 WARN org.apache.kafka.clients.NetworkClient  
- Error while fetching metadata with correlation id 1 :  
{topic.event.filter=LEADER_NOT_AVAILABLE}  




--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13443.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Re: Fink: KafkaProducer Data Loss

2017-06-01 Thread Tzu-Li (Gordon) Tai
Hi Ninad,

This exception you’re seeing does not cause data loss. As a matter of fact, its 
preventing data loss based on how Flink’s checkpoints / fault-tolerance works.

So, a recap of what the problem was when this “uncaught exception leak” issue 
was first reported:
Prior to the fix, on checkpoints the Flink Kafka producer did not check for any 
async produce errors, therefore voiding the at-least-once guarantee of the sink.
In other words, the checkpoint was incorrectly succeeding without respecting 
that some previous data wasn’t sent to Kafka.

The fix included in 1.1.5 / 1.2.1 basically corrects this by rethrowing any 
async errors that occurred before the checkpoint happened, and fails the 
checkpoint snapshot (as what you are observing from this exception).

When a failure occurs in the job, Flink uses the last completed checkpoint to 
restart the job. In the case of the Flink Kafka producer, this essentially 
makes sure that records which did not make it into Kafka and caused the last 
run to fail are reprocessed and sent to Kafka again.

Hope this helps!

Cheers,
Gordon

On 1 June 2017 at 12:15:47 PM, Kostas Kloudas (k.klou...@data-artisans.com) 
wrote:

Hi Ninad,  

I think that Gordon could shed some more light on this but I suggest  
you should update your Flink version to at least the 1.2.  

The reason is that we are already in the process of releasing Flink 1.3  
(which will come probably today) and a lot of things have  
changed/fixed/improved since the 1.1 release. In fact, it would help us  
a lot if you could check if your problem still exists in the upcoming 1.3 
release.  

In addition, I suppose that the 1.1 release will soon be not supported  
anymore.  

Cheers,  
Kostas  

> On Jun 1, 2017, at 12:15 AM, ninad <nni...@gmail.com> wrote:  
>  
> Thanks for the fix guys. I am trying to test this with 1.1.5, but still  
> seeing a data loss. I am not able to get much from logs except this:  
>  
> Here's our use case:  
>  
> 1) Consume from Kafka  
> 2) Apply session window  
> 3) Send messages of window to Kafka  
>  
> If there's a failure in step 3, because all kafka brokers are down, we see a  
> data loss.  
>  
> Here are relevant logs:  
>  
> java.lang.Exception: Could not perform checkpoint 2 for operator  
> TriggerWindow(ProcessingTimeSessionWindows(3),  
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67},
>   
> ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) ->  
> Sink: sink.http.sep (2/4).  
> at  
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:611)
>   
> at  
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:360)
>   
> at  
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:272)
>   
> at  
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:174)
>   
> at  
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:195)
>   
> at  
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:67)
>   
> at  
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
>   
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)  
> at java.lang.Thread.run(Thread.java:745)  
> Caused by: java.lang.Exception: Could not perform the checkpoint 2 for 0th  
> operator in chain.  
> at  
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:666)
>   
> at  
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:603)
>   
> ... 8 more  
> Caused by: java.lang.Exception: Failed to snapshot function state of  
> TriggerWindow(ProcessingTimeSessionWindows(3),  
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67},
>   
> ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) ->  
> Sink: sink.http.sep (2/4).  
> at  
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:139)
>   
> at  
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:652)
>   
> ... 9 more  
> Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired  
> at  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:366)
>   
> at  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:335)
>   
>  
>  
>  
>  
>  
>  
> --  
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13412.html
>   
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.  



Re: Fink: KafkaProducer Data Loss

2017-06-01 Thread Kostas Kloudas
Hi Ninad,

I think that Gordon could shed some more light on this but I suggest 
you should update your Flink version to at least the 1.2. 

The reason is that we are already in the process of releasing Flink 1.3 
(which will come probably today) and a lot of things have 
changed/fixed/improved since the 1.1 release. In fact, it would help us
a lot if you could check if your problem still exists in the upcoming 1.3 
release.

In addition, I suppose that the 1.1 release will soon be not supported 
anymore.

Cheers,
Kostas

> On Jun 1, 2017, at 12:15 AM, ninad <nni...@gmail.com> wrote:
> 
> Thanks for the fix guys. I am trying to test this with 1.1.5, but still
> seeing a data loss. I am not able to get much from logs except this:
> 
> Here's our use case:
> 
> 1) Consume from Kafka
> 2) Apply session window
> 3) Send messages of window to Kafka
> 
> If there's a failure in step 3, because all kafka brokers are down, we see a
> data loss. 
> 
> Here are relevant logs:
> 
> java.lang.Exception: Could not perform checkpoint 2 for operator
> TriggerWindow(ProcessingTimeSessionWindows(3),
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67},
> ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) ->
> Sink: sink.http.sep (2/4).
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:611)
>   at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:360)
>   at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:272)
>   at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:174)
>   at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:195)
>   at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:67)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not perform the checkpoint 2 for 0th
> operator in chain.
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:666)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:603)
>   ... 8 more
> Caused by: java.lang.Exception: Failed to snapshot function state of
> TriggerWindow(ProcessingTimeSessionWindows(3),
> ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@61ef6d67},
> ProcessingTimeTrigger(), WindowedStream.apply(WindowedStream.java:257)) ->
> Sink: sink.http.sep (2/4).
>   at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:139)
>   at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:652)
>   ... 9 more
> Caused by: java.lang.Exception: Failed to send data to Kafka: Batch Expired
>   at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:366)
>   at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:335)
> 
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p13412.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Fink: KafkaProducer Data Loss

2017-02-03 Thread ninad
Thanks, Gordon and Till.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413p11431.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Fink: KafkaProducer Data Loss

2017-02-02 Thread Tzu-Li (Gordon) Tai
Hi Ninad and Till,

Thank you for looking into the issue! This is actually a bug.

Till’s suggestion is correct:
The producer holds a `pendingRecords` value that is incremented on each 
invoke() and decremented on each callback, used to check if the producer needs 
to sync on pending callbacks on checkpoints.
On each checkpoint, we should only consider the checkpoint succeeded iff after 
flushing the `pendingRecords == 0` and `asyncException == null` (currently, 
we’re only checking `pendingRecords`).

A quick fix for this is to check and rethrow async exceptions in the 
`snapshotState` method both before and after flushing and `pendingRecords` 
becomes 0.
I’ve filed a JIRA for this: https://issues.apache.org/jira/browse/FLINK-5701.

Cheers,
Gordon

On February 3, 2017 at 6:05:23 AM, Till Rohrmann (trohrm...@apache.org) wrote:

Hi Ninad,

thanks for reporting the issue. For me it looks also as if exceptions might go 
under certain circumstances unnoticed. So for example you have a write 
operation which fails this will set the asyncException field which is not 
checked before the next invoke call happens. If now a checkpoint operation 
happens, it will pass and mark all messages up to this point as being 
successfully processed. Only after the checkpoint, the producer will fail. And 
this constitutes a data loss imho.

I've looped Robert and Gordon into the conversation which are more familiar 
with the Kafka producer. Maybe they can answer your and my questions.

Cheers,
Till

On Thu, Feb 2, 2017 at 9:58 PM, ninad <nni...@gmail.com> wrote:
Our Flink streaming workflow publishes messages to Kafka. KafkaProducer's
'retry' mechanism doesn't kick in until a message is added to it's internal
buffer.

If there's an exception before that, KafkaProducer will throw that
exception, and seems like Flink isn't handling that. In this case there will
be a data loss.

Related Flink code (FlinkKafkaProducerBase):

if (logFailuresOnly) {
            callback = new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception
e) {
                    if (e != null) {
                        LOG.error("Error while sending record to Kafka: " +
e.getMessage(), e);
                    }
                    acknowledgeMessage();
                }
            };
        }
        else {
            callback = new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception
exception) {
                    if (exception != null && asyncException == null) {
                        asyncException = exception;
                    }
                    acknowledgeMessage();
                }
            };
        }

Here are the scenario's we've identified that will cause data loss:

All kafka brokers are down.

In this case, before appending a message to it's buffer, KafkaProducer tries
to fetch metadata. If the KafkaProducer isn't able to fetch the metadata in
configured timeout, it throws an exception.
-Memory records not writable (Existing bug in kafka 0.9.0.1 library)
https://issues.apache.org/jira/browse/KAFKA-3594

In both the above cases, KafkaProducer won't retry, and Flink will ignore
the messages. the messages aren't even logged. The exception is, but not the
messages which failed.

Possible workarounds (Kafka settings):

A very high value for metadata timeout (metadata.fetch.timeout.ms)
A very high value for buffer expiry (request.timeout.ms)
We're still investigating the possible side effects of changing the above
kafka settings.

So, is our understanding correct? Or is there a way we can avoid this data
loss by modifying some Flink settings?

Thanks.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.



Re: Fink: KafkaProducer Data Loss

2017-02-02 Thread Till Rohrmann
Hi Ninad,

thanks for reporting the issue. For me it looks also as if exceptions might
go under certain circumstances unnoticed. So for example you have a write
operation which fails this will set the asyncException field which is not
checked before the next invoke call happens. If now a checkpoint operation
happens, it will pass and mark all messages up to this point as being
successfully processed. Only after the checkpoint, the producer will fail.
And this constitutes a data loss imho.

I've looped Robert and Gordon into the conversation which are more familiar
with the Kafka producer. Maybe they can answer your and my questions.

Cheers,
Till

On Thu, Feb 2, 2017 at 9:58 PM, ninad <nni...@gmail.com> wrote:

> Our Flink streaming workflow publishes messages to Kafka. KafkaProducer's
> 'retry' mechanism doesn't kick in until a message is added to it's internal
> buffer.
>
> If there's an exception before that, KafkaProducer will throw that
> exception, and seems like Flink isn't handling that. In this case there
> will
> be a data loss.
>
> Related Flink code (FlinkKafkaProducerBase):
>
> if (logFailuresOnly) {
> callback = new Callback() {
> @Override
> public void onCompletion(RecordMetadata metadata, Exception
> e) {
> if (e != null) {
> LOG.error("Error while sending record to Kafka: " +
> e.getMessage(), e);
> }
> acknowledgeMessage();
> }
> };
> }
> else {
> callback = new Callback() {
> @Override
> public void onCompletion(RecordMetadata metadata, Exception
> exception) {
> if (exception != null && asyncException == null) {
> asyncException = exception;
> }
> acknowledgeMessage();
> }
> };
> }
>
> Here are the scenario's we've identified that will cause data loss:
>
> All kafka brokers are down.
>
> In this case, before appending a message to it's buffer, KafkaProducer
> tries
> to fetch metadata. If the KafkaProducer isn't able to fetch the metadata in
> configured timeout, it throws an exception.
> -Memory records not writable (Existing bug in kafka 0.9.0.1 library)
> https://issues.apache.org/jira/browse/KAFKA-3594
>
> In both the above cases, KafkaProducer won't retry, and Flink will ignore
> the messages. the messages aren't even logged. The exception is, but not
> the
> messages which failed.
>
> Possible workarounds (Kafka settings):
>
> A very high value for metadata timeout (metadata.fetch.timeout.ms)
> A very high value for buffer expiry (request.timeout.ms)
> We're still investigating the possible side effects of changing the above
> kafka settings.
>
> So, is our understanding correct? Or is there a way we can avoid this data
> loss by modifying some Flink settings?
>
> Thanks.
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-
> tp11413.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Fink: KafkaProducer Data Loss

2017-02-02 Thread ninad
Our Flink streaming workflow publishes messages to Kafka. KafkaProducer's
'retry' mechanism doesn't kick in until a message is added to it's internal
buffer.

If there's an exception before that, KafkaProducer will throw that
exception, and seems like Flink isn't handling that. In this case there will
be a data loss.

Related Flink code (FlinkKafkaProducerBase):

if (logFailuresOnly) {
callback = new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception
e) {
if (e != null) {
LOG.error("Error while sending record to Kafka: " +
e.getMessage(), e);
}
acknowledgeMessage();
}
};
}
else {
callback = new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception
exception) {
if (exception != null && asyncException == null) {
asyncException = exception;
}
acknowledgeMessage();
}
};
}

Here are the scenario's we've identified that will cause data loss:

All kafka brokers are down.

In this case, before appending a message to it's buffer, KafkaProducer tries
to fetch metadata. If the KafkaProducer isn't able to fetch the metadata in
configured timeout, it throws an exception.
-Memory records not writable (Existing bug in kafka 0.9.0.1 library)
https://issues.apache.org/jira/browse/KAFKA-3594

In both the above cases, KafkaProducer won't retry, and Flink will ignore
the messages. the messages aren't even logged. The exception is, but not the
messages which failed.

Possible workarounds (Kafka settings):

A very high value for metadata timeout (metadata.fetch.timeout.ms)
A very high value for buffer expiry (request.timeout.ms)
We're still investigating the possible side effects of changing the above
kafka settings.

So, is our understanding correct? Or is there a way we can avoid this data
loss by modifying some Flink settings?

Thanks.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-tp11413.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.