vvcephei commented on a change in pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#discussion_r429352759



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -334,6 +334,7 @@ public String toString() {
             ") prevStandbyTasks: (" + prevStandbyTasks +
             ") prevOwnedPartitionsByConsumerId: (" + ownedPartitions.keySet() +
             ") changelogOffsetTotalsByTask: (" + taskOffsetSums.entrySet() +
+            ") taskLagTotals: (" + taskLagTotals.entrySet() +

Review comment:
       I found this useful while debugging the system test.

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1148,6 +1148,9 @@ private void verifyMaxInFlightRequestPerConnection(final 
Object maxInFlightReque
         consumerProps.put(REPLICATION_FACTOR_CONFIG, 
getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(APPLICATION_SERVER_CONFIG, 
getString(APPLICATION_SERVER_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, 
getInt(NUM_STANDBY_REPLICAS_CONFIG));
+        consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, 
getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));
+        consumerProps.put(MAX_WARMUP_REPLICAS_CONFIG, 
getInt(MAX_WARMUP_REPLICAS_CONFIG));
+        consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, 
getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));

Review comment:
       This is where we forgot to copy over the configs.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
##########
@@ -359,6 +359,10 @@ private AssignmentConfigs(final StreamsConfig configs) {
                           final Integer maxWarmupReplicas,
                           final Integer numStandbyReplicas,
                           final Long probingRebalanceIntervalMs) {
+            if (maxWarmupReplicas < 1) {
+                throw new IllegalArgumentException("must configure at least 
one warmup replica");
+            }
+

Review comment:
       I added this constraint to mirror the constraint we already apply in 
StreamConfig. It's not critical, but I was disappointed that I had written a 
bunch of tests that included a technically invalid configuration.
   
   I'll write a test for this...

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -90,15 +91,12 @@ public boolean assign(final Map<UUID, ClientState> clients,
 
         assignStatelessActiveTasks(clientStates, diff(TreeSet::new, 
allTaskIds, statefulTasks));
 
-        // We shouldn't plan a probing rebalance if we _needed_ task 
movements, but couldn't do any
-        // due to being configured for no warmups.
-        final boolean probingRebalanceNeeded =
-            configs.maxWarmupReplicas > 0 && neededActiveTaskMovements + 
neededStandbyTaskMovements > 0;
+        final boolean probingRebalanceNeeded = neededActiveTaskMovements + 
neededStandbyTaskMovements > 0;
 
         log.info("Decided on assignment: " +
                      clientStates +
-                     " with " +
-                     (probingRebalanceNeeded ? "" : "no") +
+                     " with" +
+                     (probingRebalanceNeeded ? "" : " no") +

Review comment:
       Fixing a double-space we were printing when there was a followup. (It 
would say `with  followup`)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -90,15 +91,12 @@ public boolean assign(final Map<UUID, ClientState> clients,
 
         assignStatelessActiveTasks(clientStates, diff(TreeSet::new, 
allTaskIds, statefulTasks));
 
-        // We shouldn't plan a probing rebalance if we _needed_ task 
movements, but couldn't do any
-        // due to being configured for no warmups.
-        final boolean probingRebalanceNeeded =
-            configs.maxWarmupReplicas > 0 && neededActiveTaskMovements + 
neededStandbyTaskMovements > 0;
+        final boolean probingRebalanceNeeded = neededActiveTaskMovements + 
neededStandbyTaskMovements > 0;

Review comment:
       Since we can't have zero warmups, we don't need this condition (that I 
added in https://github.com/apache/kafka/pull/8696)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -241,16 +239,29 @@ private static void assignStatelessActiveTasks(final 
TreeMap<UUID, ClientState>
         final Map<TaskId, SortedSet<UUID>> taskToCaughtUpClients = new 
HashMap<>();
 
         for (final TaskId task : statefulTasks) {
-
+            final TreeSet<UUID> caughtUpClients = new TreeSet<>();

Review comment:
       A short-lived, empty TreeSet costs practically nothing, and I found the 
other logic (with null meaning empty) a bit confusing during debugging.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -56,14 +56,17 @@ private int numCaughtUpClients() {
         return caughtUpClients.size();
     }
 
-    /**
-     * @return true if this client is caught-up for this task, or the task has 
no caught-up clients
-     */
+    private static boolean 
taskIsNotCaughtUpOnClientAndOtherCaughtUpClientsExist(final TaskId task,
+                                                                               
  final UUID client,
+                                                                               
  final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients) {
+        return !taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, 
tasksToCaughtUpClients);
+    }

Review comment:
       Expanding DeMorgan's law at @cadonna 's request (which I also 
appreciated).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -241,16 +239,29 @@ private static void assignStatelessActiveTasks(final 
TreeMap<UUID, ClientState>
         final Map<TaskId, SortedSet<UUID>> taskToCaughtUpClients = new 
HashMap<>();
 
         for (final TaskId task : statefulTasks) {
-
+            final TreeSet<UUID> caughtUpClients = new TreeSet<>();
             for (final Map.Entry<UUID, ClientState> clientEntry : 
clientStates.entrySet()) {
                 final UUID client = clientEntry.getKey();
                 final long taskLag = clientEntry.getValue().lagFor(task);
-                if (taskLag == Task.LATEST_OFFSET || taskLag <= 
acceptableRecoveryLag) {
-                    taskToCaughtUpClients.computeIfAbsent(task, ignored -> new 
TreeSet<>()).add(client);
+                if (active(taskLag) || unbounded(acceptableRecoveryLag) || 
acceptable(acceptableRecoveryLag, taskLag)) {

Review comment:
       I realized that our condition was actually wrong here. In addition to 
all the zero-or-greater lags, there are two negative lags, one meaning 
"unknown" (-1), and one meaning "latest" (-2). When we said `taskLag <= 
acceptableRecoveryLag`, it was unintentionally encompassing the sentinel values 
as well. Even if we want a sentinel to be considered "caught up" (as with 
"Latest"), we should do so explicitly, not by mathematical coincidence.
   
   I also added a special case when acceptableRecoveryLag is set to MAX_VALUE 
to indicate that all tasks, regardless of their lag (even if it's a sentinel), 
are to be considered caught-up.
   
   I also found the boolean expression with all the conditionals a little hard 
to read, so I pulled out some semantic methods.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##########
@@ -150,7 +147,7 @@ public void 
shouldAssignActiveStatefulTasksEvenlyOverClientsWhereNumberOfClients
             clientStates,
             allTaskIds,
             allTaskIds,
-            new AssignmentConfigs(0L, 0, 0, 0L)
+            new AssignmentConfigs(0L, 1, 0, 0L)

Review comment:
       All these tests erroneously set "max warmups" to zero.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##########
@@ -81,7 +81,7 @@
     );
 
     @Test
-    public void shouldBeStickyForActiveAndStandbyTasksEvenIfNoWarmups() {
+    public void shouldBeStickyForActiveAndStandbyTasksWhileWarmingUp() {

Review comment:
       The diff is misaligned. I removed 
`shouldBeStickyForActiveAndStandbyTasksEvenIfNoWarmups` and added 
`shouldSkipWarmupsWhenAcceptableLagIsMax`. 

##########
File path: tests/kafkatest/services/streams.py
##########
@@ -44,6 +44,9 @@ class StreamsTestBaseService(KafkaPathResolverMixin, 
JmxMixin, Service):
     CLEAN_NODE_ENABLED = True
 
     logs = {
+        "streams_config": {
+            "path": CONFIG_FILE,
+            "collect_default": True},

Review comment:
       It was handy to be able to see the used config file while debugging.

##########
File path: tests/kafkatest/services/streams.py
##########
@@ -465,6 +468,9 @@ def prop_file(self):
         properties['reduce.topic'] = self.REDUCE_TOPIC
         properties['join.topic'] = self.JOIN_TOPIC
 
+        # Long.MAX_VALUE lets us do the assignment without a warmup
+        properties['acceptable.recovery.lag'] = "9223372036854775807"
+

Review comment:
       Added this configuration to fix the flaky 
`StreamsOptimizedTest.test_upgrade_optimized_topology`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to