[GitHub] flink issue #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter inter...

2018-01-11 Thread tzulitai
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5200 Subsumed be re-opened PR: #5284 ---

[GitHub] flink issue #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter inter...

2018-01-10 Thread tzulitai
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5200 As discussed offline: Will re-open a PR for this that avoids introducing a new interface. I agree that it would make more sense to introduce new interface when we actually decide to go for a

[GitHub] flink issue #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter inter...

2018-01-10 Thread pnowojski
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/5200 Regarding the race condition. I think we meant the same thing or maybe I misunderstood part of the code, but yes, this NPE seems like a possible outcome of this race condition. If you

[GitHub] flink issue #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter inter...

2018-01-09 Thread tzulitai
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5200 Regarding the race condition you mentioned: hmm, I can't seem to exactly nail down the "and only first `successfulCommits.inc()` can be omitted because of that" case you mentioned, could you

[GitHub] flink issue #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter inter...

2018-01-09 Thread tzulitai
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5200 @pnowojski thanks a lot for your insightful review! regarding the choice of composition or inheritance: actually, in the end I think we should be leaning towards neither of both, and let

[GitHub] flink issue #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter inter...

2018-01-08 Thread pnowojski
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/5200 Btw, doesn't the `org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread#setOffsetsToCommit` have a race condition on those lines: ``` // record the