[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16098200#comment-16098200
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user tzulitai closed the pull request at:

https://github.com/apache/flink/pull/4386


> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.2.1, 1.3.1, 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0, 1.3.2
>
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-24 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16098197#comment-16098197
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-7174:


Merged for {{release-1.3}} via 6abd40299040ca646e7e94313dd1e0d25a4c8d82.
Closing this now, thanks a lot for the contribution [~pnowojski]!

> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.2.1, 1.3.1, 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0, 1.3.2
>
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16098135#comment-16098135
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4386
  
Thanks for the review @pnowojski!

I'm also using this branch to collect some final backports for 
`release-1.3`. Will merge once Travis is green.


> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.2.1, 1.3.1, 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0, 1.3.2
>
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16098034#comment-16098034
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4386
  
Looks good to me.


> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.2.1, 1.3.1, 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0, 1.3.2
>
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16098024#comment-16098024
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4321
  
Thanks @tzulitai 


> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.2.1, 1.3.1, 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0, 1.3.2
>
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16097936#comment-16097936
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/4386

(backport-1.3) [FLINK-7174] [kafka] Bump Kafka 0.10 dependency to 0.10.2.1

Backport of #4321 to `release-1.3`, with the following things being 
different:
1. No need to touch `KafkaConsumerThread`, because in 1.3 the code in 
`KafkaConsumerThread` will only ever be reached if there is partitions to 
subscribe to (therefore would not bump into the changed exception behaviour 
described in #4321).
2. Some touched tests and classes do not exist in 1.3 (e.g. partition 
reassignment tests, `AbstractPartitionDiscoverer`) and therefore is not 
relevant for the backport.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-7174-flink13

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4386.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4386


commit 9653b60974f68b14fd4be5ee5b9f0f687b764bdb
Author: Piotr Nowojski 
Date:   2017-07-13T09:07:28Z

[FLINK-7174] [kafka connector] Bump Kafka 0.10 dependency to 0.10.2.1

This closes #4321




> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.2.1, 1.3.1, 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0, 1.3.2
>
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-23 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16097767#comment-16097767
 ] 

Stephan Ewen commented on FLINK-7174:
-

Merged for {{1.4.0}} in 02850545e3143600c7265e737e278663e3264317

Issue is pending backport of the change to the release branch for {{1.3.2}}

> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.2.1, 1.3.1, 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0, 1.3.2
>
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16097708#comment-16097708
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4321


> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.2.1, 1.3.1, 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0, 1.3.2
>
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16096294#comment-16096294
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4321
  
Fair enough
 +1 then


> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16096010#comment-16096010
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4321
  
Dropping this field would make it more error prone in the future if anyone 
would call `reassingPartitions()` from somewhere else. For me 
`hasAssignedPartitions` is tightly related to the state of the `consumer` field 
(in perfect world it should be exposed via `consumer`'s API...), thus also 
should be maintained as the class state.


> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16094892#comment-16094892
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4321
  
This patch looks good.

As a minor comment: I would prefer to not have `hasAssignedPartitions` as a 
field, but rather return it from the `reassignPartitions()` method and have it 
only as a local variable in the `run()` method.


> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16094820#comment-16094820
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4321
  
I think the new pull request description template would have been awesome 
here ;-)


> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16094265#comment-16094265
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4321
  
Looks good now, +1 on my side.
Lets also wait a bit for @StephanEwen to see if he has any more comments 
regarding the use of an extra `hasAssignedPartitions` field (since he commented 
on that before).


> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092835#comment-16092835
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4321
  
I have also squashed previous fixups - there is only a new one.


> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092833#comment-16092833
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4321
  
I have added unit test for closing. I think it should be triggered/tested 
in one of the `ITCases`, but test is fairly easy so it shouldn't hurt us to 
have this tested explicitly.


> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092827#comment-16092827
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4321#discussion_r128193424
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
 ---
@@ -744,17 +745,21 @@ void 
reassignPartitions(List newPartit
final OneShotLatch continueAssignmentLatch) {
 
final KafkaConsumer mockConsumer = 
mock(KafkaConsumer.class);
+   final AtomicInteger callCounter = new AtomicInteger();
+
when(mockConsumer.assignment()).thenAnswer(new Answer() 
{
@Override
public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-   if (midAssignmentLatch != null) {
-   midAssignmentLatch.trigger();
-   }
+   // first call is not the one that we want to 
catch... we all love mocks, don't we?
--- End diff --

This change is no longer needed once I dropped `this.hasAssignedPartitions 
= !consumer.assignment().isEmpty();` assignment (it was `the first call` that 
was causing the problems)


> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092769#comment-16092769
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4321#discussion_r128180555
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -335,6 +356,9 @@ public void setOffsetsToCommit(Map offsetsToC
 */
@VisibleForTesting
void reassignPartitions(List 
newPartitions) throws Exception {
+   if (newPartitions.size() > 0) {
+   hasAssignedPartitions = true;
+   }
--- End diff --

Should we actually extend this `if` block to wrap the whole code in 
`reassignPartitions`? I.e., we shouldn't be doing the reassignment logic if 
`newPartitions.size() == 0`.


> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092773#comment-16092773
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4321#discussion_r128178914
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -210,14 +214,28 @@ public void run() {
}
 
try {
-   newPartitions = 
unassignedPartitionsQueue.pollBatch();
+   if (hasAssignedPartitions) {
+   newPartitions = 
unassignedPartitionsQueue.pollBatch();
+   }
+   else {
+   // if no assigned partitions 
block until we get at least one
+   // instead of hot spinning this 
loop. We relay on a fact that
--- End diff --

nit: typo? relay -> rely


> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092771#comment-16092771
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4321#discussion_r128180697
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
 ---
@@ -744,17 +745,21 @@ void 
reassignPartitions(List newPartit
final OneShotLatch continueAssignmentLatch) {
 
final KafkaConsumer mockConsumer = 
mock(KafkaConsumer.class);
+   final AtomicInteger callCounter = new AtomicInteger();
+
when(mockConsumer.assignment()).thenAnswer(new Answer() 
{
@Override
public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-   if (midAssignmentLatch != null) {
-   midAssignmentLatch.trigger();
-   }
+   // first call is not the one that we want to 
catch... we all love mocks, don't we?
--- End diff --

Lets remove the last part about loving mocks ;-) I do understand your 
argument on mocking, though.


> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092772#comment-16092772
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4321#discussion_r128181037
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
 ---
@@ -744,17 +745,21 @@ void 
reassignPartitions(List newPartit
final OneShotLatch continueAssignmentLatch) {
 
final KafkaConsumer mockConsumer = 
mock(KafkaConsumer.class);
+   final AtomicInteger callCounter = new AtomicInteger();
+
when(mockConsumer.assignment()).thenAnswer(new Answer() 
{
@Override
public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-   if (midAssignmentLatch != null) {
-   midAssignmentLatch.trigger();
-   }
+   // first call is not the one that we want to 
catch... we all love mocks, don't we?
--- End diff --

Could you explain a bit on "first call is not the one that we want to 
catch"? Which test was failing? I have the feeling that this could have been 
fixed in a different way.


> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092770#comment-16092770
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/4321#discussion_r128179835
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -151,6 +154,7 @@ public void run() {
// including concurrent 'close()' calls.
try {
this.consumer = getConsumer(kafkaProperties);
+   this.hasAssignedPartitions = 
!consumer.assignment().isEmpty();
--- End diff --

Can't we just start with `false` here?
We'll only ever get partitions once we enter the main fetch loop.


> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092746#comment-16092746
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4321
  
-- IMO begin
Mockito tests tends to repeat the implementation. Instead of testing for 
the effect, they tend to do the same thing as the actual code but in backwards. 
In other words, they have that much sense as writing the same feature/code 
twice and then comparing whether outcome is the same. It is valuable at first, 
because you make sure that you didn't make any mistakes. But after that, they 
make your live miserable, because so often changes in the actual code brakes 
them and you have to implement the same thing twice.

Exactly like in this case. I added call `consumer.assignment()` call in the 
production code and then had to spend quite a bit of time understanding why 
some strange test deadlocked. To fix it, I had to implement the same change as 
in the production code in the mock.
-- IMO ends

If you have a different opinion we can leave it as it is :) It's not worth 
of arguing that much.

There is a comment in the code, but sorry that I didn't state it more 
clearly in this PR itself:
```
// Without assigned partitions KafkaConsumer.poll will throw an exception
```
After version bump (and in Kafka 0.11), `KafkaConsumer.poll()` throws an 
`IllegalStateException` if it doesn't have assigned partitions. Thus we need 
skip this call in that case.



> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092619#comment-16092619
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4321
  
One other question: I need a bit more context on why the version bump 
requires that change in the `KafkaConsumerThread`. From what I perceive, that 
should be an separate issue to fix hot looping, no?


> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16092616#comment-16092616
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4321
  
@pnowojski we can't just drop that test, IMO. It's crucial that those tests 
exist to guard against incorrect reassignment logic in the 
`KafkaConsumerThread`. Breaking that would mess up the shutdown responsiveness 
of the consumer thread.

I'm not sure why your current fix would be bad, though. Or why do you want 
it to be removed?


> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16087618#comment-16087618
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4321
  
@tzulitai could you look at this PR and particularly into last commit 
(fixup). I'm not a big fan of mocks and mockito based tests and I would really 
be inclined to just drop this test.


> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16086104#comment-16086104
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4321
  
Hmm, will blocking operation be appropriate here? This would prevent 
`shutdown()` from actually breaking the loop. I think we would need some 
timeout here?


> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16086022#comment-16086022
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4321
  
Good catch with with this spinning, I missed that.

Checking per each iteration for assigned partitions is unfortunately 
costly, because there is no cheap `isEmpty()` method. The one that I have found 
`consumer.assignment()` is pretty costly (creates quite a lot of objects and 
takes some locks). I wouldn't want to call it very often.

I could move this variable to local scope of `run()` function, but it would 
be a little bit more error prone (in case some refactoring and for example 
calling `reassignPartitions()` from somewhere else outside of the `run()` 
method).


> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085894#comment-16085894
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4321
  
I think this pull request will make the Kafka consumer go into a hot busy 
waiting loop when it has no partitions assigned.

I would suggest to do a blocking `take()` or so on the 
`unassignedPartitionsQueue`.

Also, would be great to get around the instance variable, and simply check 
how many partitions are assigned on the KafkaConsumer, or pass this via a 
return value of the `reassignPartitions()` function.


> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7174) Bump dependency of Kafka 0.10.x to the latest one

2017-07-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16085608#comment-16085608
 ] 

ASF GitHub Bot commented on FLINK-7174:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4321

[FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2.1

This patch fixes also an incompatibility with the latest Kafka 0.10.x and 
0.11.x kafka-clients.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink kafka010

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4321.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4321


commit e8aac4d3842c433ffc40e36c696950057e5139b9
Author: Piotr Nowojski 
Date:   2017-07-13T11:58:29Z

[FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2.1




> Bump dependency of Kafka 0.10.x to the latest one
> -
>
> Key: FLINK-7174
> URL: https://issues.apache.org/jira/browse/FLINK-7174
> Project: Flink
>  Issue Type: Improvement
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> We are using pretty old Kafka version for 0.10. Besides any bug fixes and 
> improvements that were made between 0.10.0.1 and 0.10.2.1, it 0.10.2.1 
> version is more similar to 0.11.0.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)