rhauch commented on a change in pull request #8805:
URL: https://github.com/apache/kafka/pull/8805#discussion_r436925315



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
         // Base set: The previous assignment of connectors-and-tasks is a 
standalone snapshot that
         // can be used to calculate derived sets
         log.debug("Previous assignments: {}", previousAssignment);
+        int lastCompletedGenerationId = 
coordinator.lastCompletedGenerationId();
+        if (previousGenerationId != lastCompletedGenerationId) {
+            log.debug("Emptying previous assignments due to generation 
mismatch between previous "
+                    + "generation ID {} and last completed generation ID {} 
since the last assignment: {}",
+                    previousGenerationId, lastCompletedGenerationId, 
previousAssignment);

Review comment:
       Can we make this a little easier to understand for most users? I think 
it might be sufficient to add some combination of:
   * what this means (e.g., the worker was partitioned and missed at least one 
rebalance rounds, likely due to a network issue), and 
   * what resulted (e.g., the workers gave up its tasks in case the cluster had 
reassigned them to another worker).
   
   And, should this be debug or info or warn? Warn seems wrong, since the user 
shouldn't do anything, but excessive #s of these could signal the need for 
additional tuning. WDYT?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -361,6 +369,14 @@ protected void handleLostAssignments(ConnectorsAndTasks 
lostAssignments,
         log.debug("Found the following connectors and tasks missing from 
previous assignments: "
                 + lostAssignments);
 
+        if (previousMembers.size() == memberConfigs.size() && 
scheduledRebalance <= 0) {
+            log.debug("Group size is same between rebalances. Lost assignments 
are probably due to lost SyncGroup "
+                    + "responses. Treating lost tasks as new tasks");

Review comment:
       Similar comment to that above.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
         // Base set: The previous assignment of connectors-and-tasks is a 
standalone snapshot that
         // can be used to calculate derived sets
         log.debug("Previous assignments: {}", previousAssignment);
+        int lastCompletedGenerationId = 
coordinator.lastCompletedGenerationId();
+        if (previousGenerationId != lastCompletedGenerationId) {
+            log.debug("Emptying previous assignments due to generation 
mismatch between previous "
+                    + "generation ID {} and last completed generation ID {} 
since the last assignment: {}",
+                    previousGenerationId, lastCompletedGenerationId, 
previousAssignment);

Review comment:
       Sounds good, though it'd be better to have a single (long) log message 
to prevent them from being separated by other log messages from other threads.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -361,6 +369,14 @@ protected void handleLostAssignments(ConnectorsAndTasks 
lostAssignments,
         log.debug("Found the following connectors and tasks missing from 
previous assignments: "
                 + lostAssignments);
 
+        if (previousMembers.size() == memberConfigs.size() && 
scheduledRebalance <= 0) {
+            log.debug("Group size is same between rebalances. Lost assignments 
are probably due to lost SyncGroup "
+                    + "responses. Treating lost tasks as new tasks");

Review comment:
       Sounds good.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -361,6 +373,16 @@ protected void handleLostAssignments(ConnectorsAndTasks 
lostAssignments,
         log.debug("Found the following connectors and tasks missing from 
previous assignments: "
                 + lostAssignments);
 
+        if (previousMembers.size() == memberConfigs.size() && 
scheduledRebalance <= 0) {

Review comment:
       Is it enough to trust that the # of workers has not changed, or should 
we compare the members, via something like:
   ```
   if (previousMembers.equals(memberConfigs.keySet()) && scheduledRebalance <= 
0) {
   ```
   IOW, what happens if one worker disappeared about the same time that an 
operator added a new worker?
   
   IIUC from the integration tests, this logic actually doesn't care which of 
these is the case -- all of the task assignments that were lost will be 
reassigned anyway, so it doesn't matter if the worker that gets those new 
assignments is the old worker that came back or a new worker. Is that right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to