kkonstantine commented on a change in pull request #8805:
URL: https://github.com/apache/kafka/pull/8805#discussion_r436950305



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
         // Base set: The previous assignment of connectors-and-tasks is a 
standalone snapshot that
         // can be used to calculate derived sets
         log.debug("Previous assignments: {}", previousAssignment);
+        int lastCompletedGenerationId = 
coordinator.lastCompletedGenerationId();
+        if (previousGenerationId != lastCompletedGenerationId) {
+            log.debug("Emptying previous assignments due to generation 
mismatch between previous "
+                    + "generation ID {} and last completed generation ID {} 
since the last assignment: {}",
+                    previousGenerationId, lastCompletedGenerationId, 
previousAssignment);

Review comment:
       This is related to the leader's internal bookkeeping when it calculates 
a new assignment. It's not related to the tasks that a worker (even the leader) 
is actually running. 
   
   Emptying/clearing the previous assignment might result in some tasks 
shuffling around, because the leader will calculate an assignment from scratch, 
but it doesn't affect running tasks. The new computed assignment will send 
assignment and/or revocations as needed based on a) what tasks each worker has 
reported running in this round and which tasks are configured in the config 
topic. Another way to say this is that the leader won't bother detecting lost 
tasks in this round. Every unassigned task will be treated as a new task. 
   
   You are right on the log message not conveying that meaning exactly. How 
about: 
   ```
   log.debug("Clearing the view of previous assignments due to generation 
mismatch between "
                       + "previous generation ID {} and last completed 
generation ID {}. ",
                       previousGenerationId, lastCompletedGenerationId);
   log.debug("This can happen if the leader fails to sync the assignment within 
a " 
                       + "rebalancing round. The following view of previous 
assignments might be "
                       + "outdated and will be ignored by the leader in the 
current computation of " 
                       + "new assignments. Possibly outdated previous 
assignments: {}", previousAssignment);
   ```
   
   
   

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
         // Base set: The previous assignment of connectors-and-tasks is a 
standalone snapshot that
         // can be used to calculate derived sets
         log.debug("Previous assignments: {}", previousAssignment);
+        int lastCompletedGenerationId = 
coordinator.lastCompletedGenerationId();
+        if (previousGenerationId != lastCompletedGenerationId) {
+            log.debug("Emptying previous assignments due to generation 
mismatch between previous "
+                    + "generation ID {} and last completed generation ID {} 
since the last assignment: {}",
+                    previousGenerationId, lastCompletedGenerationId, 
previousAssignment);

Review comment:
       Also, given that the previous assignments are printed in `debug`, I 
think it makes sense to keep these log messages in debug as well. 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -361,6 +369,14 @@ protected void handleLostAssignments(ConnectorsAndTasks 
lostAssignments,
         log.debug("Found the following connectors and tasks missing from 
previous assignments: "
                 + lostAssignments);
 
+        if (previousMembers.size() == memberConfigs.size() && 
scheduledRebalance <= 0) {
+            log.debug("Group size is same between rebalances. Lost assignments 
are probably due to lost SyncGroup "
+                    + "responses. Treating lost tasks as new tasks");

Review comment:
       How about: 
   
   ```
   log.debug("The number of workers remained the same between rebalances. The 
missing " 
                       + "assignments that the leader is detecting are probably 
due to some workers " 
                       + "failing to receive the new assignments in the 
previous rebalance. Will "
                       + "reassign missing tasks as new tasks");
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
         // Base set: The previous assignment of connectors-and-tasks is a 
standalone snapshot that
         // can be used to calculate derived sets
         log.debug("Previous assignments: {}", previousAssignment);
+        int lastCompletedGenerationId = 
coordinator.lastCompletedGenerationId();
+        if (previousGenerationId != lastCompletedGenerationId) {
+            log.debug("Emptying previous assignments due to generation 
mismatch between previous "
+                    + "generation ID {} and last completed generation ID {} 
since the last assignment: {}",
+                    previousGenerationId, lastCompletedGenerationId, 
previousAssignment);

Review comment:
       I agree. The length will be very similar anyways. 
   I'm pushing a commit to address your comments. 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -361,6 +373,16 @@ protected void handleLostAssignments(ConnectorsAndTasks 
lostAssignments,
         log.debug("Found the following connectors and tasks missing from 
previous assignments: "
                 + lostAssignments);
 
+        if (previousMembers.size() == memberConfigs.size() && 
scheduledRebalance <= 0) {

Review comment:
       I like the idea of checking the member configs. Could probably allow us 
to avoid duplicate tasks, even in this rare scenario of replacement of a 
departing node within the rebalance round itself. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to