[GitHub] flink pull request #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2....

2017-07-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4321


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2....

2017-07-19 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4321#discussion_r128193424
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
 ---
@@ -744,17 +745,21 @@ void 
reassignPartitions(List newPartit
final OneShotLatch continueAssignmentLatch) {
 
final KafkaConsumer mockConsumer = 
mock(KafkaConsumer.class);
+   final AtomicInteger callCounter = new AtomicInteger();
+
when(mockConsumer.assignment()).thenAnswer(new Answer() 
{
@Override
public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-   if (midAssignmentLatch != null) {
-   midAssignmentLatch.trigger();
-   }
+   // first call is not the one that we want to 
catch... we all love mocks, don't we?
--- End diff --

This change is no longer needed once I dropped `this.hasAssignedPartitions 
= !consumer.assignment().isEmpty();` assignment (it was `the first call` that 
was causing the problems)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2....

2017-07-19 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4321#discussion_r128178914
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -210,14 +214,28 @@ public void run() {
}
 
try {
-   newPartitions = 
unassignedPartitionsQueue.pollBatch();
+   if (hasAssignedPartitions) {
+   newPartitions = 
unassignedPartitionsQueue.pollBatch();
+   }
+   else {
+   // if no assigned partitions 
block until we get at least one
+   // instead of hot spinning this 
loop. We relay on a fact that
--- End diff --

nit: typo? relay -> rely


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2....

2017-07-19 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4321#discussion_r128180555
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -335,6 +356,9 @@ public void setOffsetsToCommit(Map offsetsToC
 */
@VisibleForTesting
void reassignPartitions(List 
newPartitions) throws Exception {
+   if (newPartitions.size() > 0) {
+   hasAssignedPartitions = true;
+   }
--- End diff --

Should we actually extend this `if` block to wrap the whole code in 
`reassignPartitions`? I.e., we shouldn't be doing the reassignment logic if 
`newPartitions.size() == 0`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2....

2017-07-19 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4321#discussion_r128180697
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
 ---
@@ -744,17 +745,21 @@ void 
reassignPartitions(List newPartit
final OneShotLatch continueAssignmentLatch) {
 
final KafkaConsumer mockConsumer = 
mock(KafkaConsumer.class);
+   final AtomicInteger callCounter = new AtomicInteger();
+
when(mockConsumer.assignment()).thenAnswer(new Answer() 
{
@Override
public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-   if (midAssignmentLatch != null) {
-   midAssignmentLatch.trigger();
-   }
+   // first call is not the one that we want to 
catch... we all love mocks, don't we?
--- End diff --

Lets remove the last part about loving mocks ;-) I do understand your 
argument on mocking, though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2....

2017-07-19 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4321#discussion_r128179835
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -151,6 +154,7 @@ public void run() {
// including concurrent 'close()' calls.
try {
this.consumer = getConsumer(kafkaProperties);
+   this.hasAssignedPartitions = 
!consumer.assignment().isEmpty();
--- End diff --

Can't we just start with `false` here?
We'll only ever get partitions once we enter the main fetch loop.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2....

2017-07-19 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4321#discussion_r128181037
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
 ---
@@ -744,17 +745,21 @@ void 
reassignPartitions(List newPartit
final OneShotLatch continueAssignmentLatch) {
 
final KafkaConsumer mockConsumer = 
mock(KafkaConsumer.class);
+   final AtomicInteger callCounter = new AtomicInteger();
+
when(mockConsumer.assignment()).thenAnswer(new Answer() 
{
@Override
public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-   if (midAssignmentLatch != null) {
-   midAssignmentLatch.trigger();
-   }
+   // first call is not the one that we want to 
catch... we all love mocks, don't we?
--- End diff --

Could you explain a bit on "first call is not the one that we want to 
catch"? Which test was failing? I have the feeling that this could have been 
fixed in a different way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2....

2017-07-13 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4321

[FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2.1

This patch fixes also an incompatibility with the latest Kafka 0.10.x and 
0.11.x kafka-clients.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink kafka010

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4321.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4321


commit e8aac4d3842c433ffc40e36c696950057e5139b9
Author: Piotr Nowojski 
Date:   2017-07-13T11:58:29Z

[FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2.1




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---