Re: [PR] KAFKA-15875 Make Snapshot public [kafka]

2024-04-15 Thread via GitHub


github-actions[bot] commented on PR #14816:
URL: https://github.com/apache/kafka/pull/14816#issuecomment-2058166826

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16543:There may be ambiguous deletions in the `cleanupGroupMetadata` when the generation of the group is less than or equal to 0 [kafka]

2024-04-15 Thread via GitHub


hudeqi closed pull request #15706: KAFKA-16543:There may be ambiguous deletions 
in the `cleanupGroupMetadata` when the generation of the group is less than or 
equal to 0
URL: https://github.com/apache/kafka/pull/15706


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16543:There may be ambiguous deletions in the `cleanupGroupMetadata` when the generation of the group is less than or equal to 0 [kafka]

2024-04-15 Thread via GitHub


hudeqi commented on PR #15706:
URL: https://github.com/apache/kafka/pull/15706#issuecomment-2058152299

   > Hi @hudeqi. Thanks for the patch. I would like to better understand it. My 
first question is how would Flink commit Flink with a generationId equal to -1? 
The generation of the group is only managed by the group. It is not possible to 
alter it from an external system. The -1 passed in the offset commit request is 
only used for validation purposes.
   > 
   > The reason why we don't write a tombstone in this case is because the 
group was never materialized in the log if it stayed at generation 0. I am not 
sure it is a worthwhile optimization though.
   
   @dajac Thank you for your review. The answer for first question: Flink only 
uses Kafka to commit and store offsets, and its group is not managed by Kafka. 
By default, the commit generation value is always -1. Since the generation is 
only changed when members are managed in Kafka, Flink's generation remains -1 
and will not be changed. The answer for second question: In the case of using 
Kafka only to store and commit offsets, the group is also initialized in the 
`groupMetadataCache` in memory. In the log, the group's metadata and 
offsetMetadata are also written to `__consumer_offsets`. Therefore, considering 
data consistency, they should all be cleaned up when being purged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15685: added compatibility for MinGW [kafka]

2024-04-15 Thread via GitHub


Zta commented on PR #13321:
URL: https://github.com/apache/kafka/pull/13321#issuecomment-2058148690

   My OS is RHEL 8.7 and 8.8
   
   In 3.5.1, I ran this script with kafka.tools.GetOffsetShell just fine.
   
   In 3.7.0, I got the following error:
   Error: Could not find or load main class kafka.tools.GetOffsetShell
   Caused by: java.lang.ClassNotFoundException: kafka.tools.GetOffsetShell


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: fix some typos [kafka]

2024-04-15 Thread via GitHub


Nicklee007 opened a new pull request, #15725:
URL: https://github.com/apache/kafka/pull/15725

   Fix some typos.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]

2024-04-15 Thread via GitHub


KevinZTW commented on code in PR #15714:
URL: https://github.com/apache/kafka/pull/15714#discussion_r1566619569


##
storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java:
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InconsistentTopicIdException;
+
+import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class PartitionMetadataFileTest  {
+private final File dir = TestUtils.tempDirectory();

Review Comment:
   Oh thanks! I forgot the hook would cleanup files in each test automatically



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]

2024-04-15 Thread via GitHub


KevinZTW commented on code in PR #15714:
URL: https://github.com/apache/kafka/pull/15714#discussion_r1566612658


##
storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java:
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InconsistentTopicIdException;
+
+import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class PartitionMetadataFileTest  {
+private final File dir = TestUtils.tempDirectory();
+
+@Test
+public void testSetRecordWithDifferentTopicId() {
+File file = PartitionMetadataFile.newFile(dir);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+Uuid topicId = Uuid.randomUuid();
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));
+Uuid differentTopicId = Uuid.randomUuid();
+assertThrows(InconsistentTopicIdException.class, () -> 
partitionMetadataFile.record(differentTopicId));
+}
+
+@Test
+public void testSetRecordWithSameTopicId() {
+File file = PartitionMetadataFile.newFile(dir);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+Uuid topicId = Uuid.randomUuid();
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));
+}
+
+@Test
+public void testMaybeFlushWithTopicIdPresent() {
+File file = PartitionMetadataFile.newFile(dir);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+
+Uuid topicId = Uuid.randomUuid();
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));
+assertDoesNotThrow(partitionMetadataFile::maybeFlush);
+
+assertDoesNotThrow(() -> {
+List lines = Files.readAllLines(file.toPath());
+assertEquals(2, lines.size());
+assertEquals("version: 0", lines.get(0));
+assertEquals("topic_id: " + topicId, lines.get(1));
+});
+}
+
+@Test
+public void testMaybeFlushWithNoTopicIdPresent() {
+File file = PartitionMetadataFile.newFile(dir);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+
+assertDoesNotThrow(partitionMetadataFile::maybeFlush);
+assertEquals(0, file.length());
+}
+
+@Test
+public void testRead() {
+File file = PartitionMetadataFile.newFile(dir);
+LogDirFailureChannel channel = 
Mockito.mock(LogDirFailureChannel.class);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, channel);
+
+Uuid topicId = Uuid.randomUuid();
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));

Review Comment:
   The only exception would be generated is `InconsistentTopicIdException` in 
this case I think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]

2024-04-15 Thread via GitHub


KevinZTW commented on code in PR #15714:
URL: https://github.com/apache/kafka/pull/15714#discussion_r1566612024


##
storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java:
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InconsistentTopicIdException;
+
+import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class PartitionMetadataFileTest  {
+private final File dir = TestUtils.tempDirectory();
+
+@Test
+public void testSetRecordWithDifferentTopicId() {
+File file = PartitionMetadataFile.newFile(dir);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+Uuid topicId = Uuid.randomUuid();
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));
+Uuid differentTopicId = Uuid.randomUuid();
+assertThrows(InconsistentTopicIdException.class, () -> 
partitionMetadataFile.record(differentTopicId));
+}
+
+@Test
+public void testSetRecordWithSameTopicId() {
+File file = PartitionMetadataFile.newFile(dir);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+Uuid topicId = Uuid.randomUuid();
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));
+}
+
+@Test
+public void testMaybeFlushWithTopicIdPresent() {
+File file = PartitionMetadataFile.newFile(dir);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+
+Uuid topicId = Uuid.randomUuid();
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));
+assertDoesNotThrow(partitionMetadataFile::maybeFlush);
+
+assertDoesNotThrow(() -> {
+List lines = Files.readAllLines(file.toPath());
+assertEquals(2, lines.size());
+assertEquals("version: 0", lines.get(0));
+assertEquals("topic_id: " + topicId, lines.get(1));
+});
+}
+
+@Test
+public void testMaybeFlushWithNoTopicIdPresent() {
+File file = PartitionMetadataFile.newFile(dir);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+
+assertDoesNotThrow(partitionMetadataFile::maybeFlush);
+assertEquals(0, file.length());
+}
+
+@Test
+public void testRead() {
+File file = PartitionMetadataFile.newFile(dir);
+LogDirFailureChannel channel = 
Mockito.mock(LogDirFailureChannel.class);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, channel);
+
+Uuid topicId = Uuid.randomUuid();
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));

Review Comment:
   Oh, in here the reason I used `assertDoesNotThrow` is because we expect the 
first time to `record` the `topicId` shouldn't generate any exception, since 
there's no `dirtyTopicIdOpt`
   
   Do you think that make sense? Or that would be redundant?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]

2024-04-15 Thread via GitHub


KevinZTW commented on code in PR #15714:
URL: https://github.com/apache/kafka/pull/15714#discussion_r1566609363


