YeonCheolGit commented on code in PR #12019:
URL: https://github.com/apache/kafka/pull/12019#discussion_r862493447
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -526,165 +421,174 @@ private List<WorkerLoad>
pickCandidateWorkerForReassignment(List<WorkerLoad> com
}
/**
- * Task revocation is based on an rough estimation of the lower average
number of tasks before
- * and after new workers join the group. If no new workers join, no
revocation takes place.
- * Based on this estimation, tasks are revoked until the new floor average
is reached for
- * each existing worker. The revoked tasks, once assigned to the new
workers will maintain
- * a balanced load among the group.
- *
- * @param activeAssignments
- * @param completeWorkerAssignment
- * @return
+ * Revoke connectors and tasks from each worker in the cluster until no
worker is running more than it would be if:
+ * <ul>
+ * <li>The allocation of connectors and tasks across the cluster were
as balanced as possible (i.e., the difference in allocation size between any
two workers is at most one)</li>
+ * <li>Any workers that left the group within the scheduled rebalance
delay permanently left the group</li>
+ * <li>All currently-configured connectors and tasks were allocated
(including instances that may be revoked in this round because they are
duplicated across workers)</li>
+ * </ul>
+ * @param configured the set of configured connectors and tasks across the
entire cluster
+ * @param workers the workers in the cluster, whose assignments should not
include any deleted or duplicated connectors or tasks
+ * that are already due to be revoked from the worker in
this rebalance
+ * @return which connectors and tasks should be revoked from which
workers; never null, but may be empty
+ * if no load-balancing revocations are necessary or possible
*/
- private Map<String, ConnectorsAndTasks>
performTaskRevocation(ConnectorsAndTasks activeAssignments,
-
Collection<WorkerLoad> completeWorkerAssignment) {
- int totalActiveConnectorsNum = activeAssignments.connectors().size();
- int totalActiveTasksNum = activeAssignments.tasks().size();
- Collection<WorkerLoad> existingWorkers =
completeWorkerAssignment.stream()
- .filter(wl -> wl.size() > 0)
- .collect(Collectors.toList());
- int existingWorkersNum = existingWorkers.size();
- int totalWorkersNum = completeWorkerAssignment.size();
- int newWorkersNum = totalWorkersNum - existingWorkersNum;
-
- if (log.isDebugEnabled()) {
- completeWorkerAssignment.forEach(wl -> log.debug(
+ private Map<String, ConnectorsAndTasks> performLoadBalancingRevocations(
+ final ConnectorsAndTasks configured,
+ final Collection<WorkerLoad> workers
+ ) {
+ if (log.isTraceEnabled()) {
+ workers.forEach(wl -> log.trace(
"Per worker current load size; worker: {} connectors: {}
tasks: {}",
wl.worker(), wl.connectorsSize(), wl.tasksSize()));
}
- Map<String, ConnectorsAndTasks> revoking = new HashMap<>();
- // If there are no new workers, or no existing workers to revoke tasks
from return early
- // after logging the status
- if (!(newWorkersNum > 0 && existingWorkersNum > 0)) {
- log.debug("No task revocation required; workers with existing
load: {} workers with "
- + "no load {} total workers {}",
- existingWorkersNum, newWorkersNum, totalWorkersNum);
- // This is intentionally empty but mutable, because the map is
used to include deleted
- // connectors and tasks as well
- return revoking;
+ if (workers.stream().allMatch(WorkerLoad::isEmpty)) {
+ log.trace("No load-balancing revocations required; all workers are
either new "
+ + "or will have all currently-assigned connectors and
tasks revoked during this round"
+ );
+ return Collections.emptyMap();
+ }
+ if (configured.isEmpty()) {
+ log.trace("No load-balancing revocations required; no connectors
are currently configured on this cluster");
+ return Collections.emptyMap();
}
- log.debug("Task revocation is required; workers with existing load: {}
workers with "
- + "no load {} total workers {}",
- existingWorkersNum, newWorkersNum, totalWorkersNum);
-
- // We have at least one worker assignment (the leader itself) so
totalWorkersNum can't be 0
- log.debug("Previous rounded down (floor) average number of connectors
per worker {}", totalActiveConnectorsNum / existingWorkersNum);
- int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
- int ceilConnectors = floorConnectors + ((totalActiveConnectorsNum %
totalWorkersNum == 0) ? 0 : 1);
- log.debug("New average number of connectors per worker rounded down
(floor) {} and rounded up (ceil) {}", floorConnectors, ceilConnectors);
-
-
- log.debug("Previous rounded down (floor) average number of tasks per
worker {}", totalActiveTasksNum / existingWorkersNum);
- int floorTasks = totalActiveTasksNum / totalWorkersNum;
- int ceilTasks = floorTasks + ((totalActiveTasksNum % totalWorkersNum
== 0) ? 0 : 1);
- log.debug("New average number of tasks per worker rounded down (floor)
{} and rounded up (ceil) {}", floorTasks, ceilTasks);
- int numToRevoke;
-
- for (WorkerLoad existing : existingWorkers) {
- Iterator<String> connectors = existing.connectors().iterator();
- numToRevoke = existing.connectorsSize() - ceilConnectors;
- for (int i = existing.connectorsSize(); i > floorConnectors &&
numToRevoke > 0; --i, --numToRevoke) {
- ConnectorsAndTasks resources = revoking.computeIfAbsent(
- existing.worker(),
- w -> new ConnectorsAndTasks.Builder().build());
- resources.connectors().add(connectors.next());
- }
+ final Map<String, ConnectorsAndTasks.Builder> result = new HashMap<>();
+
+ Map<String, Set<String>> connectorRevocations =
loadBalancingRevocations(
+ "connector",
+ configured.connectors().size(),
+ workers,
+ WorkerLoad::connectors
+ );
+ Map<String, Set<ConnectorTaskId>> taskRevocations =
loadBalancingRevocations(
+ "task",
+ configured.tasks().size(),
+ workers,
+ WorkerLoad::tasks
+ );
+
+ connectorRevocations.forEach((worker, revoked) ->
+ result.computeIfAbsent(worker, w -> ConnectorsAndTasks.builder())
+ .addConnectors(revoked)
+ );
+ taskRevocations.forEach((worker, revoked) ->
+ result.computeIfAbsent(worker, w -> ConnectorsAndTasks.builder())
+ .addTasks(revoked)
+ );
+
+ return buildAll(result);
+ }
+
+ private <E> Map<String, Set<E>> loadBalancingRevocations(
+ final String allocatedResourceName,
+ final int totalToAllocate,
+ final Collection<WorkerLoad> workers,
+ final Function<WorkerLoad, Collection<E>> workerAllocation
+ ) {
+ final int totalWorkers = workers.size();
+ // The minimum instances of this resource that should be assigned to
each worker
+ final int minAllocatedPerWorker = totalToAllocate / totalWorkers;
+ // How many workers are going to have to be allocated exactly one
extra instance
+ // (since the total number to allocate may not be a perfect multiple
of the number of workers)
+ final int extrasToAllocate = totalToAllocate % totalWorkers;
+ // Useful function to determine exactly how many instances of the
resource a given worker is currently allocated
+ final Function<WorkerLoad, Integer> workerAllocationSize =
workerAllocation.andThen(Collection::size);
+
+ final long workersAllocatedMinimum = workers.stream()
+ .map(workerAllocationSize)
+ .filter(n -> n == minAllocatedPerWorker)
+ .count();
+ final long workersAllocatedSingleExtra = workers.stream()
+ .map(workerAllocationSize)
+ .filter(n -> n == minAllocatedPerWorker + 1)
+ .count();
+ if (workersAllocatedSingleExtra == extrasToAllocate
+ && workersAllocatedMinimum + workersAllocatedSingleExtra ==
totalWorkers) {
+ log.trace(
+ "No load-balancing {} revocations required; the current
allocations, when combined with any newly-created {}, should be balanced",
+ allocatedResourceName,
+ allocatedResourceName
+ );
+ return Collections.emptyMap();
}
- for (WorkerLoad existing : existingWorkers) {
- Iterator<ConnectorTaskId> tasks = existing.tasks().iterator();
- numToRevoke = existing.tasksSize() - ceilTasks;
- log.debug("Tasks on worker {} is higher than ceiling, so revoking
{} tasks", existing, numToRevoke);
- for (int i = existing.tasksSize(); i > floorTasks && numToRevoke >
0; --i, --numToRevoke) {
- ConnectorsAndTasks resources = revoking.computeIfAbsent(
- existing.worker(),
- w -> new ConnectorsAndTasks.Builder().build());
- resources.tasks().add(tasks.next());
+ final Map<String, Set<E>> result = new HashMap<>();
+ // How many workers we've allocated a single extra resource instance to
+ int allocatedExtras = 0;
+ for (WorkerLoad worker : workers) {
+ int currentAllocationSizeForWorker =
workerAllocationSize.apply(worker);
+ if (currentAllocationSizeForWorker <= minAllocatedPerWorker) {
+ // This worker isn't allocated more than the minimum; no need
to revoke anything
+ continue;
+ }
+ int maxAllocationForWorker;
+ if (allocatedExtras < extrasToAllocate) {
+ // We'll allocate one of the extra resource instances to this
worker
+ allocatedExtras++;
+ if (currentAllocationSizeForWorker == minAllocatedPerWorker +
1) {
+ // If the worker's running exactly one more than the
minimum, and we're allowed to
+ // allocate an extra to it, there's no need to revoke
anything
+ continue;
+ }
+ maxAllocationForWorker = minAllocatedPerWorker + 1;
+ } else {
+ maxAllocationForWorker = minAllocatedPerWorker;
+ }
+
+ Set<E> revokedFromWorker = new LinkedHashSet<>();
+ result.put(worker.worker(), revokedFromWorker);
+
+ Iterator<E> currentWorkerAllocation =
workerAllocation.apply(worker).iterator();
+ // Revoke resources from the worker until it isn't allocated any
more than it should be
+ while (workerAllocationSize.apply(worker) >
maxAllocationForWorker) {
+ if (!currentWorkerAllocation.hasNext()) {
+ // Should never happen, but better to log a warning and
move on than die and fail the whole rebalance if it does
+ log.warn(
+ "Unexpectedly ran out of {}s to revoke from worker
{} while performing load-balancing revocations; " +
+ "worker appears to still be allocated {}
instances, which is more than the intended allocation of {}",
+ allocatedResourceName,
+ worker.worker(),
+ workerAllocationSize.apply(worker),
+ maxAllocationForWorker
+ );
+ break;
+ }
+ E revocation = currentWorkerAllocation.next();
+ revokedFromWorker.add(revocation);
+ // Make sure to remove the resource from the worker load so
that later operations
+ // (such as assigning newly-created connectors and tasks) can
take that into account
+ currentWorkerAllocation.remove();
}
}
+ return result;
+ }
- return revoking;
+ private int calculateDelay(long now) {
Review Comment:
I could see using many final parameters in Kafka code. So this just for code
convention and safety.
```suggestion
private int calculateDelay(final long now) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]