[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5634 @juhoautio Your concerns for this fix is quite correct, and is why this PR was closed in the first place as there are a lot of ill-defined semantics introduced by this. Regarding your thought on relating idleness to empty results returned from Kafka: I think that seems like a good approach, and should also capture Eron's comment quite well. ---
[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...
Github user juhoautio commented on the issue: https://github.com/apache/flink/pull/5634 @tzulitai did you ever test your code? I tried it and it allowed watermarks to proceed but apparently too aggressively, as it caused a lot of data loss. I'm looking for a quick fix for this issue, as it seems that FLINK-5479 won't be fixed too soon. So I would very much like to hear if you have been able to fix this in some lighter way. My understanding of your PR is that it doesn't work reliably because it just seems to add an internal timeout, that could be surpassed whenever the consumer is for example busy consuming other partitions. Please comment if this perception is wrong. I'm thinking that it should instead get the information that a partition was idle from the kafka client, and only in that case (empty result from client) create a newer watermark for that partition. It shouldn't mark the partition to some idle state â and shouldn't create newer watermarks periodically without any connection to another empty result from the client. New watermarks should be only generated as a callback of the kafka client result..? ---
[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5634 Just saw a good comment from @EronWright > I think the ideal would be that idleness would occur only for tail reads, i.e. due to a timeout from `kafkaConsumer.poll(pollTimeout)`. In other words, an intermittent connection issue would ideally not trigger idleness. Let's see if we can get that into the design somehow, without having too specific logic inside the Kafka Consumer (making the Kafka Consumer more complex is my personal Pet Peeve) ---
[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5634 This is a good proposal, it should also survive a general connector refactor that will be necessary to address other code duplication. The Kinesis ticket is https://issues.apache.org/jira/browse/FLINK-5697 and I will add a reference back to this thread. I would be happy to add the watermark support based on the revised generator. Perhaps it would be good to recognize the special "idle" watermark in SourceContext also? ---
[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5634 @tweise @tzulitai I would suggest to solve this the following way, which should be both simple and cover our cases: - We extend the current periodic watermark generators for idleness. We can do that for example by maintaining a record counter and remembering the last counter and a System.nanoTime() timestamp each time the call whether to generate a watermark is called. If no record came for too long, return a special watermark object that indicated "idle". Or change the return type to return either 'none', 'idle', or 'watermark' - The Kinesis Concumer needs per-shard watermarks, same way as the Kafka Consumer does. That part needs to be added to the Kinesis consumer anyways. That way, we automatically get per-shard idleness in Kinesis and per-partition idleness in Kafka without doing anything specific for the source connectors. We can then also remove the idleness logic from the source context - it would be duplicate there. ---
[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5634 @tzulitai @StephanEwen the current idleness detection in the source context isn't a replacement for what is required to deal with an inactive partition (or Kinesis shard). When a connector subtask consumes multiple partitions and one is idle, then it should be possible to still generate a watermark. This can only be solved outside of the connector when the multiple source partitions are visible (like it would be for an operator with multiple input streams). ---
[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5634 I'm closing this PR now, as it seems the overall agreement is that we want to implement this differently, or at least not touch the Kafka connector code any more now. ---
[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5634 A few points to mention: 1. @StephanEwen there is already common idleness detection implemented within `SourceContext`s, see `StreamSourceContexts`. The idleness detection, however, is currently always disabled and we do not allow users to configure it. 2. I agree that we shouldn't add anything more to the connector, have also discussed this offline with @aljoscha. We should maybe do this only as part of the new connector rework that @tweise and I were talking about, in 1.6. ---
[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...
Github user tweise commented on the issue: https://github.com/apache/flink/pull/5634 There was a related discussion on the mailing list; this and several other features could be provided by a common connector framework. Such initiative is a much larger effort though and it is not clear to me that users can wait? The Kinesis consumer has virtually identical requirements and we have already written custom code for it. ---
[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5634 I would suggest to approach this in a different way. 1. Idleness detection is something that watermark generation benefits from in general, not just in Kafka 2. Unless there is a very strong reason, I would not want to add anything anymore to the Kafka Connector. This connector implementation is so big already. We saw multiple issues in the past, where the Kafka Connector's complexity was the cause of problems. ---
[GitHub] flink issue #5634: [FLINK-5479] [kafka] Idleness detection for periodic per-...
Github user EronWright commented on the issue: https://github.com/apache/flink/pull/5634 I think the ideal would be that idleness would occur only for tail reads, i.e. due to a timeout from `kafkaConsumer.poll(pollTimeout)`.In other words, an intermittent connection issue would ideally not trigger idleness. ---