[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

2016-09-29 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2559
  
Closing this for #2574 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

2016-09-29 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2559
  
Actually, just discovered that the problem is different all together.

While the KafkaConsumer is polling for new data (with a timeout), it holds 
the consumer lock. If no data comes in Kafka, the lock is not released before 
the poll timeout is over.
During that time, neither a "commitSync" nor "commitAsync" call can be 
fired off. The `notifyCheckpointComplete` method hence blocks until the poll 
timeout is over and the lock is released.

We can fix this by making sure that the consumer is "woken up" to release 
the lock, and by making sure the lock acquisition is fair, so the committer 
will get it next.

For the sake of releasing the lock fast in the committer method, it should 
still be an asynchronous commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

2016-09-29 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2559
  
@tzulitai Thanks for thorough review!

I don't understand the problem why the `commitSpecificOffsetsToKafka` 
method is designed to commit synchronously. The `FlinkKafkaConsumerBase` has 
the pending checkpoints (I think that is what you refer to). It removes the 
HashMap of "offsets to commit" from the `pendingCheckpoints` Map synchronously, 
before even calling the fetcher to commit.
After that, it looks to me like it does not make a difference how that Map 
"offsets to commit" is used (sync or async)...



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

2016-09-28 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2559
  
Seems like currently only the 0.8 Kafka connector have tests related to 
offset committing (in `Kafka08ITCase`).

My two cents for testing this for now is that a IT test for correct offset 
committing back to Kafka in the 0.9 connector is sufficient (can take a look at 
`Kafka08ITCase#testOffsetInZookeeper`, but replacing `ZookeeperOffsetHandler` 
with the new `KafkaConsumer` methods). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

2016-09-28 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2559
  
@StephanEwen 
On a second look, I think the `commitSpecificOffsetsToKafka` method was 
designed to commit synchronously in the first place. `AbstractFetcher` holds a 
Map of all current pending offsets for committing by checkpointID, and on every 
`notifyCheckpointComplete` the offsets are removed from the Map before 
`commitSpecificOffsetsToKafka` is called.

So, for async committing, I think we need to remove cleaning up the offsets 
in `AbstractFetcher#notifyCheckpointComplete()` and instead clean them up in a 
new separate callback handle method in `AbstractFetcher`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

2016-09-28 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2559
  
Thanks @tzulitai for looking at this. I will leave the offset then as it is 
(fixed via followup) and 

The Kafka 0.8 connector needs a similar change. This here is encountered by 
a user, so I wanted to get the 0.9 fix in faster. Will do a follow-up for Kafka 
0.8. Will also correct the issue tag ;-)

I have no good idea how to test this, though, so any thoughts there are 
welcome!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

2016-09-28 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2559
  
@StephanEwen I think you've tagged the wrong Github ID for Robert ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

2016-09-28 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2559
  
Btw, just curious, does 0.8 Kafka connector have the same issue with sync 
committing? I haven't looked into the code for this, but just wondering if we 
need a ticket for 0.8 too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

2016-09-28 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2559
  
Just had a look at the API of `commitAsync`, and it seems like the 
committed offsets back to Kafka through this API (likewise for `commitSync`) 
need to be `lastProcessedMessageOffset + 1` 
([https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync(java.util.Map,%20org.apache.kafka.clients.consumer.OffsetCommitCallback)](https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitAsync(java.util.Map,%20org.apache.kafka.clients.consumer.OffsetCommitCallback))).

This mainly effects that when starting from group offsets in Kafka, 
`FlinkKafkaConsumer09` currently starts from the wrong offset. There's a 
separate JIRA for this bug: 
[FLINK-4618](https://issues.apache.org/jira/browse/FLINK-4618).

Another contributor had already picked up FLINK-4618, so I'd say it's ok to 
leave this PR as it is. I'll help check on FLINK-4618 progress and make sure it 
gets merged after this PR.

Minus the above, this looks good to me. +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2559: [FLINK-4702] [kafka connector] Commit offets to Kafka asy...

2016-09-28 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2559
  
@robert and @tzulitai What is your take on this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---