##
storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java:
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InconsistentTopicIdException;
+
+import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class PartitionMetadataFileTest  {
+private final File dir = TestUtils.tempDirectory();
+
+@Test
+public void testSetRecordWithDifferentTopicId() {
+File file = PartitionMetadataFile.newFile(dir);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+Uuid topicId = Uuid.randomUuid();
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));
+Uuid differentTopicId = Uuid.randomUuid();
+assertThrows(InconsistentTopicIdException.class, () -> 
partitionMetadataFile.record(differentTopicId));
+}
+
+@Test
+public void testSetRecordWithSameTopicId() {
+File file = PartitionMetadataFile.newFile(dir);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+Uuid topicId = Uuid.randomUuid();
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));
+}
+
+@Test
+public void testMaybeFlushWithTopicIdPresent() {
+File file = PartitionMetadataFile.newFile(dir);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+
+Uuid topicId = Uuid.randomUuid();
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));
+assertDoesNotThrow(partitionMetadataFile::maybeFlush);
+
+assertDoesNotThrow(() -> {
+List lines = Files.readAllLines(file.toPath());
+assertEquals(2, lines.size());
+assertEquals("version: 0", lines.get(0));

Review Comment:
   ok! thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16556: SubscriptionState should not prematurely reset 'pending' partitions [kafka]

2024-04-15 Thread via GitHub


kirktrue commented on PR #15724:
URL: https://github.com/apache/kafka/pull/15724#issuecomment-2058081763

   @lucasbru—please kindly take a look at this issue that's occasionally 
causing consumers to ignore committed offsets. The result is the consumer 
resets the partition's position back to 0, causing reprocessing of messages.
   
   cc @lianetm @philipnee


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]

2024-04-15 Thread via GitHub


KevinZTW commented on code in PR #15714:
URL: https://github.com/apache/kafka/pull/15714#discussion_r1566606914


##
storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java:
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InconsistentTopicIdException;
+
+import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class PartitionMetadataFileTest  {
+private final File dir = TestUtils.tempDirectory();
+
+@Test
+public void testSetRecordWithDifferentTopicId() {
+File file = PartitionMetadataFile.newFile(dir);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+Uuid topicId = Uuid.randomUuid();
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));
+Uuid differentTopicId = Uuid.randomUuid();
+assertThrows(InconsistentTopicIdException.class, () -> 
partitionMetadataFile.record(differentTopicId));
+}
+
+@Test
+public void testSetRecordWithSameTopicId() {
+File file = PartitionMetadataFile.newFile(dir);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+Uuid topicId = Uuid.randomUuid();
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));
+}
+
+@Test
+public void testMaybeFlushWithTopicIdPresent() {
+File file = PartitionMetadataFile.newFile(dir);
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+
+Uuid topicId = Uuid.randomUuid();
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));
+assertDoesNotThrow(partitionMetadataFile::maybeFlush);
+
+assertDoesNotThrow(() -> {
+List lines = Files.readAllLines(file.toPath());
+assertEquals(2, lines.size());

Review Comment:
   I see, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16556: SubscriptionState should not prematurely reset 'pending' partitions [kafka]

2024-04-15 Thread via GitHub


kirktrue opened a new pull request, #15724:
URL: https://github.com/apache/kafka/pull/15724

   Partitions that are marked as `pendingOnAssignedCallback` should not be 
reset in `resetInitializingPositions()`. Pending partitions are omitted from 
the set returned by `initializingPartitions()`. As a result, the Consumer does 
not include them in the set of partitions for which it attempts to load 
committed offsets. The code used by the Consumer to reset positions  
(`resetInitializingPositions()`) should likewise skip partitions marked as 
pending.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16556) SubscriptionState should not prematurely reset 'pending' partitions

2024-04-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16556:
--
Summary: SubscriptionState should not prematurely reset 'pending' 
partitions  (was: Race condition between ConsumerRebalanceListener and 
SubscriptionState causes commit offsets to be reset)

> SubscriptionState should not prematurely reset 'pending' partitions
> ---
>
> Key: KAFKA-16556
> URL: https://issues.apache.org/jira/browse/KAFKA-16556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> There appears to be a race condition between invoking the 
> {{ConsumerRebalanceListener}} callbacks on reconciliation and 
> {{initWithCommittedOffsetsIfNeeded}} in the consumer.
>  
> The membership manager adds the newly assigned partitions to the 
> {{{}SubscriptionState{}}}, but marks them as 
> {{{}pendingOnAssignedCallback{}}}. Then, after the 
> {{ConsumerRebalanceListener.onPartitionsAssigned()}} completes, the 
> membership manager will invoke {{enablePartitionsAwaitingCallback}} to set 
> all of those partitions' 'pending' flag to false.
>  
> During the main {{Consumer.poll()}} loop, {{AsyncKafkaConsumer}} may need to 
> call {{initWithCommittedOffsetsIfNeeded()}} if the positions aren't already 
> cached. Inside {{{}initWithCommittedOffsetsIfNeeded{}}}, the consumer calls 
> the subscription's {{initializingPartitions}} method to get a set of the 
> partitions for which to fetch their committed offsets. However, 
> {{SubscriptionState.initializingPartitions()}} only returns partitions that 
> have the {{pendingOnAssignedCallback}} flag set to to false.
>  
> The result is:
>  * If the {{MembershipManagerImpl.assignPartitions()}} future  is completed 
> on the background thread first, the 'pending' flag is set to false. On the 
> application thread, when {{SubscriptionState.initializingPartitions()}} is 
> called, it returns the partition, and we fetch its committed offsets
>  * If instead the application thread calls 
> {{SubscriptionState.initializingPartitions()}} first, the partitions's 
> 'pending' flag is still set to false, and so the partition is omitted from 
> the returned set. The {{updateFetchPositions()}} method then continues on and 
> re-initializes the partition's fetch offset to 0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-04-15 Thread via GitHub


HenryCaiHaiying commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1566566286


##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+import kafka.server.QuotaType;
+import kafka.server.SensorAccess;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class RLMQuotaManager {
+private static final Logger LOGGER = 
LoggerFactory.getLogger(RLMQuotaManager.class);
+
+private final RLMQuotaManagerConfig config;
+private final Metrics metrics;
+private final QuotaType quotaType;
+private final String description;
+private final Time time;
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+private final SensorAccess sensorAccess;
+private Quota quota;
+
+public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, 
QuotaType quotaType, String description, Time time) {
+this.config = config;
+this.metrics = metrics;
+this.quotaType = quotaType;
+this.description = description;
+this.time = time;
+
+this.quota = new Quota(config.getQuotaBytesPerSecond(), true);
+this.sensorAccess = new SensorAccess(lock, metrics);
+}
+
+public void updateQuota(Quota newQuota) {
+lock.writeLock().lock();
+try {
+this.quota = newQuota;
+
+Map allMetrics = metrics.metrics();
+MetricName quotaMetricName = metricName();
+allMetrics.forEach((metricName, metric) -> {
+if (metricName.name().equals(quotaMetricName.name()) && 
metricName.group().equals(quotaMetricName.group())) {
+Map metricTags = metricName.tags();
+LOGGER.info("Sensor for quota-id {} already exists. 
Setting quota to {} in MetricConfig", metricTags, newQuota);

Review Comment:
   LOGGER.warn ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]

2024-04-15 Thread via GitHub


kirktrue commented on PR #15723:
URL: https://github.com/apache/kafka/pull/15723#issuecomment-2057988468

   @lucasbru—please kindly take a look at this issue that's occasionally 
causing duplicate heartbeat requests.
   
   cc @lianetm @philipnee 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]

2024-04-15 Thread via GitHub


kirktrue opened a new pull request, #15723:
URL: https://github.com/apache/kafka/pull/15723

   In some cases, the network layer is _very_ fast and can send out multiple 
requests within the same millisecond timestamp.
   
   The previous logic for tracking inflight status used timestamps: if the 
timestamp from the last received response was less than the timestamp from the 
last sent request, we'd interpret that as having an inflight request. However, 
this approach would incorrectly return `false` from 
`RequestState.requestInFlight()` if the two timestamps were _equal_.
   
   One result of this faulty logic is that in such cases, the consumer would 
accidentally send multiple heartbeat requests to the consumer group 
coordinator. The consumer group coordinator would interpret these requests as 
'join group' requests and create members for each request. Therefore, the 
coordinator was under the false understanding that there were more members in 
the group than there really were. Consequently, if your luck was _really_ bad, 
the coordinator might assign partitions to one of the duplicate members. Those 
partitions would be assigned to a phantom consumer that was not reading any 
data, and this led to flaky tests.
   
   The implementation in `RequestState` has a stupid simple flag that is set in 
`onSendAttempt` and cleared in `onSuccessfulAttempt`, `onFailedAttempt`, and 
`reset`. A new unit test has been added and this has been tested against all of 
the consumer unit and integration tests, and has removed all known occurrences 
of phantom consumer group members in the system tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15974) Enforce that event processing respects user-provided timeout

2024-04-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15974:
--
Reviewer: Lucas Brutschy  (was: Bruno Cadonna)

> Enforce that event processing respects user-provided timeout
> 
>
> Key: KAFKA-15974
> URL: https://issues.apache.org/jira/browse/KAFKA-15974
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, timeout
> Fix For: 3.8.0
>
>
> The intention of the {{CompletableApplicationEvent}} is for a {{Consumer}} to 
> block waiting for the event to complete. The application thread will block 
> for the timeout, but there is not yet a consistent manner in which events are 
> timed out.
> Enforce at the event handler/event processing layer that timeouts are 
> respected per the design in KAFKA-15848.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-15 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1566471100


##
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##
@@ -423,6 +441,17 @@ public void replay(ConfigRecord record) {
 log.info("Replayed ConfigRecord for {} which set configuration {} 
to {}",
 configResource, record.name(), record.value());
 }
+if (record.name() == TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG) {
+try {
+if (type == Type.TOPIC) {
+
minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(Optional.of(record.resourceName()));
+} else {
+
minIsrConfigUpdatePartitionHandler.addRecordsForMinIsrUpdate(Optional.empty());
+}
+} catch (InterruptedException | ExecutionException e) {
+throw new Throwable("Fail to append partition updates for the 
min isr update: " + e.getMessage());
+}

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-15 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1566455070


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) {
 void handleUncleanBrokerShutdown(int brokerId, List 
records) {
 replicationControl.handleBrokerUncleanShutdown(brokerId, records);
 }
+
+void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws 
InterruptedException, ExecutionException {
+appendWriteEvent("partitionUpdateForMinIsrChange", 
OptionalLong.empty(),
+() -> 
replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get();
+}

Review Comment:
   Got it, we basically only need to call the appendWriteEvents and do not wait 
for the replay().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16556) Race condition between ConsumerRebalanceListener and SubscriptionState causes commit offsets to be reset

2024-04-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16556:
--
Summary: Race condition between ConsumerRebalanceListener and 
SubscriptionState causes commit offsets to be reset  (was: Race condition 
between ConsumerRebalanceListener and SubscriptionState)

> Race condition between ConsumerRebalanceListener and SubscriptionState causes 
> commit offsets to be reset
> 
>
> Key: KAFKA-16556
> URL: https://issues.apache.org/jira/browse/KAFKA-16556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> There appears to be a race condition between invoking the 
> {{ConsumerRebalanceListener}} callbacks on reconciliation and 
> {{initWithCommittedOffsetsIfNeeded}} in the consumer.
>  
> The membership manager adds the newly assigned partitions to the 
> {{{}SubscriptionState{}}}, but marks them as 
> {{{}pendingOnAssignedCallback{}}}. Then, after the 
> {{ConsumerRebalanceListener.onPartitionsAssigned()}} completes, the 
> membership manager will invoke {{enablePartitionsAwaitingCallback}} to set 
> all of those partitions' 'pending' flag to false.
>  
> During the main {{Consumer.poll()}} loop, {{AsyncKafkaConsumer}} may need to 
> call {{initWithCommittedOffsetsIfNeeded()}} if the positions aren't already 
> cached. Inside {{{}initWithCommittedOffsetsIfNeeded{}}}, the consumer calls 
> the subscription's {{initializingPartitions}} method to get a set of the 
> partitions for which to fetch their committed offsets. However, 
> {{SubscriptionState.initializingPartitions()}} only returns partitions that 
> have the {{pendingOnAssignedCallback}} flag set to to false.
>  
> The result is:
>  * If the {{MembershipManagerImpl.assignPartitions()}} future  is completed 
> on the background thread first, the 'pending' flag is set to false. On the 
> application thread, when {{SubscriptionState.initializingPartitions()}} is 
> called, it returns the partition, and we fetch its committed offsets
>  * If instead the application thread calls 
> {{SubscriptionState.initializingPartitions()}} first, the partitions's 
> 'pending' flag is still set to false, and so the partition is omitted from 
> the returned set. The {{updateFetchPositions()}} method then continues on and 
> re-initializes the partition's fetch offset to 0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: fix duplicated return and other streams docs typos [kafka]

2024-04-15 Thread via GitHub


AyoubOm commented on code in PR #15713:
URL: https://github.com/apache/kafka/pull/15713#discussion_r1566436979


##
streams/src/main/java/org/apache/kafka/streams/kstream/NamedOperation.java:
##
@@ -17,7 +17,7 @@
 package org.apache.kafka.streams.kstream;
 
 /**
- * Default interface which can be used to personalized the named of 
operations, internal topics or store.
+ * Default interface which can be used to customize the named of operations, 
internal topics or store.

Review Comment:
   Thanks @johnnychhsu, fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Fix io-[wait-]ratio metrics description [kafka]

2024-04-15 Thread via GitHub


emitskevich-blp opened a new pull request, #15722:
URL: https://github.com/apache/kafka/pull/15722

   [Implementation of KIP-773](https://github.com/apache/kafka/pull/11302) 
deprecated `iotime-total`  and `io-waittime-total` metrics. It wasn't expected 
to mark `io-ratio` and `io-wait-ratio` deprecated. However, now they have 
`*Deprecated* ` in their description. Here is the reason:
   1. register `io-ratio` (desc: `*Deprecated* The fraction of time ...`) -> 
registered
   2. register `iotime-total` (desc: `*Deprecated* The total time ...`) -> 
registered 
   3. register `io-ratio` (desc: `The fraction of time ...`) -> **skipped, the 
same name already exists in registry**
   4. register `io-time-ns-total` (desc: `The total time ...`) -> registered 
   
   As a result, `io-ratio` has incorrect description. The same for 
`io-wait-ratio`. This PR fixes these descriptions..
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16558) Implement HeartbeatRequestState.toStringBase()

2024-04-15 Thread Kirk True (Jira)
Kirk True created KAFKA-16558:
-

 Summary: Implement HeartbeatRequestState.toStringBase()
 Key: KAFKA-16558
 URL: https://issues.apache.org/jira/browse/KAFKA-16558
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


The code incorrectly overrides the {{toString()}} method instead of overriding 
{{{}toStringBase(){}}}. This affects debugging and troubleshooting consumer 
issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16558) Implement HeartbeatRequestState.toStringBase()

2024-04-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16558:
--
Description: The inner class {{HeartbeatRequestState}} does not override 
the {{toStringBase()}} method. This affects debugging and troubleshooting 
consumer issues.  (was: The code incorrectly overrides the {{toString()}} 
method instead of overriding {{{}toStringBase(){}}}. This affects debugging and 
troubleshooting consumer issues.)

> Implement HeartbeatRequestState.toStringBase()
> --
>
> Key: KAFKA-16558
> URL: https://issues.apache.org/jira/browse/KAFKA-16558
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, logging
> Fix For: 3.8.0
>
>
> The inner class {{HeartbeatRequestState}} does not override the 
> {{toStringBase()}} method. This affects debugging and troubleshooting 
> consumer issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16557) Fix OffsetFetchRequestState.toString()

2024-04-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16557:
--
Summary: Fix OffsetFetchRequestState.toString()  (was: Fix 
CommitRequestManager’s OffsetFetchRequestState.toString())

> Fix OffsetFetchRequestState.toString()
> --
>
> Key: KAFKA-16557
> URL: https://issues.apache.org/jira/browse/KAFKA-16557
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, logging
> Fix For: 3.8.0
>
>
> The code incorrectly overrides the {{toString()}} method instead of 
> overriding {{{}toStringBase(){}}}. This affects debugging and troubleshooting 
> consumer issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16557) Fix CommitRequestManager’s OffsetFetchRequestState.toString()

2024-04-15 Thread Kirk True (Jira)
Kirk True created KAFKA-16557:
-

 Summary: Fix CommitRequestManager’s 
OffsetFetchRequestState.toString()
 Key: KAFKA-16557
 URL: https://issues.apache.org/jira/browse/KAFKA-16557
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


The code incorrectly overrides the {{toString()}} method instead of overriding 
{{{}toStringBase(){}}}. This affects debugging and troubleshooting consumer 
issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-15 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1566242343


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) {
 void handleUncleanBrokerShutdown(int brokerId, List 
records) {
 replicationControl.handleBrokerUncleanShutdown(brokerId, records);
 }
+
+void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws 
InterruptedException, ExecutionException {
+appendWriteEvent("partitionUpdateForMinIsrChange", 
OptionalLong.empty(),
+() -> 
replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get();
+}

Review Comment:
   Then we only need to call appendWriteEvent here? We don't have to wait for 
the replay().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16556) Race condition between ConsumerRebalanceListener and SubscriptionState

2024-04-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16556:
--
Description: 
There appears to be a race condition between invoking the 
{{ConsumerRebalanceListener}} callbacks on reconciliation and 
{{initWithCommittedOffsetsIfNeeded}} in the consumer.
 
The membership manager adds the newly assigned partitions to the 
{{{}SubscriptionState{}}}, but marks them as {{{}pendingOnAssignedCallback{}}}. 
Then, after the {{ConsumerRebalanceListener.onPartitionsAssigned()}} completes, 
the membership manager will invoke {{enablePartitionsAwaitingCallback}} to set 
all of those partitions' 'pending' flag to false.
 
During the main {{Consumer.poll()}} loop, {{AsyncKafkaConsumer}} may need to 
call {{initWithCommittedOffsetsIfNeeded()}} if the positions aren't already 
cached. Inside {{{}initWithCommittedOffsetsIfNeeded{}}}, the consumer calls the 
subscription's {{initializingPartitions}} method to get a set of the partitions 
for which to fetch their committed offsets. However, 
{{SubscriptionState.initializingPartitions()}} only returns partitions that 
have the {{pendingOnAssignedCallback}} flag set to to false.
 
The result is:
 * If the {{MembershipManagerImpl.assignPartitions()}} future  is completed on 
the background thread first, the 'pending' flag is set to false. On the 
application thread, when {{SubscriptionState.initializingPartitions()}} is 
called, it returns the partition, and we fetch its committed offsets
 * If instead the application thread calls 
{{SubscriptionState.initializingPartitions()}} first, the partitions's 
'pending' flag is still set to false, and so the partition is omitted from the 
returned set. The {{updateFetchPositions()}} method then continues on and 
re-initializes the partition's fetch offset to 0.

  was:
There appears to be a race condition between invoking the 
{{ConsumerRebalanceListener}} callbacks on reconciliation and 
{{initWithCommittedOffsetsIfNeeded}} in the consumer.
 
The membership manager adds the newly assigned partitions to the 
{{{}SubscriptionState{}}}, but marks them as {{{}pendingOnAssignedCallback{}}}. 
Then, after the {{ConsumerRebalanceListener.onPartitionsAssigned()}} completes, 
the membership manager will invoke {{enablePartitionsAwaitingCallback}} to set 
all of those partitions' 'pending' flag to false.
 
During the main {{Consumer.poll()}} loop, {{AsyncKafkaConsumer}} may need to 
call {{initWithCommittedOffsetsIfNeeded()}} if the positions aren't already 
cached. Inside {{{}initWithCommittedOffsetsIfNeeded{}}}, the consumer calls the 
subscription's {{initializingPartitions}} method to get a set of the partitions 
for which to fetch their committed offsets. However, 
{{SubscriptionState.initializingPartitions()}} only returns partitions that 
have the {{pendingOnAssignedCallback}} flag set to to false.
 
The result is: * If the {{MembershipManagerImpl.assignPartitions()}} future  is 
completed on the background thread first, the 'pending' flag is set to false. 
On the application thread, when {{SubscriptionState.initializingPartitions()}} 
is called, it returns the partition, and we fetch its committed offsets
 * If instead the application thread calls 
{{SubscriptionState.initializingPartitions()}} first, the partitions's 
'pending' flag is still set to false, and so the partition is omitted from the 
returned set. The {{updateFetchPositions()}} method then continues on and 
re-initializes the partition's fetch offset to 0.


> Race condition between ConsumerRebalanceListener and SubscriptionState
> --
>
> Key: KAFKA-16556
> URL: https://issues.apache.org/jira/browse/KAFKA-16556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> There appears to be a race condition between invoking the 
> {{ConsumerRebalanceListener}} callbacks on reconciliation and 
> {{initWithCommittedOffsetsIfNeeded}} in the consumer.
>  
> The membership manager adds the newly assigned partitions to the 
> {{{}SubscriptionState{}}}, but marks them as 
> {{{}pendingOnAssignedCallback{}}}. Then, after the 
> {{ConsumerRebalanceListener.onPartitionsAssigned()}} completes, the 
> membership manager will invoke {{enablePartitionsAwaitingCallback}} to set 
> all of those partitions' 'pending' flag to false.
>  
> During the main {{Consumer.poll()}} loop, {{AsyncKafkaConsumer}} may need to 
> call {{initWithCommittedOffsetsIfNeeded()}} if the positions aren't already 
> cached. Inside {{{}initWithCommittedOffsetsIfNeeded{}}}, the consumer calls 
> the subscription's {{initializingPartitions}} method 

[jira] [Assigned] (KAFKA-16556) Race condition between ConsumerRebalanceListener and SubscriptionState

2024-04-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16556:
-

Assignee: Kirk True

> Race condition between ConsumerRebalanceListener and SubscriptionState
> --
>
> Key: KAFKA-16556
> URL: https://issues.apache.org/jira/browse/KAFKA-16556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> There appears to be a race condition between invoking the 
> {{ConsumerRebalanceListener}} callbacks on reconciliation and 
> {{initWithCommittedOffsetsIfNeeded}} in the consumer.
>  
> The membership manager adds the newly assigned partitions to the 
> {{{}SubscriptionState{}}}, but marks them as 
> {{{}pendingOnAssignedCallback{}}}. Then, after the 
> {{ConsumerRebalanceListener.onPartitionsAssigned()}} completes, the 
> membership manager will invoke {{enablePartitionsAwaitingCallback}} to set 
> all of those partitions' 'pending' flag to false.
>  
> During the main {{Consumer.poll()}} loop, {{AsyncKafkaConsumer}} may need to 
> call {{initWithCommittedOffsetsIfNeeded()}} if the positions aren't already 
> cached. Inside {{{}initWithCommittedOffsetsIfNeeded{}}}, the consumer calls 
> the subscription's {{initializingPartitions}} method to get a set of the 
> partitions for which to fetch their committed offsets. However, 
> {{SubscriptionState.initializingPartitions()}} only returns partitions that 
> have the {{pendingOnAssignedCallback}} flag set to to false.
>  
> The result is: * If the {{MembershipManagerImpl.assignPartitions()}} future  
> is completed on the background thread first, the 'pending' flag is set to 
> false. On the application thread, when 
> {{SubscriptionState.initializingPartitions()}} is called, it returns the 
> partition, and we fetch its committed offsets
>  * If instead the application thread calls 
> {{SubscriptionState.initializingPartitions()}} first, the partitions's 
> 'pending' flag is still set to false, and so the partition is omitted from 
> the returned set. The {{updateFetchPositions()}} method then continues on and 
> re-initializes the partition's fetch offset to 0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16556) Race condition between ConsumerRebalanceListener and SubscriptionState

2024-04-15 Thread Kirk True (Jira)
Kirk True created KAFKA-16556:
-

 Summary: Race condition between ConsumerRebalanceListener and 
SubscriptionState
 Key: KAFKA-16556
 URL: https://issues.apache.org/jira/browse/KAFKA-16556
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Affects Versions: 3.7.0
Reporter: Kirk True
 Fix For: 3.8.0


There appears to be a race condition between invoking the 
{{ConsumerRebalanceListener}} callbacks on reconciliation and 
{{initWithCommittedOffsetsIfNeeded}} in the consumer.
 
The membership manager adds the newly assigned partitions to the 
{{{}SubscriptionState{}}}, but marks them as {{{}pendingOnAssignedCallback{}}}. 
Then, after the {{ConsumerRebalanceListener.onPartitionsAssigned()}} completes, 
the membership manager will invoke {{enablePartitionsAwaitingCallback}} to set 
all of those partitions' 'pending' flag to false.
 
During the main {{Consumer.poll()}} loop, {{AsyncKafkaConsumer}} may need to 
call {{initWithCommittedOffsetsIfNeeded()}} if the positions aren't already 
cached. Inside {{{}initWithCommittedOffsetsIfNeeded{}}}, the consumer calls the 
subscription's {{initializingPartitions}} method to get a set of the partitions 
for which to fetch their committed offsets. However, 
{{SubscriptionState.initializingPartitions()}} only returns partitions that 
have the {{pendingOnAssignedCallback}} flag set to to false.
 
The result is: * If the {{MembershipManagerImpl.assignPartitions()}} future  is 
completed on the background thread first, the 'pending' flag is set to false. 
On the application thread, when {{SubscriptionState.initializingPartitions()}} 
is called, it returns the partition, and we fetch its committed offsets
 * If instead the application thread calls 
{{SubscriptionState.initializingPartitions()}} first, the partitions's 
'pending' flag is still set to false, and so the partition is omitted from the 
returned set. The {{updateFetchPositions()}} method then continues on and 
re-initializes the partition's fetch offset to 0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16555) Consumer's RequestState has incorrect logic to determine if inflight

2024-04-15 Thread Kirk True (Jira)
Kirk True created KAFKA-16555:
-

 Summary: Consumer's RequestState has incorrect logic to determine 
if inflight
 Key: KAFKA-16555
 URL: https://issues.apache.org/jira/browse/KAFKA-16555
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 3.8.0


When running system tests for the new consumer, I've hit an issue where the 
{{HeartbeatRequestManager}} is sending out multiple concurrent 
{{CONSUMER_GROUP_REQUEST}} RPCs. The effect is the coordinator creates multiple 
members which causes downstream assignment problems.

Here's the order of events:

* Time 202: {{HearbeatRequestManager.poll()}} determines it's OK to send a 
request. In so doing, it updates the {{RequestState}}'s {{lastSentMs}} to the 
current timestamp, 202
* Time 236: the response is received and response handler is invoked, setting 
the {{RequestState}}'s {{lastReceivedMs}} to the current timestamp, 236
* Time 236: {{HearbeatRequestManager.poll()}} is invoked again, and it sees 
that it's OK to send a request. It creates another request, once again updating 
the {{RequestState}}'s {{lastSentMs}} to the current timestamp, 236
* Time 237:  {{HearbeatRequestManager.poll()}} is invoked again, and 
ERRONEOUSLY decides it's OK to send another request, despite one already in 
flight.

Here's the problem with {{requestInFlight()}}:

{code:java}
public boolean requestInFlight() {
return this.lastSentMs > -1 && this.lastReceivedMs < this.lastSentMs;
}
{code}

On our case, {{lastReceivedMs}} is 236 and {{lastSentMs}} is _also_ 236. So the 
received timestamp is _equal_ to the sent timestamp, not _less_.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-15 Thread via GitHub


dongnuo123 opened a new pull request, #15721:
URL: https://github.com/apache/kafka/pull/15721

   Online downgrade from a consumer group to a classic group is triggered when 
the last consumer that uses the consumer protocol leaves the group. A rebalance 
is manually triggered after the group conversion. 
   
   This patch adds consumer group downgrade validation and conversion.
   
   https://issues.apache.org/jira/browse/KAFKA-16554
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka-16540: Update partitions if min isr config is changed. [kafka]

2024-04-15 Thread via GitHub


CalvinConfluent commented on code in PR #15702:
URL: https://github.com/apache/kafka/pull/15702#discussion_r1566242343


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -2360,4 +2363,9 @@ void setNewNextWriteOffset(long newNextWriteOffset) {
 void handleUncleanBrokerShutdown(int brokerId, List 
records) {
 replicationControl.handleBrokerUncleanShutdown(brokerId, records);
 }
+
+void maybeTriggerMinIsrConfigUpdate(Optional topicName) throws 
InterruptedException, ExecutionException {
+appendWriteEvent("partitionUpdateForMinIsrChange", 
OptionalLong.empty(),
+() -> 
replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get();
+}

Review Comment:
   Then we only need to call appendWriteEvent here? We don't have to wait for 
the replay().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]

2024-04-15 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1566238375


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/TargetAssignmentBuilderBenchmark.java:
##
@@ -0,0 +1,259 @@
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.VersionedMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TargetAssignmentBuilderBenchmark {
+
+@Param({"1000", "1"})
+private int memberCount;
+
+@Param({"10", "50"})
+private int partitionsPerTopicCount;
+
+@Param({"1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isSubscriptionUniform;
+
+@Param({"true", "false"})
+private boolean isRangeAssignor;
+
+@Param({"true", "false"})
+private boolean isRackAware;

Review Comment:
   The numbers change based on the assignor and these factors though right? 
Which assignor and params would you want for the baseline of this benchmark



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]

2024-04-15 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1566237268


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,185 @@
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+@Param({"10", "50", "100"})
+private int partitionsPerTopicCount;
+
+@Param({"100"})
+private int topicCount;
+
+@Param({"500", "1000"})
+private int memberCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"true", "false"})
+private boolean isSubscriptionUniform;
+
+@Param({"true", "false"})
+private boolean isRangeAssignor;
+
+@Param({"true", "false"})
+private boolean isReassignment;
+
+private PartitionAssignor partitionAssignor;
+
+private final int numberOfRacks = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = new HashMap<>();
+Map> partitionRacks = isRackAware ?
+mkMapOfPartitionRacks(partitionsPerTopicCount) :
+Collections.emptyMap();
+
+for (int i = 1; i <= topicCount; i++) {
+Uuid topicUuid = Uuid.randomUuid();
+String topicName = "topic" + i;
+topicMetadata.put(topicUuid, new TopicMetadata(
+topicUuid, topicName, partitionsPerTopicCount, 
partitionRacks));
+}
+
+addTopicSubscriptions(topicMetadata);
+this.subscribedTopicDescriber = new 
SubscribedTopicMetadata(topicMetadata);
+
+if (isRangeAssignor) {
+this.partitionAssignor = new RangeAssignor();
+} else {
+this.partitionAssignor = new UniformAssignor();
+}
+
+if (isReassignment) {
+GroupAssignment initialAssignment = 
partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber);
+Map members;
+
+members = initialAssignment.members();
+
+// Update the AssignmentSpec with the results from the initial 
assignment.
+Map updatedMembers = new HashMap<>();
+
+members.forEach((memberId, memberAssignment) -> {
+AssignmentMemberSpec memberSpec = 
assignmentSpec.members().get(memberId);
+updatedMembers.put(memberId, new AssignmentMemberSpec(
+memberSpec.instanceId(),
+memberSpec.rackId(),
+memberSpec.subscribedTopicIds(),
+memberAssignment.targetPartitions()
+));
+});
+
+// Add new member to trigger a reassignment.
+Optional rackId = isRackAware ? Optional.of("rack" + 
(memberCount + 1) % numberOfRacks) : Optional.empty();
+
+updatedMembers.put("newMember", new AssignmentMemberSpec(
+Optional.empty(),
+rackId,
+topicMetadata.keySet(),
+Collections.emptyMap()
+ 

Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]

2024-04-15 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1566234265


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,185 @@
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+@Param({"10", "50", "100"})
+private int partitionsPerTopicCount;
+
+@Param({"100"})
+private int topicCount;
+
+@Param({"500", "1000"})
+private int memberCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"true", "false"})
+private boolean isSubscriptionUniform;
+
+@Param({"true", "false"})
+private boolean isRangeAssignor;
+
+@Param({"true", "false"})
+private boolean isReassignment;
+
+private PartitionAssignor partitionAssignor;
+
+private final int numberOfRacks = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = new HashMap<>();
+Map> partitionRacks = isRackAware ?
+mkMapOfPartitionRacks(partitionsPerTopicCount) :
+Collections.emptyMap();
+
+for (int i = 1; i <= topicCount; i++) {
+Uuid topicUuid = Uuid.randomUuid();
+String topicName = "topic" + i;
+topicMetadata.put(topicUuid, new TopicMetadata(
+topicUuid, topicName, partitionsPerTopicCount, 
partitionRacks));
+}
+
+addTopicSubscriptions(topicMetadata);
+this.subscribedTopicDescriber = new 
SubscribedTopicMetadata(topicMetadata);
+
+if (isRangeAssignor) {
+this.partitionAssignor = new RangeAssignor();
+} else {
+this.partitionAssignor = new UniformAssignor();
+}
+
+if (isReassignment) {
+GroupAssignment initialAssignment = 
partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber);
+Map members;
+
+members = initialAssignment.members();
+
+// Update the AssignmentSpec with the results from the initial 
assignment.
+Map updatedMembers = new HashMap<>();
+
+members.forEach((memberId, memberAssignment) -> {
+AssignmentMemberSpec memberSpec = 
assignmentSpec.members().get(memberId);
+updatedMembers.put(memberId, new AssignmentMemberSpec(
+memberSpec.instanceId(),
+memberSpec.rackId(),
+memberSpec.subscribedTopicIds(),
+memberAssignment.targetPartitions()
+));
+});
+
+// Add new member to trigger a reassignment.
+Optional rackId = isRackAware ? Optional.of("rack" + 
(memberCount + 1) % numberOfRacks) : Optional.empty();

Review Comment:
   It's pretty simple and just one line right? Do we really need a helper 
method for it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub 

Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]

2024-04-15 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1566233067


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,185 @@
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+@Param({"10", "50", "100"})
+private int partitionsPerTopicCount;
+
+@Param({"100"})
+private int topicCount;
+
+@Param({"500", "1000"})
+private int memberCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"true", "false"})
+private boolean isSubscriptionUniform;
+
+@Param({"true", "false"})
+private boolean isRangeAssignor;
+
+@Param({"true", "false"})
+private boolean isReassignment;
+
+private PartitionAssignor partitionAssignor;
+
+private final int numberOfRacks = 3;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = new HashMap<>();
+Map> partitionRacks = isRackAware ?
+mkMapOfPartitionRacks(partitionsPerTopicCount) :
+Collections.emptyMap();
+
+for (int i = 1; i <= topicCount; i++) {
+Uuid topicUuid = Uuid.randomUuid();
+String topicName = "topic" + i;
+topicMetadata.put(topicUuid, new TopicMetadata(
+topicUuid, topicName, partitionsPerTopicCount, 
partitionRacks));
+}
+
+addTopicSubscriptions(topicMetadata);
+this.subscribedTopicDescriber = new 
SubscribedTopicMetadata(topicMetadata);
+
+if (isRangeAssignor) {
+this.partitionAssignor = new RangeAssignor();
+} else {
+this.partitionAssignor = new UniformAssignor();
+}
+
+if (isReassignment) {
+GroupAssignment initialAssignment = 
partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber);
+Map members;
+
+members = initialAssignment.members();

Review Comment:
   oh yes missed changing it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]

2024-04-15 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1566224264


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/AssignPartitionsMicroBenchmark.java:
##
@@ -0,0 +1,153 @@
+package org.apache.kafka.jmh.group_coordinator;

Review Comment:
   I figured we would add all future benchmarks related to the group 
coordinator in this package. Like how the target assignment builder isn't 
directly an assignor benchmark, lmk what you think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]

2024-04-15 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1566219575


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,185 @@
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+@Param({"10", "50", "100"})
+private int partitionsPerTopicCount;
+
+@Param({"100"})
+private int topicCount;
+
+@Param({"500", "1000"})
+private int memberCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"true", "false"})
+private boolean isSubscriptionUniform;
+
+@Param({"true", "false"})
+private boolean isRangeAssignor;
+
+@Param({"true", "false"})
+private boolean isReassignment;

Review Comment:
   That makes sense, I'll adopt something similar



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16554) Online downgrade triggering and group type conversion

2024-04-15 Thread Dongnuo Lyu (Jira)
Dongnuo Lyu created KAFKA-16554:
---

 Summary: Online downgrade triggering and group type conversion
 Key: KAFKA-16554
 URL: https://issues.apache.org/jira/browse/KAFKA-16554
 Project: Kafka
  Issue Type: Sub-task
Reporter: Dongnuo Lyu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]

2024-04-15 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1566211483


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ClientSideAssignorBenchmark.java:
##
@@ -0,0 +1,198 @@
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+import static 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ClientSideAssignorBenchmark {
+
+@Param({"10", "50", "100"})
+private int partitionsPerTopicCount;
+
+@Param({"100"})
+private int topicCount;
+
+@Param({"500", "1000", "1"})
+private int memberCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"true", "false"})
+private boolean isSubscriptionUniform;
+
+@Param({"true", "false"})
+private boolean isRangeAssignor;
+
+@Param({"true", "false"})
+private boolean isReassignment;
+
+private Map subscriptions 
= new HashMap<>();
+
+private final int numBrokerRacks = 3;
+
+private final int replicationFactor = 2;
+
+protected AbstractPartitionAssignor assignor;

Review Comment:
   And we use the PartitionAssignor interface in the server side benchmark



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]

2024-04-15 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1566211207


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ClientSideAssignorBenchmark.java:
##
@@ -0,0 +1,198 @@
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+import static 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ClientSideAssignorBenchmark {
+
+@Param({"10", "50", "100"})
+private int partitionsPerTopicCount;
+
+@Param({"100"})
+private int topicCount;
+
+@Param({"500", "1000", "1"})
+private int memberCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"true", "false"})
+private boolean isSubscriptionUniform;
+
+@Param({"true", "false"})
+private boolean isRangeAssignor;
+
+@Param({"true", "false"})
+private boolean isReassignment;
+
+private Map subscriptions 
= new HashMap<>();
+
+private final int numBrokerRacks = 3;
+
+private final int replicationFactor = 2;
+
+protected AbstractPartitionAssignor assignor;

Review Comment:
   Hmm makes sense, I guess then it would be more accurate since it would 
include the time taken to create the metadata to pass to the assignor in the 
assign function, I'll look into it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]

2024-04-15 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1566204495


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,185 @@
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+@Param({"10", "50", "100"})
+private int partitionsPerTopicCount;
+
+@Param({"100"})
+private int topicCount;
+
+@Param({"500", "1000"})
+private int memberCount;

Review Comment:
   Hmm interesting, that way we can have more realistic ratios of members to 
topics/partitions, let me look into it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates. [kafka]

2024-04-15 Thread via GitHub


appchemist commented on PR #15647:
URL: https://github.com/apache/kafka/pull/15647#issuecomment-2057454239

   > thanks for the PR! just curious, what is the problem without the newly 
added state? does it cause any issues?
   
   @johnnychhsu 
   When a kafka consumer encounters a FENCED_LEADER_EPOCH error or other errors 
requiring metadata updates, a metadata update is triggered.
   However, subscription's fetchstate is FETCHING.
   In this state, the consumer will continuously fetch from the old 
leader.(because of PreferredReadReplica is reset and metadata is not yet 
updated)
   
   This patch is designed to make consumer wait without sending fetch while 
metadata is being updated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]

2024-04-15 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1566203863


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,185 @@
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+@Param({"10", "50", "100"})
+private int partitionsPerTopicCount;
+
+@Param({"100"})
+private int topicCount;
+
+@Param({"500", "1000"})
+private int memberCount;

Review Comment:
   Yes! I was going to ask what cases we want in the final benchmarks, they are 
not uniform or finalized yet



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]

2024-04-15 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1566200808


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/AssignPartitionsMicroBenchmark.java:
##
@@ -0,0 +1,153 @@
+package org.apache.kafka.jmh.group_coordinator;

Review Comment:
   Oh yes! Thanks I completely missed that!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] JMH Benchmarks for testing the performance of the Server Side Rebalances: KIP_848 [kafka]

2024-04-15 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1566199529


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/group_coordinator/AssignPartitionsMicroBenchmark.java:
##
@@ -0,0 +1,153 @@
+package org.apache.kafka.jmh.group_coordinator;
+
+import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
+import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.server.common.TopicIdPartition;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 7)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class AssignPartitionsMicroBenchmark {

Review Comment:
   Yep agreed, I'll remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters [kafka]

2024-04-15 Thread via GitHub


C0urante commented on PR #14309:
URL: https://github.com/apache/kafka/pull/14309#issuecomment-2057408474

   Hi @gharris1727 sorry for the delay. I've resolved the merge conflicts; let 
me know what you think if you have a moment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13105) Expose a method in KafkaConfig to write the configs to a logger

2024-04-15 Thread Johnny Hsu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837366#comment-17837366
 ] 

Johnny Hsu commented on KAFKA-13105:


hi [~cmccabe] I would like to work on this if no one is on it now :D

> Expose a method in KafkaConfig to write the configs to a logger
> ---
>
> Key: KAFKA-13105
> URL: https://issues.apache.org/jira/browse/KAFKA-13105
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Minor
>  Labels: 4.0-blocker
>
> We should expose a method in KafkaConfig to write the configs to a logger. 
> Currently there is no good way to write them out except creating a new 
> KafkaConfig object with doLog = true, which is unintuitive.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-04-15 Thread via GitHub


OmniaGM commented on code in PR #15569:
URL: https://github.com/apache/kafka/pull/15569#discussion_r1566156280


##
server/src/main/java/org/apache/kafka/server/config/KafkaLogConfigs.java:
##
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.TopicConfig;
+import static 
org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX;
+
+/**
+ * Common home for broker-side log configs which need to be accessible from 
the libraries shared
+ * between the broker and the multiple modules in Kafka.
+ *
+ * Note this is an internal API and subject to change without notice.
+ */
+public class KafkaLogConfigs {
+public static final String NUM_PARTITIONS_CONFIG = "num.partitions";
+public static final int NUM_PARTITIONS_DEFAULT = 1;
+public static final String NUM_PARTITIONS_DOC = "The default number of log 
partitions per topic";
+
+public static final String LOG_DIRS_CONFIG = LOG_PREFIX + "dirs";
+public static final String LOG_DIR_CONFIG = LOG_PREFIX + "dir";
+public static final String LOG_DIR_DEFAULT = "/tmp/kafka-logs";
+public static final String LOG_DIR_DOC = "The directory in which the log 
data is kept (supplemental for " + LOG_DIRS_CONFIG + " property)";
+public static final String LOG_DIRS_DOC = "A comma-separated list of the 
directories where the log data is stored. If not set, the value in " + 
LOG_DIR_CONFIG + " is used.";
+
+public static final String LOG_SEGMENT_BYTES_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_BYTES_CONFIG);
+public static final String LOG_SEGMENT_BYTES_DOC = "The maximum size of a 
single log file";
+
+public static final String LOG_ROLL_TIME_MILLIS_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_MS_CONFIG);
+public static final String LOG_ROLL_TIME_HOURS_CONFIG = LOG_PREFIX + 
"roll.hours";
+public static final String LOG_ROLL_TIME_MILLIS_DOC = "The maximum time 
before a new log segment is rolled out (in milliseconds). If not set, the value 
in " + LOG_ROLL_TIME_HOURS_CONFIG + " is used";
+public static final String LOG_ROLL_TIME_HOURS_DOC = "The maximum time 
before a new log segment is rolled out (in hours), secondary to " + 
LOG_ROLL_TIME_MILLIS_CONFIG + " property";
+
+public static final String LOG_ROLL_TIME_JITTER_MILLIS_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_JITTER_MS_CONFIG);
+public static final String LOG_ROLL_TIME_JITTER_HOURS_CONFIG = LOG_PREFIX 
+ "roll.jitter.hours";
+public static final String LOG_ROLL_TIME_JITTER_MILLIS_DOC = "The maximum 
jitter to subtract from logRollTimeMillis (in milliseconds). If not set, the 
value in " + LOG_ROLL_TIME_JITTER_HOURS_CONFIG + " is used";
+public static final String LOG_ROLL_TIME_JITTER_HOURS_DOC = "The maximum 
jitter to subtract from logRollTimeMillis (in hours), secondary to " + 
LOG_ROLL_TIME_JITTER_MILLIS_CONFIG + " property";
+
+
+public static final String LOG_RETENTION_TIME_MILLIS_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.RETENTION_MS_CONFIG);
+public static final String LOG_RETENTION_TIME_MINUTES_CONFIG = LOG_PREFIX 
+ "retention.minutes";
+public static final String LOG_RETENTION_TIME_HOURS_CONFIG = LOG_PREFIX + 
"retention.hours";
+public static final String LOG_RETENTION_TIME_MILLIS_DOC = "The number of 
milliseconds to keep a log file before deleting it (in milliseconds), If not 
set, the value in " + LOG_RETENTION_TIME_MINUTES_CONFIG + " is used. If set to 
-1, no time limit is applied.";
+public static final String LOG_RETENTION_TIME_MINUTES_DOC = "The number of 
minutes to keep a log file before deleting it (in minutes), secondary to " + 
LOG_RETENTION_TIME_MILLIS_CONFIG + " property. If not set, the value in " + 
LOG_RETENTION_TIME_HOURS_CONFIG + " is used";
+public static final String LOG_RETENTION_TIME_HOURS_DOC = "The number of 
hours to keep a log file before deleting it (in hours), tertiary to " + 
LOG_RETENTION_TIME_MILLIS_CONFIG + " property";
+
+public static final String LOG_RETENTION_BYTES_CONFIG = 

Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-04-15 Thread via GitHub


OmniaGM commented on code in PR #15569:
URL: https://github.com/apache/kafka/pull/15569#discussion_r1566156280


##
server/src/main/java/org/apache/kafka/server/config/KafkaLogConfigs.java:
##
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.TopicConfig;
+import static 
org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX;
+
+/**
+ * Common home for broker-side log configs which need to be accessible from 
the libraries shared
+ * between the broker and the multiple modules in Kafka.
+ *
+ * Note this is an internal API and subject to change without notice.
+ */
+public class KafkaLogConfigs {
+public static final String NUM_PARTITIONS_CONFIG = "num.partitions";
+public static final int NUM_PARTITIONS_DEFAULT = 1;
+public static final String NUM_PARTITIONS_DOC = "The default number of log 
partitions per topic";
+
+public static final String LOG_DIRS_CONFIG = LOG_PREFIX + "dirs";
+public static final String LOG_DIR_CONFIG = LOG_PREFIX + "dir";
+public static final String LOG_DIR_DEFAULT = "/tmp/kafka-logs";
+public static final String LOG_DIR_DOC = "The directory in which the log 
data is kept (supplemental for " + LOG_DIRS_CONFIG + " property)";
+public static final String LOG_DIRS_DOC = "A comma-separated list of the 
directories where the log data is stored. If not set, the value in " + 
LOG_DIR_CONFIG + " is used.";
+
+public static final String LOG_SEGMENT_BYTES_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_BYTES_CONFIG);
+public static final String LOG_SEGMENT_BYTES_DOC = "The maximum size of a 
single log file";
+
+public static final String LOG_ROLL_TIME_MILLIS_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_MS_CONFIG);
+public static final String LOG_ROLL_TIME_HOURS_CONFIG = LOG_PREFIX + 
"roll.hours";
+public static final String LOG_ROLL_TIME_MILLIS_DOC = "The maximum time 
before a new log segment is rolled out (in milliseconds). If not set, the value 
in " + LOG_ROLL_TIME_HOURS_CONFIG + " is used";
+public static final String LOG_ROLL_TIME_HOURS_DOC = "The maximum time 
before a new log segment is rolled out (in hours), secondary to " + 
LOG_ROLL_TIME_MILLIS_CONFIG + " property";
+
+public static final String LOG_ROLL_TIME_JITTER_MILLIS_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_JITTER_MS_CONFIG);
+public static final String LOG_ROLL_TIME_JITTER_HOURS_CONFIG = LOG_PREFIX 
+ "roll.jitter.hours";
+public static final String LOG_ROLL_TIME_JITTER_MILLIS_DOC = "The maximum 
jitter to subtract from logRollTimeMillis (in milliseconds). If not set, the 
value in " + LOG_ROLL_TIME_JITTER_HOURS_CONFIG + " is used";
+public static final String LOG_ROLL_TIME_JITTER_HOURS_DOC = "The maximum 
jitter to subtract from logRollTimeMillis (in hours), secondary to " + 
LOG_ROLL_TIME_JITTER_MILLIS_CONFIG + " property";
+
+
+public static final String LOG_RETENTION_TIME_MILLIS_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.RETENTION_MS_CONFIG);
+public static final String LOG_RETENTION_TIME_MINUTES_CONFIG = LOG_PREFIX 
+ "retention.minutes";
+public static final String LOG_RETENTION_TIME_HOURS_CONFIG = LOG_PREFIX + 
"retention.hours";
+public static final String LOG_RETENTION_TIME_MILLIS_DOC = "The number of 
milliseconds to keep a log file before deleting it (in milliseconds), If not 
set, the value in " + LOG_RETENTION_TIME_MINUTES_CONFIG + " is used. If set to 
-1, no time limit is applied.";
+public static final String LOG_RETENTION_TIME_MINUTES_DOC = "The number of 
minutes to keep a log file before deleting it (in minutes), secondary to " + 
LOG_RETENTION_TIME_MILLIS_CONFIG + " property. If not set, the value in " + 
LOG_RETENTION_TIME_HOURS_CONFIG + " is used";
+public static final String LOG_RETENTION_TIME_HOURS_DOC = "The number of 
hours to keep a log file before deleting it (in hours), tertiary to " + 
LOG_RETENTION_TIME_MILLIS_CONFIG + " property";
+
+public static final String LOG_RETENTION_BYTES_CONFIG = 

