[GitHub] flink issue #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter inter...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5200 Subsumed be re-opened PR: #5284 ---
[GitHub] flink issue #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter inter...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5200 As discussed offline: Will re-open a PR for this that avoids introducing a new interface. I agree that it would make more sense to introduce new interface when we actually decide to go for a major refactor ;) ---
[GitHub] flink issue #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter inter...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/5200 Regarding the race condition. I think we meant the same thing or maybe I misunderstood part of the code, but yes, this NPE seems like a possible outcome of this race condition. If you have spare cycles to fix this race feel free to do so, I would like to avoid working on it in this and the following week because of some code handing over before some lengthy vacation in my team. ---
[GitHub] flink issue #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter inter...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5200 Regarding the race condition you mentioned: hmm, I can't seem to exactly nail down the "and only first `successfulCommits.inc()` can be omitted because of that" case you mentioned, could you elaborate on that a bit more? But yes, it seems like there certainly is a race condition here, and can even cause an NPE on: https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java#L496 Seems like the `nextOffsetsToCommit` and its callback instance should be bundled as a single `AtomicReference` here ... Would you like to open a PR for that? I wanted to ask because you discovered it in the first place; if not I'll open a fix for it. ---
[GitHub] flink issue #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter inter...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5200 @pnowojski thanks a lot for your insightful review! regarding the choice of composition or inheritance: actually, in the end I think we should be leaning towards neither of both, and let offset committing and record fetching live as separate components. I've left more detailed comments inline. ---
[GitHub] flink issue #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter inter...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/5200 Btw, doesn't the `org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread#setOffsetsToCommit` have a race condition on those lines: ``` // record the work to be committed by the main consumer thread and make sure the consumer notices that if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) { log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " + "Skipping commit of previous offsets because newer complete checkpoint offsets are available. " + "This does not compromise Flink's checkpoint integrity."); } this.offsetCommitCallback = commitCallback; ``` (`getAndSet` followed by `volatile`) with `org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread#run`? However currently it doesn't seem to be so important, since: 1. it's used only for metrics 2. and only first `successfulCommits.inc()` can be omitted because of that., because (at least now) `offsetCommitCallback` is a `write-once` variable, and any subsequent overwrites, are overwriting it to the same value. ---