[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance
avalsa commented on a change in pull request #8221: URL: https://github.com/apache/kafka/pull/8221#discussion_r426499189 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java ## @@ -170,6 +170,8 @@ public boolean isValidTransition(final State newState) { */ void closeDirty(); +void update(final Set topicPartitions, final ProcessorTopology processorTopology); Review comment: yes, added short description This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance
avalsa commented on a change in pull request #8221: URL: https://github.com/apache/kafka/pull/8221#discussion_r426498877 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ## @@ -93,6 +96,28 @@ long partitionTimestamp(final TopicPartition partition) { return queue.partitionTime(); } +// creates queues for new partitions, removes old queues, saves cached records for previously assigned partitions +void updatePartitions(final Set newInputPartitions, final Function recordQueueCreator) { +final Set removedPartitions = new HashSet<>(); +final Iterator> queuesIterator = partitionQueues.entrySet().iterator(); +while (queuesIterator.hasNext()) { +final Map.Entry queueEntry = queuesIterator.next(); +final TopicPartition topicPartition = queueEntry.getKey(); +if (!newInputPartitions.contains(topicPartition)) { +// if partition is removed should delete it's queue +totalBuffered -= queueEntry.getValue().size(); +queuesIterator.remove(); +removedPartitions.add(topicPartition); +} +newInputPartitions.remove(topicPartition); +} +for (final TopicPartition newInputPartition : newInputPartitions) { +partitionQueues.put(newInputPartition, recordQueueCreator.apply(newInputPartition)); +} +nonEmptyQueuesByTime.removeIf(q -> removedPartitions.contains(q.partition())); +allBuffered = allBuffered && newInputPartitions.isEmpty(); Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance
avalsa commented on a change in pull request #8221: URL: https://github.com/apache/kafka/pull/8221#discussion_r426498753 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ## @@ -93,6 +96,28 @@ long partitionTimestamp(final TopicPartition partition) { return queue.partitionTime(); } +// creates queues for new partitions, removes old queues, saves cached records for previously assigned partitions +void updatePartitions(final Set newInputPartitions, final Function recordQueueCreator) { +final Set removedPartitions = new HashSet<>(); +final Iterator> queuesIterator = partitionQueues.entrySet().iterator(); +while (queuesIterator.hasNext()) { +final Map.Entry queueEntry = queuesIterator.next(); +final TopicPartition topicPartition = queueEntry.getKey(); +if (!newInputPartitions.contains(topicPartition)) { +// if partition is removed should delete it's queue Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance
avalsa commented on a change in pull request #8221: URL: https://github.com/apache/kafka/pull/8221#discussion_r412178952 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java ## @@ -179,6 +179,44 @@ public void subscribe(final Pattern topics, final ConsumerRebalanceListener list TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED); +CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2"); +} + +@Test +public void testRegexRecordsAreProcessedAfterReassignment() throws Exception { Review comment: Yes, it fails on trunk. I found reason: here I tried to isolate tests: so we can create same topic in one test, delete it after we are done, do the same in next test, so on. But after each test it calls `streams.close()` in `teardown` method. I guess It tries to do smth with removed topics and gets `TimeoutException` because can't do it. may be it's issue but probably it happens only when close. But it seems to be not related to this task. @abbccdda What do you think? I fixed it easily (may be not best approach): call `streams.close` in test method body before deleting topics. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance
avalsa commented on a change in pull request #8221: URL: https://github.com/apache/kafka/pull/8221#discussion_r412178952 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java ## @@ -179,6 +179,44 @@ public void subscribe(final Pattern topics, final ConsumerRebalanceListener list TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED); +CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2"); +} + +@Test +public void testRegexRecordsAreProcessedAfterReassignment() throws Exception { Review comment: I found reason: here I tried to isolate tests: so we can create same topic in one test, delete it after we are done, do the same in next test, so on. But after each test it calls `streams.close()` in `teardown` method. I guess It tries to do smth with removed topics and gets `TimeoutException` because can't do it. may be it's issue but probably it happens only when close. But it seems to be not related to this task. @abbccdda What do you think? I fixed it easily (may be not best approach): call `streams.close` in test method body before deleting topics. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance
avalsa commented on a change in pull request #8221: URL: https://github.com/apache/kafka/pull/8221#discussion_r412178952 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java ## @@ -179,6 +179,44 @@ public void subscribe(final Pattern topics, final ConsumerRebalanceListener list TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED); +CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2"); +} + +@Test +public void testRegexRecordsAreProcessedAfterReassignment() throws Exception { Review comment: I found reason: here I tried to isolate tests: so we can create same topic in one test, delete it after we are done, do the same in next test, so on. But after each test it calls `streams.close()` in `teardown` method. I guess It tries to do smth with removed topics and gets `TimeoutException` because can't do it. may be it's issue but probably it happens only when close. But it seems to be not related to this task. @abbccdda What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance
avalsa commented on a change in pull request #8221: URL: https://github.com/apache/kafka/pull/8221#discussion_r412099165 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ## @@ -115,7 +115,7 @@ public void suspend() { } @Override -public void resume() { +public void resume(final boolean requiresUpdate) { Review comment: agree. beautified it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance
avalsa commented on a change in pull request #8221: URL: https://github.com/apache/kafka/pull/8221#discussion_r412055008 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -305,6 +297,12 @@ public void resume() { default: throw new IllegalStateException("Illegal state " + state() + " while resuming active task " + id); } +if (requiresUpdate) { +partitionGroup.updatePartitions(inputPartitions(), recordQueueCreator::createQueue); +if (state() != State.RESTORING) { // if task is RESTORING then topology will be initialized in completeRestoration Review comment: when work on this task it fails some test that ensure that initializeTopology called once here. I thought that it might be important and decided to support this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance
avalsa commented on a change in pull request #8221: URL: https://github.com/apache/kafka/pull/8221#discussion_r412049922 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ## @@ -1127,7 +1127,7 @@ public void shouldReadCommittedOffsetAndRethrowTimeoutWhenCompleteRestoration() } @Test -public void shouldNotReInitializeTopologyWhenResuming() throws IOException { +public void shouldNotReInitializeTopologyWhenResumingWithFalseFlag() throws IOException { Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance
avalsa commented on a change in pull request #8221: URL: https://github.com/apache/kafka/pull/8221#discussion_r412020510 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ## @@ -93,6 +96,29 @@ long partitionTimestamp(final TopicPartition partition) { return queue.partitionTime(); } +// creates queues for new partitions, removes old queues, saves cached records for previously assigned partitions +void updatePartitions(final Set newInputPartitions, final Function recordQueueCreator) { +final Set removedPartitions = new HashSet<>(); +final Iterator> queuesIterator = partitionQueues.entrySet().iterator(); +while (queuesIterator.hasNext()) { +final Map.Entry queueEntry = queuesIterator.next(); +final TopicPartition topicPartition = queueEntry.getKey(); +if (newInputPartitions.contains(topicPartition)) { Review comment: Yes, can rephrase as you offer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance
avalsa commented on a change in pull request #8221: URL: https://github.com/apache/kafka/pull/8221#discussion_r412020510 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ## @@ -93,6 +96,29 @@ long partitionTimestamp(final TopicPartition partition) { return queue.partitionTime(); } +// creates queues for new partitions, removes old queues, saves cached records for previously assigned partitions +void updatePartitions(final Set newInputPartitions, final Function recordQueueCreator) { +final Set removedPartitions = new HashSet<>(); +final Iterator> queuesIterator = partitionQueues.entrySet().iterator(); +while (queuesIterator.hasNext()) { +final Map.Entry queueEntry = queuesIterator.next(); +final TopicPartition topicPartition = queueEntry.getKey(); +if (newInputPartitions.contains(topicPartition)) { Review comment: Yes, can rephrased as you offer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance
avalsa commented on a change in pull request #8221: URL: https://github.com/apache/kafka/pull/8221#discussion_r412015236 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ## @@ -50,7 +53,7 @@ */ public class PartitionGroup { -private final Map partitionQueues; +private Map partitionQueues; Review comment: agree This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org