[PR] [WIP] KAFKA-16475: add test for TopicImageNodeTest [kafka]

2024-04-15 Thread via GitHub


johnnychhsu opened a new pull request, #15720:
URL: https://github.com/apache/kafka/pull/15720

   ## Context
   Add unit test for TopicImageNodeTest
   Jira ticket: https://issues.apache.org/jira/browse/KAFKA-16475
   
   ## Test
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-04-15 Thread via GitHub


chia7712 commented on code in PR #15569:
URL: https://github.com/apache/kafka/pull/15569#discussion_r1566129710


##
server/src/main/java/org/apache/kafka/server/config/KafkaLogConfigs.java:
##
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.TopicConfig;
+import static 
org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX;
+
+/**
+ * Common home for broker-side log configs which need to be accessible from 
the libraries shared
+ * between the broker and the multiple modules in Kafka.
+ *
+ * Note this is an internal API and subject to change without notice.
+ */
+public class KafkaLogConfigs {
+public static final String NUM_PARTITIONS_CONFIG = "num.partitions";
+public static final int NUM_PARTITIONS_DEFAULT = 1;
+public static final String NUM_PARTITIONS_DOC = "The default number of log 
partitions per topic";
+
+public static final String LOG_DIRS_CONFIG = LOG_PREFIX + "dirs";
+public static final String LOG_DIR_CONFIG = LOG_PREFIX + "dir";
+public static final String LOG_DIR_DEFAULT = "/tmp/kafka-logs";
+public static final String LOG_DIR_DOC = "The directory in which the log 
data is kept (supplemental for " + LOG_DIRS_CONFIG + " property)";
+public static final String LOG_DIRS_DOC = "A comma-separated list of the 
directories where the log data is stored. If not set, the value in " + 
LOG_DIR_CONFIG + " is used.";
+
+public static final String LOG_SEGMENT_BYTES_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_BYTES_CONFIG);
+public static final String LOG_SEGMENT_BYTES_DOC = "The maximum size of a 
single log file";
+
+public static final String LOG_ROLL_TIME_MILLIS_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_MS_CONFIG);
+public static final String LOG_ROLL_TIME_HOURS_CONFIG = LOG_PREFIX + 
"roll.hours";
+public static final String LOG_ROLL_TIME_MILLIS_DOC = "The maximum time 
before a new log segment is rolled out (in milliseconds). If not set, the value 
in " + LOG_ROLL_TIME_HOURS_CONFIG + " is used";
+public static final String LOG_ROLL_TIME_HOURS_DOC = "The maximum time 
before a new log segment is rolled out (in hours), secondary to " + 
LOG_ROLL_TIME_MILLIS_CONFIG + " property";
+
+public static final String LOG_ROLL_TIME_JITTER_MILLIS_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_JITTER_MS_CONFIG);
+public static final String LOG_ROLL_TIME_JITTER_HOURS_CONFIG = LOG_PREFIX 
+ "roll.jitter.hours";
+public static final String LOG_ROLL_TIME_JITTER_MILLIS_DOC = "The maximum 
jitter to subtract from logRollTimeMillis (in milliseconds). If not set, the 
value in " + LOG_ROLL_TIME_JITTER_HOURS_CONFIG + " is used";
+public static final String LOG_ROLL_TIME_JITTER_HOURS_DOC = "The maximum 
jitter to subtract from logRollTimeMillis (in hours), secondary to " + 
LOG_ROLL_TIME_JITTER_MILLIS_CONFIG + " property";
+
+
+public static final String LOG_RETENTION_TIME_MILLIS_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.RETENTION_MS_CONFIG);
+public static final String LOG_RETENTION_TIME_MINUTES_CONFIG = LOG_PREFIX 
+ "retention.minutes";
+public static final String LOG_RETENTION_TIME_HOURS_CONFIG = LOG_PREFIX + 
"retention.hours";
+public static final String LOG_RETENTION_TIME_MILLIS_DOC = "The number of 
milliseconds to keep a log file before deleting it (in milliseconds), If not 
set, the value in " + LOG_RETENTION_TIME_MINUTES_CONFIG + " is used. If set to 
-1, no time limit is applied.";
+public static final String LOG_RETENTION_TIME_MINUTES_DOC = "The number of 
minutes to keep a log file before deleting it (in minutes), secondary to " + 
LOG_RETENTION_TIME_MILLIS_CONFIG + " property. If not set, the value in " + 
LOG_RETENTION_TIME_HOURS_CONFIG + " is used";
+public static final String LOG_RETENTION_TIME_HOURS_DOC = "The number of 
hours to keep a log file before deleting it (in hours), tertiary to " + 
LOG_RETENTION_TIME_MILLIS_CONFIG + " property";
+
+public static final String LOG_RETENTION_BYTES_CONFIG = 

