Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-17 Thread via GitHub


dajac commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1569055496


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -998,4 +1094,232 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
 );
 return describedGroup;
 }
+
+/**
+ * Create a new consumer group according to the given classic group.
+ *
+ * @param snapshotRegistry  The SnapshotRegistry.
+ * @param metrics   The GroupCoordinatorMetricsShard.
+ * @param classicGroup  The converted classic group.
+ * @param topicsImage   The TopicsImage for topic id and topic name 
conversion.
+ * @return  The created ConsumerGruop.
+ */
+public static ConsumerGroup fromClassicGroup(
+SnapshotRegistry snapshotRegistry,
+GroupCoordinatorMetricsShard metrics,
+ClassicGroup classicGroup,
+TopicsImage topicsImage
+) {
+String groupId = classicGroup.groupId();
+ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+consumerGroup.setGroupEpoch(classicGroup.generationId());
+consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+classicGroup.allMembers().forEach(classicGroupMember -> {
+ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+ByteBuffer.wrap(classicGroupMember.assignment())
+);
+Map> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+
+ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(
+
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get()))
+);
+
+// The target assignment and the assigned partitions of each 
member are set based on the last
+// assignment of the classic group. All the members are put in the 
Stable state. If the classic
+// group was in Preparing Rebalance or Completing Rebalance 
states, the classic members are
+// asked to rejoin the group to re-trigger a rebalance or collect 
their assignments.
+ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(classicGroupMember.memberId())
+.setMemberEpoch(classicGroup.generationId())
+.setState(MemberState.STABLE)
+.setPreviousMemberEpoch(classicGroup.generationId())
+
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
+.setRackId(subscription.rackId().orElse(null))
+.setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
+.setClientId(classicGroupMember.clientId())
+.setClientHost(classicGroupMember.clientHost())
+.setSubscribedTopicNames(subscription.topics())
+.setAssignedPartitions(partitions)
+
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
+.build();
+consumerGroup.updateTargetAssignment(newMember.memberId(), new 
Assignment(partitions));
+consumerGroup.updateMember(newMember);
+});
+
+return consumerGroup;
+}
+
+/**
+ * Populate the record list with the records needed to create the given 
consumer group.
+ *
+ * @param records The list to which the new records are added.
+ */
+public void createConsumerGroupRecords(
+List records
+) {
+members().forEach((__, consumerGroupMember) ->
+records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), 
consumerGroupMember))
+);
+
+records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
groupEpoch()));
+
+members().forEach((consumerGroupMemberId, consumerGroupMember) ->
+records.add(RecordHelpers.newTargetAssignmentRecord(
+groupId(),
+consumerGroupMemberId,
+targetAssignment(consumerGroupMemberId).partitions()
+))
+);
+
+records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
groupEpoch()));
+
+members().forEach((__, consumerGroupMember) ->
+records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), 
consumerGroupMember))
+);
+}
+
+/**
+ * @return The map of topic id and partition set converted from the list 
of TopicPartition.
+ */
+private static Map> topicPartitionMapFromList(
+List partitions,
+TopicsImage topicsImage
+) {
+Map> topicPartitionMap = new HashMap<>();
+partitions.forEach(topicPartition -> {
+TopicImage topicImage = 
topicsImage.getTopic(topicPartition.topic());
+if (topicImage != null) {
+topicPartitionMap
+ 

Re: [PR] KAFKA-16445: Add PATCH method for connector config [kafka]

2024-04-17 Thread via GitHub


C0urante commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1568959278


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##
@@ -242,6 +242,19 @@ public Response putConnectorConfig(final 
@PathParam("connector") String connecto
 return response.entity(createdInfo.result()).build();
 }
 
+@PATCH
+@Path("/{connector}/config")
+public Response patchConnectorConfig(final @PathParam("connector") String 
connector,
+ final @Context HttpHeaders headers,
+ final @QueryParam("forward") Boolean 
forward,
+ final Map 
connectorConfigPatch) throws Throwable {
+FutureCallback> cb = new 
FutureCallback<>();
+herder.patchConnectorConfig(connector, connectorConfigPatch, cb);
+Herder.Created createdInfo = 
requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + 
"/config",
+"PATCH", headers, connectorConfigPatch, new 
TypeReference() { }, new CreatedConnectorInfoTranslator(), 
forward);
+return Response.ok().entity(createdInfo.result()).build();

Review Comment:
   Just realizing now that we don't actually specify the status and body of the 
REST response in the KIP. I agree with what's here, especially since it matches 
 the existing `PUT /connectors/{name}/config` endpoint, but it's worth 
specifying in the KIP for completeness.



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##
@@ -2336,6 +2336,95 @@ public void testPutConnectorConfig() throws Exception {
 verifyNoMoreInteractions(worker, member, configBackingStore, 
statusBackingStore);
 }
 
+@Test
+public void testPatchConnectorConfigNotFound() {
+ClusterConfigState clusterConfigState = new ClusterConfigState(
+0,
+null,
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptyMap(),
+Collections.emptySet(),
+Collections.emptySet());
+expectConfigRefreshAndSnapshot(clusterConfigState);
+
+Map connConfigPatch = new HashMap<>();
+connConfigPatch.put("foo1", "baz1");
+
+FutureCallback> patchCallback = new 
FutureCallback<>();
+herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback);
+herder.tick();
+assertTrue(patchCallback.isDone());
+ExecutionException exception = assertThrows(ExecutionException.class, 
patchCallback::get);
+assertInstanceOf(NotFoundException.class, exception.getCause());
+}
+
+@Test
+public void testPatchConnectorConfig() throws Exception {
+when(member.memberId()).thenReturn("leader");
+expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), 
true);
+
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
+
+Map originalConnConfig = new HashMap<>(CONN1_CONFIG);
+originalConnConfig.put("foo1", "bar1");
+originalConnConfig.put("foo2", "bar2");

Review Comment:
   Nit: can we add one more key/value pair that should be unchanged after the 
patch is applied? This would catch bugs where the set of post-patch keys is 
derived from the patch instead of the combination of the patch and the prior 
configuration.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##
@@ -242,6 +242,19 @@ public Response putConnectorConfig(final 
@PathParam("connector") String connecto
 return response.entity(createdInfo.result()).build();
 }
 
+@PATCH
+@Path("/{connector}/config")
+public Response patchConnectorConfig(final @PathParam("connector") String 
connector,
+ final @Context HttpHeaders headers,
+ final @QueryParam("forward") Boolean 
forward,

Review Comment:
   Nit:
   ```suggestion
final @Parameter(hidden = true) 
@QueryParam("forward") Boolean forward,
   ```



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##
@@ -751,6 +754,74 @@ public void testPutConnectorConfig() throws Exception {
 verifyNoMoreInteractions(connectorConfigCb);
 }
 
+@Test
+public void testPatchConnectorConfigNotFound() {
+Map connConfigPatch = new HashMap<>();
+connConfigPatch.put("foo1", "baz1");
+
+Callback> patchCallback = 
mock(Callback.class);
+herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, 

Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-17 Thread via GitHub


dajac commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1569051446


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -775,6 +777,126 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String 
memberId) {
+if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+consumerGroup.groupId());
+return false;
+} else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() <= 1) {
+log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+consumerGroup.groupId());
+}
+return true;
+}
+
+public CompletableFuture convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List records) {
+consumerGroup.createGroupTombstoneRecords(records);
+ClassicGroup classicGroup;
+try {
+classicGroup = consumerGroup.toClassicGroup(
+leavingMemberId,
+logContext,
+time,
+consumerGroupSessionTimeoutMs,
+metadataImage,
+records
+);
+} catch (SchemaException e) {
+log.warn("Cannot downgrade the consumer group " + 
consumerGroup.groupId() + ": fail to parse " +
+"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + 
".", e);
+
+throw new GroupIdNotFoundException(String.format("Cannot downgrade 
the classic group %s: %s.",
+consumerGroup.groupId(), e.getMessage()));
+}
+
+groups.put(consumerGroup.groupId(), classicGroup);
+metrics.onClassicGroupStateTransition(null, 
classicGroup.currentState());
+
+CompletableFuture appendFuture = new CompletableFuture<>();
+appendFuture.whenComplete((__, t) -> {
+if (t == null) {
+classicGroup.allMembers().forEach(member -> 
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
+prepareRebalance(classicGroup, String.format("Downgrade group 
%s.", classicGroup.groupId()));

Review Comment:
   I do agree with the `appendFuture` part of your explanation. However, I 
still believe that we should schedule the session timeouts and the start the 
rebalance before it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Refactor org.apache.kafka.server.config.Defaults to follow Java config pattern [kafka]

2024-04-17 Thread via GitHub


OmniaGM commented on PR #15260:
URL: https://github.com/apache/kafka/pull/15260#issuecomment-2061522296

   Am going to close this as we already started to move defaults out of Default 
and into the scoped config classes. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-17 Thread via GitHub


FrankYang0529 commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1569024432


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -801,8 +803,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri
 try {
 if (delete.execute())
 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-else if (logIfMissing)
-LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+else {
+if (logIfMissing) {
+LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+}
+
+// During alter log dir, the log segment may be moved to a new 
directory, so async delete may fail.
+// Fallback to delete the file in the new directory to avoid 
orphan file.
+Pattern dirPattern = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-future");

Review Comment:
   I think we can create a file like `LogDirUtils.java` in the future, so we 
don't need to define same variables like `FutureDirPattern` and 
`FutureDirSuffix` in `LogSegment.java` and `LocalLog.scala`. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig properties definition out of core [kafka]

2024-04-17 Thread via GitHub


OmniaGM closed pull request #15501: KAFKA-15853: Move KafkaConfig properties 
definition out of core
URL: https://github.com/apache/kafka/pull/15501


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig properties definition out of core [kafka]

2024-04-17 Thread via GitHub


OmniaGM commented on PR #15501:
URL: https://github.com/apache/kafka/pull/15501#issuecomment-2061520938

   Am going to close this one as it drifted away from the approach we took to 
break the configs out. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Refactor org.apache.kafka.server.config.Defaults to follow Java config pattern [kafka]

2024-04-17 Thread via GitHub


OmniaGM closed pull request #15260: KAFKA-15853: Refactor 
org.apache.kafka.server.config.Defaults to follow Java config pattern
URL: https://github.com/apache/kafka/pull/15260


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]

2024-04-17 Thread via GitHub


brandboat commented on code in PR #15719:
URL: https://github.com/apache/kafka/pull/15719#discussion_r1569022628


##
core/src/test/scala/unit/kafka/log/LogManagerTest.scala:
##
@@ -413,7 +413,7 @@ class LogManagerTest {
 assertEquals(numMessages * setSize / segmentBytes, log.numberOfSegments, 
"Check we have the expected number of segments.")
 
 // this cleanup shouldn't find any expired segments but should delete some 
to reduce size
-time.sleep(logManager.InitialTaskDelayMs)
+time.sleep(logManager.initialTaskDelayMs)
 assertEquals(6, log.numberOfSegments, "Now there should be exactly 6 
segments")
 time.sleep(log.config.fileDeleteDelayMs + 1)

Review Comment:
   Thanks for the explanation ! 
   > or maybe we can directly add verification inside these tests?
   
   I decided to follow the comment as you mentioned earlier, and updated the 
initialTaskDelayMs to 10s in LogManagerTests. Please take another look :smiley: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-17 Thread via GitHub


johnnychhsu commented on code in PR #15744:
URL: https://github.com/apache/kafka/pull/15744#discussion_r1569005655


##
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##
@@ -467,13 +470,33 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating 
the znode
+   * @throws ControllerMovedException if no controller is defined, or a KRaft 
controller is defined
*/
   def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: 
String, config: Properties): Unit = {
+val controllerRegistration = getControllerRegistration match {
+  case Some(registration) => registration
+  case None =>
+// This case is mainly here to make tests less flaky. In practice, 
there will always be a /controller ZNode

Review Comment:
   may I know why without this could make the test flaky?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16561: Disable `allow.auto.create.topics` in MirrorMaker2 Consumer Config [kafka]

2024-04-17 Thread via GitHub


OmniaGM commented on PR #15728:
URL: https://github.com/apache/kafka/pull/15728#issuecomment-2061493764

   Hi @aaron-ai thanks for the PR. Can you please have a look into the failing 
tests 
herehttps://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15728/1/#showFailuresLink
 I believe they are related for example 
   
   [MirrorConnectorConfigTest]( 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15728/1/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorConfigTest/Build___JDK_21_and_Scala_2_13___testSourceConsumerConfigWithSourcePrefix__/)
 is failing with 
   ```
   org.opentest4j.AssertionFailedError: source.consumer. source consumer config 
not matching ==> expected: <{enable.auto.commit=false, 
max.poll.interval.ms=100, auto.offset.reset=latest, 
client.id=source1->target2|ConnectorName|test}> but was: 
<{enable.auto.commit=false, max.poll.interval.ms=100, 
allow.auto.create.topics=false, auto.offset.reset=latest, 
client.id=source1->target2|ConnectorName|test}>
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix io-[wait-]ratio metrics description [kafka]

2024-04-17 Thread via GitHub


emitskevich-blp commented on code in PR #15722:
URL: https://github.com/apache/kafka/pull/15722#discussion_r1569005969


##
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##
@@ -1281,14 +1281,14 @@ private Meter createMeter(Metrics metrics, String 
groupName,  Map metricTags,
 String baseName, String action) {
 MetricName rateMetricName = metrics.metricName(baseName + 
"-ratio", groupName,
-String.format("*Deprecated* The fraction of time the I/O 
thread spent %s", action), metricTags);
+String.format("The fraction of time the I/O thread spent 
%s", action), metricTags);

Review Comment:
   Please find it in PR description



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix io-[wait-]ratio metrics description [kafka]

2024-04-17 Thread via GitHub


emitskevich-blp commented on code in PR #15722:
URL: https://github.com/apache/kafka/pull/15722#discussion_r1569005969


##
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##
@@ -1281,14 +1281,14 @@ private Meter createMeter(Metrics metrics, String 
groupName,  Map metricTags,
 String baseName, String action) {
 MetricName rateMetricName = metrics.metricName(baseName + 
"-ratio", groupName,
-String.format("*Deprecated* The fraction of time the I/O 
thread spent %s", action), metricTags);
+String.format("The fraction of time the I/O thread spent 
%s", action), metricTags);

Review Comment:
   Hi, please find it in PR description



##
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##
@@ -1281,14 +1281,14 @@ private Meter createMeter(Metrics metrics, String 
groupName,  Map metricTags,
 String baseName, String action) {
 MetricName rateMetricName = metrics.metricName(baseName + 
"-ratio", groupName,
-String.format("*Deprecated* The fraction of time the I/O 
thread spent %s", action), metricTags);
+String.format("The fraction of time the I/O thread spent 
%s", action), metricTags);

Review Comment:
   Please find it in PR description



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15309: Add custom error handler to Producer [kafka]

2024-04-17 Thread via GitHub


OmniaGM commented on PR #15731:
URL: https://github.com/apache/kafka/pull/15731#issuecomment-2061484219

   Hi @aliehsaeedii thanks for the PoC and the KIP. Can you please mark this 
PoC as draft PR as the KIP still under discussion?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-17 Thread via GitHub


dongnuo123 commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1568994033


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -998,4 +1094,232 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
 );
 return describedGroup;
 }
+
+/**
+ * Create a new consumer group according to the given classic group.
+ *
+ * @param snapshotRegistry  The SnapshotRegistry.
+ * @param metrics   The GroupCoordinatorMetricsShard.
+ * @param classicGroup  The converted classic group.
+ * @param topicsImage   The TopicsImage for topic id and topic name 
conversion.
+ * @return  The created ConsumerGruop.
+ */
+public static ConsumerGroup fromClassicGroup(
+SnapshotRegistry snapshotRegistry,
+GroupCoordinatorMetricsShard metrics,
+ClassicGroup classicGroup,
+TopicsImage topicsImage
+) {
+String groupId = classicGroup.groupId();
+ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+consumerGroup.setGroupEpoch(classicGroup.generationId());
+consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+classicGroup.allMembers().forEach(classicGroupMember -> {
+ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+ByteBuffer.wrap(classicGroupMember.assignment())
+);
+Map> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+
+ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(
+
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get()))
+);
+
+// The target assignment and the assigned partitions of each 
member are set based on the last
+// assignment of the classic group. All the members are put in the 
Stable state. If the classic
+// group was in Preparing Rebalance or Completing Rebalance 
states, the classic members are
+// asked to rejoin the group to re-trigger a rebalance or collect 
their assignments.
+ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(classicGroupMember.memberId())
+.setMemberEpoch(classicGroup.generationId())
+.setState(MemberState.STABLE)
+.setPreviousMemberEpoch(classicGroup.generationId())
+
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
+.setRackId(subscription.rackId().orElse(null))
+.setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
+.setClientId(classicGroupMember.clientId())
+.setClientHost(classicGroupMember.clientHost())
+.setSubscribedTopicNames(subscription.topics())
+.setAssignedPartitions(partitions)
+
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
+.build();
+consumerGroup.updateTargetAssignment(newMember.memberId(), new 
Assignment(partitions));
+consumerGroup.updateMember(newMember);
+});
+
+return consumerGroup;
+}
+
+/**
+ * Populate the record list with the records needed to create the given 
consumer group.
+ *
+ * @param records The list to which the new records are added.
+ */
+public void createConsumerGroupRecords(
+List records
+) {
+members().forEach((__, consumerGroupMember) ->
+records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), 
consumerGroupMember))
+);
+
+records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
groupEpoch()));
+
+members().forEach((consumerGroupMemberId, consumerGroupMember) ->
+records.add(RecordHelpers.newTargetAssignmentRecord(
+groupId(),
+consumerGroupMemberId,
+targetAssignment(consumerGroupMemberId).partitions()
+))
+);
+
+records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
groupEpoch()));
+
+members().forEach((__, consumerGroupMember) ->
+records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), 
consumerGroupMember))
+);
+}
+
+/**
+ * @return The map of topic id and partition set converted from the list 
of TopicPartition.
+ */
+private static Map> topicPartitionMapFromList(
+List partitions,
+TopicsImage topicsImage
+) {
+Map> topicPartitionMap = new HashMap<>();
+partitions.forEach(topicPartition -> {
+TopicImage topicImage = 
topicsImage.getTopic(topicPartition.topic());
+if (topicImage != null) {
+topicPartitionMap

Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-17 Thread via GitHub


dongnuo123 commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1568994033


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -998,4 +1094,232 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
 );
 return describedGroup;
 }
+
+/**
+ * Create a new consumer group according to the given classic group.
+ *
+ * @param snapshotRegistry  The SnapshotRegistry.
+ * @param metrics   The GroupCoordinatorMetricsShard.
+ * @param classicGroup  The converted classic group.
+ * @param topicsImage   The TopicsImage for topic id and topic name 
conversion.
+ * @return  The created ConsumerGruop.
+ */
+public static ConsumerGroup fromClassicGroup(
+SnapshotRegistry snapshotRegistry,
+GroupCoordinatorMetricsShard metrics,
+ClassicGroup classicGroup,
+TopicsImage topicsImage
+) {
+String groupId = classicGroup.groupId();
+ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+consumerGroup.setGroupEpoch(classicGroup.generationId());
+consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+classicGroup.allMembers().forEach(classicGroupMember -> {
+ConsumerPartitionAssignor.Assignment assignment = 
ConsumerProtocol.deserializeAssignment(
+ByteBuffer.wrap(classicGroupMember.assignment())
+);
+Map> partitions = 
topicPartitionMapFromList(assignment.partitions(), topicsImage);
+
+ConsumerPartitionAssignor.Subscription subscription = 
ConsumerProtocol.deserializeSubscription(
+
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().get()))
+);
+
+// The target assignment and the assigned partitions of each 
member are set based on the last
+// assignment of the classic group. All the members are put in the 
Stable state. If the classic
+// group was in Preparing Rebalance or Completing Rebalance 
states, the classic members are
+// asked to rejoin the group to re-trigger a rebalance or collect 
their assignments.
+ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(classicGroupMember.memberId())
+.setMemberEpoch(classicGroup.generationId())
+.setState(MemberState.STABLE)
+.setPreviousMemberEpoch(classicGroup.generationId())
+
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
+.setRackId(subscription.rackId().orElse(null))
+.setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
+.setClientId(classicGroupMember.clientId())
+.setClientHost(classicGroupMember.clientHost())
+.setSubscribedTopicNames(subscription.topics())
+.setAssignedPartitions(partitions)
+
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
+.build();
+consumerGroup.updateTargetAssignment(newMember.memberId(), new 
Assignment(partitions));
+consumerGroup.updateMember(newMember);
+});
+
+return consumerGroup;
+}
+
+/**
+ * Populate the record list with the records needed to create the given 
consumer group.
+ *
+ * @param records The list to which the new records are added.
+ */
+public void createConsumerGroupRecords(
+List records
+) {
+members().forEach((__, consumerGroupMember) ->
+records.add(RecordHelpers.newMemberSubscriptionRecord(groupId(), 
consumerGroupMember))
+);
+
+records.add(RecordHelpers.newGroupEpochRecord(groupId(), 
groupEpoch()));
+
+members().forEach((consumerGroupMemberId, consumerGroupMember) ->
+records.add(RecordHelpers.newTargetAssignmentRecord(
+groupId(),
+consumerGroupMemberId,
+targetAssignment(consumerGroupMemberId).partitions()
+))
+);
+
+records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId(), 
groupEpoch()));
+
+members().forEach((__, consumerGroupMember) ->
+records.add(RecordHelpers.newCurrentAssignmentRecord(groupId(), 
consumerGroupMember))
+);
+}
+
+/**
+ * @return The map of topic id and partition set converted from the list 
of TopicPartition.
+ */
+private static Map> topicPartitionMapFromList(
+List partitions,
+TopicsImage topicsImage
+) {
+Map> topicPartitionMap = new HashMap<>();
+partitions.forEach(topicPartition -> {
+TopicImage topicImage = 
topicsImage.getTopic(topicPartition.topic());
+if (topicImage != null) {
+topicPartitionMap

Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]

2024-04-17 Thread via GitHub


lianetm commented on code in PR #15742:
URL: https://github.com/apache/kafka/pull/15742#discussion_r1568991555


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -228,13 +228,16 @@ private void process(final ErrorEvent event) {
 }
 
 private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
-ApplicationEvent invokedEvent = invokeRebalanceCallbacks(
+ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = 
invokeRebalanceCallbacks(
 rebalanceListenerInvoker,
 event.methodName(),
 event.partitions(),
 event.future()
 );
 applicationEventHandler.add(invokedEvent);
+if (invokedEvent.error().isPresent()) {
+throw invokedEvent.error().get();

Review Comment:
   I was expecting to see here the logic for wrapping the callback error into a 
`KafkaException`, but I see it is at a lower level in the 
`invokeRebalanceCallbacks`, which it's a bit more obfuscated I would say? Still 
I see how it's deeply tied to the 
`ConsumerRebalanceListenerCallbackCompletedEvent` so ok for me to leave as it 
is if we feel it's clear enough. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16539 Fix IncrementalAlterConfigs during ZK migration [kafka]

2024-04-17 Thread via GitHub


mumrah opened a new pull request, #15744:
URL: https://github.com/apache/kafka/pull/15744

   This patch fixes two issues with IncrementalAlterConfigs and the ZK 
migration. First, it changes the handling of IncrementalAlterConfigs to check 
if the controller is ZK vs KRaft and only forward for KRaft. Second, it adds a 
check in KafkaZkClient#setOrCreateEntityConfigs to ensure a ZK broker is not 
directly modifying configs in ZK if there is a KRaft controller. This closes 
the race condition between KRaft taking over as the active controller and the 
ZK brokers learning about this.
   
   
   
   During the ZK migration, there is a time when the ZK brokers are running 
with migrations enabled, but KRaft has yet to take over as the controller.  
Prior to KRaft taking over as the controller, the ZK brokers in migration mode 
were unconditionally forwarding IncrementalAlterConfigs (IAC) to the ZK 
controller. This works for some config types, but breaks when setting BROKER 
and BROKER_LOGGER configs for a specific broker. The behavior in KafkaApis for 
IAC was to always forward if the forwarding manager was defined. Since ZK 
brokers in migration mode have forwarding enabled, the forwarding would happen, 
and the special logic for BROKER and BROKER_LOGGER would be missed, causing the 
request to fail.
   
   With this fix, the IAC handler will check if the controller is KRaft or ZK 
and only forward for KRaft.
   
    
   
   As part of KIP-500, we moved most (but not all) ZK mutations to the ZK 
controller. One of the things we did not move fully to the controller was 
entity configs. This is because there was some special logic that needed to run 
on the broker for certain config updates. If a broker-specific config was set, 
AdminClient would route the request to the proper broker. In KRaft, we have a 
different mechanism for handling broker-specific config updates.
   
   Leaving this ZK update on the broker side would be okay if we were guarding 
writes on the controller epoch, but it turns out 
KafkaZkClient#setOrCreateEntityConfigs does unprotected "last writer wins" 
updates to ZK. This means a ZK broker could update the contents of ZK _after_ 
the metadata had been migrated to KRaft. No good! To fix this, this patch adds 
a check on the controller epoch to KafkaZkClient#setOrCreateEntityConfigs but 
also adds logic to fail the update if the controller is a KRaft controller.
   
   The new logic in setOrCreateEntityConfigs adds STALE_CONTROLLER_EPOCH as a 
new exception that can be thrown while updating configs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]

2024-04-17 Thread via GitHub


lianetm commented on code in PR #15742:
URL: https://github.com/apache/kafka/pull/15742#discussion_r1568972567


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -1333,32 +1342,41 @@ public void 
testListenerCallbacksInvoke(List listenerCallbacksInvokeSource() {
 Optional empty = Optional.empty();
 Optional error = Optional.of(new 
RuntimeException("Intentional error"));
+Optional kafkaException = Optional.of(new 
KafkaException("Intentional error"));
+Optional wrappedException = Optional.of(new 
KafkaException("User rebalance callback throws an error", error.get()));
 
 return Stream.of(
 // Tests if we don't have an event, the listener doesn't get 
called.
-Arguments.of(Collections.emptyList(), empty, empty, empty, 0, 0, 
0),
+Arguments.of(Collections.emptyList(), empty, empty, empty, 0, 0, 
0, empty),
 
 // Tests if we get an event for a revocation, that we invoke our 
listener.
-Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), 
empty, empty, empty, 1, 0, 0),
+Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), 
empty, empty, empty, 1, 0, 0, empty),
 
 // Tests if we get an event for an assignment, that we invoke our 
listener.
-Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), 
empty, empty, empty, 0, 1, 0),
+Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), 
empty, empty, empty, 0, 1, 0, empty),
 
 // Tests that we invoke our listener even if it encounters an 
exception.
-Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, empty, 0, 0, 1),
+Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, empty, 0, 0, 1, empty),
 
 // Tests that we invoke our listener even if it encounters an 
exception.
-Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), 
error, empty, empty, 1, 0, 0),
+Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), 
error, empty, empty, 1, 0, 0, wrappedException),
 
 // Tests that we invoke our listener even if it encounters an 
exception.
-Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), 
empty, error, empty, 0, 1, 0),
+Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), 
empty, error, empty, 0, 1, 0, wrappedException),
 
 // Tests that we invoke our listener even if it encounters an 
exception.
-Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, error, 0, 0, 1),
+Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, error, 0, 0, 1, wrappedException),
+
+// Tests that we invoke our listener even if it encounters an 
exception. Special case to test that a kafka exception is not wrapped.
+Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, kafkaException, 0, 0, 1, kafkaException),

Review Comment:
   Nice addition, indeed part of what the legacy logic does for not wrapping a 
KafkaException  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]

2024-04-17 Thread via GitHub


KevinZTW commented on code in PR #15714:
URL: https://github.com/apache/kafka/pull/15714#discussion_r1568968759


##
storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java:
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InconsistentTopicIdException;
+
+import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class PartitionMetadataFileTest  {
+private final File dir = TestUtils.tempDirectory();
+
+@Test
+public void testSetRecordWithDifferentTopicId() {
+File file = PartitionMetadataFile.newFile(dir);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+Uuid topicId = Uuid.randomUuid();
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));
+Uuid differentTopicId = Uuid.randomUuid();
+assertThrows(InconsistentTopicIdException.class, () -> 
partitionMetadataFile.record(differentTopicId));
+}
+
+@Test
+public void testSetRecordWithSameTopicId() {
+File file = PartitionMetadataFile.newFile(dir);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+Uuid topicId = Uuid.randomUuid();
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));
+}
+
+@Test
+public void testMaybeFlushWithTopicIdPresent() {
+File file = PartitionMetadataFile.newFile(dir);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+
+Uuid topicId = Uuid.randomUuid();
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));
+assertDoesNotThrow(partitionMetadataFile::maybeFlush);
+
+assertDoesNotThrow(() -> {
+List lines = Files.readAllLines(file.toPath());
+assertEquals(2, lines.size());
+assertEquals("version: 0", lines.get(0));
+assertEquals("topic_id: " + topicId, lines.get(1));
+});
+}
+
+@Test
+public void testMaybeFlushWithNoTopicIdPresent() {
+File file = PartitionMetadataFile.newFile(dir);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+
+assertDoesNotThrow(partitionMetadataFile::maybeFlush);
+assertEquals(0, file.length());
+}
+
+@Test
+public void testRead() {
+File file = PartitionMetadataFile.newFile(dir);
+LogDirFailureChannel channel = 
Mockito.mock(LogDirFailureChannel.class);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, channel);
+
+Uuid topicId = Uuid.randomUuid();
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));

Review Comment:
   I see, thank you very much for the detailed explanation! I've revised it as 
suggested. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-17 Thread via GitHub


dongnuo123 commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1568962281


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -775,6 +777,126 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String 
memberId) {
+if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+consumerGroup.groupId());
+return false;
+} else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() <= 1) {
+log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+consumerGroup.groupId());
+}
+return true;
+}
+
+public CompletableFuture convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List records) {
+consumerGroup.createGroupTombstoneRecords(records);
+ClassicGroup classicGroup;
+try {
+classicGroup = consumerGroup.toClassicGroup(
+leavingMemberId,
+logContext,
+time,
+consumerGroupSessionTimeoutMs,
+metadataImage,
+records
+);
+} catch (SchemaException e) {
+log.warn("Cannot downgrade the consumer group " + 
consumerGroup.groupId() + ": fail to parse " +
+"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + 
".", e);
+
+throw new GroupIdNotFoundException(String.format("Cannot downgrade 
the classic group %s: %s.",
+consumerGroup.groupId(), e.getMessage()));
+}
+
+groups.put(consumerGroup.groupId(), classicGroup);
+metrics.onClassicGroupStateTransition(null, 
classicGroup.currentState());
+
+CompletableFuture appendFuture = new CompletableFuture<>();
+appendFuture.whenComplete((__, t) -> {
+if (t == null) {
+classicGroup.allMembers().forEach(member -> 
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
+prepareRebalance(classicGroup, String.format("Downgrade group 
%s.", classicGroup.groupId()));

Review Comment:
   If creating all the state immediately, we'll get error when replaying the 
old ConsumerGroup tombstone because `group.get(groupId)` has become a 
ClassicGroup. We can't rely on replaying the records to update the states 
either, because we need the new classicGroup reference to trigger the 
rebalance. so the only way is not to replay the records by setting the 
appendFuture.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-17 Thread via GitHub


dongnuo123 commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1568963034


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -775,6 +777,126 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String 
memberId) {
+if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+consumerGroup.groupId());
+return false;
+} else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() <= 1) {
+log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+consumerGroup.groupId());
+}
+return true;
+}
+
+public CompletableFuture convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List records) {
+consumerGroup.createGroupTombstoneRecords(records);
+ClassicGroup classicGroup;
+try {
+classicGroup = consumerGroup.toClassicGroup(
+leavingMemberId,
+logContext,
+time,
+consumerGroupSessionTimeoutMs,
+metadataImage,
+records
+);
+} catch (SchemaException e) {
+log.warn("Cannot downgrade the consumer group " + 
consumerGroup.groupId() + ": fail to parse " +
+"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + 
".", e);
+
+throw new GroupIdNotFoundException(String.format("Cannot downgrade 
the classic group %s: %s.",
+consumerGroup.groupId(), e.getMessage()));
+}
+
+groups.put(consumerGroup.groupId(), classicGroup);
+metrics.onClassicGroupStateTransition(null, 
classicGroup.currentState());
+
+CompletableFuture appendFuture = new CompletableFuture<>();
+appendFuture.whenComplete((__, t) -> {
+if (t == null) {
+classicGroup.allMembers().forEach(member -> 
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
+prepareRebalance(classicGroup, String.format("Downgrade group 
%s.", classicGroup.groupId()));

Review Comment:
   Yes we should revert `onClassicGroupStateTransition`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-17 Thread via GitHub


dongnuo123 commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1568962281


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -775,6 +777,126 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String 
memberId) {
+if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+consumerGroup.groupId());
+return false;
+} else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() <= 1) {
+log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+consumerGroup.groupId());
+}
+return true;
+}
+
+public CompletableFuture convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List records) {
+consumerGroup.createGroupTombstoneRecords(records);
+ClassicGroup classicGroup;
+try {
+classicGroup = consumerGroup.toClassicGroup(
+leavingMemberId,
+logContext,
+time,
+consumerGroupSessionTimeoutMs,
+metadataImage,
+records
+);
+} catch (SchemaException e) {
+log.warn("Cannot downgrade the consumer group " + 
consumerGroup.groupId() + ": fail to parse " +
+"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + 
".", e);
+
+throw new GroupIdNotFoundException(String.format("Cannot downgrade 
the classic group %s: %s.",
+consumerGroup.groupId(), e.getMessage()));
+}
+
+groups.put(consumerGroup.groupId(), classicGroup);
+metrics.onClassicGroupStateTransition(null, 
classicGroup.currentState());
+
+CompletableFuture appendFuture = new CompletableFuture<>();
+appendFuture.whenComplete((__, t) -> {
+if (t == null) {
+classicGroup.allMembers().forEach(member -> 
rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
+prepareRebalance(classicGroup, String.format("Downgrade group 
%s.", classicGroup.groupId()));

Review Comment:
   If creating all the state immediately, we'll get error when replaying the 
old ConsumerGroup tombstone because `group.get(groupId)` has become a 
ClassicGroup. We can't rely on replaying the records to update the states 
either, because we need the new classicGroup reference to trigger the 
rebalance. so the only way is not to replay the records.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Fix typo [kafka]

2024-04-17 Thread via GitHub


birdoplank opened a new pull request, #15743:
URL: https://github.com/apache/kafka/pull/15743

   Testing potential security vulnerability in the pipeline to be reported as 
part of Apache vulnerability disclosure program:
   https://apache.org/security/#vulnerability-handling
   
   ### Committer Checklist (excluded from commit message)
   - [ X ] Verify design and implementation 
   - [ X ] Verify test coverage and CI build status
   - [ X ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-17 Thread via GitHub


chia7712 commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1568904632


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1189,50 +1216,61 @@ class LogManager(logDirs: Seq[File],
   val sourceLog = currentLogs.get(topicPartition)
   val destLog = futureLogs.get(topicPartition)
 
-  info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
   if (sourceLog == null)
 throw new KafkaStorageException(s"The current replica for 
$topicPartition is offline")
   if (destLog == null)
 throw new KafkaStorageException(s"The future replica for 
$topicPartition is offline")
 
-  destLog.renameDir(UnifiedLog.logDirName(topicPartition), 
shouldReinitialize = true)
-  // the metrics tags still contain "future", so we have to remove it.
-  // we will add metrics back after sourceLog remove the metrics
-  destLog.removeLogMetrics()
-  destLog.updateHighWatermark(sourceLog.highWatermark)
+  replaceCurrentWithFutureLog(Option(sourceLog), destLog, 
updateHighWatermark = true)
+}
+  }
+
+  def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: 
UnifiedLog, updateHighWatermark: Boolean = false): Unit = {
+val topicPartition = destLog.topicPartition
+info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
 
-  // Now that future replica has been successfully renamed to be the 
current replica
-  // Update the cached map and log cleaner as appropriate.
-  futureLogs.remove(topicPartition)
-  currentLogs.put(topicPartition, destLog)
-  if (cleaner != null) {
-cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, 
destLog.parentDirFile)
-resumeCleaning(topicPartition)
-  }
+destLog.renameDir(UnifiedLog.logDirName(topicPartition), 
shouldReinitialize = true)
+// the metrics tags still contain "future", so we have to remove it.
+// we will add metrics back after sourceLog remove the metrics
+destLog.removeLogMetrics()
+if (updateHighWatermark && sourceLog.isDefined) {
+  destLog.updateHighWatermark(sourceLog.get.highWatermark)
+}
 
-  try {
-sourceLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), 
shouldReinitialize = true)
+// Now that future replica has been successfully renamed to be the current 
replica
+// Update the cached map and log cleaner as appropriate.
+futureLogs.remove(topicPartition)
+currentLogs.put(topicPartition, destLog)
+if (cleaner != null) {
+  cleaner.alterCheckpointDir(topicPartition, 
sourceLog.map(_.parentDirFile), destLog.parentDirFile)

Review Comment:
   It seems `cleaner.alterCheckpointDir` will do nothing if `sourceLog` is 
empty. Maybe we can revert those changes and run `alterCheckpointDir` only if 
`sourceLog` is defined.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-17 Thread via GitHub


FrankYang0529 commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1568897953


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -801,8 +803,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri
 try {
 if (delete.execute())
 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-else if (logIfMissing)
-LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+else {
+if (logIfMissing) {
+LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+}
+
+// During alter log dir, the log segment may be moved to a new 
directory, so async delete may fail.
+// Fallback to delete the file in the new directory to avoid 
orphan file.
+Pattern dirPattern = 
Pattern.compile("^(\\S+)-(\\S+)\\.(\\S+)-future");

Review Comment:
   Updated it. Thanks for the review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-17 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1568882032


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1176,6 +1176,42 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
+  val tp = futureLog.topicPartition
+
+  futureLog.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true)
+  futureLog.removeLogMetrics()
+  futureLogs.remove(tp)
+
+  currentLog.foreach { log =>
+log.removeLogMetrics()
+log.renameDir(UnifiedLog.logDeleteDirName(tp), shouldReinitialize = 
false)
+addLogToBeDeleted(log)
+info(s"Old log for partition ${tp} is renamed to 
${log.dir.getAbsolutePath} and is scheduled for deletion")
+  }
+
+  currentLogs.put(tp, futureLog)
+  futureLog.newMetrics()
+
+  info(s"Successfully renamed abandoned future log for $tp")

Review Comment:
   I've refactored LogManager to use `replaceCurrentWithFutureLog` in 
[b87a21f](https://github.com/apache/kafka/pull/15136/commits/b87a21f3bd0eea6e4083a2d14b41053361f7b40a).
 PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-17 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1568883111


##
core/src/test/java/kafka/testkit/BrokerNode.java:
##
@@ -81,8 +82,7 @@ public BrokerNode build(
 logDataDirectories = Collections.
 singletonList(String.format("combined_%d", id));
 } else {
-logDataDirectories = Collections.
-singletonList(String.format("broker_%d_data0", id));
+logDataDirectories = 
Collections.unmodifiableList(Arrays.asList(String.format("broker_%d_data0", 
id), String.format("broker_%d_data1", id)));

Review Comment:
   Rebased to use the new method in the builder



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16467) Add README to docs folder

2024-04-17 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838190#comment-17838190
 ] 

ASF GitHub Bot commented on KAFKA-16467:


FrankYang0529 commented on code in PR #596:
URL: https://github.com/apache/kafka-site/pull/596#discussion_r1568848118


##
README.md:
##
@@ -10,4 +10,32 @@ You can run it with the following command, note that it 
requires docker:
 
 Then you can open [localhost:8080](http://localhost:8080) on your browser and 
browse the documentation.
 
-To kill the process, just type ctrl + c
\ No newline at end of file
+To kill the process, just type ctrl + c.
+
+## How to preview the latest documentation changes in Kafka repository?
+
+1. Generating document from kafka repository:
+
+```shell
+# change directory into kafka repository
+cd KAFKA_REPO
+./gradlew clean siteDocTar
+# supposing built with scala 2.13
+tar zxvf core/build/distributions/kafka_2.13-$(./gradlew properties | grep 
version: | awk '{print $NF}' | head -n 1)-site-docs.tgz
+```
+
+2. Copying the generated documents from Kafka repository into kafka-site, and 
preview them (note that it requires docker):
+
+```shell
+# change directory into kafka-site repository
+cd KAFKA_SITE_REPO
+# copy the generated documents into dev folder
+rm -rf dev
+mkdir dev
+# change directory into kafka repository
+cp -r KAFKA_REPO/site-docs/* dev

Review Comment:
   I remove `# change directory into kafka repository`. Thank you.





> Add README to docs folder
> -
>
> Key: KAFKA-16467
> URL: https://issues.apache.org/jira/browse/KAFKA-16467
> Project: Kafka
>  Issue Type: Improvement
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Minor
>
> We don't have a guide in project root folder or docs folder to show how to 
> run local website. It's good to provide a way to run document with kafka-site 
> repository.
>  
> Option 1: Add links to wiki page 
> [https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes]
>  and 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67634793]. 
> Option 2: Show how to run the document within container. For example: moving 
> `site-docs` from kafka to kafka-site repository and run `./start-preview.sh`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: fix duplicated return and other streams docs typos [kafka]

2024-04-17 Thread via GitHub


chia7712 merged PR #15713:
URL: https://github.com/apache/kafka/pull/15713


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: fix duplicated return and other streams docs typos [kafka]

2024-04-17 Thread via GitHub


chia7712 commented on PR #15713:
URL: https://github.com/apache/kafka/pull/15713#issuecomment-2061259436

   @AyoubOm thanks for your contribution


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix io-[wait-]ratio metrics description [kafka]

2024-04-17 Thread via GitHub


chia7712 commented on code in PR #15722:
URL: https://github.com/apache/kafka/pull/15722#discussion_r1568836486


##
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##
@@ -1281,14 +1281,14 @@ private Meter createMeter(Metrics metrics, String 
groupName,  Map metricTags,
 String baseName, String action) {
 MetricName rateMetricName = metrics.metricName(baseName + 
"-ratio", groupName,
-String.format("*Deprecated* The fraction of time the I/O 
thread spent %s", action), metricTags);
+String.format("The fraction of time the I/O thread spent 
%s", action), metricTags);

Review Comment:
   nice finding! Could you add comments to explain why we remove the 
`deprecated`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16103: AsyncConsumer should await pending async commits on commitSync and close [kafka]

2024-04-17 Thread via GitHub


lucasbru commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1568770827


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1388,6 +1393,31 @@ public void commitSync(Map offsets, Duration
 }
 }
 
+private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer 
timer, boolean disableWakeup) {
+if (lastPendingAsyncCommit == null) {
+return;
+}
+
+try {
+final CompletableFuture futureToAwait = new 
CompletableFuture<>();
+// We don't want the wake-up trigger to complete our pending async 
commit future,
+// so create new future here. Any errors in the pending async 
commit will be handled
+// by the async commit future / the commit callback - here, we 
just want to wait for it to complete.
+lastPendingAsyncCommit.whenComplete((v, t) -> 
futureToAwait.complete(null));
+if (!disableWakeup) {
+wakeupTrigger.setActiveTask(futureToAwait);
+}
+ConsumerUtils.getResult(futureToAwait, timer);

Review Comment:
   Yes. I think if `lastPendingAsyncCommit` is completed before entering here, 
the `whenComplete` will execute immediately.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -158,7 +158,11 @@ public class AsyncKafkaConsumerTest {
 public void resetAll() {
 backgroundEventQueue.clear();
 if (consumer != null) {
-consumer.close(Duration.ZERO);
+try {
+consumer.close(Duration.ZERO);
+} catch (Exception e) {
+assertInstanceOf(KafkaException.class, e);
+}

Review Comment:
   `resetAll` isn't supposed to test anything, so this also shouldn't mask 
anything. It's purely for cleanup. In this case, it only affects two tests that 
will timeout on close (since we don't mock an async commit response). So let me 
do it. But in general, I wonder if adding clean-up logic to the tests itself 
won't reduce readability/clarity of the actual test case.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -616,6 +620,80 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
 assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
 }
 
+@Test
+public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+time = new MockTime(1);
+consumer = newConsumer();
+
+// Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+final TopicPartition tp = new TopicPartition("foo", 0);
+consumer.assign(Collections.singleton(tp));
+consumer.seek(tp, 20);
+consumer.commitAsync();
+
+// Commit async is not completed yet, so commit sync should wait for 
it to complete (time out)
+assertThrows(TimeoutException.class, () -> 
consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100)));
+
+// Complete async commit event
+final ArgumentCaptor commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+verify(applicationEventHandler).add(commitEventCaptor.capture());

Review Comment:
   Yes. I agree, it's a bit weird, but mockito is recording those invocations 
with copies (references) of all arguments. That's also why spying on lots of 
objects in busy event loops will accumulate lots of such "invocation objects" 
in memory.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -616,6 +620,80 @@ public void 
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
 assertEquals("Get fenced exception for group.instance.id 
groupInstanceId1", e.getMessage());
 }
 
+@Test
+public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+time = new MockTime(1);
+consumer = newConsumer();
+
+// Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+final TopicPartition tp = new TopicPartition("foo", 0);
+consumer.assign(Collections.singleton(tp));
+consumer.seek(tp, 20);
+consumer.commitAsync();

Review Comment:
   I guess I may be less concerned with code duplication in test setup than my 
reviewers :). Done. Added another helper method that contains the lines up to 
the incomplete async commit.



##

Re: [PR] MINOR: Various cleanups in connect [kafka]

2024-04-17 Thread via GitHub


chia7712 merged PR #15734:
URL: https://github.com/apache/kafka/pull/15734


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-17 Thread via GitHub


dajac commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1568737120


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -775,6 +777,126 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String 
memberId) {
+if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+consumerGroup.groupId());
+return false;
+} else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() <= 1) {
+log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+consumerGroup.groupId());
+}
+return true;
+}
+
+public CompletableFuture convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List records) {
+consumerGroup.createGroupTombstoneRecords(records);
+ClassicGroup classicGroup;

Review Comment:
   nit: Let's add an empty line before this one.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -775,6 +777,126 @@ public ClassicGroup classicGroup(
 }
 }
 
+public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String 
memberId) {
+if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
+log.info("Cannot downgrade consumer group {} to classic group 
because the online downgrade is disabled.",
+consumerGroup.groupId());
+return false;
+} else if 
(!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
+log.debug("Cannot downgrade consumer group {} to classic group 
because not all its members use the classic protocol.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() <= 1) {
+log.info("Skip downgrading the consumer group {} to classic group 
because it's empty.",
+consumerGroup.groupId());
+return false;
+} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
+log.info("Cannot downgrade consumer group {} to classic group 
because its group size is greater than classic group max size.",
+consumerGroup.groupId());
+}
+return true;
+}
+
+public CompletableFuture convertToClassicGroup(ConsumerGroup 
consumerGroup, String leavingMemberId, List records) {

Review Comment:
   Does it need to be public? Let's add some javadoc please.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -9623,6 +10332,186 @@ public void 
testClassicGroupOnUnloadedCompletingRebalance() throws Exception {
 .setErrorCode(NOT_COORDINATOR.code()), 
pendingMemberSyncResult.syncFuture.get());
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testLastClassicProtocolMemberLeavingConsumerGroup(boolean 
appendLogSuccessfully) {

Review Comment:
   Should we also test the session expiration path and the rebalance expiration 
path?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -998,4 +1094,232 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
 );
 return describedGroup;
 }
+
+/**
+ * Create a new consumer group according to the given classic group.
+ *
+ * @param snapshotRegistry  The SnapshotRegistry.
+ * @param metrics   The GroupCoordinatorMetricsShard.
+ * @param classicGroup  The converted classic group.
+ * @param topicsImage   The TopicsImage for topic id and topic name 
conversion.
+ * @return  The created ConsumerGruop.
+ */
+public static ConsumerGroup fromClassicGroup(
+SnapshotRegistry snapshotRegistry,
+GroupCoordinatorMetricsShard metrics,
+ClassicGroup classicGroup,
+TopicsImage topicsImage
+) {
+String groupId = classicGroup.groupId();
+ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+

Re: [PR] KAFKA-15853: Move consumer group and group coordinator configs out of core [kafka]

2024-04-17 Thread via GitHub


chia7712 merged PR #15684:
URL: https://github.com/apache/kafka/pull/15684


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Refactor Values class to fix checkstyle, add benchmark, optimize exceptions [kafka]

2024-04-17 Thread via GitHub


mimaison commented on code in PR #15469:
URL: https://github.com/apache/kafka/pull/15469#discussion_r1568692760


##
connect/api/src/main/java/org/apache/kafka/connect/data/Values.java:
##
@@ -766,135 +852,23 @@ protected static boolean 
canParseSingleTokenLiteral(Parser parser, boolean embed
 protected static SchemaAndValue parse(Parser parser, boolean embedded) 
throws NoSuchElementException {

Review Comment:
   Ah right, Parser is also part of the public API, I thought it was only a 
private inner class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: fix duplicated return and other streams docs typos [kafka]

2024-04-17 Thread via GitHub


AyoubOm commented on PR #15713:
URL: https://github.com/apache/kafka/pull/15713#issuecomment-2060923661

   @chia7712 please check this when you have time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Refactor Values class to fix checkstyle, add benchmark, optimize exceptions [kafka]

2024-04-17 Thread via GitHub


mimaison commented on code in PR #15469:
URL: https://github.com/apache/kafka/pull/15469#discussion_r1568541184


##
connect/api/src/main/java/org/apache/kafka/connect/data/Values.java:
##
@@ -766,135 +852,23 @@ protected static boolean 
canParseSingleTokenLiteral(Parser parser, boolean embed
 protected static SchemaAndValue parse(Parser parser, boolean embedded) 
throws NoSuchElementException {

Review Comment:
   I wonder if it would make sense to move all these `parse<>()` methods to the 
`Parser` class, and extract `Parser` to its own file. WDYT?
   
   I made a quick attempt in 
https://github.com/apache/kafka/commit/10f4910bfc5e0d47782d7a70f8ad22dee97efe12#diff-024f49f1f6adf07bcc1cab6aa8caa0d931ba2c6be887d96ab575ae032be4d051



##
connect/api/src/main/java/org/apache/kafka/connect/data/Values.java:
##
@@ -177,7 +213,12 @@ public static Long convertToLong(Schema schema, Object 
value) throws DataExcepti
  * @throws DataException if the value could not be converted to a float
  */
 public static Float convertToFloat(Schema schema, Object value) throws 
DataException {

Review Comment:
   A few of these `convertTo<>()` methods are not covered by unit tests. It's 
ok not to address this in this PR if you'd prefer as it's already huge.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Refactor Values class to fix checkstyle, add benchmark, optimize exceptions [kafka]

2024-04-17 Thread via GitHub


gharris1727 commented on code in PR #15469:
URL: https://github.com/apache/kafka/pull/15469#discussion_r1568679258


##
connect/api/src/main/java/org/apache/kafka/connect/data/Values.java:
##
@@ -766,135 +852,23 @@ protected static boolean 
canParseSingleTokenLiteral(Parser parser, boolean embed
 protected static SchemaAndValue parse(Parser parser, boolean embedded) 
throws NoSuchElementException {

Review Comment:
   That's a cool idea, I think that makes a lot of sense when a bunch of these 
static methods take a Parser argument anyway.
   
   Since this is in the public API, I'll focus on moving some of the internal 
methods to instance methods of a protected/package local Parser, and leave the 
public static methods in Values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR adds '-parameters' compiler option for :core tests [kafka]

2024-04-17 Thread via GitHub


chia7712 merged PR #15729:
URL: https://github.com/apache/kafka/pull/15729


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16363: Storage tool crashes if dir is unavailable [kafka]

2024-04-17 Thread via GitHub


chia7712 merged PR #15733:
URL: https://github.com/apache/kafka/pull/15733


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16298) Ensure user callbacks exceptions are propagated to the user on consumer poll

2024-04-17 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-16298:
--

Assignee: Lucas Brutschy  (was: Kirk True)

> Ensure user callbacks exceptions are propagated to the user on consumer poll
> 
>
> Key: KAFKA-16298
> URL: https://issues.apache.org/jira/browse/KAFKA-16298
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Lucas Brutschy
>Priority: Blocker
>  Labels: callback, kip-848-client-support
> Fix For: 3.8.0
>
>
> When user-defined callbacks fail with an exception, the expectation is that 
> the error should be propagated to the user as a KafkaExpception and break the 
> poll loop (behaviour in the legacy coordinator). The new consumer executes 
> callbacks in the application thread, and sends an event to the background 
> with the callback result and error if any, [passing the error along with the 
> event 
> here|https://github.com/apache/kafka/blob/98a658f871fc2c533b16fb5fd567a5ceb1c340b7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1882]
>  to the background thread, but does not seem to propagate the exception to 
> the user. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16566: Fix consumer static membership system test with new protocol [kafka]

2024-04-17 Thread via GitHub


lucasbru commented on PR #15738:
URL: https://github.com/apache/kafka/pull/15738#issuecomment-2060943337

   @lianetm did you mean to closet the PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15588 ConfigCommandIntegrationTest rewritten in java [kafka]

2024-04-17 Thread via GitHub


nizhikov commented on code in PR #15645:
URL: https://github.com/apache/kafka/pull/15645#discussion_r1568506632


##
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin;
+
+import kafka.cluster.Broker;
+import kafka.cluster.EndPoint;
+import kafka.server.KafkaConfig;
+import kafka.server.QuorumTestHarness;
+import kafka.zk.AdminZkClient;
+import kafka.zk.BrokerInfo;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.security.PasswordEncoder;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.config.ZooKeeperInternals;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
+public class ConfigCommandIntegrationTest extends QuorumTestHarness {

Review Comment:
   Yes. I struggle to make test work both for kraft and zk modes and found the 
bug from #15729 :)
   
   > Is is possible to use ClusterTestExtensions to rewrite this test?
   
   Will try to do it. 
   I stoped on the step "Is it possible to apply same test cases for kraft 
mode" :)



##
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin;
+
+import kafka.cluster.Broker;
+import kafka.cluster.EndPoint;
+import kafka.server.KafkaConfig;
+import kafka.server.QuorumTestHarness;
+import kafka.zk.AdminZkClient;
+import kafka.zk.BrokerInfo;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.security.PasswordEncoder;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.config.ZooKeeperInternals;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import 

Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]

2024-04-17 Thread via GitHub


lucasbru commented on PR #15742:
URL: https://github.com/apache/kafka/pull/15742#issuecomment-2060865659

   @kirktrue @lianetm could you have a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]

2024-04-17 Thread via GitHub


lucasbru opened a new pull request, #15742:
URL: https://github.com/apache/kafka/pull/15742

   When user-defined rebalance listeners fail with an exception, the 
expectation is that the error should be propagated to the user as a 
KafkaExpception and break the poll loop (behaviour in the legacy coordinator). 
The new consumer executes callbacks in the application thread, and sends an 
event to the background with the callback result and error if any, [passing the 
error along with the event 
here](https://github.com/apache/kafka/blob/98a658f871fc2c533b16fb5fd567a5ceb1c340b7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1882)
 to the background thread, but does not seem to propagate the exception to the 
user.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-17 Thread via GitHub


dajac commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1568482951


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 1)
+@Measurement(iterations = 0)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"true", "false"})
+private boolean simulateRebalanceTrigger;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
+
+createAssignmentSpec();
+
+partitionAssignor = assignorType.assignor();
+
+if (simulateRebalanceTrigger) {
+simulateIncrementalRebalance(topicMetadata);
+}
+}
+
+private Map createTopicMetadata() {
+Map topicMetadata = new 

Re: [PR] KAFKA-13152: Kip 770 buffer size fix [kafka]

2024-04-17 Thread via GitHub


vamossagar12 commented on PR #13283:
URL: https://github.com/apache/kafka/pull/13283#issuecomment-2060631731

   hey @ableegoldman .. i am interested in wrapping this up but i haven't 
looked at the streams codebase for some time and things seemed to have changed 
a bit. I am not aware of the 3.8 timelines, will have to check those.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-17 Thread via GitHub


dajac commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1568383960


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,185 @@
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+@Param({"10", "50", "100"})
+private int partitionsPerTopicCount;
+
+@Param({"100"})
+private int topicCount;
+
+@Param({"500", "1000"})
+private int memberCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"true", "false"})
+private boolean isSubscriptionUniform;
+
+@Param({"true", "false"})
+private boolean isRangeAssignor;
+
+@Param({"true", "false"})
+private boolean isReassignment;
+
+private PartitionAssignor partitionAssignor;
+
+private final int numberOfRacks = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = new HashMap<>();
+Map> partitionRacks = isRackAware ?
+mkMapOfPartitionRacks(partitionsPerTopicCount) :
+Collections.emptyMap();
+
+for (int i = 1; i <= topicCount; i++) {
+Uuid topicUuid = Uuid.randomUuid();
+String topicName = "topic" + i;
+topicMetadata.put(topicUuid, new TopicMetadata(
+topicUuid, topicName, partitionsPerTopicCount, 
partitionRacks));
+}
+
+addTopicSubscriptions(topicMetadata);
+this.subscribedTopicDescriber = new 
SubscribedTopicMetadata(topicMetadata);
+
+if (isRangeAssignor) {
+this.partitionAssignor = new RangeAssignor();
+} else {
+this.partitionAssignor = new UniformAssignor();
+}
+
+if (isReassignment) {
+GroupAssignment initialAssignment = 
partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber);
+Map members;
+
+members = initialAssignment.members();
+
+// Update the AssignmentSpec with the results from the initial 
assignment.
+Map updatedMembers = new HashMap<>();
+
+members.forEach((memberId, memberAssignment) -> {
+AssignmentMemberSpec memberSpec = 
assignmentSpec.members().get(memberId);
+updatedMembers.put(memberId, new AssignmentMemberSpec(
+memberSpec.instanceId(),
+memberSpec.rackId(),
+memberSpec.subscribedTopicIds(),
+memberAssignment.targetPartitions()
+));
+});
+
+// Add new member to trigger a reassignment.
+Optional rackId = isRackAware ? Optional.of("rack" + 
(memberCount + 1) % numberOfRacks) : Optional.empty();
+
+updatedMembers.put("newMember", new AssignmentMemberSpec(
+Optional.empty(),
+rackId,
+topicMetadata.keySet(),
+Collections.emptyMap()
+

Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]

2024-04-17 Thread via GitHub


jeqo commented on code in PR #15379:
URL: https://github.com/apache/kafka/pull/15379#discussion_r1568385972


##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/MultiFieldPaths.java:
##
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Multiple field paths to access data objects ({@code Struct} or {@code Map}) 
efficiently,
+ * instead of multiple individual {@link SingleFieldPath single-field paths}.
+ *
+ * If the SMT requires accessing a single field on the same data object,
+ * use {@link SingleFieldPath} instead.
+ *
+ * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures;>KIP-821
+ * @see SingleFieldPath
+ * @see FieldSyntaxVersion
+ */
+public class MultiFieldPaths {
+final Trie trie = new Trie();
+
+MultiFieldPaths(Set paths) {
+paths.forEach(trie::insert);
+}
+
+public static MultiFieldPaths of(List fields, FieldSyntaxVersion 
syntaxVersion) {
+return new MultiFieldPaths(fields.stream()
+.map(f -> new SingleFieldPath(f, syntaxVersion))
+.collect(Collectors.toSet()));
+}
+
+/**
+ * Find values at the field paths
+ *
+ * @param struct data value
+ * @return map of field paths and field/values
+ */
+public Map> 
fieldAndValuesFrom(Struct struct) {
+if (trie.isEmpty()) return Collections.emptyMap();
+return findFieldAndValues(struct, trie.root, new HashMap<>());
+}
+
+private Map> findFieldAndValues(
+Struct originalValue,
+TrieNode trieAt,
+Map> fieldAndValueMap
+) {
+for (Map.Entry step : trieAt.steps().entrySet()) {
+Field field = originalValue.schema().field(step.getKey());
+if (step.getValue().isLeaf()) {
+Map.Entry fieldAndValue =
+field != null
+? new AbstractMap.SimpleImmutableEntry<>(field, 
originalValue.get(field))
+: null;
+fieldAndValueMap.put(step.getValue().path, fieldAndValue);
+} else {
+if (field.schema().type() == Type.STRUCT) {
+findFieldAndValues(
+originalValue.getStruct(field.name()),
+step.getValue(),
+fieldAndValueMap
+);
+}
+}
+}
+return fieldAndValueMap;
+}
+
+/**
+ * Find values at the field paths
+ *
+ * @param value data value
+ * @return map of field paths and field/values
+ */
+public Map> 
fieldAndValuesFrom(Map value) {
+if (trie.isEmpty()) return Collections.emptyMap();
+return findFieldAndValues(value, trie.root, new HashMap<>());
+}
+
+@SuppressWarnings("unchecked")
+private Map> findFieldAndValues(
+Map value,
+TrieNode trieAt,
+Map> fieldAndValueMap
+) {
+for (Map.Entry step : trieAt.steps().entrySet()) {
+Object fieldValue = value.get(step.getKey());
+if (step.getValue().isLeaf()) {
+fieldAndValueMap.put(
+step.getValue().path,
+new AbstractMap.SimpleImmutableEntry<>(step.getKey(), 
fieldValue)
+);
+} else {
+if (fieldValue instanceof Map) {
+findFieldAndValues(
+(Map) fieldValue,
+step.getValue(),
+fieldAndValueMap
+  

Re: [PR] KAFKA-14226: feat(connect:transform): Introduce FieldPath abstraction [kafka]

2024-04-17 Thread via GitHub


jeqo commented on code in PR #15379:
URL: https://github.com/apache/kafka/pull/15379#discussion_r1568385072


##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/MultiFieldPaths.java:
##
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms.field;
+
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Multiple field paths to access data objects ({@code Struct} or {@code Map}) 
efficiently,
+ * instead of multiple individual {@link SingleFieldPath single-field paths}.
+ *
+ * If the SMT requires accessing a single field on the same data object,
+ * use {@link SingleFieldPath} instead.
+ *
+ * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures;>KIP-821
+ * @see SingleFieldPath
+ * @see FieldSyntaxVersion
+ */
+public class MultiFieldPaths {
+final Trie trie = new Trie();
+
+MultiFieldPaths(Set paths) {
+paths.forEach(trie::insert);
+}
+
+public static MultiFieldPaths of(List fields, FieldSyntaxVersion 
syntaxVersion) {
+return new MultiFieldPaths(fields.stream()
+.map(f -> new SingleFieldPath(f, syntaxVersion))
+.collect(Collectors.toSet()));
+}
+
+/**
+ * Find values at the field paths
+ *
+ * @param struct data value
+ * @return map of field paths and field/values
+ */
+public Map> 
fieldAndValuesFrom(Struct struct) {
+if (trie.isEmpty()) return Collections.emptyMap();
+return findFieldAndValues(struct, trie.root, new HashMap<>());
+}
+
+private Map> findFieldAndValues(
+Struct originalValue,
+TrieNode trieAt,
+Map> fieldAndValueMap
+) {
+for (Map.Entry step : trieAt.steps().entrySet()) {
+Field field = originalValue.schema().field(step.getKey());
+if (step.getValue().isLeaf()) {
+Map.Entry fieldAndValue =
+field != null
+? new AbstractMap.SimpleImmutableEntry<>(field, 
originalValue.get(field))
+: null;
+fieldAndValueMap.put(step.getValue().path, fieldAndValue);
+} else {
+if (field.schema().type() == Type.STRUCT) {
+findFieldAndValues(
+originalValue.getStruct(field.name()),
+step.getValue(),
+fieldAndValueMap
+);
+}
+}
+}
+return fieldAndValueMap;
+}
+
+/**
+ * Find values at the field paths
+ *
+ * @param value data value
+ * @return map of field paths and field/values
+ */
+public Map> 
fieldAndValuesFrom(Map value) {
+if (trie.isEmpty()) return Collections.emptyMap();
+return findFieldAndValues(value, trie.root, new HashMap<>());
+}
+
+@SuppressWarnings("unchecked")
+private Map> findFieldAndValues(
+Map value,
+TrieNode trieAt,
+Map> fieldAndValueMap
+) {
+for (Map.Entry step : trieAt.steps().entrySet()) {
+Object fieldValue = value.get(step.getKey());
+if (step.getValue().isLeaf()) {
+fieldAndValueMap.put(
+step.getValue().path,
+new AbstractMap.SimpleImmutableEntry<>(step.getKey(), 
fieldValue)
+);
+} else {
+if (fieldValue instanceof Map) {
+findFieldAndValues(
+(Map) fieldValue,
+step.getValue(),
+fieldAndValueMap
+  

Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]

2024-04-17 Thread via GitHub


chia7712 commented on code in PR #15732:
URL: https://github.com/apache/kafka/pull/15732#discussion_r1568327439


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -391,13 +391,16 @@ void enqueueMetadataChangeEvent(
 
 // Events handled by Migration Driver.
 abstract class MigrationEvent implements EventQueue.Event {
+// Use no-op handler by default because the retryHandler will be 
overridden if needed
+public void retryHandler() { }
 @SuppressWarnings("ThrowableNotThrown")
 @Override
 public void handleException(Throwable e) {
 if (e instanceof MigrationClientAuthException) {
 
KRaftMigrationDriver.this.faultHandler.handleFault("Encountered ZooKeeper 
authentication in " + this, e);
 } else if (e instanceof MigrationClientException) {
 log.info(String.format("Encountered ZooKeeper error during 
event %s. Will retry.", this), e.getCause());
+retryHandler();

Review Comment:
   > No, as you said above, the MigrationClientException retryHandler won't be 
triggered in other migrationState because they will be handled in other event 
handler, which is not related to pollEvent. And because the default 
retryHandler is no-op, there will be no retry for other migrationStates. As 
long as pollEvent is keep polling, they can be retried later.
   
   you are right :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]

2024-04-17 Thread via GitHub


showuon commented on code in PR #15732:
URL: https://github.com/apache/kafka/pull/15732#discussion_r1568325101


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -391,13 +391,16 @@ void enqueueMetadataChangeEvent(
 
 // Events handled by Migration Driver.
 abstract class MigrationEvent implements EventQueue.Event {
+// Use no-op handler by default because the retryHandler will be 
overridden if needed
+public void retryHandler() { }
 @SuppressWarnings("ThrowableNotThrown")
 @Override
 public void handleException(Throwable e) {
 if (e instanceof MigrationClientAuthException) {
 
KRaftMigrationDriver.this.faultHandler.handleFault("Encountered ZooKeeper 
authentication in " + this, e);
 } else if (e instanceof MigrationClientException) {
 log.info(String.format("Encountered ZooKeeper error during 
event %s. Will retry.", this), e.getCause());
+retryHandler();

Review Comment:
   > My question is "why we did not handle UNINITIALIZED by another event"? If 
we move recoverMigrationStateFromZK to another event, we don't need to add 
extra retryHandler.
   
   That's a good quesiton, @chia7712 ! Let me think about it.
   
   > Also, the solution offered by this PR has a side effect that we will put 2 
PollEvent if the exception MigrationClientException happens in other 
migrationState
   
   No, as you said above, the `MigrationClientException` retryHandler won't be 
triggered in other migrationState because they will be handled in other event 
handler, which is not related to pollEvent. And because the default 
retryHandler is no-op, there will be no retry for other migrationStates. As 
long as `pollEvent` is keep polling, they can be retried later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]

2024-04-17 Thread via GitHub


showuon commented on code in PR #15732:
URL: https://github.com/apache/kafka/pull/15732#discussion_r1568325101


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -391,13 +391,16 @@ void enqueueMetadataChangeEvent(
 
 // Events handled by Migration Driver.
 abstract class MigrationEvent implements EventQueue.Event {
+// Use no-op handler by default because the retryHandler will be 
overridden if needed
+public void retryHandler() { }
 @SuppressWarnings("ThrowableNotThrown")
 @Override
 public void handleException(Throwable e) {
 if (e instanceof MigrationClientAuthException) {
 
KRaftMigrationDriver.this.faultHandler.handleFault("Encountered ZooKeeper 
authentication in " + this, e);
 } else if (e instanceof MigrationClientException) {
 log.info(String.format("Encountered ZooKeeper error during 
event %s. Will retry.", this), e.getCause());
+retryHandler();

Review Comment:
   > My question is "why we did not handle UNINITIALIZED by another event"? If 
we move recoverMigrationStateFromZK to another event, we don't need to add 
extra retryHandler.
   
   That's a good quesiton, @chia7712 ! Let me think about it.
   
   > Also, the solution offered by this PR has a side effect that we will put 2 
PollEvent if the exception MigrationClientException happens in other 
migrationState
   
   No, as you said above, the `MigrationClientException` retry won't happen in 
other migrationState because they will be handled in other event handler, which 
is not related to pollEvent. And because the default retryHandler is no-op, 
there will be no retry for other migrationStates. As long as `pollEvent` is 
keep polling, they can be retried later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]

2024-04-17 Thread via GitHub


showuon commented on code in PR #15732:
URL: https://github.com/apache/kafka/pull/15732#discussion_r1568325101


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -391,13 +391,16 @@ void enqueueMetadataChangeEvent(
 
 // Events handled by Migration Driver.
 abstract class MigrationEvent implements EventQueue.Event {
+// Use no-op handler by default because the retryHandler will be 
overridden if needed
+public void retryHandler() { }
 @SuppressWarnings("ThrowableNotThrown")
 @Override
 public void handleException(Throwable e) {
 if (e instanceof MigrationClientAuthException) {
 
KRaftMigrationDriver.this.faultHandler.handleFault("Encountered ZooKeeper 
authentication in " + this, e);
 } else if (e instanceof MigrationClientException) {
 log.info(String.format("Encountered ZooKeeper error during 
event %s. Will retry.", this), e.getCause());
+retryHandler();

Review Comment:
   > My question is "why we did not handle UNINITIALIZED by another event"? If 
we move recoverMigrationStateFromZK to another event, we don't need to add 
extra retryHandler.
   
   That's a good quesiton, @chia7712 ! Let me think about it.
   
   > Also, the solution offered by this PR has a side effect that we will put 2 
PollEvent if the exception MigrationClientException happens in other 
migrationState
   
   No, as you said above, the `MigrationClientException` won't happen in other 
migrationState because they will be handled in other event handler, which is 
not related to pollEvent. And because the default retryHandler is no-op, there 
will be no retry for other migrationStates. As long as `pollEvent` is keep 
polling, they can be retried later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]

2024-04-17 Thread via GitHub


chia7712 commented on code in PR #15732:
URL: https://github.com/apache/kafka/pull/15732#discussion_r1568311272


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -391,13 +391,16 @@ void enqueueMetadataChangeEvent(
 
 // Events handled by Migration Driver.
 abstract class MigrationEvent implements EventQueue.Event {
+// Use no-op handler by default because the retryHandler will be 
overridden if needed
+public void retryHandler() { }
 @SuppressWarnings("ThrowableNotThrown")
 @Override
 public void handleException(Throwable e) {
 if (e instanceof MigrationClientAuthException) {
 
KRaftMigrationDriver.this.faultHandler.handleFault("Encountered ZooKeeper 
authentication in " + this, e);
 } else if (e instanceof MigrationClientException) {
 log.info(String.format("Encountered ZooKeeper error during 
event %s. Will retry.", this), e.getCause());
+retryHandler();

Review Comment:
   Also, the solution offered by this PR has a side effect that we will put 2 
`PollEvent` if the exception `MigrationClientException` happens in other 
`migrationState`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]

2024-04-17 Thread via GitHub


chia7712 commented on code in PR #15732:
URL: https://github.com/apache/kafka/pull/15732#discussion_r1568308820


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -391,13 +391,16 @@ void enqueueMetadataChangeEvent(
 
 // Events handled by Migration Driver.
 abstract class MigrationEvent implements EventQueue.Event {
+// Use no-op handler by default because the retryHandler will be 
overridden if needed
+public void retryHandler() { }
 @SuppressWarnings("ThrowableNotThrown")
 @Override
 public void handleException(Throwable e) {
 if (e instanceof MigrationClientAuthException) {
 
KRaftMigrationDriver.this.faultHandler.handleFault("Encountered ZooKeeper 
authentication in " + this, e);
 } else if (e instanceof MigrationClientException) {
 log.info(String.format("Encountered ZooKeeper error during 
event %s. Will retry.", this), e.getCause());
+retryHandler();

Review Comment:
   I feel the `retry` is existent except for `UNINITIALIZED` since 
`UNINITIALIZED` is not running by another event. For other event type, 
`PollEvent` will put (do-something event + one deferred `PollEvent`) to the 
queue. It means the deferred `PollEvent` is the "retry".
   
   My question is "why we did not handle `UNINITIALIZED` by another event"? If 
we move `recoverMigrationStateFromZK` to another event, we don't need to add 
extra `retryHandler`.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] debug for 15679 [kafka]

2024-04-17 Thread via GitHub


chia7712 opened a new pull request, #15741:
URL: https://github.com/apache/kafka/pull/15741

   as title


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]

2024-04-17 Thread via GitHub


showuon commented on code in PR #15732:
URL: https://github.com/apache/kafka/pull/15732#discussion_r1568277518


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -391,13 +391,20 @@ void enqueueMetadataChangeEvent(
 
 // Events handled by Migration Driver.
 abstract class MigrationEvent implements EventQueue.Event {
+// Use no-op handler by default because the handleException will be 
overridden if needed
+private Consumer retryHandler = NO_OP_HANDLER;
+
+public void retryHandler(Consumer retryHandler) {
+this.retryHandler = retryHandler;
+}

Review Comment:
   > Did you consider simply defining an empty public void 
retryHandler(Throwable thrown) that PollEvent can override?
   
   Nice suggestion! Updated!
   
   > Also, should we call wakeup (run next poll ASAP) rather that 
scheduleDeferred if the exception is retryable?
   
   Thanks for the suggestion. I think that's not appropriate because if the 
retriable error needs some time to be fixed (ex: the ZK connection issue), the 
pollEvent will be invoked a lot of times (and keep retrying) in a short period 
of time.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



<    1   2