dajac commented on code in PR #22488:
URL: https://github.com/apache/kafka/pull/22488#discussion_r3430983551
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4149,19 +4173,98 @@ private UpdateTargetAssignmentResult<Assignment>
maybeUpdateTargetAssignment(
group.groupId(), groupEpoch, preferredServerAssignor,
assignorTimeMs);
}
- records.addAll(assignmentResult.records());
+ return assignmentResult;
+ };
+
+ if (offloadAssignor) {
+ assignmentResultBuilder.freezeInputs();
Review Comment:
This smells a bit to be honest with you. Why do we need it? Have you
considered having the builder build the input the assignor (e.g. as a record)?
The prepared input could be a copy or not depending on whether offloading is
enabled.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2453,6 +2456,7 @@ private
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
) throws ApiException {
final long currentTimeMs = time.milliseconds();
final List<CoordinatorRecord> records = new ArrayList<>();
+ final CompletableFuture<Void> appendFuture = new CompletableFuture<>();
Review Comment:
Could we avoid having to rely on the `appendFuture`? We mainly introduce it
for the classic protocol and I would like to contain is as much as possible.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4149,19 +4173,98 @@ private UpdateTargetAssignmentResult<Assignment>
maybeUpdateTargetAssignment(
group.groupId(), groupEpoch, preferredServerAssignor,
assignorTimeMs);
}
- records.addAll(assignmentResult.records());
+ return assignmentResult;
+ };
+
+ if (offloadAssignor) {
+ assignmentResultBuilder.freezeInputs();
+ Map<String, String> previousStaticMembers =
Map.copyOf(updatedMembersAndTargetAssignment.staticMembers());
+ // The view is backed by the group's timeline data structures,
which may have been
+ // modified by the time the executor task runs.
+ updatedMembersAndTargetAssignment.close();
+
+ executor.schedule(
+ targetAssignmentUpdateKey,
+ buildTargetAssignment::get,
+ (result, exception) -> {
+ if (exception != null) {
+ log.error("[GroupId {}] Failed to compute a new target
assignment for epoch {}: {}.", group.groupId(), groupEpoch,
exception.getMessage(), exception);
+ return new CoordinatorResult<>(List.of());
+ }
+
+ try {
+ ConsumerGroup consumerGroup =
consumerGroup(group.groupId());
+ if (consumerGroup.assignmentEpoch() >= groupEpoch) {
+ // The assignment epoch is already caught up.
+ // Writing this record would backslide it.
+ log.debug("[GroupId {}] Discarding stale offloaded
target assignment for epoch {} (current assignment epoch is {}).",
+ group.groupId(), groupEpoch,
consumerGroup.assignmentEpoch());
+ return new CoordinatorResult<>(List.of());
+ }
+
+ log.debug("[GroupId {}] Received updated target
assignment for epoch {}: {}.",
+ group.groupId(), groupEpoch,
result.targetAssignment());
+
+ TargetAssignmentRecordsBuilder<Assignment>
assignmentRecordsBuilder =
+ new
TargetAssignmentRecordsBuilder.ConsumerTargetAssignmentRecordsBuilder(logContext,
group.groupId())
+ .withAssignmentEpoch(groupEpoch)
+
.withAssignmentTimestampMs(result.assignmentTimestampMs())
+ .withCurrentMemberIds(group.members().keySet())
+
.withPreviousStaticMembers(previousStaticMembers)
+
.withCurrentStaticMembers(group.staticMembers())
+
.withCurrentTargetAssignment(group.targetAssignment())
+
.withNewTargetAssignment(result.targetAssignment());
+
+ return new
CoordinatorResult<>(assignmentRecordsBuilder.build());
+ } catch (GroupIdNotFoundException ex) {
+ log.debug("[GroupId {}] Received updated target
assignment but the consumer group no longer exists.", group.groupId());
+ return new CoordinatorResult<>(List.of());
+ } catch (Throwable t) {
+ log.error("[GroupId {}] Failed to compute a new target
assignment for epoch {}: {}.", group.groupId(), groupEpoch, t.getMessage(), t);
+ return new CoordinatorResult<>(List.of());
+ }
+ }
+ );
- MemberAssignment newMemberAssignment =
assignmentResult.targetAssignment().get(updatedMember.memberId());
+ appendFuture.whenComplete((__, t) -> {
+ if (t != null) {
+ // The subscription change and group epoch have been
reverted. We must not use
+ // the new target assignment, since it will not correspond
to the latest
+ // subscription even if the group epoch matches.
+ executor.cancel(targetAssignmentUpdateKey);
Review Comment:
This would not work with client side assignor. I wonder if we could use a
different construct which is more generic. Have you thought about it?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java:
##########
@@ -417,32 +301,31 @@ public U withTopicAssignablePartitionsMap(
}
/**
- * Adds or updates a member. This is useful when the updated member is
- * not yet materialized in memory.
- *
- * @param memberId The member id.
- * @param member The member to add or update.
- * @return This object.
+ * Takes copies of the inputs backed by mutable collections, so that
+ * {@link TargetAssignmentBuilder#build()} can run on another thread.
*/
- public U addOrUpdateMember(
- String memberId,
- T member
- ) {
- this.updatedMembers.put(memberId, member);
- return self();
- }
+ public void freezeInputs() {
+ if (Thread.currentThread().getId() != creationThreadId) {
Review Comment:
I am not a fan of this. In my opinion, the builder must only be used to
build something. In our case, it must be the input to the assignor. We should
not carry on the builder itself as it kind of break the encapsulation.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4149,19 +4173,98 @@ private UpdateTargetAssignmentResult<Assignment>
maybeUpdateTargetAssignment(
group.groupId(), groupEpoch, preferredServerAssignor,
assignorTimeMs);
}
- records.addAll(assignmentResult.records());
+ return assignmentResult;
+ };
+
+ if (offloadAssignor) {
+ assignmentResultBuilder.freezeInputs();
+ Map<String, String> previousStaticMembers =
Map.copyOf(updatedMembersAndTargetAssignment.staticMembers());
+ // The view is backed by the group's timeline data structures,
which may have been
+ // modified by the time the executor task runs.
+ updatedMembersAndTargetAssignment.close();
+
+ executor.schedule(
+ targetAssignmentUpdateKey,
+ buildTargetAssignment::get,
+ (result, exception) -> {
+ if (exception != null) {
+ log.error("[GroupId {}] Failed to compute a new target
assignment for epoch {}: {}.", group.groupId(), groupEpoch,
exception.getMessage(), exception);
+ return new CoordinatorResult<>(List.of());
+ }
+
+ try {
+ ConsumerGroup consumerGroup =
consumerGroup(group.groupId());
+ if (consumerGroup.assignmentEpoch() >= groupEpoch) {
+ // The assignment epoch is already caught up.
+ // Writing this record would backslide it.
+ log.debug("[GroupId {}] Discarding stale offloaded
target assignment for epoch {} (current assignment epoch is {}).",
+ group.groupId(), groupEpoch,
consumerGroup.assignmentEpoch());
+ return new CoordinatorResult<>(List.of());
+ }
+
+ log.debug("[GroupId {}] Received updated target
assignment for epoch {}: {}.",
+ group.groupId(), groupEpoch,
result.targetAssignment());
+
+ TargetAssignmentRecordsBuilder<Assignment>
assignmentRecordsBuilder =
+ new
TargetAssignmentRecordsBuilder.ConsumerTargetAssignmentRecordsBuilder(logContext,
group.groupId())
+ .withAssignmentEpoch(groupEpoch)
+
.withAssignmentTimestampMs(result.assignmentTimestampMs())
+ .withCurrentMemberIds(group.members().keySet())
+
.withPreviousStaticMembers(previousStaticMembers)
Review Comment:
If I got it right, this is the only dependence on the group state at the
schedule time. Is it correct?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/OverlayMap.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import java.util.AbstractMap;
+import java.util.AbstractSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * A map which wraps an underlying base map and accepts incremental updates
+ * that overlay on top of it. This class expects the underlying base map to
+ * be immutable.
+ *
+ * <p>Null values are not supported.
+ *
+ * @param <K> The key type.
+ * @param <V> The value type.
+ */
+public class OverlayMap<K, V> extends AbstractMap<K, V> {
+ /** Whether the map has been closed. */
+ private boolean closed = false;
Review Comment:
closing a map is a bit weird, isn't it?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4149,19 +4173,98 @@ private UpdateTargetAssignmentResult<Assignment>
maybeUpdateTargetAssignment(
group.groupId(), groupEpoch, preferredServerAssignor,
assignorTimeMs);
}
- records.addAll(assignmentResult.records());
+ return assignmentResult;
+ };
+
+ if (offloadAssignor) {
+ assignmentResultBuilder.freezeInputs();
+ Map<String, String> previousStaticMembers =
Map.copyOf(updatedMembersAndTargetAssignment.staticMembers());
+ // The view is backed by the group's timeline data structures,
which may have been
+ // modified by the time the executor task runs.
+ updatedMembersAndTargetAssignment.close();
+
+ executor.schedule(
+ targetAssignmentUpdateKey,
+ buildTargetAssignment::get,
+ (result, exception) -> {
Review Comment:
nit: Should we define a method for this handler?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/OverlayMap.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import java.util.AbstractMap;
+import java.util.AbstractSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * A map which wraps an underlying base map and accepts incremental updates
+ * that overlay on top of it. This class expects the underlying base map to
+ * be immutable.
+ *
+ * <p>Null values are not supported.
+ *
+ * @param <K> The key type.
+ * @param <V> The value type.
+ */
+public class OverlayMap<K, V> extends AbstractMap<K, V> {
+ /** Whether the map has been closed. */
+ private boolean closed = false;
+
+ private final Map<K, V> base;
+
+ /** Entries whose keys are not in the base. */
+ private final Map<K, V> additions = new HashMap<>();
+
+ /** Entries whose keys are in the base, with a value that supersedes the
base value. */
+ private final Map<K, V> replacements = new HashMap<>();
+
+ /** Keys that are in the base but have been removed. */
+ private final Set<Object> removals = new HashSet<>();
+
+ private transient Set<Entry<K, V>> entrySet;
Review Comment:
It is the first time that I see `transient`. What's its purpose here?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4149,19 +4173,98 @@ private UpdateTargetAssignmentResult<Assignment>
maybeUpdateTargetAssignment(
group.groupId(), groupEpoch, preferredServerAssignor,
assignorTimeMs);
}
- records.addAll(assignmentResult.records());
+ return assignmentResult;
+ };
+
+ if (offloadAssignor) {
+ assignmentResultBuilder.freezeInputs();
+ Map<String, String> previousStaticMembers =
Map.copyOf(updatedMembersAndTargetAssignment.staticMembers());
+ // The view is backed by the group's timeline data structures,
which may have been
+ // modified by the time the executor task runs.
+ updatedMembersAndTargetAssignment.close();
Review Comment:
Does `close()` mean "duplicate it'?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]