Re: [PR] KAFKA-15853: Move KafkaConfig Replication properties and docs out of … [kafka]

2024-04-15 Thread via GitHub


OmniaGM commented on code in PR #15575:
URL: https://github.com/apache/kafka/pull/15575#discussion_r1566114227


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1458,40 +1395,40 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   def logMessageDownConversionEnable: Boolean = 
getBoolean(KafkaConfig.LogMessageDownConversionEnableProp)
 
   /** * Replication configuration ***/
-  val controllerSocketTimeoutMs: Int = 
getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
-  val defaultReplicationFactor: Int = 
getInt(KafkaConfig.DefaultReplicationFactorProp)
-  val replicaLagTimeMaxMs = getLong(KafkaConfig.ReplicaLagTimeMaxMsProp)
-  val replicaSocketTimeoutMs = getInt(KafkaConfig.ReplicaSocketTimeoutMsProp)
-  val replicaSocketReceiveBufferBytes = 
getInt(KafkaConfig.ReplicaSocketReceiveBufferBytesProp)
-  val replicaFetchMaxBytes = getInt(KafkaConfig.ReplicaFetchMaxBytesProp)
-  val replicaFetchWaitMaxMs = getInt(KafkaConfig.ReplicaFetchWaitMaxMsProp)
-  val replicaFetchMinBytes = getInt(KafkaConfig.ReplicaFetchMinBytesProp)
-  val replicaFetchResponseMaxBytes = 
getInt(KafkaConfig.ReplicaFetchResponseMaxBytesProp)
-  val replicaFetchBackoffMs = getInt(KafkaConfig.ReplicaFetchBackoffMsProp)
-  def numReplicaFetchers = getInt(KafkaConfig.NumReplicaFetchersProp)
-  val replicaHighWatermarkCheckpointIntervalMs = 
getLong(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp)
-  val fetchPurgatoryPurgeIntervalRequests = 
getInt(KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp)
-  val producerPurgatoryPurgeIntervalRequests = 
getInt(KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp)
-  val deleteRecordsPurgatoryPurgeIntervalRequests = 
getInt(KafkaConfig.DeleteRecordsPurgatoryPurgeIntervalRequestsProp)
-  val autoLeaderRebalanceEnable = 
getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp)
-  val leaderImbalancePerBrokerPercentage = 
getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp)
-  val leaderImbalanceCheckIntervalSeconds: Long = 
getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
-  def uncleanLeaderElectionEnable: java.lang.Boolean = 
getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
+  val controllerSocketTimeoutMs: Int = 
getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG)
+  val defaultReplicationFactor: Int = 
getInt(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG)
+  val replicaLagTimeMaxMs = 
getLong(ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG)
+  val replicaSocketTimeoutMs = 
getInt(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG)
+  val replicaSocketReceiveBufferBytes = 
getInt(ReplicationConfigs.REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_CONFIG)
+  val replicaFetchMaxBytes = 
getInt(ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG)
+  val replicaFetchWaitMaxMs = 
getInt(ReplicationConfigs.REPLICA_FETCH_WAIT_MAX_MS_CONFIG)
+  val replicaFetchMinBytes = 
getInt(ReplicationConfigs.REPLICA_FETCH_MIN_BYTES_CONFIG)
+  val replicaFetchResponseMaxBytes = 
getInt(ReplicationConfigs.REPLICA_FETCH_RESPONSE_MAX_BYTES_CONFIG)
+  val replicaFetchBackoffMs = 
getInt(ReplicationConfigs.REPLICA_FETCH_BACKOFF_MS_CONFIG)
+  def numReplicaFetchers = 
getInt(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG)
+  val replicaHighWatermarkCheckpointIntervalMs = 
getLong(ReplicationConfigs.REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL_MS_CONFIG)
+  val fetchPurgatoryPurgeIntervalRequests = 
getInt(ReplicationConfigs.FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG)
+  val producerPurgatoryPurgeIntervalRequests = 
getInt(ReplicationConfigs.PRODUCER_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG)
+  val deleteRecordsPurgatoryPurgeIntervalRequests = 
getInt(ReplicationConfigs.DELETE_RECORDS_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG)
+  val autoLeaderRebalanceEnable = 
getBoolean(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG)
+  val leaderImbalancePerBrokerPercentage = 
getInt(ReplicationConfigs.LEADER_IMBALANCE_PER_BROKER_PERCENTAGE_CONFIG)
+  val leaderImbalanceCheckIntervalSeconds: Long = 
getLong(ReplicationConfigs.LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS_CONFIG)
+  def uncleanLeaderElectionEnable: Boolean = 
getBoolean(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG)

