lucasbru commented on code in PR #22551:
URL: https://github.com/apache/kafka/pull/22551#discussion_r3411919402
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -624,15 +647,19 @@ public CompletableFuture<StreamsGroupHeartbeatResult>
streamsGroupHeartbeat(
.setErrorCode(apiError.error().code())
.setErrorMessage(apiError.message()),
Map.of(),
+ -1,
+ -1,
-1
)
);
}
+ final String heartbeatGroupId = request.groupId();
Review Comment:
Can I not use request.groupId() directly below?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -352,6 +359,14 @@ public GroupCoordinatorService build() {
*/
private final PartitionMetadataClient partitionMetadataClient;
+ /**
+ * The broker-level component that owns the streams-group topology
description plugin
+ * (KIP-1331): plugin reference, per-group push back-off, and the three
entry points
+ * the service delegates into — heartbeat post-processing, the push RPC,
and the
+ * pre-tombstone hook on DeleteGroups.
+ */
+ private final TopologyDescriptionManager topologyDescriptionManager;
Review Comment:
Should we rename it to StreamsGroupTopologyDescriptionManager?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupHeartbeatResultTest.java:
##########
@@ -30,38 +30,51 @@
public class StreamsGroupHeartbeatResultTest {
@Test
- public void testThreeArgConstructorPreservesTopologyEpoch() {
+ public void testConstructorPreservesEpochs() {
StreamsGroupHeartbeatResult result = new StreamsGroupHeartbeatResult(
- new StreamsGroupHeartbeatResponseData(), Map.of(), 7);
+ new StreamsGroupHeartbeatResponseData(), Map.of(), 7, 5, 3);
assertEquals(7, result.currentTopologyEpoch());
+ assertEquals(5, result.storedDescriptionTopologyEpoch());
+ assertEquals(3, result.failedDescriptionTopologyEpoch());
}
@Test
public void testCurrentTopologyEpochIsPartOfEquality() {
- // Records derive equals from all components; two results with
different topology epochs are unequal.
StreamsGroupHeartbeatResult a = new StreamsGroupHeartbeatResult(
- new StreamsGroupHeartbeatResponseData(), Map.of(), 1);
+ new StreamsGroupHeartbeatResponseData(), Map.of(), 1, -1, -1);
StreamsGroupHeartbeatResult b = new StreamsGroupHeartbeatResult(
- new StreamsGroupHeartbeatResponseData(), Map.of(), 2);
+ new StreamsGroupHeartbeatResponseData(), Map.of(), 2, -1, -1);
assertNotEquals(a, b);
StreamsGroupHeartbeatResult c = new StreamsGroupHeartbeatResult(
- new StreamsGroupHeartbeatResponseData(), Map.of(), 1);
+ new StreamsGroupHeartbeatResponseData(), Map.of(), 1, -1, -1);
assertEquals(a, c);
}
+ @Test
+ public void testStoredAndFailedEpochsArePartOfEquality() {
+ StreamsGroupHeartbeatResult a = new StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData(), Map.of(), 1, 1, -1);
+ StreamsGroupHeartbeatResult differentStored = new
StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData(), Map.of(), 1, 0, -1);
+ StreamsGroupHeartbeatResult differentFailed = new
StreamsGroupHeartbeatResult(
+ new StreamsGroupHeartbeatResponseData(), Map.of(), 1, 1, 0);
+ assertNotEquals(a, differentStored);
+ assertNotEquals(a, differentFailed);
+ }
+
@Test
public void testCreatableTopicsMapIsImmutable() {
StreamsGroupHeartbeatResult result = new StreamsGroupHeartbeatResult(
- new StreamsGroupHeartbeatResponseData(), Map.of(), -1);
+ new StreamsGroupHeartbeatResponseData(), Map.of(), -1, -1, -1);
assertThrows(UnsupportedOperationException.class,
() -> result.creatableTopics().put("t", null));
}
@Test
public void testNullDataIsRejected() {
assertThrows(NullPointerException.class,
- () -> new StreamsGroupHeartbeatResult(null, Map.of(), -1));
+ () -> new StreamsGroupHeartbeatResult(null, Map.of(), -1, -1, -1));
assertTrue(true);
Review Comment:
Seems like `assertTrue(true)` doesn't do anything?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]