[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-05-18 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r426499189



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##
@@ -170,6 +170,8 @@ public boolean isValidTransition(final State newState) {
  */
 void closeDirty();
 
+void update(final Set topicPartitions, final 
ProcessorTopology processorTopology);

Review comment:
   yes, added short description





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-05-18 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r426498877



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##
@@ -93,6 +96,28 @@ long partitionTimestamp(final TopicPartition partition) {
 return queue.partitionTime();
 }
 
+// creates queues for new partitions, removes old queues, saves cached 
records for previously assigned partitions
+void updatePartitions(final Set newInputPartitions, final 
Function recordQueueCreator) {
+final Set removedPartitions = new HashSet<>();
+final Iterator> queuesIterator 
= partitionQueues.entrySet().iterator();
+while (queuesIterator.hasNext()) {
+final Map.Entry queueEntry = 
queuesIterator.next();
+final TopicPartition topicPartition = queueEntry.getKey();
+if (!newInputPartitions.contains(topicPartition)) {
+// if partition is removed should delete it's queue
+totalBuffered -= queueEntry.getValue().size();
+queuesIterator.remove();
+removedPartitions.add(topicPartition);
+}
+newInputPartitions.remove(topicPartition);
+}
+for (final TopicPartition newInputPartition : newInputPartitions) {
+partitionQueues.put(newInputPartition, 
recordQueueCreator.apply(newInputPartition));
+}
+nonEmptyQueuesByTime.removeIf(q -> 
removedPartitions.contains(q.partition()));
+allBuffered = allBuffered && newInputPartitions.isEmpty();

Review comment:
   fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-05-18 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r426498753



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##
@@ -93,6 +96,28 @@ long partitionTimestamp(final TopicPartition partition) {
 return queue.partitionTime();
 }
 
+// creates queues for new partitions, removes old queues, saves cached 
records for previously assigned partitions
+void updatePartitions(final Set newInputPartitions, final 
Function recordQueueCreator) {
+final Set removedPartitions = new HashSet<>();
+final Iterator> queuesIterator 
= partitionQueues.entrySet().iterator();
+while (queuesIterator.hasNext()) {
+final Map.Entry queueEntry = 
queuesIterator.next();
+final TopicPartition topicPartition = queueEntry.getKey();
+if (!newInputPartitions.contains(topicPartition)) {
+// if partition is removed should delete it's queue

Review comment:
   fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-04-21 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r412178952



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##
@@ -179,6 +179,44 @@ public void subscribe(final Pattern topics, final 
ConsumerRebalanceListener list
 
 TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
 
+CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
+}
+
+@Test
+public void testRegexRecordsAreProcessedAfterReassignment() throws 
Exception {

Review comment:
   Yes, it fails on trunk.
   I found reason: here I tried to isolate tests: so we can create same topic 
in one test, delete it after we are done, do the same in next test, so on. But 
after each test it calls `streams.close()` in `teardown` method. I guess It 
tries to do smth with removed topics and gets `TimeoutException` because can't 
do it. may be it's issue but probably it happens only when close. But it seems 
to be not related to this task. @abbccdda What do you think? I fixed it easily 
(may be not best approach): call `streams.close` in test method body before 
deleting topics. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-04-21 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r412178952



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##
@@ -179,6 +179,44 @@ public void subscribe(final Pattern topics, final 
ConsumerRebalanceListener list
 
 TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
 
+CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
+}
+
+@Test
+public void testRegexRecordsAreProcessedAfterReassignment() throws 
Exception {

Review comment:
   I found reason: here I tried to isolate tests: so we can create same 
topic in one test, delete it after we are done, do the same in next test, so 
on. But after each test it calls `streams.close()` in `teardown` method. I 
guess It tries to do smth with removed topics and gets `TimeoutException` 
because can't do it. may be it's issue but probably it happens only when close. 
But it seems to be not related to this task. @abbccdda What do you think? I 
fixed it easily (may be not best approach): call `streams.close` in test method 
body before deleting topics. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-04-21 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r412178952



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##
@@ -179,6 +179,44 @@ public void subscribe(final Pattern topics, final 
ConsumerRebalanceListener list
 
 TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
 
+CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
+}
+
+@Test
+public void testRegexRecordsAreProcessedAfterReassignment() throws 
Exception {

Review comment:
   I found reason: here I tried to isolate tests: so we can create same 
topic in one test, delete it after we are done, do the same in next test, so 
on. But after each test it calls `streams.close()` in `teardown` method. I 
guess It tries to do smth with removed topics and gets `TimeoutException` 
because can't do it. may be it's issue but probably it happens only when close. 
But it seems to be not related to this task. @abbccdda What do you think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-04-21 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r412099165



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##
@@ -115,7 +115,7 @@ public void suspend() {
 }
 
 @Override
-public void resume() {
+public void resume(final boolean requiresUpdate) {

Review comment:
   agree. beautified it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-04-21 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r412055008



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -305,6 +297,12 @@ public void resume() {
 default:
 throw new IllegalStateException("Illegal state " + state() + " 
while resuming active task " + id);
 }
+if (requiresUpdate) {
+partitionGroup.updatePartitions(inputPartitions(), 
recordQueueCreator::createQueue);
+if (state() != State.RESTORING) { // if task is RESTORING then 
topology will be initialized in completeRestoration

Review comment:
   when work on this task it fails some test that ensure that 
initializeTopology called once here. I thought that it might be important and 
decided to support this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-04-21 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r412049922



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##
@@ -1127,7 +1127,7 @@ public void 
shouldReadCommittedOffsetAndRethrowTimeoutWhenCompleteRestoration()
 }
 
 @Test
-public void shouldNotReInitializeTopologyWhenResuming() throws IOException 
{
+public void shouldNotReInitializeTopologyWhenResumingWithFalseFlag() 
throws IOException {

Review comment:
   fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-04-21 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r412020510



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##
@@ -93,6 +96,29 @@ long partitionTimestamp(final TopicPartition partition) {
 return queue.partitionTime();
 }
 
+// creates queues for new partitions, removes old queues, saves cached 
records for previously assigned partitions
+void updatePartitions(final Set newInputPartitions, final 
Function recordQueueCreator) {
+final Set removedPartitions = new HashSet<>();
+final Iterator> queuesIterator 
= partitionQueues.entrySet().iterator();
+while (queuesIterator.hasNext()) {
+final Map.Entry queueEntry = 
queuesIterator.next();
+final TopicPartition topicPartition = queueEntry.getKey();
+if (newInputPartitions.contains(topicPartition)) {

Review comment:
   Yes, can rephrase as you offer.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-04-21 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r412020510



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##
@@ -93,6 +96,29 @@ long partitionTimestamp(final TopicPartition partition) {
 return queue.partitionTime();
 }
 
+// creates queues for new partitions, removes old queues, saves cached 
records for previously assigned partitions
+void updatePartitions(final Set newInputPartitions, final 
Function recordQueueCreator) {
+final Set removedPartitions = new HashSet<>();
+final Iterator> queuesIterator 
= partitionQueues.entrySet().iterator();
+while (queuesIterator.hasNext()) {
+final Map.Entry queueEntry = 
queuesIterator.next();
+final TopicPartition topicPartition = queueEntry.getKey();
+if (newInputPartitions.contains(topicPartition)) {

Review comment:
   Yes, can rephrased as you offer.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-04-21 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r412015236



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##
@@ -50,7 +53,7 @@
  */
 public class PartitionGroup {
 
-private final Map partitionQueues;
+private Map partitionQueues;

Review comment:
   agree





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org