Review Comment:
   All of this will be converted to java soon as part of the same KAFKA-15853



##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1458,40 +1395,40 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   def logMessageDownConversionEnable: Boolean = 
getBoolean(KafkaConfig.LogMessageDownConversionEnableProp)
 
   /** * Replication configuration ***/
-  val controllerSocketTimeoutMs: Int = 
getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
-  val defaultReplicationFactor: Int = 
getInt(KafkaConfig.DefaultReplicationFactorProp)
-  val replicaLagTimeMaxMs = getLong(KafkaConfig.ReplicaLagTimeMaxMsProp)
-  val replicaSocketTimeoutMs = 

Re: [PR] KAFKA-15853: Move KafkaConfig Replication properties and docs out of … [kafka]

2024-04-15 Thread via GitHub


OmniaGM commented on code in PR #15575:
URL: https://github.com/apache/kafka/pull/15575#discussion_r1566111409


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1458,40 +1395,40 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   def logMessageDownConversionEnable: Boolean = 
getBoolean(KafkaConfig.LogMessageDownConversionEnableProp)
 
   /** * Replication configuration ***/
-  val controllerSocketTimeoutMs: Int = 
getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
-  val defaultReplicationFactor: Int = 
getInt(KafkaConfig.DefaultReplicationFactorProp)
-  val replicaLagTimeMaxMs = getLong(KafkaConfig.ReplicaLagTimeMaxMsProp)
-  val replicaSocketTimeoutMs = getInt(KafkaConfig.ReplicaSocketTimeoutMsProp)
-  val replicaSocketReceiveBufferBytes = 
getInt(KafkaConfig.ReplicaSocketReceiveBufferBytesProp)
-  val replicaFetchMaxBytes = getInt(KafkaConfig.ReplicaFetchMaxBytesProp)
-  val replicaFetchWaitMaxMs = getInt(KafkaConfig.ReplicaFetchWaitMaxMsProp)
-  val replicaFetchMinBytes = getInt(KafkaConfig.ReplicaFetchMinBytesProp)
-  val replicaFetchResponseMaxBytes = 
getInt(KafkaConfig.ReplicaFetchResponseMaxBytesProp)
-  val replicaFetchBackoffMs = getInt(KafkaConfig.ReplicaFetchBackoffMsProp)
-  def numReplicaFetchers = getInt(KafkaConfig.NumReplicaFetchersProp)
-  val replicaHighWatermarkCheckpointIntervalMs = 
getLong(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp)
-  val fetchPurgatoryPurgeIntervalRequests = 
getInt(KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp)
-  val producerPurgatoryPurgeIntervalRequests = 
getInt(KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp)
-  val deleteRecordsPurgatoryPurgeIntervalRequests = 
getInt(KafkaConfig.DeleteRecordsPurgatoryPurgeIntervalRequestsProp)
-  val autoLeaderRebalanceEnable = 
getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp)
-  val leaderImbalancePerBrokerPercentage = 
getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp)
-  val leaderImbalanceCheckIntervalSeconds: Long = 
getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
-  def uncleanLeaderElectionEnable: java.lang.Boolean = 
getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
+  val controllerSocketTimeoutMs: Int = 
getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG)
+  val defaultReplicationFactor: Int = 
getInt(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG)
+  val replicaLagTimeMaxMs = 
getLong(ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG)
+  val replicaSocketTimeoutMs = 
getInt(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG)
+  val replicaSocketReceiveBufferBytes = 
getInt(ReplicationConfigs.REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_CONFIG)
+  val replicaFetchMaxBytes = 
getInt(ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG)
+  val replicaFetchWaitMaxMs = 
getInt(ReplicationConfigs.REPLICA_FETCH_WAIT_MAX_MS_CONFIG)
+  val replicaFetchMinBytes = 
getInt(ReplicationConfigs.REPLICA_FETCH_MIN_BYTES_CONFIG)
+  val replicaFetchResponseMaxBytes = 
getInt(ReplicationConfigs.REPLICA_FETCH_RESPONSE_MAX_BYTES_CONFIG)
+  val replicaFetchBackoffMs = 
getInt(ReplicationConfigs.REPLICA_FETCH_BACKOFF_MS_CONFIG)
+  def numReplicaFetchers = 
getInt(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG)
+  val replicaHighWatermarkCheckpointIntervalMs = 
getLong(ReplicationConfigs.REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL_MS_CONFIG)
+  val fetchPurgatoryPurgeIntervalRequests = 
getInt(ReplicationConfigs.FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG)
+  val producerPurgatoryPurgeIntervalRequests = 
getInt(ReplicationConfigs.PRODUCER_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG)
+  val deleteRecordsPurgatoryPurgeIntervalRequests = 
getInt(ReplicationConfigs.DELETE_RECORDS_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG)
+  val autoLeaderRebalanceEnable = 
getBoolean(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG)
+  val leaderImbalancePerBrokerPercentage = 
getInt(ReplicationConfigs.LEADER_IMBALANCE_PER_BROKER_PERCENTAGE_CONFIG)
+  val leaderImbalanceCheckIntervalSeconds: Long = 
getLong(ReplicationConfigs.LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS_CONFIG)
+  def uncleanLeaderElectionEnable: Boolean = 
getBoolean(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG)

Review Comment:
   revert it as this was already the original declaration 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16316: Configure reprocessing with addGlobalStateStore [kafka]

2024-04-15 Thread via GitHub


AyoubOm commented on code in PR #15619:
URL: https://github.com/apache/kafka/pull/15619#discussion_r1566096443


##
streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java:
##
@@ -613,6 +616,74 @@ public synchronized  StreamsBuilder 
addGlobalStore(final StoreBuilder<
 return this;
 }
 
+/**
+ * Adds a global {@link StateStore} to the topology.
+ * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
+ * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
+ * 
+ * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
+ * of the input topic.
+ * 
+ * The provided {@link ProcessorSupplier} will be used to create an
+ * {@link Processor} that will receive all records forwarded from the 
{@link SourceNode}.
+ * The supplier should always generate a new instance. Creating a single 
{@link Processor} object
+ * and returning the same object reference in {@link 
ProcessorSupplier#get()} is a
+ * violation of the supplier pattern and leads to runtime exceptions.
+ * This {@link Processor} should be used to keep the {@link StateStore} 
up-to-date.
+ * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
+ * 
+ * It is not required to connect a global store to the {@link Processor 
Processors},
+ * {@link Transformer Transformers}, or {@link ValueTransformer 
ValueTransformer}; those have read-only access to all global stores by default.
+ *
+ * @param storeBuilder  user defined {@link StoreBuilder}; can't 
be {@code null}
+ * @param topic the topic to source the data from
+ * @param consumed  the instance of {@link Consumed} used to 
define optional parameters; can't be {@code null}
+ * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+ * @param reprocessOnRestorerestore by reprocessing the data using a 
processor supplied by stateUpdateSupplier or loads the data in byte for byte

Review Comment:
   nit: 
   ```suggestion
* @param reprocessOnRestorerestore by reprocessing the data using a 
processor supplied by stateUpdateSupplier or load the data in byte for byte
   ```
   Wondering if it would make it clearer to say "if true, restore ..., 
otherwise load .."



##
streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java:
##
@@ -613,6 +616,74 @@ public synchronized  StreamsBuilder 
addGlobalStore(final StoreBuilder<
 return this;
 }
 
+/**
+ * Adds a global {@link StateStore} to the topology.
+ * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
+ * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
+ * 
+ * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
+ * of the input topic.
+ * 
+ * The provided {@link ProcessorSupplier} will be used to create an

Review Comment:
   nit: 
   ```suggestion
* The provided {@link ProcessorSupplier} will be used to create a
   ```



##
streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java:
##
@@ -613,6 +616,74 @@ public synchronized  StreamsBuilder 
addGlobalStore(final StoreBuilder<
 return this;
 }
 
+/**
+ * Adds a global {@link StateStore} to the topology.
+ * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
+ * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
+ * 
+ * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
+ * of the input topic.
+ * 
+ * The provided {@link ProcessorSupplier} will be used to create an
+ * {@link Processor} that will receive all records forwarded from the 
{@link SourceNode}.
+ * The supplier should always generate a new instance. Creating a single 
{@link Processor} object
+ * and returning the same object reference in {@link 
ProcessorSupplier#get()} is a
+ * violation of the supplier pattern and leads to runtime exceptions.
+ * This {@link Processor} should be used to keep the {@link StateStore} 
up-to-date.
+ * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
+ * 
+ * It is not required to connect a global store to the {@link Processor 
Processors},
+ * {@link Transformer Transformers}, or {@link ValueTransformer 
ValueTransformer}; those have read-only access to all global stores by default.
+ *
+ * @param storeBuilder  user defined {@link StoreBuilder}; can't 
be {@code null}
+ * @param topic 

Re: [PR] KAFKA-15853: Move KafkaConfig Replication properties and docs out of … [kafka]

2024-04-15 Thread via GitHub


chia7712 commented on code in PR #15575:
URL: https://github.com/apache/kafka/pull/15575#discussion_r1566098229


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -1458,40 +1395,40 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   def logMessageDownConversionEnable: Boolean = 
getBoolean(KafkaConfig.LogMessageDownConversionEnableProp)
 
   /** * Replication configuration ***/
-  val controllerSocketTimeoutMs: Int = 
getInt(KafkaConfig.ControllerSocketTimeoutMsProp)
-  val defaultReplicationFactor: Int = 
getInt(KafkaConfig.DefaultReplicationFactorProp)
-  val replicaLagTimeMaxMs = getLong(KafkaConfig.ReplicaLagTimeMaxMsProp)
-  val replicaSocketTimeoutMs = getInt(KafkaConfig.ReplicaSocketTimeoutMsProp)
-  val replicaSocketReceiveBufferBytes = 
getInt(KafkaConfig.ReplicaSocketReceiveBufferBytesProp)
-  val replicaFetchMaxBytes = getInt(KafkaConfig.ReplicaFetchMaxBytesProp)
-  val replicaFetchWaitMaxMs = getInt(KafkaConfig.ReplicaFetchWaitMaxMsProp)
-  val replicaFetchMinBytes = getInt(KafkaConfig.ReplicaFetchMinBytesProp)
-  val replicaFetchResponseMaxBytes = 
getInt(KafkaConfig.ReplicaFetchResponseMaxBytesProp)
-  val replicaFetchBackoffMs = getInt(KafkaConfig.ReplicaFetchBackoffMsProp)
-  def numReplicaFetchers = getInt(KafkaConfig.NumReplicaFetchersProp)
-  val replicaHighWatermarkCheckpointIntervalMs = 
getLong(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp)
-  val fetchPurgatoryPurgeIntervalRequests = 
getInt(KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp)
-  val producerPurgatoryPurgeIntervalRequests = 
getInt(KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp)
-  val deleteRecordsPurgatoryPurgeIntervalRequests = 
getInt(KafkaConfig.DeleteRecordsPurgatoryPurgeIntervalRequestsProp)
-  val autoLeaderRebalanceEnable = 
getBoolean(KafkaConfig.AutoLeaderRebalanceEnableProp)
-  val leaderImbalancePerBrokerPercentage = 
getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp)
-  val leaderImbalanceCheckIntervalSeconds: Long = 
getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
-  def uncleanLeaderElectionEnable: java.lang.Boolean = 
getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
+  val controllerSocketTimeoutMs: Int = 
getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG)
+  val defaultReplicationFactor: Int = 
getInt(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG)
+  val replicaLagTimeMaxMs = 
getLong(ReplicationConfigs.REPLICA_LAG_TIME_MAX_MS_CONFIG)
+  val replicaSocketTimeoutMs = 
getInt(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG)
+  val replicaSocketReceiveBufferBytes = 
getInt(ReplicationConfigs.REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_CONFIG)
+  val replicaFetchMaxBytes = 
getInt(ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG)
+  val replicaFetchWaitMaxMs = 
getInt(ReplicationConfigs.REPLICA_FETCH_WAIT_MAX_MS_CONFIG)
+  val replicaFetchMinBytes = 
getInt(ReplicationConfigs.REPLICA_FETCH_MIN_BYTES_CONFIG)
+  val replicaFetchResponseMaxBytes = 
getInt(ReplicationConfigs.REPLICA_FETCH_RESPONSE_MAX_BYTES_CONFIG)
+  val replicaFetchBackoffMs = 
getInt(ReplicationConfigs.REPLICA_FETCH_BACKOFF_MS_CONFIG)
+  def numReplicaFetchers = 
getInt(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG)
+  val replicaHighWatermarkCheckpointIntervalMs = 
getLong(ReplicationConfigs.REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL_MS_CONFIG)
+  val fetchPurgatoryPurgeIntervalRequests = 
getInt(ReplicationConfigs.FETCH_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG)
+  val producerPurgatoryPurgeIntervalRequests = 
getInt(ReplicationConfigs.PRODUCER_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG)
+  val deleteRecordsPurgatoryPurgeIntervalRequests = 
getInt(ReplicationConfigs.DELETE_RECORDS_PURGATORY_PURGE_INTERVAL_REQUESTS_CONFIG)
+  val autoLeaderRebalanceEnable = 
getBoolean(ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG)
+  val leaderImbalancePerBrokerPercentage = 
getInt(ReplicationConfigs.LEADER_IMBALANCE_PER_BROKER_PERCENTAGE_CONFIG)
+  val leaderImbalanceCheckIntervalSeconds: Long = 
getLong(ReplicationConfigs.LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS_CONFIG)
+  def uncleanLeaderElectionEnable: Boolean = 
getBoolean(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG)

Review Comment:
   we have to either remove the type declaration or keep `java.lang.Boolean`, 
otherwise, the build can't get pass.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]

2024-04-15 Thread via GitHub


brandboat opened a new pull request, #15719:
URL: https://github.com/apache/kafka/pull/15719

   related to KAFKA-16552,
   
   Introduce a new internal config `log.initial.task.delay.ms` to control 
InitialTaskDelayMs in LogManager to speed up tests
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response

2024-04-15 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837342#comment-17837342
 ] 

Philip Nee commented on KAFKA-16474:


The log was attached:

