chia7712 commented on code in PR #16898:
URL: https://github.com/apache/kafka/pull/16898#discussion_r1750828687
##########
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##########
@@ -156,17 +164,42 @@ class CoordinatorLoaderImpl[T](
} else {
batch.asScala.foreach { record =>
numRecords = numRecords + 1
Review Comment:
(this comment is unrelated to this PR)
the `numBytes` is based on whole `memoryRecords`. By contrast, the
`numRecords` does not include control batch. Is it expected?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -14285,6 +14285,162 @@ public void testConsumerGroupDynamicConfigs() {
context.assertNoRebalanceTimeout(groupId, memberId);
}
+ @Test
+ public void testReplayConsumerGroupMemberMetadata() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
+ .setClientId("clientid")
+ .setClientHost("clienthost")
+ .setServerAssignorName("range")
+ .setRackId("rackid")
+ .setSubscribedTopicNames(Collections.singletonList("foo"))
+ .build();
+
+ // The group and the member are created if they do not exist.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord("foo",
member));
+ assertEquals(member,
context.groupMetadataManager.consumerGroup("foo").getOrMaybeCreateMember("member",
false));
+ }
+
+ @Test
+ public void testReplayConsumerGroupMemberMetadataTombstone() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ // The group still exists but the member is already gone. Replaying the
+ // ConsumerGroupMemberMetadata tombstone should be a no-op.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord("foo",
10));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("foo",
"m1"));
+
+ // The group may not exist at all. Replaying the
ConsumerGroupMemberMetadata tombstone
+ // should a no-op.
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord("bar",
"m1"));
Review Comment:
Should we check the existence of group/member to ensure it is indeed no-op
##########
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##########
@@ -140,12 +140,20 @@ class CoordinatorLoaderImpl[T](
batch.asScala.foreach { record =>
val controlRecord = ControlRecordType.parse(record.key)
if (controlRecord == ControlRecordType.COMMIT) {
+ if (isTraceEnabled) {
+ trace(s"Replaying end transaction marker from $tp at
offset ${record.offset} to commit transaction " +
+ s"with producer id ${batch.producerId} and producer
epoch ${batch.producerEpoch}.")
+ }
coordinator.replayEndTransactionMarker(
batch.producerId,
batch.producerEpoch,
TransactionResult.COMMIT
)
} else if (controlRecord == ControlRecordType.ABORT) {
+ if (isTraceEnabled) {
Review Comment:
Do we need to log something for other `ControlRecordType` instead of
ignoring them?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]