[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

2018-04-13 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5634
  
@juhoautio

Your concerns for this fix is quite correct, and is why this PR was closed 
in the first place as there are a lot of ill-defined semantics introduced by 
this.

Regarding your thought on relating idleness to empty results returned from 
Kafka: I think that seems like a good approach, and should also capture Eron's 
comment quite well.


---


[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

2018-04-13 Thread juhoautio
Github user juhoautio commented on the issue:

https://github.com/apache/flink/pull/5634
  
@tzulitai did you ever test your code? I tried it and it allowed watermarks 
to proceed but apparently too aggressively, as it caused a lot of data loss.

I'm looking for a quick fix for this issue, as it seems that FLINK-5479 
won't be fixed too soon. So I would very much like to hear if you have been 
able to fix this in some lighter way.

My understanding of your PR is that it doesn't work reliably because it 
just seems to add an internal timeout, that could be surpassed whenever the 
consumer is for example busy consuming other partitions. Please comment if this 
perception is wrong.

I'm thinking that it should instead get the information that a partition 
was idle from the kafka client, and only in that case (empty result from 
client) create a newer watermark for that partition. It shouldn't mark the 
partition to some idle state – and shouldn't create newer watermarks 
periodically without any connection to another empty result from the client. 
New watermarks should be only generated as a callback of the kafka client 
result..?


---


[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

2018-03-08 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5634
  
Just saw a good comment from @EronWright 

> I think the ideal would be that idleness would occur only for tail reads, 
i.e. due to a timeout from `kafkaConsumer.poll(pollTimeout)`. In other words, 
an intermittent connection issue would ideally not trigger idleness.

Let's see if we can get that into the design somehow, without having too 
specific logic inside the Kafka Consumer (making the Kafka Consumer more 
complex is my personal Pet Peeve)


---


[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

2018-03-07 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5634
  
This is a good proposal, it should also survive a general connector 
refactor that will be necessary to address other code duplication. The Kinesis 
ticket is https://issues.apache.org/jira/browse/FLINK-5697 and I will add a 
reference back to this thread. I would be happy to add the watermark support 
based on the revised generator. Perhaps it would be good to recognize the 
special "idle" watermark in SourceContext also?


---


[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

2018-03-07 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5634
  
@tweise @tzulitai 

I would suggest to solve this the following way, which should be both 
simple and cover our cases:

  - We extend the current periodic watermark generators for idleness. We 
can do that for example by maintaining a record counter and remembering the 
last counter and a System.nanoTime() timestamp each time the call whether to 
generate a watermark is called. If no record came for too long, return a 
special watermark object that indicated "idle". Or change the return type to 
return either 'none', 'idle', or 'watermark'

  - The Kinesis Concumer needs per-shard watermarks, same way as the Kafka 
Consumer does. That part needs to be added to the Kinesis consumer anyways.

That way, we automatically get per-shard idleness in Kinesis and 
per-partition idleness in Kafka without doing anything specific for the source 
connectors.

We can then also remove the idleness logic from the source context - it 
would be duplicate there.


---


[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

2018-03-07 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5634
  
@tzulitai @StephanEwen the current idleness detection in the source context 
isn't a replacement for what is required to deal with an inactive partition (or 
Kinesis shard). When a connector subtask consumes multiple partitions and one 
is idle, then it should be possible to still generate a watermark. This can 
only be solved outside of the connector when the multiple source partitions are 
visible (like it would be for an operator with multiple input streams).


---


[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

2018-03-07 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5634
  
I'm closing this PR now, as it seems the overall agreement is that we want 
to implement this differently, or at least not touch the Kafka connector code 
any more now.


---


[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

2018-03-06 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5634
  
A few points to mention:

1. @StephanEwen there is already common idleness detection implemented 
within `SourceContext`s, see `StreamSourceContexts`. The idleness detection, 
however, is currently always disabled and we do not allow users to configure it.

2. I agree that we shouldn't add anything more to the connector, have also 
discussed this offline with @aljoscha. We should maybe do this only as part of 
the new connector rework that @tweise and I were talking about, in 1.6.


---


[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

2018-03-06 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5634
  
There was a related discussion on the mailing list; this and several other 
features could be provided by a common connector framework. Such initiative is 
a much larger effort though and it is not clear to me that users can wait? The 
Kinesis consumer has virtually identical requirements and we have already 
written custom code for it.


---


[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

2018-03-06 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5634
  
I would suggest to approach this in a different way.

  1. Idleness detection is something that watermark generation benefits 
from in general, not just in Kafka
  2. Unless there is a very strong reason, I would not want to add anything 
anymore to the Kafka Connector. This connector implementation is so big 
already. We saw multiple issues in the past, where the Kafka Connector's 
complexity was the cause of problems.


---


[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...

2018-03-05 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/5634
  
I think the ideal would be that idleness would occur only for tail reads, 
i.e. due to a timeout from `kafkaConsumer.poll(pollTimeout)`.In other 
words, an intermittent connection issue would ideally not trigger idleness.


---