[GitHub] flink issue #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter inter...

2018-01-11 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5200
  
Subsumed be re-opened PR: #5284 


---


[GitHub] flink issue #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter inter...

2018-01-10 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5200
  
As discussed offline:
Will re-open a PR for this that avoids introducing a new interface. I agree 
that it would make more sense to introduce new interface when we actually 
decide to go for a major refactor ;)


---


[GitHub] flink issue #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter inter...

2018-01-10 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/5200
  
Regarding the race condition. I think we meant the same thing or maybe I 
misunderstood part of the code, but yes, this NPE seems like a possible outcome 
of this race condition.

If you have spare cycles to fix this race feel free to do so, I would like 
to avoid working on it in this and the following week because of some code 
handing over before some lengthy vacation in my team.


---


[GitHub] flink issue #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter inter...

2018-01-09 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5200
  
Regarding the race condition you mentioned:
hmm, I can't seem to exactly nail down the "and only first 
`successfulCommits.inc()` can be omitted because of that" case you mentioned, 
could you elaborate on that a bit more?

But yes, it seems like there certainly is a race condition here, and can 
even cause an NPE on:

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java#L496

Seems like the `nextOffsetsToCommit` and its callback instance should be 
bundled as a single `AtomicReference` here ...

Would you like to open a PR for that? I wanted to ask because you 
discovered it in the first place; if not I'll open a fix for it.


---


[GitHub] flink issue #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter inter...

2018-01-09 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5200
  
@pnowojski thanks a lot for your insightful review!

regarding the choice of composition or inheritance: actually, in the end I 
think we should be leaning towards neither of both, and let offset committing 
and record fetching live as separate components. I've left more detailed 
comments inline.


---


[GitHub] flink issue #5200: [FLINK-8306] [kafka] Introduce KafkaOffsetCommitter inter...

2018-01-08 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/5200
  
Btw, doesn't the 
`org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread#setOffsetsToCommit`
 have a race condition on those lines:


```
// record the work to be committed by the main consumer thread 
and make sure the consumer notices that
if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
log.warn("Committing offsets to Kafka takes longer than 
the checkpoint interval. " +
"Skipping commit of previous offsets 
because newer complete checkpoint offsets are available. " +
"This does not compromise Flink's 
checkpoint integrity.");
}
this.offsetCommitCallback = commitCallback;
```

(`getAndSet` followed by `volatile`) with 
`org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread#run`? 
However currently it doesn't seem to be so important, since:

1. it's used only for metrics
2. and only first `successfulCommits.inc()` can be omitted because of 
that., because (at least now) `offsetCommitCallback` is a `write-once` 
variable, and any subsequent overwrites, are overwriting it to the same value.


---