See line 7481 and 7492 in :
{code:java}
AssignmentValidationTest/test_valid_assignment/metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer.group_remote_assignor=uniform/2/VerifiableConsumer-0-281473320420544/ducker11/verifiable_consumer.log
 {code}
{code:java}
7480 [2024-04-15 16:03:35,964] TRACE [Consumer 
clientId=consumer-test_group_id-1, groupId=test_group_id] Found least loaded 
node ducker10:9092 (id: 2 rack: null) connected with no in-flight requests 
(org.apache.kafka.clients.NetworkClient)
7481 [2024-04-15 16:03:35,964] DEBUG [Consumer 
clientId=consumer-test_group_id-1, groupId=test_group_id] Sending 
CONSUMER_GROUP_HEARTBEAT request with header 
RequestHeader(apiKey=CONSUMER_GROUP_HEARTBEAT, apiVersion=0, 
clientId=consumer-test_gro     up_id-1, correlationId=108, headerVersion=2) and 
timeout 3 to node 2147483646: 
ConsumerGroupHeartbeatRequestData(groupId='test_group_id', memberId='', 
memberEpoch=0, instanceId=null, rackId=null, rebalanceTimeoutMs=30, 
subscribedTop     icNames=[test_topic], serverAssignor='uniform', 
topicPartitions=[]) (org.apache.kafka.clients.NetworkClient)
7482 [2024-04-15 16:03:35,964] TRACE For telemetry state SUBSCRIPTION_NEEDED, 
returning the value 299843 ms;  
(org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
7483 [2024-04-15 16:03:35,964] TRACE [Consumer 
clientId=consumer-test_group_id-1, groupId=test_group_id] Starting processing 
of 1 event (org.apache.kafka.clients.consumer.internals.events.EventProcessor)
7484 [2024-04-15 16:03:35,964] TRACE [Consumer 
clientId=consumer-test_group_id-1, groupId=test_group_id] Processing event: 
ValidatePositionsEvent{type=VALIDATE_POSITIONS, id=nMQf3YgtRU6vaJu-oVpiQg, 
future=java.util.concurrent.CompletableFuture@5     7bc27f5[Not completed, 1 
dependents], deadlineMs=9223372036854775807} 
(org.apache.kafka.clients.consumer.internals.events.EventProcessor)
7485 [2024-04-15 16:03:35,964] TRACE [Consumer 
clientId=consumer-test_group_id-1, groupId=test_group_id] Completed processing 
(org.apache.kafka.clients.consumer.internals.events.EventProcessor)
7486 [2024-04-15 16:03:35,964] TRACE [Consumer 
clientId=consumer-test_group_id-1, groupId=test_group_id] 999 ms remain before 
another request should be sent for 
RequestState{owner='org.apache.kafka.clients.consumer.internals.HeartbeatRequestMana
     ger$HeartbeatRequestState', 
exponentialBackoff=ExponentialBackoff{multiplier=2, expMax=3.3219280948873626, 
initialInterval=100, jitter=30.0}, lastSentMs=1713197015963, 
lastReceivedMs=1713197015963, numAttempts=1, backoffMs=1000} (org.ap     
ache.kafka.clients.consumer.internals.RequestState)
7487 [2024-04-15 16:03:35,964] TRACE [Consumer 
clientId=consumer-test_group_id-1, groupId=test_group_id] Polling for fetches 
with timeout 0 (org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer)
7488 [2024-04-15 16:03:35,964] TRACE [Consumer 
clientId=consumer-test_group_id-1, groupId=test_group_id] Found least loaded 
node ducker10:9092 (id: 2 rack: null) connected with no in-flight requests 
(org.apache.kafka.clients.NetworkClient)
7489 [2024-04-15 16:03:35,964] TRACE [Consumer 
clientId=consumer-test_group_id-1, groupId=test_group_id] Enqueued event: 
PollEvent{type=POLL, id=T9IwUDeHQoGrnqJJIyzb8Q, pollTimeMs=1713197015964} 
(org.apache.kafka.clients.consumer.internals.event     
s.ApplicationEventHandler)
7490 [2024-04-15 16:03:35,964] TRACE [Consumer 
clientId=consumer-test_group_id-1, groupId=test_group_id] No events to process 
(org.apache.kafka.clients.consumer.internals.events.EventProcessor)
7491 [2024-04-15 16:03:35,964] TRACE [Consumer 
clientId=consumer-test_group_id-1, groupId=test_group_id] Enqueued event: 
ValidatePositionsEvent{type=VALIDATE_POSITIONS, id=otKgqakkS9COd01SkJ3btQ, 
future=java.util.concurrent.CompletableFuture@5fb     759d6[Not completed], 
deadlineMs=9223372036854775807} 
(org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler)
7492 [2024-04-15 16:03:35,964] DEBUG [Consumer 
clientId=consumer-test_group_id-1, groupId=test_group_id] Sending 
CONSUMER_GROUP_HEARTBEAT request with header 
RequestHeader(apiKey=CONSUMER_GROUP_HEARTBEAT, apiVersion=0, 
clientId=consumer-test_gro     up_id-1, correlationId=109, headerVersion=2) and 
timeout 3 to node 2147483646: 
ConsumerGroupHeartbeatRequestData(groupId='test_group_id', memberId='', 
memberEpoch=0, instanceId=null, rackId=null, rebalanceTimeoutMs=30, 
subscribedTop     icNames=[test_topic], serverAssignor='uniform', 
topicPartitions=[]) (org.apache.kafka.clients.NetworkClient) {code}

> AsyncKafkaConsumer might send out heartbeat request without waiting for its 
> response
> 

[jira] [Updated] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response

2024-04-15 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-16474:
---
Attachment: failing_results.zip

> AsyncKafkaConsumer might send out heartbeat request without waiting for its 
> response
> 
>
> Key: KAFKA-16474
> URL: https://issues.apache.org/jira/browse/KAFKA-16474
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: failing_results.zip
>
>
> KAFKA-16389
> We've discovered that in some uncommon cases, the consumer could send out 
> successive heartbeats without waiting for the response to come back.  this 
> might result in causing the consumer to revoke its just assigned assignments 
> in some cases.  For example:
>  
> The consumer first sends out a heartbeat with epoch=0 and memberId='' 
> The consumer then rapidly sends out another heartbeat with epoch=0 and 
> memberId='' because it has not gotten any response and thus not updating its 
> local state
>  
> The consumer receives assignments from the first heartbeat and reconciles its 
> assignment.
>  
> Since the second heartbeat has epoch=0 and memberId='', the server will think 
> this is a new member joining and therefore send out an empty assignment.  
>  
> The consumer receives the response from the second heartbeat.  Revoke all of 
> its partitions.
>  
> There are 2 issues associate with this bug:
>  # inflight logic
>  # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to 
> be a few ms.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-15 Thread via GitHub


dajac commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1566001316


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##
@@ -1046,4 +1051,101 @@ public void testIsInStatesCaseInsensitive() {
 assertTrue(group.isInStates(Collections.singleton("stable"), 1));
 assertFalse(group.isInStates(Collections.singleton("empty"), 1));
 }
+
+@Test
+public void testClassicMembersSupportedProtocols() {
+ConsumerGroup consumerGroup = createConsumerGroup("foo");
+List rangeProtocol = 
new ArrayList<>();
+rangeProtocol.add(new 
ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+.setMetadata(new byte[0]));
+
+List 
roundRobinAndRangeProtocols = new ArrayList<>();
+roundRobinAndRangeProtocols.add(new 
ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("roundrobin")
+.setMetadata(new byte[0]));
+roundRobinAndRangeProtocols.add(new 
ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+.setMetadata(new byte[0]));
+
+ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder("member-1")
+.setClassicMemberMetadata(new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSupportedProtocols(rangeProtocol))
+.build();
+consumerGroup.updateMember(member1);
+
+ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder("member-2")
+.setClassicMemberMetadata(new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSupportedProtocols(roundRobinAndRangeProtocols))
+.build();
+consumerGroup.updateMember(member2);
+
+assertEquals(2, 
consumerGroup.classicMembersSupportedProtocols().get("range"));
+assertEquals(1, 
consumerGroup.classicMembersSupportedProtocols().get("roundrobin"));
+
assertTrue(consumerGroup.supportsClassicProtocols(ConsumerProtocol.PROTOCOL_TYPE,
 new HashSet<>(Arrays.asList("range", "sticky";
+
assertFalse(consumerGroup.supportsClassicProtocols(ConsumerProtocol.PROTOCOL_TYPE,
 new HashSet<>(Arrays.asList("sticky", "roundrobin";
+
+member2 = new ConsumerGroupMember.Builder(member2)
+.setClassicMemberMetadata(new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSupportedProtocols(rangeProtocol))
+.build();
+consumerGroup.updateMember(member2);
+
+assertEquals(2, 
consumerGroup.classicMembersSupportedProtocols().get("range"));
+
assertFalse(consumerGroup.classicMembersSupportedProtocols().containsKey("roundrobin"));
+
+member1 = new ConsumerGroupMember.Builder(member1)
+.setClassicMemberMetadata(new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSupportedProtocols(roundRobinAndRangeProtocols))
+.build();
+consumerGroup.updateMember(member1);
+member2 = new ConsumerGroupMember.Builder(member2)
+.setClassicMemberMetadata(new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSupportedProtocols(roundRobinAndRangeProtocols))
+.build();
+consumerGroup.updateMember(member2);
+
+assertEquals(2, 
consumerGroup.classicMembersSupportedProtocols().get("range"));
+assertEquals(2, 
consumerGroup.classicMembersSupportedProtocols().get("roundrobin"));
+
assertTrue(consumerGroup.supportsClassicProtocols(ConsumerProtocol.PROTOCOL_TYPE,
 new HashSet<>(Arrays.asList("sticky", "roundrobin";
+}
+
+@Test
+public void testAllUseClassicProtocol() {

Review Comment:
   nit: testAllMembersUseClassicProtocol



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in tools [kafka]

2024-04-15 Thread via GitHub


mimaison merged PR #15709:
URL: https://github.com/apache/kafka/pull/15709


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in tools [kafka]

2024-04-15 Thread via GitHub


mimaison commented on PR #15709:
URL: https://github.com/apache/kafka/pull/15709#issuecomment-2057128107

   None of the failures seem related, merging to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16417:When initializeResources throws an exception in TopicBasedRemoteLogMetadataManager.scala, initializationFailed needs to be set to true [kafka]

2024-04-15 Thread via GitHub


johnnychhsu commented on code in PR #15595:
URL: https://github.com/apache/kafka/pull/15595#discussion_r1565975540


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java:
##
@@ -436,6 +436,7 @@ private void initializeResources() {
 log.info("Initialized topic-based RLMM resources 
successfully");
 } catch (Exception e) {
 log.error("Encountered error while initializing 
producer/consumer", e);
+initializationFailed = true;

Review Comment:
   when the topic creation failed, currently it sets `initializationFailed` to 
true, and if it failed when init producer, consumer, it's not set now. Do we 
need to set `initializationFailed` when the producer/consumer init fail?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15649: Handle directory failure timeout [kafka]

2024-04-15 Thread via GitHub


mimaison commented on code in PR #15697:
URL: https://github.com/apache/kafka/pull/15697#discussion_r1565973156


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -870,6 +875,7 @@ object KafkaConfig {
   .define(CreateTopicPolicyClassNameProp, CLASS, null, LOW, 
CreateTopicPolicyClassNameDoc)
   .define(AlterConfigPolicyClassNameProp, CLASS, null, LOW, 
AlterConfigPolicyClassNameDoc)
   .define(LogMessageDownConversionEnableProp, BOOLEAN, 
LogConfig.DEFAULT_MESSAGE_DOWNCONVERSION_ENABLE, LOW, 
LogMessageDownConversionEnableDoc)
+  .define(LogDirFailureTimeoutMsProp, LONG, 
Defaults.LOG_DIR_FAILURE_TIMEOUT_MS, atLeast(0), MEDIUM, 
LogDirFailureTimeoutMsDoc)

Review Comment:
   In the KIP the accepted value range is defined as >= 1. I wonder if values 
below 1s actually make much sense. 
   Also the importance was defined as low.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: fix typo [kafka]

2024-04-15 Thread via GitHub


johnnychhsu commented on PR #15643:
URL: https://github.com/apache/kafka/pull/15643#issuecomment-2057106113

   thanks for the PR. this looks good to me


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15615: Improve handling of fetching during metadata updates. [kafka]

2024-04-15 Thread via GitHub


johnnychhsu commented on PR #15647:
URL: https://github.com/apache/kafka/pull/15647#issuecomment-2057103180

   thanks for the PR!
   just curious, what is the problem without the newly added state? does it 
cause any issues? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-15 Thread via GitHub


johnnychhsu commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1565898495


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##
@@ -800,8 +802,23 @@ private Void deleteTypeIfExists(StorageAction delete, Stri
 try {
 if (delete.execute())
 LOGGER.info("Deleted {} {}.", fileType, 
file.getAbsolutePath());
-else if (logIfMissing)
-LOGGER.info("Failed to delete {} {} because it does not 
exist.", fileType, file.getAbsolutePath());
+else {
+if (logIfMissing) {

Review Comment:
   why can't we just add the fall back login inside `if (logIfMissing)`, 
instead adding a new if-else block inside this else block? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in server and server-common [kafka]

2024-04-15 Thread via GitHub


mimaison commented on code in PR #15710:
URL: https://github.com/apache/kafka/pull/15710#discussion_r1565897312


##
server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsInstance.java:
##
@@ -116,8 +116,7 @@ public synchronized boolean 
maybeUpdatePushRequestTimestamp(long currentTime) {
 */
 boolean canAccept = lastGetRequestTimestamp > lastPushRequestTimestamp;

Review Comment:
   It's shorter but I'm not sure it's more readable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16373: Adding code to support Apache Kafka Docker Official Images [kafka]

2024-04-15 Thread via GitHub


soarez commented on code in PR #15704:
URL: https://github.com/apache/kafka/pull/15704#discussion_r1565856796


##
docker/generate_kafka_pr_template.sh:
##
@@ -0,0 +1,76 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Ensure script exits on error or unset variable
+set -eu
+
+# Define the 'self' variable with the script's basename
+self="$(basename "$BASH_SOURCE")"
+
+# Navigate to the script's directory and then to docker_official_images
+cd "$(dirname "$(readlink -f "$BASH_SOURCE")")/docker_official_images"
+
+# Source common utilities
+source ../common.sh
+
+# Initialize an empty variable for the highest version
+highest_version=""
+
+# Output header information
+cat <<-EOH
+# This file is generated via https://github.com/apache/kafka/blob/$(fileCommit 
"../$self")/docker/generate_kafka_pr_template.sh
+
+Maintainers: The Apache Kafka Project  (@ApacheKafka)
+GitRepo: https://github.com/apache/kafka.git
+EOH
+
+# Find all versions, excluding -rc, sort them, and determine the globally 
highest version
+versions=$(find . -mindepth 1 -maxdepth 1 -type d ! -name "*-rc" | sort -V)
+for dir in $versions; do
+version=$(basename "$dir")
+highest_version="$version" # Continuously update to ensure the last is the 
highest
+done

Review Comment:
   ```suggestion
   highest_version=$(find . -mindepth 1 -maxdepth 1 -type d ! -name "*-rc" | 
sort -Vr | xargs basename | head -n 1)
   ```



##
docker/docker_official_image_build_test.py:
##
@@ -0,0 +1,128 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Python script to build and test a docker image
+This script is used to generate a test report
+
+Usage:
+docker_build_test.py --help
+Get detailed description of each option
+
+Example command:-
+docker_build_test.py  --image-tag  --image-type 
 --kafka-url 
+
+This command will build an image with  as image name, 
 as image_tag (it will be latest by default),
+ as image type (jvm by default),  for the kafka 
inside the image and run tests on the image.
+-b can be passed as additional argument if you just want to build the 
image.
+-t can be passed if you just want to run tests on the image.
+"""
+
+from datetime import date
+import argparse
+from distutils.dir_util import copy_tree
+import shutil
+from test.docker_sanity_test import run_tests
+from common import execute, jvm_image
+import tempfile
+import os
+
+
+def set_executable_permissions(directory):
+"""
+Sets executable permissions for all files in the specified directory and 
its subdirectories.
+"""
+for root, _, files in os.walk(directory):
+for file in files:
+path = os.path.join(root, file)
+os.chmod(path, os.stat(path).st_mode | 0o111)
+
+
+def build_jvm(image, tag, kafka_version):
+image = f'{image}:{tag}'
+current_dir = os.path.dirname(os.path.realpath(__file__))
+temp_dir_path = tempfile.mkdtemp()
+directories = [
+f'{current_dir}/docker_official_images/{kafka_version}/jvm',
+f'{current_dir}/docker_official_images/{kafka_version}/jvm/resources'
+]
+for directory in directories:
+set_executable_permissions(directory)
+copy_tree(f"{current_dir}/docker_official_images/{kafka_version}/jvm",
+  f"{temp_dir_path}/jvm")
+
copy_tree(f"{current_dir}/docker_official_images/{kafka_version}/jvm/resources",
+  

Re: [PR] MINOR: fix duplicated return and other streams docs typos [kafka]

2024-04-15 Thread via GitHub


johnnychhsu commented on code in PR #15713:
URL: https://github.com/apache/kafka/pull/15713#discussion_r1565877195


##
streams/src/main/java/org/apache/kafka/streams/kstream/NamedOperation.java:
##
@@ -17,7 +17,7 @@
 package org.apache.kafka.streams.kstream;
 
 /**
- * Default interface which can be used to personalized the named of 
operations, internal topics or store.
+ * Default interface which can be used to customize the named of operations, 
internal topics or store.

Review Comment:
   shall we also change the description to 'the name of operations'? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-04-15 Thread via GitHub


C0urante commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-2056973237

   Thanks Sagar, great catch! I suspected this would be a gnarly one to tackle 
but it's turning out to be even harder than I thought.
   
   I think there's still an issue with the current state of the PR. It looks 
like we aren't blocking on the future returned by `setPrimaryThenSecondary`, 
which means that we may spuriously return early from `get` in the future we're 
returning from `ConnectorOffsetBackingStore::set` if the write to the primary 
store hasn't completed yet. I believe this is missed by tests because the 
producer writes we mock out all take place synchronously; maybe we can use the 
`MockProducer` more idiomatically to simulate records being ack'd after calls 
to `MockProducer::send` have returned?
   
   I've sketched a new kind of `Future` implementation that seems to do the 
trick, though I haven't tested it rigorously:
   
   ```java
   private class ChainedOffsetWriteFuture implements Future {
   
   private final OffsetBackingStore primaryStore;
   private final OffsetBackingStore secondaryStore;
   private final Map completeOffsets;
   private final Map regularOffsets;
   private final Callback callback;
   private final AtomicReference writeError;
   private final CountDownLatch completed;
   
   public ChainedOffsetWriteFuture(
   OffsetBackingStore primaryStore,
   OffsetBackingStore secondaryStore,
   Map completeOffsets,
   Map regularOffsets,
   Map tombstoneOffsets,
   Callback callback
   ) {
   this.primaryStore = primaryStore;
   this.secondaryStore = secondaryStore;
   this.completeOffsets = completeOffsets;
   this.regularOffsets = regularOffsets;
   this.callback = callback;
   this.writeError = new AtomicReference<>();
   this.completed = new CountDownLatch(1);
   
   secondaryStore.set(tombstoneOffsets, this::onFirstWrite);
   }
   
   private void onFirstWrite(Throwable error, Void ignored) {
   if (error != null) {
   log.trace("Skipping offsets write to primary store because 
secondary tombstone write has failed", error);
   try (LoggingContext context = loggingContext()) {
   callback.onCompletion(error, ignored);
   writeError.compareAndSet(null, error);
   completed.countDown();
   }
   return;
   }
   setPrimaryThenSecondary(primaryStore, secondaryStore, 
completeOffsets, regularOffsets, this::onSecondWrite);
   }
   
   private void onSecondWrite(Throwable error, Void ignored) {
   callback.onCompletion(error, ignored);
   writeError.compareAndSet(null, error);
   completed.countDown();
   }
   
   @Override
   public boolean cancel(boolean mayInterruptIfRunning) {
   return false;
   }
   
   @Override
   public boolean isCancelled() {
   return false;
   }
   
   @Override
   public boolean isDone() {
   return completed.getCount() == 0;
   }
   
   @Override
   public Void get() throws InterruptedException, ExecutionException {
   completed.await();
   if (writeError.get() != null) {
   throw new ExecutionException(writeError.get());
   }
   return null;
   }
   
   @Override
   public Void get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
   if (!completed.await(timeout, unit)) {
   throw new TimeoutException("Failed to complete offset write in 
time");
   }
   if (writeError.get() != null) {
   throw new ExecutionException(writeError.get());
   }
   return null;
   }
   }
   ```
   
   (I've omitted an implementation of `cancel` and `isCancelled` for now since 
I'm not sure it's really necessary, but LMK if I've missed a case where this 
would make a difference.)
   
   The new class can be used at the end of `ConnectorOffsetBackingStore::set` 
like this:
   
   ```java
   if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
   return new ChainedOffsetWriteFuture(
   primaryStore,
   secondaryStore,
   values,
   regularOffsets,
   tombstoneOffsets,
   callback
   );
   } else {
   return setPrimaryThenSecondary(primaryStore, secondaryStore, values, 
regularOffsets, callback);
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:

Re: [PR] MINOR: fix duplicated return and other streams docs typos [kafka]

2024-04-15 Thread via GitHub


johnnychhsu commented on PR #15713:
URL: https://github.com/apache/kafka/pull/15713#issuecomment-2056975870

   thanks for the pr!
   this looks good to me, leave a simple comment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16436: Online upgrade triggering and group type conversion [kafka]

2024-04-15 Thread via GitHub


dajac commented on code in PR #15662:
URL: https://github.com/apache/kafka/pull/15662#discussion_r1565866968


##
group-coordinator/src/main/resources/common/message/ConsumerGroupMemberMetadataValue.json:
##
@@ -35,6 +35,20 @@
 { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", 
"default": -1,
   "about": "The rebalance timeout" },
 { "name": "ServerAssignor", "versions": "0+", "nullableVersions": "0+", 
"type": "string",
-  "about": "The server assignor to use; or null if not used." }
+  "about": "The server assignor to use; or null if not used." },
+{ "name": "ClassicMemberMetadata", "versions": "0+", "nullableVersions": 
"0+", "type": "ClassicMemberMetadata",

Review Comment:
   Let's make it a tagged field in order to not break the backward 
compatibility of the record. You can do it by adding `"taggedVersions": "0+", 
"tag": 0`.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -998,4 +1090,177 @@ public ConsumerGroupDescribeResponseData.DescribedGroup 
asDescribedGroup(
 );
 return describedGroup;
 }
+
+/**
+ * Create a new consumer group according to the given classic group.
+ *
+ * @param snapshotRegistry  The SnapshotRegistry.
+ * @param metrics   The GroupCoordinatorMetricsShard.
+ * @param classicGroup  The converted classic group.
+ * @param topicsImage   The TopicsImage for topic id and topic name 
conversion.
+ * @param log   The logger to use.
+ * @return  The created ConsumerGruop.
+ */
+public static ConsumerGroup fromClassicGroup(
+SnapshotRegistry snapshotRegistry,
+GroupCoordinatorMetricsShard metrics,
+ClassicGroup classicGroup,
+TopicsImage topicsImage,
+Logger log
+) {
+String groupId = classicGroup.groupId();
+ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, 
groupId, metrics);
+consumerGroup.setGroupEpoch(classicGroup.generationId());
+consumerGroup.setTargetAssignmentEpoch(classicGroup.generationId());
+
+classicGroup.allMembers().forEach(classicGroupMember -> {
+ConsumerPartitionAssignor.Subscription subscription = 
deserializeSubscription(
+classicGroupMember.metadata(classicGroup.protocolName().get()),
+log,
+"group upgrade"
+);
+Map> partitions = 
topicPartitionMapFromList(subscription.ownedPartitions(), topicsImage);
+
+ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(classicGroupMember.memberId())
+.setMemberEpoch(classicGroup.generationId())
+.setState(MemberState.STABLE)
+.setPreviousMemberEpoch(classicGroup.generationId())
+
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
+.setRackId(subscription.rackId().orElse(null))
+.setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
+.setClientId(classicGroupMember.clientId())
+.setClientHost(classicGroupMember.clientHost())
+.setSubscribedTopicNames(subscription.topics())
+.setAssignedPartitions(partitions)
+
.setSupportedClassicProtocols(classicGroupMember.supportedProtocols())
+.build();
+consumerGroup.updateMember(newMember);
+consumerGroup.updateTargetAssignment(newMember.memberId(), new 
Assignment(partitions));
+});
+
+return consumerGroup;
+}
+
+/**
+ * Populate the record list with the records needed to create the given 
consumer group.
+ *
+ * @param consumerGroup The consumer group to create.
+ * @param records   The list to which the new records are added.
+ */
+public static void createConsumerGroupRecords(
+ConsumerGroup consumerGroup,
+List records
+) {
+String groupId = consumerGroup.groupId;
+
+consumerGroup.members().forEach((__, consumerGroupMember) ->
+records.add(RecordHelpers.newMemberSubscriptionRecord(groupId, 
consumerGroupMember))
+);
+
+records.add(RecordHelpers.newGroupEpochRecord(groupId, 
consumerGroup.groupEpoch()));
+
+consumerGroup.members().forEach((consumerGroupMemberId, 
consumerGroupMember) ->
+records.add(RecordHelpers.newTargetAssignmentRecord(groupId, 
consumerGroupMemberId, 
consumerGroup.targetAssignment(consumerGroupMemberId).partitions()))
+);
+
+records.add(RecordHelpers.newTargetAssignmentEpochRecord(groupId, 
consumerGroup.groupEpoch()));
+
+consumerGroup.members().forEach((__, consumerGroupMember) ->
+records.add(RecordHelpers.newCurrentAssignmentRecord(groupId, 
consumerGroupMember))
+);
+}
+
+/**
+ * Converts the list 

Re: [PR] MINOR: Various cleanups in server and server-common [kafka]

2024-04-15 Thread via GitHub


mimaison commented on code in PR #15710:
URL: https://github.com/apache/kafka/pull/15710#discussion_r1565856071


##
server/src/main/java/org/apache/kafka/server/AssignmentsManager.java:
##
@@ -321,7 +321,7 @@ public void run() throws Exception {
 AssignReplicasToDirsResponseData data = 
((AssignReplicasToDirsResponse) response.responseBody()).data();
 
 Set failed = filterFailures(data, inflight);
-Set completed = Utils.diff(HashSet::new, 
inflight.values().stream().collect(Collectors.toSet()), failed);
+Set completed = Utils.diff(HashSet::new, new 
HashSet<>(inflight.values()), failed);

Review Comment:
   I'd prefer only sticking to simple refactorings in this PR. We can do this 
optimization in another PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add missing docs for migration metrics [kafka]

2024-04-15 Thread via GitHub


soarez commented on code in PR #15718:
URL: https://github.com/apache/kafka/pull/15718#discussion_r1565822707


##
docs/ops.html:
##
@@ -2126,7 +2142,7 @@ https://github.com/apache/kafka/blob/3617dda9a5415ee1597967f754985fb73a5350c6/core/src/main/scala/kafka/controller/KafkaController.scala#L74



##
docs/ops.html:
##
@@ -2113,6 +2113,22 @@ https://github.com/apache/kafka/blob/3617dda9a5415ee1597967f754985fb73a5350c6/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java#L54-L55



##
docs/ops.html:
##
@@ -2113,6 +2113,22 @@ https://github.com/apache/kafka/blob/3617dda9a5415ee1597967f754985fb73a5350c6/metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationState.java#L33-L72



##
docs/ops.html:
##
@@ -2113,6 +2113,22 @@ https://github.com/apache/kafka/blob/3617dda9a5415ee1597967f754985fb73a5350c6/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java#L42-L43



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-15 Thread via GitHub


lianetm commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1565783934


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -269,6 +269,9 @@ private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequest(final long curr
 heartbeatRequestState.onSendAttempt(currentTimeMs);
 membershipManager.onHeartbeatRequestSent();
 metricsManager.recordHeartbeatSentMs(currentTimeMs);
+// Reset timer when sending the request, to make sure that, if waiting 
for the interval,
+// we don't include the response time (which may introduce delay)

Review Comment:
   You're right, I don't think it's bringing anything not clear with the func 
name and action themselves. Removed.
   This is covered in the new test I added 
[here](https://github.com/apache/kafka/blob/fe483ff816b62133291f77f29b00e3bc706b581f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java#L258).
 I just included now an assert message along the lines of this comment to make 
it clearer in the test.   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Add missing docs for migration metrics [kafka]

2024-04-15 Thread via GitHub


soarez opened a new pull request, #15718:
URL: https://github.com/apache/kafka/pull/15718

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-15 Thread via GitHub


lianetm commented on PR #15698:
URL: https://github.com/apache/kafka/pull/15698#issuecomment-2056883089

   Hey @cadonna, thanks a lot for your feedback! All comments addressed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-15 Thread via GitHub


lianetm commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1565815246


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -482,6 +484,14 @@ public long nextHeartbeatMs(final long currentTimeMs) {
 return heartbeatTimer.remainingMs();
 }
 
+public void onFailedAttempt(final long currentTimeMs) {
+// Expire timer to allow sending HB after a failure. After a 
failure, a next HB may be
+// needed with backoff (ex. errors that lead to retries, like 
coordinator load error),
+// or immediately (ex. errors that lead to rejoining, like fencing 
errors).
+heartbeatTimer.update(heartbeatTimer.currentTimeMs() + 
heartbeatTimer.remainingMs());

Review Comment:
   Done, good point. I changed it to reset to 0, that shows the intention of 
not having an interval to wait for, which is what we want on these failure 
scenarios.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-15 Thread via GitHub


lianetm commented on code in PR #15698:
URL: https://github.com/apache/kafka/pull/15698#discussion_r1565783934


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -269,6 +269,9 @@ private NetworkClientDelegate.UnsentRequest 
makeHeartbeatRequest(final long curr
 heartbeatRequestState.onSendAttempt(currentTimeMs);
 membershipManager.onHeartbeatRequestSent();
 metricsManager.recordHeartbeatSentMs(currentTimeMs);
+// Reset timer when sending the request, to make sure that, if waiting 
for the interval,
+// we don't include the response time (which may introduce delay)

Review Comment:
   You're right, I don't think it's bringing anything not clear with the func 
name and action themselves. Removed.
   This is covered in the new test I added. I just included now an assert 
message along the lines of this comment to make it clearer in the test.   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in clients [kafka]

2024-04-15 Thread via GitHub


mimaison commented on code in PR #15705:
URL: https://github.com/apache/kafka/pull/15705#discussion_r1565764251


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java:
##
@@ -136,8 +136,8 @@ private CoordinatorKey 
requireSingletonAndType(Set keys) {
 }
 
 private void ensureSameType(Set keys) {
-if (keys.size() < 1) {
-throw new IllegalArgumentException("Unexpected size of key set: 
expected >= 1, but got " + keys.size());
+if (keys.isEmpty()) {
+throw new IllegalArgumentException("Unexpected size of key set: 
expected >= 1, but got 0");

Review Comment:
   It's the same really. I just simplified the error message but be a fixed 
string.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in clients [kafka]

2024-04-15 Thread via GitHub


mimaison commented on code in PR #15705:
URL: https://github.com/apache/kafka/pull/15705#discussion_r1565721043


##
clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java:
##
@@ -79,12 +79,8 @@ public ByteBuffer get(int size) {
 @Override
 public void release(ByteBuffer buffer) {
 buffer.clear();
-Deque bufferQueue = bufferMap.get(buffer.capacity());
-if (bufferQueue == null) {
-// We currently keep a single buffer in flight, so optimise 
for that case
-bufferQueue = new ArrayDeque<>(1);
-bufferMap.put(buffer.capacity(), bufferQueue);
-}
+Deque bufferQueue = 
bufferMap.computeIfAbsent(buffer.capacity(), k -> new ArrayDeque<>(1));
+// We currently keep a single buffer in flight, so optimise for 
that case

Review Comment:
   Good point!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in clients [kafka]

2024-04-15 Thread via GitHub


mimaison commented on code in PR #15705:
URL: https://github.com/apache/kafka/pull/15705#discussion_r1565719621


##
clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java:
##
@@ -37,13 +37,13 @@ public class GarbageCollectedMemoryPool extends 
SimpleMemoryPool implements Auto
 //serves 2 purposes - 1st it maintains the ref objects reachable (which is 
a requirement for them
 //to ever be enqueued), 2nd keeps some (small) metadata for every buffer 
allocated
 private final Map buffersInFlight = new 
ConcurrentHashMap<>();
-private final GarbageCollectionListener gcListener = new 
GarbageCollectionListener();
 private final Thread gcListenerThread;
-private volatile boolean alive = true;
+private volatile boolean alive;
 
 public GarbageCollectedMemoryPool(long sizeBytes, int 
maxSingleAllocationSize, boolean strict, Sensor oomPeriodSensor) {
 super(sizeBytes, maxSingleAllocationSize, strict, oomPeriodSensor);
 this.alive = true;

Review Comment:
   Yes I agree



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in clients [kafka]

2024-04-15 Thread via GitHub


mimaison commented on code in PR #15705:
URL: https://github.com/apache/kafka/pull/15705#discussion_r1565718889


##
clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java:
##
@@ -60,7 +58,6 @@ public SslChannelBuilder(Mode mode,
 this.mode = mode;
 this.listenerName = listenerName;
 this.isInterBrokerListener = isInterBrokerListener;
-this.log = logContext.logger(getClass());

Review Comment:
   Since this field is unused so I thought we should just delete it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16553) log controller configs when startup

2024-04-15 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16553:
--

 Summary: log controller configs when startup
 Key: KAFKA-16553
 URL: https://issues.apache.org/jira/browse/KAFKA-16553
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


We can't observe the controller configs from the log file. We can copy the 
solution used by broker 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L492).

Or this issue should be blocked by 
https://issues.apache.org/jira/browse/KAFKA-13105 to wait for more graceful 
solution.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16371: fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE [kafka]

2024-04-15 Thread via GitHub


dajac commented on code in PR #15536:
URL: https://github.com/apache/kafka/pull/15536#discussion_r1565639650


##
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##
@@ -1661,6 +1661,65 @@ class GroupMetadataManagerTest {
 assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
   }
 
+  @Test
+  def testOffsetMetadataTooLargePartialFailure(): Unit = {
+val memberId = ""
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 
1, "foo")
+val offset = 37
+val requireStable = true;
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  topicIdPartition -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds()),
+  validTopicIdPartition -> OffsetAndMetadata(offset, "", 
time.milliseconds()))
+
+expectAppendMessage(Errors.NONE)
+
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+  commitErrors = Some(errors)
+}
+
+assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, 
offsets, callback, verificationGuard = None)
+assertTrue(group.hasOffsets)
+
+assertFalse(commitErrors.isEmpty)
+assertEquals(
+  Some(Errors.OFFSET_METADATA_TOO_LARGE),
+  commitErrors.get.get(topicIdPartition)
+)
+assertEquals(
+  Some(Errors.NONE),
+  commitErrors.get.get(validTopicIdPartition)
+)

Review Comment:
   nit: Would it be possible to use `assertEquals(expectedMap, 
commitErrors.get)`? We usually prefer this way because it ensures that the Map 
only contains what we expect.



##
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##
@@ -1661,6 +1661,65 @@ class GroupMetadataManagerTest {
 assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
   }
 
+  @Test
+  def testOffsetMetadataTooLargePartialFailure(): Unit = {
+val memberId = ""
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 
1, "foo")
+val offset = 37
+val requireStable = true;
+
+groupMetadataManager.addOwnedPartition(groupPartitionId)
+val group = new GroupMetadata(groupId, Empty, time)
+groupMetadataManager.addGroup(group)
+
+val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupMetadataManager.partitionFor(group.groupId))
+val offsets = immutable.Map(
+  topicIdPartition -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds()),
+  validTopicIdPartition -> OffsetAndMetadata(offset, "", 
time.milliseconds()))
+
+expectAppendMessage(Errors.NONE)
+
+var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
+  commitErrors = Some(errors)
+}
+
+assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
+groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, 
offsets, callback, verificationGuard = None)
+assertTrue(group.hasOffsets)
+
+assertFalse(commitErrors.isEmpty)
+assertEquals(
+  Some(Errors.OFFSET_METADATA_TOO_LARGE),
+  commitErrors.get.get(topicIdPartition)
+)
+assertEquals(
+  Some(Errors.NONE),
+  commitErrors.get.get(validTopicIdPartition)
+)
+
+val cachedOffsets = groupMetadataManager.getOffsets(
+  groupId,
+  requireStable,
+  Some(Seq(topicIdPartition.topicPartition, 
validTopicIdPartition.topicPartition))
+)
+assertEquals(
+  Some(OffsetFetchResponse.INVALID_OFFSET),
+  cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
+)
+assertEquals(
+  Some(Errors.NONE),
+  cachedOffsets.get(topicIdPartition.topicPartition).map(_.error)
+)
+assertEquals(
+  Some(offset),
+  cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset)
+)

Review Comment:
   nit: Same comment as the previous one.



##
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##
@@ -1661,6 +1661,65 @@ class GroupMetadataManagerTest {
 assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
   }
 
+  @Test
+  def testOffsetMetadataTooLargePartialFailure(): Unit = {
+val memberId = ""
+val 

Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-15 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1565638692


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1176,6 +1176,42 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
+  val tp = futureLog.topicPartition
+
+  futureLog.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true)
+  futureLog.removeLogMetrics()
+  futureLogs.remove(tp)
+
+  currentLog.foreach { log =>
+log.removeLogMetrics()
+log.renameDir(UnifiedLog.logDeleteDirName(tp), shouldReinitialize = 
false)
+addLogToBeDeleted(log)
+info(s"Old log for partition ${tp} is renamed to 
${log.dir.getAbsolutePath} and is scheduled for deletion")
+  }
+
+  currentLogs.put(tp, futureLog)
+  futureLog.newMetrics()
+
+  info(s"Successfully renamed abandoned future log for $tp")

Review Comment:
   That's a great suggestion! I'm going to try refactoring this to use 
`replaceCurrentWithFutureLog`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]

2024-04-15 Thread via GitHub


showuon commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1565635052


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -40,7 +92,72 @@ public interface TierStateMachine {
  */
 PartitionFetchState start(TopicPartition topicPartition,
   PartitionFetchState currentFetchState,
-  PartitionData fetchPartitionData) throws 
Exception;
+  PartitionData fetchPartitionData) throws 
Exception {
+OffsetAndEpoch epochAndLeaderLocalStartOffset = 
leader.fetchEarliestLocalOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+int epoch = epochAndLeaderLocalStartOffset.leaderEpoch();
+long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset();
+
+long offsetToFetch;
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark();
+
+try {
+offsetToFetch = buildRemoteLogAuxState(topicPartition, 
currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, 
fetchPartitionData.logStartOffset());
+} catch (RemoteStorageException e) {
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark();
+throw e;
+}
+
+OffsetAndEpoch fetchLatestOffsetResult = 
leader.fetchLatestOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+long leaderEndOffset = fetchLatestOffsetResult.offset();
+
+long initialLag = leaderEndOffset - offsetToFetch;
+
+return PartitionFetchState.apply(currentFetchState.topicId(), 
offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+Fetching$.MODULE$, 
replicaMgr.localLogOrException(topicPartition).latestEpoch());

Review Comment:
   Ah, nice catch! Interestingly there's no test caught this error. Let me 
write a test for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in storage [kafka]

2024-04-15 Thread via GitHub


mimaison merged PR #15711:
URL: https://github.com/apache/kafka/pull/15711


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15230) ApiVersions data between controllers is not reliable

2024-04-15 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-15230.

Fix Version/s: (was: 3.7.0)
   Resolution: Duplicate

this is fixed by https://issues.apache.org/jira/browse/KAFKA-15369

> ApiVersions data between controllers is not reliable
> 
>
> Key: KAFKA-15230
> URL: https://issues.apache.org/jira/browse/KAFKA-15230
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Arthur
>Assignee: Colin McCabe
>Priority: Critical
>
> While testing ZK migrations, I noticed a case where the controller was not 
> starting the migration due to the missing ApiVersions data from other 
> controllers. This was unexpected because the quorum was running and the 
> followers were replicating the metadata log as expected. After examining a 
> heap dump of the leader, it was in fact the case that the ApiVersions map of 
> NodeApiVersions was empty.
>  
> After further investigation and offline discussion with [~jsancio], we 
> realized that after the initial leader election, the connection from the Raft 
> leader to the followers will become idle and eventually timeout and close. 
> This causes NetworkClient to purge the NodeApiVersions data for the closed 
> connections.
>  
> There are two main side effects of this behavior: 
> 1) If migrations are not started within the idle timeout period (10 minutes, 
> by default), then they will not be able to be started. After this timeout 
> period, I was unable to restart the controllers in such a way that the leader 
> had active connections with all followers.
> 2) Dynamically updating features, such as "metadata.version", is not 
> guaranteed to be safe
>  
> There is a partial workaround for the migration issue. If we set "
> connections.max.idle.ms" to -1, the Raft leader will never disconnect from 
> the followers. However, if a follower restarts, the leader will not 
> re-establish a connection.
>  
> The feature update issue has no safe workarounds.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-15230) ApiVersions data between controllers is not reliable

2024-04-15 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reopened KAFKA-15230:


reopen to make it as duplicate

> ApiVersions data between controllers is not reliable
> 
>
> Key: KAFKA-15230
> URL: https://issues.apache.org/jira/browse/KAFKA-15230
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Arthur
>Assignee: Colin McCabe
>Priority: Critical
> Fix For: 3.7.0
>
>
> While testing ZK migrations, I noticed a case where the controller was not 
> starting the migration due to the missing ApiVersions data from other 
> controllers. This was unexpected because the quorum was running and the 
> followers were replicating the metadata log as expected. After examining a 
> heap dump of the leader, it was in fact the case that the ApiVersions map of 
> NodeApiVersions was empty.
>  
> After further investigation and offline discussion with [~jsancio], we 
> realized that after the initial leader election, the connection from the Raft 
> leader to the followers will become idle and eventually timeout and close. 
> This causes NetworkClient to purge the NodeApiVersions data for the closed 
> connections.
>  
> There are two main side effects of this behavior: 
> 1) If migrations are not started within the idle timeout period (10 minutes, 
> by default), then they will not be able to be started. After this timeout 
> period, I was unable to restart the controllers in such a way that the leader 
> had active connections with all followers.
> 2) Dynamically updating features, such as "metadata.version", is not 
> guaranteed to be safe
>  
> There is a partial workaround for the migration issue. If we set "
> connections.max.idle.ms" to -1, the Raft leader will never disconnect from 
> the followers. However, if a follower restarts, the leader will not 
> re-establish a connection.
>  
> The feature update issue has no safe workarounds.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15369) Allow AdminClient to Talk Directly with the KRaft Controller Quorum and add Controller Registration

2024-04-15 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-15369.

Fix Version/s: 3.7.0
 Assignee: Colin McCabe
   Resolution: Fixed

> Allow AdminClient to Talk Directly with the KRaft Controller Quorum and add 
> Controller Registration
> ---
>
> Key: KAFKA-15369
> URL: https://issues.apache.org/jira/browse/KAFKA-15369
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-15 Thread via GitHub


chia7712 commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1565572433


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1176,6 +1176,42 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
+  val tp = futureLog.topicPartition
+
+  futureLog.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true)
+  futureLog.removeLogMetrics()
+  futureLogs.remove(tp)
+
+  currentLog.foreach { log =>
+log.removeLogMetrics()
+log.renameDir(UnifiedLog.logDeleteDirName(tp), shouldReinitialize = 
false)
+addLogToBeDeleted(log)
+info(s"Old log for partition ${tp} is renamed to 
${log.dir.getAbsolutePath} and is scheduled for deletion")

Review Comment:
   `${tp}` -> `$tp`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-15 Thread via GitHub


chia7712 commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1565571409


##
core/src/test/java/kafka/testkit/BrokerNode.java:
##
@@ -81,8 +82,7 @@ public BrokerNode build(
 logDataDirectories = Collections.
 singletonList(String.format("combined_%d", id));
 } else {
-logDataDirectories = Collections.
-singletonList(String.format("broker_%d_data0", id));
+logDataDirectories = 
Collections.unmodifiableList(Arrays.asList(String.format("broker_%d_data0", 
id), String.format("broker_%d_data1", id)));

Review Comment:
   Personally, I'd like to change setNumBrokerNodes(int) to setBrokerNodes(int, 
int). The second argument is used to define the number of data folders. After 
all, not all tests require 2+ folders. Maybe we can address that in another PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-15 Thread via GitHub


chia7712 commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1565569471


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1176,6 +1176,42 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
+  val tp = futureLog.topicPartition
+
+  futureLog.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true)
+  futureLog.removeLogMetrics()
+  futureLogs.remove(tp)
+
+  currentLog.foreach { log =>
+log.removeLogMetrics()
+log.renameDir(UnifiedLog.logDeleteDirName(tp), shouldReinitialize = 
false)
+addLogToBeDeleted(log)
+info(s"Old log for partition ${tp} is renamed to 
${log.dir.getAbsolutePath} and is scheduled for deletion")
+  }
+
+  currentLogs.put(tp, futureLog)
+  futureLog.newMetrics()
+
+  info(s"Successfully renamed abandoned future log for $tp")

Review Comment:
   it seems to me that we need to call `closeHandlers` at least if we don't do 
refactor in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in storage [kafka]

2024-04-15 Thread via GitHub


mimaison commented on PR #15711:
URL: https://github.com/apache/kafka/pull/15711#issuecomment-2056497697

   None of the test failures are related, merging to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16552) Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests

2024-04-15 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837164#comment-17837164
 ] 

Luke Chen commented on KAFKA-16552:
---

Thanks [~brandboat] !

> Create an internal config to control InitialTaskDelayMs in LogManager to 
> speed up tests
> ---
>
> Key: KAFKA-16552
> URL: https://issues.apache.org/jira/browse/KAFKA-16552
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Kuan Po Tseng
>Priority: Major
>
> When startup LogManager, we'll create schedule tasks like: 
> kafka-log-retention, kafka-recovery-point-checkpoint threads...etc 
> ([here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L629]).
>  All of them have public configs to configure the interval, like 
> `log.retention.check.interval.ms`. But in addition to the scheduler interval, 
> there's a hard coded InitialTaskDelayMs (30 seconds) for all of them. That 
> might not be a problem in production env, since it'll make the kafka server 
> start up faster. But in test env, the 30 secs delay means if there are tests 
> verifying the behaviors like log retention, it'll take 30 secs up to complete 
> the tests.
> To speed up tests, we should create an internal config (ex: 
> "log.initial.task.delay.ms") to control InitialTaskDelayMs in LogManager to 
> speed up tests. This is not intended to be used by normal users, just for 
> speeding up testing usage.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16552) Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests

2024-04-15 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-16552:
-

Assignee: Kuan Po Tseng  (was: Luke Chen)

> Create an internal config to control InitialTaskDelayMs in LogManager to 
> speed up tests
> ---
>
> Key: KAFKA-16552
> URL: https://issues.apache.org/jira/browse/KAFKA-16552
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Kuan Po Tseng
>Priority: Major
>
> When startup LogManager, we'll create schedule tasks like: 
> kafka-log-retention, kafka-recovery-point-checkpoint threads...etc 
> ([here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L629]).
>  All of them have public configs to configure the interval, like 
> `log.retention.check.interval.ms`. But in addition to the scheduler interval, 
> there's a hard coded InitialTaskDelayMs (30 seconds) for all of them. That 
> might not be a problem in production env, since it'll make the kafka server 
> start up faster. But in test env, the 30 secs delay means if there are tests 
> verifying the behaviors like log retention, it'll take 30 secs up to complete 
> the tests.
> To speed up tests, we should create an internal config (ex: 
> "log.initial.task.delay.ms") to control InitialTaskDelayMs in LogManager to 
> speed up tests. This is not intended to be used by normal users, just for 
> speeding up testing usage.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16552) Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests

2024-04-15 Thread Kuan Po Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17837163#comment-17837163
 ] 

Kuan Po Tseng commented on KAFKA-16552:
---

+1 on this !

> Create an internal config to control InitialTaskDelayMs in LogManager to 
> speed up tests
> ---
>
> Key: KAFKA-16552
> URL: https://issues.apache.org/jira/browse/KAFKA-16552
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> When startup LogManager, we'll create schedule tasks like: 
> kafka-log-retention, kafka-recovery-point-checkpoint threads...etc 
> ([here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L629]).
>  All of them have public configs to configure the interval, like 
> `log.retention.check.interval.ms`. But in addition to the scheduler interval, 
> there's a hard coded InitialTaskDelayMs (30 seconds) for all of them. That 
> might not be a problem in production env, since it'll make the kafka server 
> start up faster. But in test env, the 30 secs delay means if there are tests 
> verifying the behaviors like log retention, it'll take 30 secs up to complete 
> the tests.
> To speed up tests, we should create an internal config (ex: 
> "log.initial.task.delay.ms") to control InitialTaskDelayMs in LogManager to 
> speed up tests. This is not intended to be used by normal users, just for 
> speeding up testing usage.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >