[GitHub] [kafka] ncliang commented on a change in pull request #8511: KAFKA-9888: Copy connector configs before passing to REST extensions

2020-04-20 Thread GitBox


ncliang commented on a change in pull request #8511:
URL: https://github.com/apache/kafka/pull/8511#discussion_r411610839



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
##
@@ -87,7 +88,13 @@ public Void answer() {
 }
 });
 EasyMock.replay(herder);
-assertEquals(expectedConfig, 
connectClusterState.connectorConfig(connName));
+Map actualConfig = 
connectClusterState.connectorConfig(connName);
+assertEquals(expectedConfig, actualConfig);
+assertNotSame(

Review comment:
   Nice test.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on issue #7884: [KAFKA-8522] Streamline tombstone and transaction marker removal

2020-04-20 Thread GitBox


junrao commented on issue #7884:
URL: https://github.com/apache/kafka/pull/7884#issuecomment-616751692


   @ConcurrencyPractitioner : We now have 
https://github.com/apache/kafka/blob/trunk/.asf.yaml. You can add yourself to 
Jenkins's whitelist by following 
https://cwiki.apache.org/confluence/display/INFRA/.asf.yaml+features+for+git+repositories#id-.asf.yamlfeaturesforgitrepositories-JenkinsPRWhitelisting
 .



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8518: MINOR: add support for kafka 2.4 and 2.5 to downgrade test

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8518:
URL: https://github.com/apache/kafka/pull/8518#discussion_r411544719



##
File path: tests/kafkatest/tests/core/downgrade_test.py
##
@@ -67,11 +67,18 @@ def setup_services(self, kafka_version, compression_types, 
security_protocol):
  version=kafka_version)
 self.producer.start()
 
+static_membership = kafka_version == DEV_BRANCH or kafka_version >= 
LATEST_2_3

Review comment:
   Good question, without it the verifiable consumer fails with:
   ```
   usage: verifiable-consumer [-h]  



  --broker-list HOST1:PORT1[,HOST2:PORT2[...]]  



  --topic TOPIC --group-id GROUP_ID 



  --group-instance-id GROUP_INSTANCE_ID 



  [--max-messages MAX-MESSAGES] 



  [--session-timeout TIMEOUT_MS] [--verbose]



  [--enable-autocommit] 



  [--reset-policy RESETPOLICY]  



  [--assignment-strategy ASSIGNMENTSTRATEGY]



  [--consumer.config CONFIG_FILE]   



   verifiable-consumer: error: argument --group-instance-id is required 



   ```
   
   I assumed that this check was added for a reason, however maybe it's being 
to strict?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] harshitshah4 commented on issue #8512: KAFKA-6024: Consider moving validation in KafkaConsumer ahead of call to acquireAndEnsureOpen()

2020-04-20 Thread GitBox


harshitshah4 commented on issue #8512:
URL: https://github.com/apache/kafka/pull/8512#issuecomment-616683601


   @omkreddy can you review the changes ? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on issue #8514: MINOR: Further reduce runtime for metrics integration tests

2020-04-20 Thread GitBox


guozhangwang commented on issue #8514:
URL: https://github.com/apache/kafka/pull/8514#issuecomment-616718841


   Merged to trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8518: MINOR: add support for kafka 2.4 and 2.5 to downgrade test

2020-04-20 Thread GitBox


abbccdda commented on a change in pull request #8518:
URL: https://github.com/apache/kafka/pull/8518#discussion_r411528794



##
File path: tests/kafkatest/tests/core/downgrade_test.py
##
@@ -67,11 +67,18 @@ def setup_services(self, kafka_version, compression_types, 
security_protocol):
  version=kafka_version)
 self.producer.start()
 
+static_membership = kafka_version == DEV_BRANCH or kafka_version >= 
LATEST_2_3

Review comment:
   Why does static membership make a difference here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on issue #8511: KAFKA-9888: Copy connector configs before passing to REST extensions

2020-04-20 Thread GitBox


C0urante commented on issue #8511:
URL: https://github.com/apache/kafka/pull/8511#issuecomment-616743204


   Thanks @ncliang! @kkonstantine, @rhauch would one of you mind taking a look 
at this when you get a chance?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on issue #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch

2020-04-20 Thread GitBox


apovzner commented on issue #8509:
URL: https://github.com/apache/kafka/pull/8509#issuecomment-616658368


   @dajac I did initially add a unit test to `KafkaApisTest` for 
UpdateMetadataRequest. However, since `KafkaApisTest` mocks pretty much every 
other component (like replica manager, etc), the only thing it would test is 
the fact that passing newer broker epoch would know throw 
`IllegalStateException`. Basically, it does not add any more coverage in 
addition to `BrokerEpochIntegrationTest` which also tests more of a code path. 
Given this info, you still think we should add it? Or perhaps there are more 
reasons I missed?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on issue #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch

2020-04-20 Thread GitBox


hachikuji commented on issue #8509:
URL: https://github.com/apache/kafka/pull/8509#issuecomment-61736


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] maseiler commented on issue #8431: MINOR: Rename description of flatMapValues transformation

2020-04-20 Thread GitBox


maseiler commented on issue #8431:
URL: https://github.com/apache/kafka/pull/8431#issuecomment-616676054


   @bbejeck @ijuma



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8114: KAFKA-9290: Update IQ related JavaDocs

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8114:
URL: https://github.com/apache/kafka/pull/8114#discussion_r411570528



