squah-confluent commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2542895318
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -147,6 +149,29 @@ public String toLowerCaseString() {
private final TimelineObject<Boolean> hasSubscriptionMetadataRecord;
+ private final LogContext logContext;
+
+ private final Logger log;
Review Comment:
nit: I'd move this to the top of the fields, before
`TimelineObject<ConsumerGroupState> state`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -147,6 +149,29 @@ public String toLowerCaseString() {
private final TimelineObject<Boolean> hasSubscriptionMetadataRecord;
+ private final LogContext logContext;
Review Comment:
We can drop this field since we never use it.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -147,6 +149,29 @@ public String toLowerCaseString() {
private final TimelineObject<Boolean> hasSubscriptionMetadataRecord;
+ private final LogContext logContext;
+
+ private final Logger log;
+
+ public ConsumerGroup(
+ LogContext logContext,
+ SnapshotRegistry snapshotRegistry,
+ String groupId
+ ) {
Review Comment:
Can we remove the other constructor? In the tests we can fill in the extra
parameter with `new LogContext()`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -147,6 +149,29 @@ public String toLowerCaseString() {
private final TimelineObject<Boolean> hasSubscriptionMetadataRecord;
+ private final LogContext logContext;
+
+ private final Logger log;
+
+ public ConsumerGroup(
+ LogContext logContext,
+ SnapshotRegistry snapshotRegistry,
+ String groupId
+ ) {
+ super(snapshotRegistry, groupId);
+ this.state = new TimelineObject<>(snapshotRegistry, EMPTY);
+ this.staticMembers = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.serverAssignors = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.numClassicProtocolMembers = new TimelineInteger(snapshotRegistry);
+ this.classicProtocolMembersSupportedProtocols = new
TimelineHashMap<>(snapshotRegistry, 0);
+ this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry,
0);
+ this.subscribedRegularExpressions = new
TimelineHashMap<>(snapshotRegistry, 0);
+ this.resolvedRegularExpressions = new
TimelineHashMap<>(snapshotRegistry, 0);
+ this.hasSubscriptionMetadataRecord = new
TimelineObject<>(snapshotRegistry, false);
+ this.logContext = logContext;
+ this.log = logContext.logger(ConsumerGroup.class);
Review Comment:
nit: same, I'd move this to the top of the constructor, after super().
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -24057,4 +24057,318 @@ private Map<Uuid,
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsRes
return responseTopics.stream()
.collect(Collectors.toMap(DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic::topicId,
Function.identity()));
}
+
+ @Test
+ public void testConsumerGroupResolvesOnCompaction() {
Review Comment:
Is it possible to simpify the tests by removing the consumer group setup,
target assignment records and heartbearts? To check what's in the
`currentPartitionEpoch` map, we can assert on
`consumerGroup.currentPartitionEpoch(topicId, partitionId)`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]