[GitHub] flink pull request #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2....
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4321 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2....
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4321#discussion_r128193424 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java --- @@ -744,17 +745,21 @@ void reassignPartitions(ListnewPartit final OneShotLatch continueAssignmentLatch) { final KafkaConsumer mockConsumer = mock(KafkaConsumer.class); + final AtomicInteger callCounter = new AtomicInteger(); + when(mockConsumer.assignment()).thenAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - if (midAssignmentLatch != null) { - midAssignmentLatch.trigger(); - } + // first call is not the one that we want to catch... we all love mocks, don't we? --- End diff -- This change is no longer needed once I dropped `this.hasAssignedPartitions = !consumer.assignment().isEmpty();` assignment (it was `the first call` that was causing the problems) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2....
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4321#discussion_r128178914 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java --- @@ -210,14 +214,28 @@ public void run() { } try { - newPartitions = unassignedPartitionsQueue.pollBatch(); + if (hasAssignedPartitions) { + newPartitions = unassignedPartitionsQueue.pollBatch(); + } + else { + // if no assigned partitions block until we get at least one + // instead of hot spinning this loop. We relay on a fact that --- End diff -- nit: typo? relay -> rely --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2....
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4321#discussion_r128180555 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java --- @@ -335,6 +356,9 @@ public void setOffsetsToCommit(MapoffsetsToC */ @VisibleForTesting void reassignPartitions(List newPartitions) throws Exception { + if (newPartitions.size() > 0) { + hasAssignedPartitions = true; + } --- End diff -- Should we actually extend this `if` block to wrap the whole code in `reassignPartitions`? I.e., we shouldn't be doing the reassignment logic if `newPartitions.size() == 0`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2....
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4321#discussion_r128180697 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java --- @@ -744,17 +745,21 @@ void reassignPartitions(ListnewPartit final OneShotLatch continueAssignmentLatch) { final KafkaConsumer mockConsumer = mock(KafkaConsumer.class); + final AtomicInteger callCounter = new AtomicInteger(); + when(mockConsumer.assignment()).thenAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - if (midAssignmentLatch != null) { - midAssignmentLatch.trigger(); - } + // first call is not the one that we want to catch... we all love mocks, don't we? --- End diff -- Lets remove the last part about loving mocks ;-) I do understand your argument on mocking, though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2....
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4321#discussion_r128179835 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java --- @@ -151,6 +154,7 @@ public void run() { // including concurrent 'close()' calls. try { this.consumer = getConsumer(kafkaProperties); + this.hasAssignedPartitions = !consumer.assignment().isEmpty(); --- End diff -- Can't we just start with `false` here? We'll only ever get partitions once we enter the main fetch loop. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2....
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4321#discussion_r128181037 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java --- @@ -744,17 +745,21 @@ void reassignPartitions(ListnewPartit final OneShotLatch continueAssignmentLatch) { final KafkaConsumer mockConsumer = mock(KafkaConsumer.class); + final AtomicInteger callCounter = new AtomicInteger(); + when(mockConsumer.assignment()).thenAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - if (midAssignmentLatch != null) { - midAssignmentLatch.trigger(); - } + // first call is not the one that we want to catch... we all love mocks, don't we? --- End diff -- Could you explain a bit on "first call is not the one that we want to catch"? Which test was failing? I have the feeling that this could have been fixed in a different way. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4321: [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2....
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4321 [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2.1 This patch fixes also an incompatibility with the latest Kafka 0.10.x and 0.11.x kafka-clients. You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink kafka010 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4321.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4321 commit e8aac4d3842c433ffc40e36c696950057e5139b9 Author: Piotr NowojskiDate: 2017-07-13T11:58:29Z [FLINK-7174] Bump Kafka 0.10 dependency to 0.10.2.1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---