dajac commented on code in PR #21652:
URL: https://github.com/apache/kafka/pull/21652#discussion_r2895141585
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java:
##########
@@ -113,6 +113,11 @@ public static class DeadlineAndEpoch {
*/
private final TimelineHashMap<Uuid, TimelineHashMap<Integer, String>>
invertedTargetAssignment;
+ /**
+ * The time at which the target assignment calculation finished.
+ */
+ protected final TimelineLong targetAssignmentTimestamp;
Review Comment:
Instead of having a single long here, have you considered using
`TimelineObject` with a record(epoch, ts)? It would reduce the number of
timeline objects that we have to track and it makes sense as they both always
go together. We could also have a single API to set epoch + ts instead of
having two APIs.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java:
##########
@@ -491,7 +510,8 @@ public TargetAssignmentResult build() throws
PartitionAssignorException {
}
// Bump the target assignment epoch.
- records.add(newTargetAssignmentEpochRecord(groupId, groupEpoch));
+ long timestampMs = time.milliseconds();
Review Comment:
I am debating whether we should just pass `now` to the build method as it is
the only thing that we need in the end. Thoughts?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsCoordinatorRecordHelpers.java:
##########
@@ -227,7 +235,8 @@ public static CoordinatorRecord
newStreamsGroupTargetAssignmentEpochRecord(
.setGroupId(groupId),
new ApiMessageAndVersion(
new StreamsGroupTargetAssignmentMetadataValue()
- .setAssignmentEpoch(assignmentEpoch),
+ .setAssignmentEpoch(assignmentEpoch)
+ .setTimestamp(timestamp),
Review Comment:
I have noticed that we are a bit inconsistent across the PR with the naming.
We use either Timestamp or AssignmentTimestamp. Should we align on the latter?
##########
group-coordinator/src/main/resources/common/message/ConsumerGroupTargetAssignmentMetadataValue.json:
##########
@@ -21,6 +21,9 @@
"flexibleVersions": "0+",
"fields": [
{ "name": "AssignmentEpoch", "versions": "0+", "type": "int32",
- "about": "The assignment epoch." }
+ "about": "The assignment epoch." },
+ // Added in 4.3 (KIP-1263).
+ { "name": "Timestamp", "versions": "0+", "taggedVersions": "0+", "tag": 0,
"type": "int64", "default": 0, "ignorable": true,
Review Comment:
We actually discussed it in the KIP's discuss thread. Using zero makes sense
here because this is actually the default value that we want to have when
reading old records. `-1` would have been helpful if knowing that the value was
not provided was required but it is not.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java:
##########
@@ -457,7 +457,7 @@ SyncGroupRequestData build() {
}
public static class Builder {
- private MockTime time = new MockTime(0, 0, 0);
+ private MockTime time = new MockTime(0, 1000, 1000);
Review Comment:
Is it better to change the default or to just set `Time` for those new tests?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -22417,6 +22509,7 @@ public void testConsumerGroupMemberClearsRegex() {
.setSubscribedTopicRegex("")
.setServerAssignor("range")
.setTopicPartitions(List.of()));
+ long targetAssignmentTimestamp = context.time.milliseconds();
Review Comment:
Is this variable really needed or could we inline it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]