##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##
@@ -208,13 +208,13 @@
  * streamBuilder.table(topic, Consumed.with(Serde.String(), 
Serde.String()), Materialized.as(storeName))
  * }
  * 
- * To query the local {@link KeyValueStore} it must be obtained via
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
  * {@link KafkaStreams#store(StoreQueryParameters) 
KafkaStreams#store(...)}:
  * {@code
  * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore localStore = 
streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local 
(application state is shared over all running Kafka Streams instances)
+ * ReadOnlyKeyValueStore> localStore = 
streams.store(queryableStoreName, QueryableStoreTypes.>timestampedKeyValueStore());

Review comment:
   nit: insert space between generics (here and elsewhere)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ConcurrencyPractitioner commented on issue #7884: [KAFKA-8522] Streamline tombstone and transaction marker removal

2020-04-20 Thread GitBox


ConcurrencyPractitioner commented on issue #7884:
URL: https://github.com/apache/kafka/pull/7884#issuecomment-616773372


   ok to test
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8518: MINOR: add support for kafka 2.4 and 2.5 to downgrade test

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8518:
URL: https://github.com/apache/kafka/pull/8518#discussion_r411544719



##
File path: tests/kafkatest/tests/core/downgrade_test.py
##
@@ -67,11 +67,18 @@ def setup_services(self, kafka_version, compression_types, 
security_protocol):
  version=kafka_version)
 self.producer.start()
 
+static_membership = kafka_version == DEV_BRANCH or kafka_version >= 
LATEST_2_3

Review comment:
   Good question, without it the verifiable consumer fails with:
   ```
   usage: verifiable-consumer [-h]  



  --broker-list HOST1:PORT1[,HOST2:PORT2[...]]  



  --topic TOPIC --group-id GROUP_ID 



  --group-instance-id GROUP_INSTANCE_ID 



  [--max-messages MAX-MESSAGES] 



  [--session-timeout TIMEOUT_MS] [--verbose]



  [--enable-autocommit] 



  [--reset-policy RESETPOLICY]  



  [--assignment-strategy ASSIGNMENTSTRATEGY]



  [--consumer.config CONFIG_FILE]   



   verifiable-consumer: error: argument --group-instance-id is required 



   ```
   
   I assumed that this check was added for a reason, however maybe it's being 
to strict or buggy?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on issue #8445: KAFKA-9823: Remember the sent generation for the coordinator request

2020-04-20 Thread GitBox


guozhangwang commented on issue #8445:
URL: https://github.com/apache/kafka/pull/8445#issuecomment-616757320


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8519: MINOR: Upgrade gradle plugins and test libraries for Java 14 support

2020-04-20 Thread GitBox


mjsax commented on a change in pull request #8519:
URL: https://github.com/apache/kafka/pull/8519#discussion_r411639992



##
File path: jenkins.sh
##
@@ -29,12 +29,17 @@
 --profile --no-daemon --continue 
-PtestLoggingEvents=started,passed,skipped,failed "$@" \
 || { echo 'Test steps failed'; exit 1; }
 
-# Verify that Kafka Streams archetype compiles
 if [ $JAVA_HOME = "/home/jenkins/tools/java/latest11" ] ; then
   echo "Skipping Kafka Streams archetype test for Java 11"
   exit 0
 fi
 
+if [ $JAVA_HOME = "/home/jenkins/tools/java/latest14" ] ; then
+  echo "Skipping Kafka Streams archetype test for Java 14"
+  exit 0
+fi

Review comment:
   LGTM





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ConcurrencyPractitioner commented on issue #7884: [KAFKA-8522] Streamline tombstone and transaction marker removal

2020-04-20 Thread GitBox


ConcurrencyPractitioner commented on issue #7884:
URL: https://github.com/apache/kafka/pull/7884#issuecomment-616776614


   @junrao Cool. It's just that should I edit the ```.asf.yml``` as part of 
this PR? Or will I need to do it some other way?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #8396: KAFKA-9754 - Trogdor - Ignore produce errors, better error statistics.

2020-04-20 Thread GitBox


cmccabe commented on a change in pull request #8396:
URL: https://github.com/apache/kafka/pull/8396#discussion_r411654793



##
File path: 
tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
##
@@ -72,6 +75,8 @@
 private final TopicsSpec activeTopics;
 private final TopicsSpec inactiveTopics;
 private final boolean useConfiguredPartitioner;
+private final boolean ignoreProduceErrors;
+private final int topicVerificationRetries;

Review comment:
   It would be better to call this "topicVerificationTries" since 0 is then 
an unreasonable value.  For REtries, 0 is a reasonable value and not something 
that we should assume is equivalent to "use the default."





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411672041



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ValidClientsByTaskLoadQueue.java
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import org.apache.kafka.streams.processor.TaskId;
+
+/**
+ * Wraps a priority queue of clients and returns the next valid candidate(s) 
based on the current task assignment
+ */
+class ValidClientsByTaskLoadQueue {
+private final PriorityQueue clientsByTaskLoad;
+private final BiFunction validClientCriteria;
+private final Set uniqueClients = new HashSet<>();
+
+ValidClientsByTaskLoadQueue(final Map clientStates,
+final BiFunction 
validClientCriteria) {
+clientsByTaskLoad = getClientPriorityQueueByTaskLoad(clientStates);
+this.validClientCriteria = validClientCriteria;
+}
+
+/**
+= * @return the next least loaded client that satisfies the given 
criteria, or null if none do
+ */
+UUID poll(final TaskId task) {
+final List validClient = poll(task, 1);
+return validClient.isEmpty() ? null : validClient.get(0);
+}
+
+/**
+ * @return the next N <= {@code numClientsPerTask} clients in the 
underlying priority queue that are valid
+ * candidates for the given task
+ */
+List poll(final TaskId task, final int numClients) {
+final List nextLeastLoadedValidClients = new LinkedList<>();
+final Set invalidPolledClients = new HashSet<>();
+while (nextLeastLoadedValidClients.size() < numClients) {
+UUID candidateClient;
+while (true) {
+candidateClient = clientsByTaskLoad.poll();
+if (candidateClient == null) {
+offerAll(invalidPolledClients);
+return nextLeastLoadedValidClients;
+}
+
+if (validClientCriteria.apply(candidateClient, task)) {
+nextLeastLoadedValidClients.add(candidateClient);
+break;
+} else {
+invalidPolledClients.add(candidateClient);
+}
+}
+}
+offerAll(invalidPolledClients);
+return nextLeastLoadedValidClients;
+}
+
+void offerAll(final Collection clients) {
+for (final UUID client : clients) {
+offer(client);
+}
+}
+
+void offer(final UUID client) {
+if (uniqueClients.contains(client)) {

Review comment:
   I think it's just a computer-sciencey matter of principle. 
`clientsByTaskLoad` is a linear collection, so every `offer` would become 
`O(n)` if we did a `contains` call on it every time. Right now, it's only 
`O(n)` when we need to remove the prior record for the same client, and 
`O(log(n))` otherwise.
   
   Does it really matter? I'm not sure.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on issue #8522: KAFKA-9868: Reduce transaction log partitions for embed broker

2020-04-20 Thread GitBox


abbccdda commented on issue #8522:
URL: https://github.com/apache/kafka/pull/8522#issuecomment-616808689


   @vvcephei @mjsax Call for a review



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8520: Add explicit grace period to tumbling window example

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8520:
URL: https://github.com/apache/kafka/pull/8520#discussion_r411704050



##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -3262,12 +3262,15 @@ KTable-KTable 
Foreign-Key
 import org.apache.kafka.streams.kstream.TimeWindows;
 
 // A tumbling time window with a size of 5 minutes (and, by 
definition, an implicit
-// advance interval of 5 minutes).
+// advance interval of 5 minutes). Note the explicit grace 
period, as the current
+// default value is 24 hours, which may be larger than needed 
for smaller windows. 
+// Note that this default may change in future major version 
releases.

Review comment:
   I'd remove the note about changing the default out. We can either find a 
non-breaking way to change defaults (see 
https://issues.apache.org/jira/browse/KAFKA-8924?focusedCommentId=17088091=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17088091
 ), or we can't change it.
   
   I'd rather not make people paranoid that we're going to break their app 
semantics from underneath them, because we won't.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on issue #8522: KAFKA-9868: Reduce transaction log partitions for embed broker

2020-04-20 Thread GitBox


mjsax commented on issue #8522:
URL: https://github.com/apache/kafka/pull/8522#issuecomment-616831664


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on issue #8522: KAFKA-9868: Reduce transaction log partitions for embed broker

2020-04-20 Thread GitBox


abbccdda commented on issue #8522:
URL: https://github.com/apache/kafka/pull/8522#issuecomment-616839537


   Run 350 times on local and no failure.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on issue #8522: KAFKA-9868: Reduce transaction log partitions for embed broker

2020-04-20 Thread GitBox


abbccdda commented on issue #8522:
URL: https://github.com/apache/kafka/pull/8522#issuecomment-616840036


   @mjsax I suppose this should help for all integration tests, as they share 
`EmbeddedBroker`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411734468



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##
@@ -89,95 +88,72 @@ public boolean assign() {
 return false;
 }
 
-final Map> warmupTaskAssignment = 
initializeEmptyTaskAssignmentMap(sortedClients);
-final Map> standbyTaskAssignment = 
initializeEmptyTaskAssignmentMap(sortedClients);
-final Map> statelessActiveTaskAssignment = 
initializeEmptyTaskAssignmentMap(sortedClients);
+final Map tasksToRemainingStandbys =
+statefulTasks.stream().collect(Collectors.toMap(task -> task, t -> 
configs.numStandbyReplicas));
 
-//  Stateful Active Tasks  //
+final boolean followupRebalanceNeeded = 
assignStatefulActiveTasks(tasksToRemainingStandbys);
 
-final Map> statefulActiveTaskAssignment =
-new DefaultStateConstrainedBalancedAssignor().assign(
-statefulTasksToRankedCandidates,
-configs.balanceFactor,
-sortedClients,
-clientsToNumberOfThreads,
-tasksToCaughtUpClients
-);
+assignStandbyReplicaTasks(tasksToRemainingStandbys);
+
+assignStatelessActiveTasks();
 
-//  Warmup Replica Tasks  //
+return followupRebalanceNeeded;
+}
 
-final Map> balancedStatefulActiveTaskAssignment =
+private boolean assignStatefulActiveTasks(final Map 
tasksToRemainingStandbys) {
+final Map> statefulActiveTaskAssignment =
 new DefaultBalancedAssignor().assign(
 sortedClients,
 statefulTasks,
 clientsToNumberOfThreads,
 configs.balanceFactor);
 
-final Map tasksToRemainingStandbys =
-statefulTasks.stream().collect(Collectors.toMap(task -> task, t -> 
configs.numStandbyReplicas));
-
-final List movements = getMovements(
+return assignTaskMovements(
 statefulActiveTaskAssignment,
-balancedStatefulActiveTaskAssignment,
 tasksToCaughtUpClients,
 clientStates,
 tasksToRemainingStandbys,
-configs.maxWarmupReplicas);
-
-for (final TaskMovement movement : movements) {
-warmupTaskAssignment.get(movement.destination).add(movement.task);
-}
-
-//  Standby Replica Tasks  //
-
-final List>> allTaskAssignmentMaps = asList(
-statefulActiveTaskAssignment,
-warmupTaskAssignment,
-standbyTaskAssignment,
-statelessActiveTaskAssignment
+configs.maxWarmupReplicas
 );
+}
 
-final ValidClientsByTaskLoadQueue clientsByStandbyTaskLoad =
-new ValidClientsByTaskLoadQueue<>(
-getClientPriorityQueueByTaskLoad(allTaskAssignmentMaps),
-allTaskAssignmentMaps
+private void assignStandbyReplicaTasks(final Map 
tasksToRemainingStandbys) {
+final ValidClientsByTaskLoadQueue standbyTaskClientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> 
!clientStates.get(client).assignedTasks().contains(task)
 );
+standbyTaskClientsByTaskLoad.offerAll(clientStates.keySet());
 
 for (final TaskId task : statefulTasksToRankedCandidates.keySet()) {
 final int numRemainingStandbys = 
tasksToRemainingStandbys.get(task);
-final List clients = clientsByStandbyTaskLoad.poll(task, 
numRemainingStandbys);
+final List clients = standbyTaskClientsByTaskLoad.poll(task, 
numRemainingStandbys);
 for (final UUID client : clients) {
-standbyTaskAssignment.get(client).add(task);
+clientStates.get(client).assignStandby(task);
 }
-clientsByStandbyTaskLoad.offer(clients);
+standbyTaskClientsByTaskLoad.offerAll(clients);
+
 final int numStandbysAssigned = clients.size();
-if (numStandbysAssigned < configs.numStandbyReplicas) {
+if (numStandbysAssigned < numRemainingStandbys) {
 log.warn("Unable to assign {} of {} standby tasks for task 
[{}]. " +
  "There is not enough available capacity. You 
should " +
  "increase the number of threads and/or 
application instances " +
  "to maintain the requested number of standby 
replicas.",
-configs.numStandbyReplicas - numStandbysAssigned, 
configs.numStandbyReplicas, task);
+ numRemainingStandbys - numStandbysAssigned, 
configs.numStandbyReplicas, task);

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411738965



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411739644



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] mjsax commented on issue #8114: KAFKA-9290: Update IQ related JavaDocs

2020-04-20 Thread GitBox


mjsax commented on issue #8114:
URL: https://github.com/apache/kafka/pull/8114#issuecomment-616848801


   Ah. Thanks for pointing out the hotfix @highluck. Can you address Sophie's 
comment?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on issue #7884: [KAFKA-8522] Streamline tombstone and transaction marker removal

2020-04-20 Thread GitBox


junrao commented on issue #7884:
URL: https://github.com/apache/kafka/pull/7884#issuecomment-616787194


   @ConcurrencyPractitioner : You can just submit a separate PR to add yourself 
in .asf.yml.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411673115



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ConcurrencyPractitioner opened a new pull request #8523: Adding github whitelist

2020-04-20 Thread GitBox


ConcurrencyPractitioner opened a new pull request #8523:
URL: https://github.com/apache/kafka/pull/8523


   This PR is meant to add ConcurrencyPractitioner to the Jenkins whitelist so 
that this user can trigger tests.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ConcurrencyPractitioner commented on issue #8523: Adding github whitelist

2020-04-20 Thread GitBox


ConcurrencyPractitioner commented on issue #8523:
URL: https://github.com/apache/kafka/pull/8523#issuecomment-616818171


   @junrao So then we can merge this, right?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8522: KAFKA-9868: Reduce transaction log partitions for embed broker

2020-04-20 Thread GitBox


mjsax commented on a change in pull request #8522:
URL: https://github.com/apache/kafka/pull/8522#discussion_r411719647



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
##
@@ -84,7 +84,7 @@ public EmbeddedKafkaCluster(final int numBrokers,
 /**
  * Creates and starts a Kafka cluster.
  */
-public void start() throws IOException, InterruptedException {
+public void start() throws IOException {

Review comment:
   If we would have written `throws Exception` from the beginning on, this 
change would not be necessary... (Just to back up my preferred coding stile to 
only use `throws Exception` in tests.)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ConcurrencyPractitioner commented on issue #7884: [KAFKA-8522] Streamline tombstone and transaction marker removal

2020-04-20 Thread GitBox


ConcurrencyPractitioner commented on issue #7884:
URL: https://github.com/apache/kafka/pull/7884#issuecomment-616776257


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8508: MINOR: Improve usage of LogCaptureAppender

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8508:
URL: https://github.com/apache/kafka/pull/8508#discussion_r411667311



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##
@@ -84,6 +87,10 @@
 
 @Before
 public void init() {
+// When executing on Jenkins, the thread name is set to an unknown 
value,
+// hence, we need to set it explicitly to make our log-assertions pass
+Thread.currentThread().setName(threadName);

Review comment:
   Hmm, this might be surprising. Won't this cause the test executor thread 
to be called "threadName" from now until the end of the build?
   
   Do you think we could instead get the currentThread's name and use that in 
the assertions? Or otherwise make the assertions agnostic to the name of the 
thread?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411739644



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ijuma commented on issue #8519: MINOR: Upgrade gradle plugins and test libraries for Java 14 support

2020-04-20 Thread GitBox


ijuma commented on issue #8519:
URL: https://github.com/apache/kafka/pull/8519#issuecomment-616802690


   2 flaky tests, one in each job:
   * 
org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore[exactly_once_beta]
   * 
kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on issue #8508: MINOR: Improve usage of LogCaptureAppender

2020-04-20 Thread GitBox


mjsax commented on issue #8508:
URL: https://github.com/apache/kafka/pull/8508#issuecomment-616828656


   Updated this PR. Will merge after Jenkins passe.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411779358



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411787822



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();

Review comment:
   Well, 

[GitHub] [kafka] abbccdda opened a new pull request #8522: KAFKA-9868: Reduce transaction log partitions for embed broker

2020-04-20 Thread GitBox


abbccdda opened a new pull request #8522:
URL: https://github.com/apache/kafka/pull/8522


   This PR tries to fix the flaky 
EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore
 by making the bootstrapping of the test to be less painful with fewer number 
of partitions of txn log.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8508: MINOR: Improve usage of LogCaptureAppender

2020-04-20 Thread GitBox


mjsax commented on a change in pull request #8508:
URL: https://github.com/apache/kafka/pull/8508#discussion_r411712314



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##
@@ -84,6 +87,10 @@
 
 @Before
 public void init() {
+// When executing on Jenkins, the thread name is set to an unknown 
value,
+// hence, we need to set it explicitly to make our log-assertions pass
+Thread.currentThread().setName(threadName);

Review comment:
   Getting the name is much better! Should have done this for the beginning 
on...





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8522: KAFKA-9868: Reduce transaction log partitions for embed broker

2020-04-20 Thread GitBox


abbccdda commented on a change in pull request #8522:
URL: https://github.com/apache/kafka/pull/8522#discussion_r411727585



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
##
@@ -84,7 +84,7 @@ public EmbeddedKafkaCluster(final int numBrokers,
 /**
  * Creates and starts a Kafka cluster.
  */
-public void start() throws IOException, InterruptedException {
+public void start() throws IOException {

Review comment:
   Lol, makes sense.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soenkeliebau commented on issue #8464: KAFKA-9852: Change the max duration that calls to the buffer pool can block from 2000ms to 10ms

2020-04-20 Thread GitBox


soenkeliebau commented on issue #8464:
URL: https://github.com/apache/kafka/pull/8464#issuecomment-616782288


   retest this please
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ConcurrencyPractitioner commented on issue #7884: [KAFKA-8522] Streamline tombstone and transaction marker removal

2020-04-20 Thread GitBox


ConcurrencyPractitioner commented on issue #7884:
URL: https://github.com/apache/kafka/pull/7884#issuecomment-616819297


   @junrao Alright, got it done.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on issue #8517: MINOR: avoid unnecessary delay conversion in isDelayed check

2020-04-20 Thread GitBox


ijuma commented on issue #8517:
URL: https://github.com/apache/kafka/pull/8517#issuecomment-616875521


   Unrelated flaky test:
   
   > 
org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore[exactly_once_beta]
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411799045



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] cadonna commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


cadonna commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411366111



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##
@@ -89,95 +88,72 @@ public boolean assign() {
 return false;
 }
 
-final Map> warmupTaskAssignment = 
initializeEmptyTaskAssignmentMap(sortedClients);
-final Map> standbyTaskAssignment = 
initializeEmptyTaskAssignmentMap(sortedClients);
-final Map> statelessActiveTaskAssignment = 
initializeEmptyTaskAssignmentMap(sortedClients);
+final Map tasksToRemainingStandbys =
+statefulTasks.stream().collect(Collectors.toMap(task -> task, t -> 
configs.numStandbyReplicas));
 
-//  Stateful Active Tasks  //
+final boolean followupRebalanceNeeded = 
assignStatefulActiveTasks(tasksToRemainingStandbys);

Review comment:
   I love it when a comment gets killed by a meaningful method name!

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##
@@ -89,95 +88,72 @@ public boolean assign() {
 return false;
 }
 
-final Map> warmupTaskAssignment = 
initializeEmptyTaskAssignmentMap(sortedClients);
-final Map> standbyTaskAssignment = 
initializeEmptyTaskAssignmentMap(sortedClients);
-final Map> statelessActiveTaskAssignment = 
initializeEmptyTaskAssignmentMap(sortedClients);
+final Map tasksToRemainingStandbys =
+statefulTasks.stream().collect(Collectors.toMap(task -> task, t -> 
configs.numStandbyReplicas));
 
-//  Stateful Active Tasks  //
+final boolean followupRebalanceNeeded = 
assignStatefulActiveTasks(tasksToRemainingStandbys);
 
-final Map> statefulActiveTaskAssignment =
-new DefaultStateConstrainedBalancedAssignor().assign(
-statefulTasksToRankedCandidates,
-configs.balanceFactor,
-sortedClients,
-clientsToNumberOfThreads,
-tasksToCaughtUpClients
-);
+assignStandbyReplicaTasks(tasksToRemainingStandbys);
+
+assignStatelessActiveTasks();
 
-//  Warmup Replica Tasks  //
+return followupRebalanceNeeded;
+}
 
-final Map> balancedStatefulActiveTaskAssignment =
+private boolean assignStatefulActiveTasks(final Map 
tasksToRemainingStandbys) {
+final Map> statefulActiveTaskAssignment =
 new DefaultBalancedAssignor().assign(
 sortedClients,
 statefulTasks,
 clientsToNumberOfThreads,
 configs.balanceFactor);

Review comment:
   prop:
   ```suggestion
   final Map> statefulActiveTaskAssignment = new 
DefaultBalancedAssignor().assign(
   sortedClients,
   statefulTasks,
   clientsToNumberOfThreads,
   configs.balanceFactor
   );
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ValidClientsByTaskLoadQueue.java
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import org.apache.kafka.streams.processor.TaskId;
+
+/**
+ * Wraps a priority queue of clients and returns the next valid candidate(s) 
based on the current task assignment
+ */
+class ValidClientsByTaskLoadQueue {
+private final PriorityQueue clientsByTaskLoad;
+private final BiFunction validClientCriteria;
+private final Set uniqueClients = new HashSet<>();
+
+ValidClientsByTaskLoadQueue(final Map clientStates,
+final BiFunction 
validClientCriteria) {
+ 

[GitHub] [kafka] abbccdda commented on a change in pull request #8522: KAFKA-9868: Reduce transaction log partitions for embed broker

2020-04-20 Thread GitBox


abbccdda commented on a change in pull request #8522:
URL: https://github.com/apache/kafka/pull/8522#discussion_r411680309



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
##
@@ -98,6 +98,7 @@ public void start() throws IOException, InterruptedException {
 putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), 0);
 putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) 1);
 putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.OffsetsTopicPartitionsProp(), 5);
+putIfAbsent(brokerConfig, 
KafkaConfig$.MODULE$.TransactionsTopicPartitionsProp(), 5);

Review comment:
   This is the actual fix, other parts are just side cleanups.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8518: MINOR: add support for kafka 2.4 and 2.5 to downgrade test

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8518:
URL: https://github.com/apache/kafka/pull/8518#discussion_r411692621



##
File path: tests/kafkatest/tests/core/downgrade_test.py
##
@@ -67,11 +67,18 @@ def setup_services(self, kafka_version, compression_types, 
security_protocol):
  version=kafka_version)
 self.producer.start()
 
+static_membership = kafka_version == DEV_BRANCH or kafka_version >= 
LATEST_2_3

Review comment:
   @abbccdda I changed the code to add a static_membership parameter to the 
test matrix. We simply avoid this for 2.4 where this is broken due to the 
command line validation in that release.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] LiamClarkeNZ commented on issue #8520: Add explicit grace period to tumbling window example

2020-04-20 Thread GitBox


LiamClarkeNZ commented on issue #8520:
URL: https://github.com/apache/kafka/pull/8520#issuecomment-616842780


   Yep, fine by me. :)
   
   On Tue, Apr 21, 2020 at 9:32 AM John Roesler 
   wrote:
   
   > *@vvcephei* commented on this pull request.
   >
   > Hey @LiamClarkeNZ  , thanks for the docs
   > improvement! Everything looks good to me, except I'd request we remove
   > L3267. What do you think?
   > --
   >
   > In docs/streams/developer-guide/dsl-api.html
   > :
   >
   > > @@ -3262,12 +3262,15 @@ KTable-KTable Foreign-Key
   >  import org.apache.kafka.streams.kstream.TimeWindows;
   >
   >  // A tumbling time window with a size of 5 minutes (and, 
by definition, an implicit
   > -// advance interval of 5 minutes).
   > +// advance interval of 5 minutes). Note the explicit 
grace period, as the current
   > +// default value is 24 hours, which may be larger than 
needed for smaller windows. 
   > +// Note that this default may change in future major 
version releases.
   >
   > I'd remove the note about changing the default out. We can either find a
   > non-breaking way to change defaults (see
   > 
https://issues.apache.org/jira/browse/KAFKA-8924?focusedCommentId=17088091=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17088091
   > ), or we can't change it.
   >
   > I'd rather not make people paranoid that we're going to break their app
   > semantics from underneath them, because we won't.
   >
   > —
   > You are receiving this because you were mentioned.
   > Reply to this email directly, view it on GitHub
   > ,
   > or unsubscribe
   > 

   > .
   >
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411743597



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] highluck commented on a change in pull request #8114: KAFKA-9290: Update IQ related JavaDocs

2020-04-20 Thread GitBox


highluck commented on a change in pull request #8114:
URL: https://github.com/apache/kafka/pull/8114#discussion_r411791026



##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##
@@ -208,13 +208,13 @@
  * streamBuilder.table(topic, Consumed.with(Serde.String(), 
Serde.String()), Materialized.as(storeName))
  * }
  * 
- * To query the local {@link KeyValueStore} it must be obtained via
+ * To query the local {@link ReadOnlyKeyValueStore} it must be obtained via
  * {@link KafkaStreams#store(StoreQueryParameters) 
KafkaStreams#store(...)}:
  * {@code
  * KafkaStreams streams = ...
- * ReadOnlyKeyValueStore localStore = 
streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
- * String key = "some-key";
- * Long valueForKey = localStore.get(key); // key must be local 
(application state is shared over all running Kafka Streams instances)
+ * ReadOnlyKeyValueStore> localStore = 
streams.store(queryableStoreName, QueryableStoreTypes.>timestampedKeyValueStore());

Review comment:
   ok! thanks :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411797135



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411797135



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411676361



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] mjsax commented on issue #8508: MINOR: Improve usage of LogCaptureAppender

2020-04-20 Thread GitBox


mjsax commented on issue #8508:
URL: https://github.com/apache/kafka/pull/8508#issuecomment-616806079


   Java 8 passed.
   Java11:  
`kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`
   Java14 failed due to #8519 (already merged).
   
   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8517: MINOR: avoid unnecessary delay conversion in isDelayed check

2020-04-20 Thread GitBox


ijuma commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411779855



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -33,7 +33,11 @@ class DelayedItem(val delayMs: Long) extends Delayed with 
Logging {
* The remaining delay time
*/
   def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+unit.convert(getDelayMs, TimeUnit.MILLISECONDS)

Review comment:
   It seems like it's only used in a test. So I suggest we remove it (as 
well as the Delayed implementation).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411810700



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411813351



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411828397



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411828565



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] chia7712 commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


chia7712 commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411845809



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+dueNs - time.nanoseconds > 0
   }
 
   def compareTo(d: Delayed): Int = {

Review comment:
   Is it necessary to accept Delayed type now?

##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+dueNs - time.nanoseconds > 0

Review comment:
   How about dueNs > time.nanoseconds ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on issue #8508: MINOR: Improve usage of LogCaptureAppender

2020-04-20 Thread GitBox


mjsax commented on issue #8508:
URL: https://github.com/apache/kafka/pull/8508#issuecomment-616934105


   Java 8: 
`org.apache.kafka.streams.integration.StandbyTaskEOSIntegrationTest.surviveWithOneTaskAsStandby[exactly_once_beta]`
   Java 11: 
`org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication`
   Java 14: 
`org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores`
   
   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411858555



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -1610,79 +1610,6 @@ public void 
shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenN
 )));
 }
 
-@Test
-public void 
shouldReturnNormalAssignmentForOldAndFutureInstancesDuringVersionProbing() {

Review comment:
   I don't mean to totally cop out on this, but I think we should do this 
in a followup PR. I'll make a ticket and assign it to myself for later so I 
can't escape, but I don't even think it's worth marking it `@Ignore` for now.
   Tbh we should have removed it a while ago, rather than changing it over time 
to become its useless self today. It's a long history, and I'm mostly 
responsible, but just looking ahead the question now is: what do we even want 
to validate? The task assignor has no knowledge of version probing, and the 
partition assignor is not responsible for the task assignment (whereas it used 
to be with version probing, hence this test). What we should do is validate the 
inputs are being assembled sensibly during version probing.
   Anyways this will be really difficult to do just based on the final 
partition assignment, and even harder to distinguish a real failure from an 
unrelated one. So I'd propose to kick this into the future, when we embed the 
actual assignor class in the configs instead of this flag, and then pass in a 
`VersionProbingClientStatesValidatingAssignor` or whatever...SG?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411858808



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -1610,79 +1610,6 @@ public void 
shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenN
 )));
 }
 
-@Test
-public void 
shouldReturnNormalAssignmentForOldAndFutureInstancesDuringVersionProbing() {

Review comment:
   Probably a much longer answer than you ever wanted, but this test has 
been haunting me over many PRs ï‘€ 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411866042



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+dueNs - time.nanoseconds > 0
   }
 
   def compareTo(d: Delayed): Int = {
 val other = d.asInstanceOf[DelayedItem]
-java.lang.Long.compare(dueMs, other.dueMs)
+java.lang.Long.compare(dueNs, other.dueNs)
   }
 
+  override def toString: String = {
+"DelayedItem(delayMs="+(dueNs-time.nanoseconds())+")"

Review comment:
   I changed it to convert to millis since nanoseconds aren't very readable 
and this is closer to the existing behavior.

##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+dueNs - time.nanoseconds > 0
   }
 
   def compareTo(d: Delayed): Int = {

Review comment:
   Fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411814466



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411813832



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] chia7712 commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


chia7712 commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411847662



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)

Review comment:
   Should we handle the overflow?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411864699



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+dueNs - time.nanoseconds > 0
   }
 
   def compareTo(d: Delayed): Int = {

Review comment:
   Good point. I'll remove that. I don't think we use it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411864205



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)

Review comment:
   Any idea how to efficiently handle the overflow here? I think it's very 
unlikely we'll ever hit it given our normal delays.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411863881



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+dueNs - time.nanoseconds > 0
   }
 
   def compareTo(d: Delayed): Int = {
 val other = d.asInstanceOf[DelayedItem]
-java.lang.Long.compare(dueMs, other.dueMs)
+java.lang.Long.compare(dueNs, other.dueNs)
   }
 
+  override def toString: String = {
+"DelayedItem(delayMs="+(dueNs-time.nanoseconds())+")"

Review comment:
   Thanks, I actually forgot the unit conversion here. I was thinking it'd 
still be nicer to print it in ms. What do you think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411812127



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411821268



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r41182



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();

Review comment:
   Yeah I 

[GitHub] [kafka] chia7712 commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


chia7712 commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411846617



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+dueNs - time.nanoseconds > 0
   }
 
   def compareTo(d: Delayed): Int = {
 val other = d.asInstanceOf[DelayedItem]
-java.lang.Long.compare(dueMs, other.dueMs)
+java.lang.Long.compare(dueNs, other.dueNs)
   }
 
+  override def toString: String = {
+"DelayedItem(delayMs="+(dueNs-time.nanoseconds())+")"

Review comment:
   typo: delayMs -> delayNs





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411821268



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411821268



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411821268



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: avoid unnecessary delay conversion in isDelayed check

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411821681



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -33,7 +33,11 @@ class DelayedItem(val delayMs: Long) extends Delayed with 
Logging {
* The remaining delay time
*/
   def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+unit.convert(getDelayMs, TimeUnit.MILLISECONDS)

Review comment:
   Sure thing. I'll drop it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411821268



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] abbccdda commented on issue #8522: KAFKA-9868: Reduce transaction log partitions for embed broker

2020-04-20 Thread GitBox


abbccdda commented on issue #8522:
URL: https://github.com/apache/kafka/pull/8522#issuecomment-616917436


   Got 2/3 green, one failed test is due to
   ```
   kafka.api.SaslSslConsumerTest.testCoordinatorFailover
   
   java.lang.AssertionError: expected: but 
was:
   ```
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411826785



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411826782



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();

Review comment:
   Yep, 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411841909



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();

Review comment:
   

[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411863591



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+dueNs - time.nanoseconds > 0

Review comment:
   Thanks, that was a lazy code conversion





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411813053



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();

Review comment:
   I might 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411815724



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();

Review comment:
   I 

[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411815738



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411818919



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411818806



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411840027



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -33,7 +33,11 @@ class DelayedItem(val delayMs: Long) extends Delayed with 
Logging {
* The remaining delay time
*/
   def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+unit.convert(getDelayMs, TimeUnit.MILLISECONDS)

Review comment:
   @ijuma I have removed getDelay and we no longer implement Delayed. In 
doing so I also switched the fetcher to a monotonic clock, as our existing 
implementation is dangerous.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411866242



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+dueNs - time.nanoseconds > 0

Review comment:
   Fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411825756



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] LiamClarkeNZ commented on a change in pull request #8520: Add explicit grace period to tumbling window example

2020-04-21 Thread GitBox


LiamClarkeNZ commented on a change in pull request #8520:
URL: https://github.com/apache/kafka/pull/8520#discussion_r412007274



##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -3262,12 +3262,15 @@ KTable-KTable 
Foreign-Key
 import org.apache.kafka.streams.kstream.TimeWindows;
 
 // A tumbling time window with a size of 5 minutes (and, by 
definition, an implicit
-// advance interval of 5 minutes).
+// advance interval of 5 minutes). Note the explicit grace 
period, as the current
+// default value is 24 hours, which may be larger than needed 
for smaller windows. 
+// Note that this default may change in future major version 
releases.

Review comment:
   Kia ora @vvcephei I have removed the comment in my latest commit :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-04-21 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r412049922



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##
@@ -1127,7 +1127,7 @@ public void 
shouldReadCommittedOffsetAndRethrowTimeoutWhenCompleteRestoration()
 }
 
 @Test
-public void shouldNotReInitializeTopologyWhenResuming() throws IOException 
{
+public void shouldNotReInitializeTopologyWhenResumingWithFalseFlag() 
throws IOException {

Review comment:
   fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-04-21 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r412020510



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##
@@ -93,6 +96,29 @@ long partitionTimestamp(final TopicPartition partition) {
 return queue.partitionTime();
 }
 
+// creates queues for new partitions, removes old queues, saves cached 
records for previously assigned partitions
+void updatePartitions(final Set newInputPartitions, final 
Function recordQueueCreator) {
+final Set removedPartitions = new HashSet<>();
+final Iterator> queuesIterator 
= partitionQueues.entrySet().iterator();
+while (queuesIterator.hasNext()) {
+final Map.Entry queueEntry = 
queuesIterator.next();
+final TopicPartition topicPartition = queueEntry.getKey();
+if (newInputPartitions.contains(topicPartition)) {

Review comment:
   Yes, can rephrased as you offer.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-04-21 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r412020510



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##
@@ -93,6 +96,29 @@ long partitionTimestamp(final TopicPartition partition) {
 return queue.partitionTime();
 }
 
+// creates queues for new partitions, removes old queues, saves cached 
records for previously assigned partitions
+void updatePartitions(final Set newInputPartitions, final 
Function recordQueueCreator) {
+final Set removedPartitions = new HashSet<>();
+final Iterator> queuesIterator 
= partitionQueues.entrySet().iterator();
+while (queuesIterator.hasNext()) {
+final Map.Entry queueEntry = 
queuesIterator.next();
+final TopicPartition topicPartition = queueEntry.getKey();
+if (newInputPartitions.contains(topicPartition)) {

Review comment:
   Yes, can rephrase as you offer.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on issue #8311: KAFKA-9434: automated protocol for alterReplicaLogDirs

2020-04-21 Thread GitBox


tombentley commented on issue #8311:
URL: https://github.com/apache/kafka/pull/8311#issuecomment-617069226


   @dajac thanks for the review, I've addressed all your comments. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #8525: [WIP] KAFKA-9885; Evict last members of a group when the maximum allowed is reached

2020-04-21 Thread GitBox


dajac opened a new pull request #8525:
URL: https://github.com/apache/kafka/pull/8525


   WIP
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   4   5   6   7   8   9   10   >