YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r862455239
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -200,283 +229,149 @@ protected Map<String, ByteBuffer>
performTaskAssignment(String leaderId, long ma
previousAssignment = activeAssignments;
canRevoke = true;
}
- previousRevocation.connectors().clear();
- previousRevocation.tasks().clear();
+ previousRevocation = ConnectorsAndTasks.EMPTY;
}
- // Derived set: The set of deleted connectors-and-tasks is a derived
set from the set
- // difference of previous - configured
- ConnectorsAndTasks deleted = diff(previousAssignment, configured);
- log.debug("Deleted assignments: {}", deleted);
-
- // Derived set: The set of remaining active connectors-and-tasks is a
derived set from the
- // set difference of active - deleted
- ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted);
- log.debug("Remaining (excluding deleted) active assignments: {}",
remainingActive);
-
- // Derived set: The set of lost or unaccounted connectors-and-tasks is
a derived set from
- // the set difference of previous - active - deleted
- ConnectorsAndTasks lostAssignments = diff(previousAssignment,
activeAssignments, deleted);
- log.debug("Lost assignments: {}", lostAssignments);
-
- // Derived set: The set of new connectors-and-tasks is a derived set
from the set
- // difference of configured - previous - active
- ConnectorsAndTasks newSubmissions = diff(configured,
previousAssignment, activeAssignments);
- log.debug("New assignments: {}", newSubmissions);
+ // The connectors and tasks that have been deleted since the last
rebalance
+ final ConnectorsAndTasks deleted =
ConnectorsAndTasks.diff(previousAssignment, configured);
+ log.trace("Deleted assignments: {}", deleted);
- // A collection of the complete assignment
- List<WorkerLoad> completeWorkerAssignment =
workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY);
- log.debug("Complete (ignoring deletions) worker assignments: {}",
completeWorkerAssignment);
+ // The connectors and tasks that are currently running on more than
one worker each
+ final ConnectorsAndTasks duplicated = duplicated(memberAssignments);
+ log.trace("Duplicated assignments: {}", duplicated);
- // Per worker connector assignments without removing deleted
connectors yet
- Map<String, Collection<String>> connectorAssignments =
-
completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker,
WorkerLoad::connectors));
- log.debug("Complete (ignoring deletions) connector assignments: {}",
connectorAssignments);
+ // The connectors and tasks that should already be running on the
cluster, but which are not included
+ // in the assignment reported by any workers in the cluster
+ final ConnectorsAndTasks lostAssignments =
ConnectorsAndTasks.diff(previousAssignment, activeAssignments, deleted);
Review Comment:
Could you explain what `lost` assignment meaning is?
As far as i know `ConnectorsAndTasks.diff` returns remainder after
subtracted assignments.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]