Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 commented on PR #15616: URL: https://github.com/apache/kafka/pull/15616#issuecomment-2067936507 > This PR is good but it seems to me `LogSegment` should NOT guess the directory structure managed by upper class (i.e `LogManager`). > > It seems the root cause is caused by following steps: > > 1. the segments to be deleted removed from `LocalLog` > 2. `LocalLog#renameDir` move whole folder > 3. `LocalLog#renameDir` update the parent folder for all segments. However, the segments to be deleted are removed form inner collection already. Hence, some `Segment#log` has a stale file. > > If I understand correctly, another solution is that we pass a function to get latest dir when calling `deleteSegmentFiles` ( > > https://github.com/apache/kafka/blob/2d4abb85bf4a3afb1e3359a05786ab8f3fda127e/core/src/main/scala/kafka/log/LocalLog.scala#L904 > > ). If deleting segment get not-found error, we call `updateParentDir` and delete it again. > WDYT? Hi @chia7712, thanks for the great suggestion. I took a look `LogSegment#deleteIfExists` and `LogSegment#deleteTypeIfExists`. If we want to handle fallback deletion in `LocalLog`, we may need to return true/false in that two functions. However, `LogSegment#deleteIfExists` uses `Utils.tryAll` to handle 4 try/catch blocks. If we want to return true/false, we need to refactor `Utils.tryAll` as well. Finally, `LogSegment#deleteIfExists` is not only used by `LocalLog.deleteSegmentFiles`, but also `LocalLog.splitOverflowedSegment`. We need to handle `LocalLog.splitOverflowedSegment` path, too. I think we can use another Jira to track the change. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1573652117 ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -116,6 +118,57 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } + @Test + def testAlterReplicaLogDirsRequestWithRetention(): Unit = { +val partitionNum = 1 + +// Alter replica dir before topic creation +val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath +val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap +val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) + +// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions +val tp = new TopicPartition(topic, 0) +assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp)) +assertTrue(servers.head.logManager.getLog(tp).isEmpty) + +val topicProperties = new Properties() +topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024") +topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1") +topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024") + +createTopic(topic, partitionNum, 1, topicProperties) +assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent) + +// send enough records to trigger log rolling +(0 until 20).foreach { _ => + TestUtils.generateAndProduceMessages(servers, topic, 10, 1) +} +TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new TopicPartition(topic, 0)).get.numberOfSegments > 1, + "timed out waiting for log segment to roll") + +// Wait for log segment retention. LogManager#InitialTaskDelayMs is 30 seconds. +// The first retention task is executed after 30 seconds, so waiting for 35 seconds should be enough. +TestUtils.waitUntilTrue(() => { + new File(logDir1, tp.toString).listFiles().count(_.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)) > 0 +}, "timed out waiting for log segment to retention", 35000) Review Comment: Thank you. I updated `LOG_INITIAL_TASK_DELAY_MS_CONFIG` as 5 seconds. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16549: suppress the warnings from RemoteLogManager [kafka]
charliecheng630 opened a new pull request, #15767: URL: https://github.com/apache/kafka/pull/15767 - suppress the warnings from RemoteLogManager ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [x] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: fix javadoc argument name in KRaftMetadataCache#getTopicMetadataForDescribeTopicResponse [kafka]
brandboat commented on code in PR #15764: URL: https://github.com/apache/kafka/pull/15764#discussion_r1573667382 ## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ## @@ -267,7 +267,7 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w * * @param topicsThe iterator of topics and their corresponding first partition id to fetch. * @param listenerName The listener name. - * @param firstTopicPartitionStartIndex The start partition index for the first topic + * @param topicPartitionStartIndex The start partition index for the first topic Review Comment: Looks like the description here isn't correct, could you help modify it ? Huge thanks :smiley: Maybe `The function that return the start partition index for the topic`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]
The-Gamer-01 commented on code in PR #15620: URL: https://github.com/apache/kafka/pull/15620#discussion_r1573677249 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java: ## @@ -74,25 +72,24 @@ protected final Map createRemoteLogMetadataTr return map; } -protected final Map createRemoteLogStorageClassToApiKeyMap() { -Map map = new HashMap<>(); -map.put(RemoteLogSegmentMetadata.class.getName(), REMOTE_LOG_SEGMENT_METADATA_API_KEY); -map.put(RemoteLogSegmentMetadataUpdate.class.getName(), REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY); -map.put(RemotePartitionDeleteMetadata.class.getName(), REMOTE_PARTITION_DELETE_API_KEY); -map.put(RemoteLogSegmentMetadataSnapshot.class.getName(), REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY); -return map; -} - public byte[] serialize(RemoteLogMetadata remoteLogMetadata) { -Short apiKey = remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName()); -if (apiKey == null) { -throw new IllegalArgumentException("ApiKey for given RemoteStorageMetadata class: " + remoteLogMetadata.getClass() - + " does not exist."); -} -@SuppressWarnings("unchecked") -ApiMessageAndVersion apiMessageAndVersion = remoteLogMetadataTransform(apiKey).toApiMessageAndVersion(remoteLogMetadata); +RemoteLogMetadataTransform metadataTransform; + +if(remoteLogMetadata.getClass() == RemoteLogSegmentMetadata.class) { Review Comment: Why this pr don't use **instanceof** -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
chia7712 commented on PR #15616: URL: https://github.com/apache/kafka/pull/15616#issuecomment-2067966296 > thanks for the great suggestion. I took a look LogSegment#deleteIfExists and LogSegment#deleteTypeIfExists. If we want to handle fallback deletion in LocalLog, we may need to return true/false in that two functions. However, LogSegment#deleteIfExists uses Utils.tryAll to handle 4 try/catch blocks. If we want to return true/false, we need to refactor Utils.tryAll as well. Finally, LogSegment#deleteIfExists is not only used by LocalLog.deleteSegmentFiles, but also LocalLog.splitOverflowedSegment. We need to handle LocalLog.splitOverflowedSegment path, too. I think we can use another Jira to track the change. Thanks. agree. Let's ship it first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16551) add integration test for ClusterTool
[ https://issues.apache.org/jira/browse/KAFKA-16551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839367#comment-17839367 ] Yin Chen Liao commented on KAFKA-16551: --- I want to take over this issue. > add integration test for ClusterTool > > > Key: KAFKA-16551 > URL: https://issues.apache.org/jira/browse/KAFKA-16551 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > as title -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-1655: new test for ClusterTool, integration with ClusterTest [kafka]
dboyliao commented on PR #15768: URL: https://github.com/apache/kafka/pull/15768#issuecomment-2067968427 The blockers here are `testClusterTooOldToHaveId` and `testUnregisterBroker`. As for `testClusterTooOldToHaveId`, I'm trying which metadata version is old enough to be identified as too old. As for `testUnregisterBroker`, the error message is `WARN [BrokerLifecycleManager id=0] Broker 0 sent a heartbeat request but received error STALE_BROKER_EPOCH. (kafka.server.BrokerLifecycleManager:70)` and the testing seems to be block on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16550) add integration test for LogDirsCommand
[ https://issues.apache.org/jira/browse/KAFKA-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839369#comment-17839369 ] JiaChi Wang commented on KAFKA-16550: - Hi [~chia7712] I'm interested in this issue. Can I take this one? Thanks. > add integration test for LogDirsCommand > --- > > Key: KAFKA-16550 > URL: https://issues.apache.org/jira/browse/KAFKA-16550 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > Currently LogDirsCommand have only UT -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16550) add integration test for LogDirsCommand
[ https://issues.apache.org/jira/browse/KAFKA-16550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16550: -- Assignee: JiaChi Wang (was: Chia-Ping Tsai) > add integration test for LogDirsCommand > --- > > Key: KAFKA-16550 > URL: https://issues.apache.org/jira/browse/KAFKA-16550 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: JiaChi Wang >Priority: Minor > > Currently LogDirsCommand have only UT -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: use assertInstanceOf to replace assertTrue [kafka]
evalaiyc98 opened a new pull request, #15769: URL: https://github.com/apache/kafka/pull/15769 I am a newbie and this is a minor change that use `assertInstanceOf` to replace the `assertTrue`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15853: Refactor KafkaConfig to use PasswordEncoderConfigs [kafka]
OmniaGM opened a new pull request, #15770: URL: https://github.com/apache/kafka/pull/15770 - Move docs to PasswordEncoderConfigs - Renamed the configs in `PasswordEncoderConfigs` to match `_CONFIG` suffix pattern - Move default values to `PasswordEncoderConfigs` - Replace `KafkaConfig.PassowrdEncoder*Prop` with `PasswordEncoderConfigs.*_CONFIGS` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Minor: Fix CDS in docker image [kafka]
VedarthConfluent opened a new pull request, #15771: URL: https://github.com/apache/kafka/pull/15771 Fix CDS in docker image. Due to difference in packages present when jsa files were generated and when docker image is generated, there is a log on starting docker image. `[0.001s][warning][cds] The shared archive file has a bad magic number: 0` This PR fixes the warning. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16594) Add a test to detect CDS errors
Vedarth Sharma created KAFKA-16594: -- Summary: Add a test to detect CDS errors Key: KAFKA-16594 URL: https://issues.apache.org/jira/browse/KAFKA-16594 Project: Kafka Issue Type: Sub-task Reporter: Vedarth Sharma Assignee: Vedarth Sharma Currently pipeline cannot detect whether CDS is working as expected or not. A test for this will help. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]
linu-shibu commented on code in PR #15620: URL: https://github.com/apache/kafka/pull/15620#discussion_r1573721007 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java: ## @@ -74,25 +72,24 @@ protected final Map createRemoteLogMetadataTr return map; } -protected final Map createRemoteLogStorageClassToApiKeyMap() { -Map map = new HashMap<>(); -map.put(RemoteLogSegmentMetadata.class.getName(), REMOTE_LOG_SEGMENT_METADATA_API_KEY); -map.put(RemoteLogSegmentMetadataUpdate.class.getName(), REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY); -map.put(RemotePartitionDeleteMetadata.class.getName(), REMOTE_PARTITION_DELETE_API_KEY); -map.put(RemoteLogSegmentMetadataSnapshot.class.getName(), REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY); -return map; -} - public byte[] serialize(RemoteLogMetadata remoteLogMetadata) { -Short apiKey = remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName()); -if (apiKey == null) { -throw new IllegalArgumentException("ApiKey for given RemoteStorageMetadata class: " + remoteLogMetadata.getClass() - + " does not exist."); -} -@SuppressWarnings("unchecked") -ApiMessageAndVersion apiMessageAndVersion = remoteLogMetadataTransform(apiKey).toApiMessageAndVersion(remoteLogMetadata); +RemoteLogMetadataTransform metadataTransform; + +if(remoteLogMetadata.getClass() == RemoteLogSegmentMetadata.class) { Review Comment: Is there an advantage in using instanceof instead of using getClass() in this scenario? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]
gharris1727 commented on code in PR #15620: URL: https://github.com/apache/kafka/pull/15620#discussion_r1573729186 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java: ## @@ -74,25 +72,24 @@ protected final Map createRemoteLogMetadataTr return map; } -protected final Map createRemoteLogStorageClassToApiKeyMap() { -Map map = new HashMap<>(); -map.put(RemoteLogSegmentMetadata.class.getName(), REMOTE_LOG_SEGMENT_METADATA_API_KEY); -map.put(RemoteLogSegmentMetadataUpdate.class.getName(), REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY); -map.put(RemotePartitionDeleteMetadata.class.getName(), REMOTE_PARTITION_DELETE_API_KEY); -map.put(RemoteLogSegmentMetadataSnapshot.class.getName(), REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY); -return map; -} - public byte[] serialize(RemoteLogMetadata remoteLogMetadata) { -Short apiKey = remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName()); -if (apiKey == null) { -throw new IllegalArgumentException("ApiKey for given RemoteStorageMetadata class: " + remoteLogMetadata.getClass() - + " does not exist."); -} -@SuppressWarnings("unchecked") -ApiMessageAndVersion apiMessageAndVersion = remoteLogMetadataTransform(apiKey).toApiMessageAndVersion(remoteLogMetadata); +RemoteLogMetadataTransform metadataTransform; + +if(remoteLogMetadata.getClass() == RemoteLogSegmentMetadata.class) { Review Comment: I don't think it makes a correctness difference since this is a flat hierarchy and there aren't any subclasses of each of the serialized types. There might be a slight performance difference, but not significant when the code is overall unoptimized, and we can't know without measuring it. I would have reached for instance of because it is more null safe, but if there's an explicit null guard somewhere else then there is again no difference in correctness, just style. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]
OmniaGM opened a new pull request, #15772: URL: https://github.com/apache/kafka/pull/15772 - Move socket configs and docs out of core and into `org.apache.kafka.network.SocketServerConfigs` - Move default values for socket configs into same class ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]
linu-shibu commented on code in PR #15620: URL: https://github.com/apache/kafka/pull/15620#discussion_r1573755412 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java: ## @@ -74,25 +72,24 @@ protected final Map createRemoteLogMetadataTr return map; } -protected final Map createRemoteLogStorageClassToApiKeyMap() { -Map map = new HashMap<>(); -map.put(RemoteLogSegmentMetadata.class.getName(), REMOTE_LOG_SEGMENT_METADATA_API_KEY); -map.put(RemoteLogSegmentMetadataUpdate.class.getName(), REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY); -map.put(RemotePartitionDeleteMetadata.class.getName(), REMOTE_PARTITION_DELETE_API_KEY); -map.put(RemoteLogSegmentMetadataSnapshot.class.getName(), REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY); -return map; -} - public byte[] serialize(RemoteLogMetadata remoteLogMetadata) { -Short apiKey = remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName()); -if (apiKey == null) { -throw new IllegalArgumentException("ApiKey for given RemoteStorageMetadata class: " + remoteLogMetadata.getClass() - + " does not exist."); -} -@SuppressWarnings("unchecked") -ApiMessageAndVersion apiMessageAndVersion = remoteLogMetadataTransform(apiKey).toApiMessageAndVersion(remoteLogMetadata); +RemoteLogMetadataTransform metadataTransform; + +if(remoteLogMetadata.getClass() == RemoteLogSegmentMetadata.class) { Review Comment: Okay, thanks for the clarification! I will change the comparison to use instanceof operator since it is already used in the project. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: use assertInstanceOf to replace assertTrue [kafka]
evalaiyc98 commented on PR #15769: URL: https://github.com/apache/kafka/pull/15769#issuecomment-2068055223 @chia7712 Could you help review this? If there is a need for any further improvement, please let me know. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]
gharris1727 commented on code in PR #15620: URL: https://github.com/apache/kafka/pull/15620#discussion_r1573784112 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java: ## @@ -65,50 +59,45 @@ protected ApiMessage newApiMessage(short apiKey) { return MetadataRecordType.fromId(apiKey).newMetadataRecord(); } -protected final Map createRemoteLogMetadataTransforms() { -Map map = new HashMap<>(); -map.put(REMOTE_LOG_SEGMENT_METADATA_API_KEY, new RemoteLogSegmentMetadataTransform()); -map.put(REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY, new RemoteLogSegmentMetadataUpdateTransform()); -map.put(REMOTE_PARTITION_DELETE_API_KEY, new RemotePartitionDeleteMetadataTransform()); -map.put(REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY, new RemoteLogSegmentMetadataSnapshotTransform()); -return map; -} - -protected final Map createRemoteLogStorageClassToApiKeyMap() { -Map map = new HashMap<>(); -map.put(RemoteLogSegmentMetadata.class.getName(), REMOTE_LOG_SEGMENT_METADATA_API_KEY); -map.put(RemoteLogSegmentMetadataUpdate.class.getName(), REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY); -map.put(RemotePartitionDeleteMetadata.class.getName(), REMOTE_PARTITION_DELETE_API_KEY); -map.put(RemoteLogSegmentMetadataSnapshot.class.getName(), REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY); -return map; -} - public byte[] serialize(RemoteLogMetadata remoteLogMetadata) { -Short apiKey = remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName()); -if (apiKey == null) { -throw new IllegalArgumentException("ApiKey for given RemoteStorageMetadata class: " + remoteLogMetadata.getClass() - + " does not exist."); -} -@SuppressWarnings("unchecked") -ApiMessageAndVersion apiMessageAndVersion = remoteLogMetadataTransform(apiKey).toApiMessageAndVersion(remoteLogMetadata); +ApiMessageAndVersion apiMessageAndVersion; +if (remoteLogMetadata instanceof RemoteLogSegmentMetadata) { +RemoteLogSegmentMetadataTransform metadataTransform = new RemoteLogSegmentMetadataTransform(); Review Comment: Please move these variables to fields, and the `new` calls to the serde constructor. We can reuse the same fields for the deserialization. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16588: broker shutdown hangs when file.delete.delay.ms is zero [kafka]
FrankYang0529 opened a new pull request, #15773: URL: https://github.com/apache/kafka/pull/15773 If `file.delete.delay.ms` is zero, We call `take` even though the `logsToBeDeleted` is empty, and `KafkaScheduler#shutdown` call `shutdown` rather than `shudownNow` (https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java#L134) Hence, the thread won't be completed forever, and it blocks the shutdown of broker. We should replace the `take` by `poll` since we have checked the element before. The zero is a valid value: https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java#L258 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1573795818 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -16,7 +16,15 @@ */ package org.apache.kafka.tools.consumer.group; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; import kafka.utils.TestUtils; Review Comment: It looks like the only place using `subscribeAndWaitForRecords` is `PlaintextAdminIntegrationTest` after this PR is merge. I will create another minor PR to refactor it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16466: add details of an internal exception to the failure message [kafka]
ilyazr commented on PR #15701: URL: https://github.com/apache/kafka/pull/15701#issuecomment-2068079457 @soarez Hi! I've added some changes to fix those tests, but now it shows one failing check. Could you tell me what has gone wrong now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16592: Add a new constructor which invokes the existing constructor with default value for alternativeString [kafka]
vamossagar12 commented on code in PR #15762: URL: https://github.com/apache/kafka/pull/15762#discussion_r1573818929 ## clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java: ## @@ -1256,6 +1256,15 @@ public static class ConfigKey { public final boolean internalConfig; public final String alternativeString; +public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16592: Add a new constructor which invokes the existing constructor with default value for alternativeString [kafka]
vamossagar12 commented on PR #15762: URL: https://github.com/apache/kafka/pull/15762#issuecomment-2068083995 Thanks @chia7712 , I am thinking we can make the constructors as private and let the users use only `define` method. Even in the AK codebase, I don't see the constructor being used widely and all declarations are being done using `define`. I can file a ticket for this if we agree to this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14585: Move StorageTool to tools [kafka]
fvaleri commented on PR #14847: URL: https://github.com/apache/kafka/pull/14847#issuecomment-2068086501 @showuon @mimaison I think this is now ready for review. I think now changes are well isolated. There is no code refactoring or Kafka configuration changes, so comparison with the original code should be straightforward. I'm using `LogConfig` to get all required configurations. Most keys are taken from the configuration objects migrated from `KafkaConfig`. I only miss 3 of them, that I'm defining as constants in `StorageTool`, and will be removed as soon as they are migrated. I tried all commands and options, comparing the output with the old implementation. I also ran a local 3-nodes Kafka cluster in KRaft mode. Finally, I ran all unit and integration tests. Please have a look. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16592: Add a new constructor which invokes the existing constructor with default value for alternativeString [kafka]
chia7712 commented on code in PR #15762: URL: https://github.com/apache/kafka/pull/15762#discussion_r1573826054 ## clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java: ## @@ -1256,6 +1256,16 @@ public static class ConfigKey { public final boolean internalConfig; public final String alternativeString; +// This constructor is present for backward compatibility reasons. +public ConfigKey(String name, Type type, Object defaultValue, Validator validator, + Importance importance, String documentation, String group, + int orderInGroup, Width width, String displayName, + List dependents, Recommender recommender, + boolean internalConfig) { +this(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, +dependents, recommender, internalConfig, null); +} + public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Review Comment: As #13909 is not in any release, we can change this constructor from `public` to `private` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1573832025 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -16,7 +16,15 @@ */ package org.apache.kafka.tools.consumer.group; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; import kafka.utils.TestUtils; Review Comment: > I will create another minor PR to refactor it. Thanks. nice!!! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1573834240 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -173,7 +208,11 @@ private void produceRecord() { private void withStableConsumerGroup(Runnable body) { Consumer consumer = createConsumer(new Properties()); try { -TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, DEFAULT_MAX_WAIT_MS); +consumer.subscribe(Collections.singletonList(TOPIC)); +ConsumerRecords records = consumer.poll(Duration.ofMillis(DEFAULT_MAX_WAIT_MS)); +if (records.isEmpty()) { Review Comment: Could we rewrite it by `assertNotEquals(0, records.count())`? ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -184,7 +223,11 @@ private void withStableConsumerGroup(Runnable body) { private void withEmptyConsumerGroup(Runnable body) { Consumer consumer = createConsumer(new Properties()); Review Comment: please use try-with-resources ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -173,7 +208,11 @@ private void produceRecord() { private void withStableConsumerGroup(Runnable body) { Consumer consumer = createConsumer(new Properties()); Review Comment: please use try-with-resources -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1573839545 ## core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java: ## @@ -290,7 +287,7 @@ public void waitForReadyBrokers() throws InterruptedException { } @Override -public void rollingBrokerRestart() { +public void rollingBrokerRestart(Optional clusterConfig) { Review Comment: As not all implementation support this method, we should remove it from interface. The callers can use `getUnderlying` to get zk instance and call that method ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -211,13 +186,36 @@ public static class Builder { private String listenerName; private File trustStoreFile; private MetadataVersion metadataVersion; -private Properties serverProperties = new Properties(); -private Properties producerProperties = new Properties(); -private Properties consumerProperties = new Properties(); -private Properties adminClientProperties = new Properties(); -private Properties saslServerProperties = new Properties(); -private Properties saslClientProperties = new Properties(); -private final Map perBrokerOverrideProperties = new HashMap<>(); +private Map serverProperties = new HashMap<>(); +private Map producerProperties = new HashMap<>(); +private Map consumerProperties = new HashMap<>(); +private Map adminClientProperties = new HashMap<>(); +private Map saslServerProperties = new HashMap<>(); +private Map saslClientProperties = new HashMap<>(); +private Map> perBrokerOverrideProperties = new HashMap<>(); + +Builder() {} + +Builder(ClusterConfig clusterConfig) { +this.type = clusterConfig.type; +this.brokers = clusterConfig.brokers; +this.controllers = clusterConfig.controllers; +this.name = clusterConfig.name; +this.autoStart = clusterConfig.autoStart; +this.securityProtocol = clusterConfig.securityProtocol; +this.listenerName = clusterConfig.listenerName; +this.trustStoreFile = clusterConfig.trustStoreFile; +this.metadataVersion = clusterConfig.metadataVersion; +this.serverProperties = new HashMap<>(clusterConfig.serverProperties); +this.producerProperties = new HashMap<>(clusterConfig.producerProperties); +this.consumerProperties = new HashMap<>(clusterConfig.consumerProperties); +this.adminClientProperties = new HashMap<>(clusterConfig.adminClientProperties); +this.saslServerProperties = new HashMap<>(clusterConfig.saslServerProperties); +this.saslClientProperties = new HashMap<>(clusterConfig.saslClientProperties); +Map> perBrokerOverrideProps = new HashMap<>(); +clusterConfig.perBrokerOverrideProperties.forEach((k, v) -> perBrokerOverrideProps.put(k, new HashMap<>(v))); +this.perBrokerOverrideProperties = perBrokerOverrideProps; Review Comment: ```java this.perBrokerOverrideProperties = clusterConfig.perBrokerOverrideProperties.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> new HashMap<>(e.getValue(; ``` ## core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala: ## @@ -18,41 +18,58 @@ package kafka.server import java.net.Socket import java.util.Collections - import kafka.api.{KafkaSasl, SaslSetup} -import kafka.test.annotation.{ClusterTest, Type} +import kafka.server.SaslApiVersionsRequestTest.{kafkaClientSaslMechanism, kafkaServerSaslMechanisms} +import kafka.test.annotation.{ClusterTemplate, Type} import kafka.test.junit.ClusterTestExtensions -import kafka.test.{ClusterConfig, ClusterInstance} +import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance} import kafka.utils.JaasTestUtils +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.message.SaslHandshakeRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, SaslHandshakeRequest, SaslHandshakeResponse} import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.server.config.KafkaSecurityConfigs import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.{AfterEach, BeforeEach} import scala.jdk.CollectionConverters._ +object SaslApiVersionsRequestTest { + val kafkaClientSaslMechanism = "PLAIN" + val kafkaServerSaslMechanisms: Seq[String] = List("PLAIN") + val controlPlaneListenerName = "CONTROL_PLANE" + val securityProtocol = SecurityProto
Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]
chia7712 commented on code in PR #15772: URL: https://github.com/apache/kafka/pull/15772#discussion_r1573848774 ## server/src/main/java/org/apache/kafka/server/config/Defaults.java: ## @@ -46,6 +42,10 @@ public class Defaults { public static final int METADATA_MAX_IDLE_INTERVAL_MS = 500; public static final int METADATA_MAX_RETENTION_BYTES = 100 * 1024 * 1024; public static final boolean DELETE_TOPIC_ENABLE = true; +public static final int REQUEST_TIMEOUT_MS = 3; +public static final long CONNECTION_SETUP_TIMEOUT_MS = CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS; +public static final long CONNECTION_SETUP_TIMEOUT_MAX_MS = CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS; Review Comment: ditto ## server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.network; + +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.server.config.ReplicationConfigs; +import java.util.Arrays; +import java.util.stream.Collectors; + +public class SocketServerConfigs { +public static final String LISTENER_SECURITY_PROTOCOL_MAP_CONFIG = "listener.security.protocol.map"; +public static final String LISTENER_SECURITY_PROTOCOL_MAP_DEFAULT = Arrays.stream(SecurityProtocol.values()) +.collect(Collectors.toMap(sp -> ListenerName.forSecurityProtocol(sp), sp -> sp)) Review Comment: `.collect(Collectors.toMap(ListenerName::forSecurityProtocol, sp -> sp))` ## server/src/main/java/org/apache/kafka/server/config/Defaults.java: ## @@ -46,6 +42,10 @@ public class Defaults { public static final int METADATA_MAX_IDLE_INTERVAL_MS = 500; public static final int METADATA_MAX_RETENTION_BYTES = 100 * 1024 * 1024; public static final boolean DELETE_TOPIC_ENABLE = true; +public static final int REQUEST_TIMEOUT_MS = 3; +public static final long CONNECTION_SETUP_TIMEOUT_MS = CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS; Review Comment: Do we need `CONNECTION_SETUP_TIMEOUT_MS` as it is used by `KafkaConfig.scala` only. ## server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.network; + +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.server.config.ReplicationConfigs; +import java.util.Arrays; +import java.util.stream.Collectors; + +public class SocketServerConfigs { Review Comment: Should we call it `ServerSocketConfigs` so as to align it with `ServerLogConfigs`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1573849975 ## core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala: ## @@ -17,48 +17,78 @@ package kafka.server -import kafka.test.{ClusterConfig, ClusterInstance} +import kafka.test.ClusterInstance import org.apache.kafka.common.message.ApiVersionsRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.ApiVersionsRequest -import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, ClusterTests, Type} import kafka.test.junit.ClusterTestExtensions import org.apache.kafka.server.common.MetadataVersion import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.extension.ExtendWith @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1) +@ClusterTestDefaults(brokers = 1) class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) { - @BeforeEach - def setup(config: ClusterConfig): Unit = { -super.brokerPropertyOverrides(config.serverProperties()) - } - - @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( -new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), -new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"), + @ClusterTests(Array( +new ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( Review Comment: Currently we can't add `ClusterTemplate` in ClusterTests. Should we do that is this PR ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1573850549 ## core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala: ## @@ -17,48 +17,78 @@ package kafka.server -import kafka.test.{ClusterConfig, ClusterInstance} +import kafka.test.ClusterInstance import org.apache.kafka.common.message.ApiVersionsRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.ApiVersionsRequest -import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, ClusterTests, Type} import kafka.test.junit.ClusterTestExtensions import org.apache.kafka.server.common.MetadataVersion import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.extension.ExtendWith @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1) +@ClusterTestDefaults(brokers = 1) class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) { - @BeforeEach - def setup(config: ClusterConfig): Unit = { -super.brokerPropertyOverrides(config.serverProperties()) - } - - @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( -new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), -new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"), + @ClusterTests(Array( +new ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( Review Comment: > Currently we can't add ClusterTemplate in ClusterTests. Should we do that is this PR ? Could you file a Jira for that? Also, we can leave a TODO here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]
OmniaGM commented on code in PR #15772: URL: https://github.com/apache/kafka/pull/15772#discussion_r1573850333 ## server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.network; + +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.server.config.ReplicationConfigs; +import java.util.Arrays; +import java.util.stream.Collectors; + +public class SocketServerConfigs { Review Comment: I name it `SocketServerConfigs ` instead of `ServerSocketConfigs` for the following reasons 1. this is part of `server.network` 2. we have `SocketServe` which will move at some point to same package. 3. java has something `java.net.ServerSocket` so we may need to differentiate between classes related `SocketServe` away from build in class. Am not very attached to the name but it made more sense for me during the refactor. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]
OmniaGM commented on code in PR #15772: URL: https://github.com/apache/kafka/pull/15772#discussion_r1573851135 ## server/src/main/java/org/apache/kafka/server/config/Defaults.java: ## @@ -46,6 +42,10 @@ public class Defaults { public static final int METADATA_MAX_IDLE_INTERVAL_MS = 500; public static final int METADATA_MAX_RETENTION_BYTES = 100 * 1024 * 1024; public static final boolean DELETE_TOPIC_ENABLE = true; +public static final int REQUEST_TIMEOUT_MS = 3; +public static final long CONNECTION_SETUP_TIMEOUT_MS = CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS; Review Comment: These will move soon in the final PR couple of prs for moving KafkaConfig out of core. So I would keep it for now until I fully finish this jira. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]
OmniaGM commented on code in PR #15772: URL: https://github.com/apache/kafka/pull/15772#discussion_r1573851135 ## server/src/main/java/org/apache/kafka/server/config/Defaults.java: ## @@ -46,6 +42,10 @@ public class Defaults { public static final int METADATA_MAX_IDLE_INTERVAL_MS = 500; public static final int METADATA_MAX_RETENTION_BYTES = 100 * 1024 * 1024; public static final boolean DELETE_TOPIC_ENABLE = true; +public static final int REQUEST_TIMEOUT_MS = 3; +public static final long CONNECTION_SETUP_TIMEOUT_MS = CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS; Review Comment: These will move soon in the final PR couple of prs for moving KafkaConfig out of core. So I would keep it for now until I fully finish this jira. `KafkaConfig.scala` will join the rest of the configs in `server` pretty soon. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1573851263 ## core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala: ## @@ -17,48 +17,78 @@ package kafka.server -import kafka.test.{ClusterConfig, ClusterInstance} +import kafka.test.ClusterInstance import org.apache.kafka.common.message.ApiVersionsRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.ApiVersionsRequest -import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, ClusterTests, Type} import kafka.test.junit.ClusterTestExtensions import org.apache.kafka.server.common.MetadataVersion import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.extension.ExtendWith @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1) +@ClusterTestDefaults(brokers = 1) class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) { - @BeforeEach - def setup(config: ClusterConfig): Unit = { -super.brokerPropertyOverrides(config.serverProperties()) - } - - @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( -new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), -new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"), + @ClusterTests(Array( +new ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( Review Comment: OK, let's talk about this in that JIRA. What I've wondering here is to simplify the duplicate ClusterConfigProperty here. Add `ClusterTemplate` to `ClusterTests` is a way. But maybe we can have other solutions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]
OmniaGM commented on code in PR #15772: URL: https://github.com/apache/kafka/pull/15772#discussion_r1573851135 ## server/src/main/java/org/apache/kafka/server/config/Defaults.java: ## @@ -46,6 +42,10 @@ public class Defaults { public static final int METADATA_MAX_IDLE_INTERVAL_MS = 500; public static final int METADATA_MAX_RETENTION_BYTES = 100 * 1024 * 1024; public static final boolean DELETE_TOPIC_ENABLE = true; +public static final int REQUEST_TIMEOUT_MS = 3; +public static final long CONNECTION_SETUP_TIMEOUT_MS = CommonClientConfigs.DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS; Review Comment: These will move soon in the final PR couple of prs for moving KafkaConfig out of core. There will be some refactoring to move more stuff into `CommonClientConfigs` and some to `ServerCommonConfigs`. So I would keep it for now until I fully finish this jira. `KafkaConfig.scala` will join the rest of the configs in `server` pretty soon. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Refactor KafkaConfig to use PasswordEncoderConfigs [kafka]
chia7712 merged PR #15770: URL: https://github.com/apache/kafka/pull/15770 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1573852618 ## core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala: ## @@ -17,48 +17,78 @@ package kafka.server -import kafka.test.{ClusterConfig, ClusterInstance} +import kafka.test.ClusterInstance import org.apache.kafka.common.message.ApiVersionsRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.ApiVersionsRequest -import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, ClusterTests, Type} import kafka.test.junit.ClusterTestExtensions import org.apache.kafka.server.common.MetadataVersion import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.extension.ExtendWith @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1) +@ClusterTestDefaults(brokers = 1) class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) { - @BeforeEach - def setup(config: ClusterConfig): Unit = { -super.brokerPropertyOverrides(config.serverProperties()) - } - - @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( -new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), -new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"), + @ClusterTests(Array( +new ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( Review Comment: Filed https://issues.apache.org/jira/browse/KAFKA-16595 ## core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala: ## @@ -17,48 +17,78 @@ package kafka.server -import kafka.test.{ClusterConfig, ClusterInstance} +import kafka.test.ClusterInstance import org.apache.kafka.common.message.ApiVersionsRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.ApiVersionsRequest -import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, ClusterTests, Type} import kafka.test.junit.ClusterTestExtensions import org.apache.kafka.server.common.MetadataVersion import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.extension.ExtendWith @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1) +@ClusterTestDefaults(brokers = 1) class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) { - @BeforeEach - def setup(config: ClusterConfig): Unit = { -super.brokerPropertyOverrides(config.serverProperties()) - } - - @ClusterTest(metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( -new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), -new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "true"), + @ClusterTests(Array( +new ClusterTest(clusterType = Type.ZK, metadataVersion = MetadataVersion.IBP_3_8_IV0, serverProperties = Array( Review Comment: gentle ping @chia7712 , Filed https://issues.apache.org/jira/browse/KAFKA-16595 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16595) Introduce template in ClusterTests
Kuan Po Tseng created KAFKA-16595: - Summary: Introduce template in ClusterTests Key: KAFKA-16595 URL: https://issues.apache.org/jira/browse/KAFKA-16595 Project: Kafka Issue Type: Improvement Reporter: Kuan Po Tseng Assignee: Kuan Po Tseng discussed in https://github.com/apache/kafka/pull/15761#discussion_r1573850549 Currently we can't apply any template in ClusterTests, thus we have to write down all ClusterConfigProperty in each ClusterTest inside ClusterTests. And that could leave bunch of duplicate code. We need to find a way to reduce the duplicate code. Introduce template in ClusterTests could be a solution. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]
chia7712 commented on PR #15772: URL: https://github.com/apache/kafka/pull/15772#issuecomment-2068132025 @OmniaGM I merge #15770 first since it is a smaller PR. Please fix conflicts. thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]
chia7712 commented on code in PR #15772: URL: https://github.com/apache/kafka/pull/15772#discussion_r1573853257 ## server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.network; + +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.server.config.ReplicationConfigs; +import java.util.Arrays; +import java.util.stream.Collectors; + +public class SocketServerConfigs { Review Comment: > Am not very attached to the name but it made more sense for me during the refactor. WDYT? that makes sense to me. +1 :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]
linu-shibu commented on code in PR #15620: URL: https://github.com/apache/kafka/pull/15620#discussion_r1573854456 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java: ## @@ -65,50 +59,45 @@ protected ApiMessage newApiMessage(short apiKey) { return MetadataRecordType.fromId(apiKey).newMetadataRecord(); } -protected final Map createRemoteLogMetadataTransforms() { -Map map = new HashMap<>(); -map.put(REMOTE_LOG_SEGMENT_METADATA_API_KEY, new RemoteLogSegmentMetadataTransform()); -map.put(REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY, new RemoteLogSegmentMetadataUpdateTransform()); -map.put(REMOTE_PARTITION_DELETE_API_KEY, new RemotePartitionDeleteMetadataTransform()); -map.put(REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY, new RemoteLogSegmentMetadataSnapshotTransform()); -return map; -} - -protected final Map createRemoteLogStorageClassToApiKeyMap() { -Map map = new HashMap<>(); -map.put(RemoteLogSegmentMetadata.class.getName(), REMOTE_LOG_SEGMENT_METADATA_API_KEY); -map.put(RemoteLogSegmentMetadataUpdate.class.getName(), REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY); -map.put(RemotePartitionDeleteMetadata.class.getName(), REMOTE_PARTITION_DELETE_API_KEY); -map.put(RemoteLogSegmentMetadataSnapshot.class.getName(), REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY); -return map; -} - public byte[] serialize(RemoteLogMetadata remoteLogMetadata) { -Short apiKey = remoteLogStorageClassToApiKey.get(remoteLogMetadata.getClass().getName()); -if (apiKey == null) { -throw new IllegalArgumentException("ApiKey for given RemoteStorageMetadata class: " + remoteLogMetadata.getClass() - + " does not exist."); -} -@SuppressWarnings("unchecked") -ApiMessageAndVersion apiMessageAndVersion = remoteLogMetadataTransform(apiKey).toApiMessageAndVersion(remoteLogMetadata); +ApiMessageAndVersion apiMessageAndVersion; +if (remoteLogMetadata instanceof RemoteLogSegmentMetadata) { +RemoteLogSegmentMetadataTransform metadataTransform = new RemoteLogSegmentMetadataTransform(); Review Comment: Updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]
gharris1727 commented on code in PR #15620: URL: https://github.com/apache/kafka/pull/15620#discussion_r1573855150 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java: ## @@ -46,69 +44,65 @@ public class RemoteLogMetadataSerde { private static final short REMOTE_PARTITION_DELETE_API_KEY = new RemotePartitionDeleteMetadataRecord().apiKey(); private static final short REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY = new RemoteLogSegmentMetadataSnapshotRecord().apiKey(); -private final Map remoteLogStorageClassToApiKey; -private final Map keyToTransform; private final BytesApiMessageSerde bytesApiMessageSerde; +private ApiMessageAndVersion apiMessageAndVersion; + +private final RemoteLogSegmentMetadataTransform remoteLogSegmentMetadataTransform; Review Comment: This looks pretty verbose, can you shorten the variable names? We're already in the `RemoteLogMetadataSerde`, so `remoteLog` and `Metadata` end up being visual clutter. ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java: ## @@ -46,69 +44,65 @@ public class RemoteLogMetadataSerde { private static final short REMOTE_PARTITION_DELETE_API_KEY = new RemotePartitionDeleteMetadataRecord().apiKey(); private static final short REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY = new RemoteLogSegmentMetadataSnapshotRecord().apiKey(); -private final Map remoteLogStorageClassToApiKey; -private final Map keyToTransform; private final BytesApiMessageSerde bytesApiMessageSerde; +private ApiMessageAndVersion apiMessageAndVersion; Review Comment: This shouldn't be an instance variable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1573855899 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -67,13 +69,16 @@ public class ClusterConfig { this.listenerName = listenerName; this.trustStoreFile = trustStoreFile; this.metadataVersion = metadataVersion; -this.serverProperties = copyOf(serverProperties); -this.producerProperties = copyOf(producerProperties); -this.consumerProperties = copyOf(consumerProperties); -this.adminClientProperties = copyOf(adminClientProperties); -this.saslServerProperties = copyOf(saslServerProperties); -this.saslClientProperties = copyOf(saslClientProperties); -perBrokerOverrideProperties.forEach((brokerId, props) -> this.perBrokerOverrideProperties.put(brokerId, copyOf(props))); +this.serverProperties = Collections.unmodifiableMap(serverProperties); +this.producerProperties = Collections.unmodifiableMap(producerProperties); +this.consumerProperties = Collections.unmodifiableMap(consumerProperties); +this.adminClientProperties = Collections.unmodifiableMap(adminClientProperties); +this.saslServerProperties = Collections.unmodifiableMap(saslServerProperties); +this.saslClientProperties = Collections.unmodifiableMap(saslClientProperties); +this.perBrokerOverrideProperties = Collections.unmodifiableMap( Review Comment: Pardon me, I'm not quite clear about this comment. Could you explain more ? :smiley: For convenience, this pr add something like `public Builder putServerProperty(String key, String value) ` in each configurations like `serverProperties`, `consumerProperties`. So we don't need to do deep copy in ClusterConfig.Builder. Or did you mean we need to use `setServerProperty(Map serverProperties)` instead of `putServerProperty(String key, String value)` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [DRAFT] KAFKA-16593: wip [kafka]
chia7712 commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1573855465 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,477 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; +import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; +import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.Collections.singleton; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; -assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); -} -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String missingGroup = "missing.group"; +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3, serverProperties = { +@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), +}) +public class DeleteConsumerGroupsTest { +private final ClusterInstance cluster; +private static final String TOPIC = "foo"; +private static final String GROUP = "test.group"; -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; -ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); +public DeleteConsumerGroupsTest(ClusterInstance cluster) { +this.cluster = cluster; +} -String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); -assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +@ClusterTest +public void testDeleteWithTopicOption() { +try (Admin admin = cluster.createAdminClient()) { +admin.createTopics(buildSingletonTestTopic()); Review Comment: please call `get` to make sure the request is completed. ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,477 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation
Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]
OmniaGM commented on code in PR #15772: URL: https://github.com/apache/kafka/pull/15772#discussion_r1573850333 ## server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.network; + +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.server.config.ReplicationConfigs; +import java.util.Arrays; +import java.util.stream.Collectors; + +public class SocketServerConfigs { Review Comment: I name it `SocketServerConfigs ` instead of `ServerSocketConfigs` for the following reasons 1. this is part of `server.network` 2. we have `SocketServe` which will move at some point to same package. 3. java has something with same name which is `java.net.ServerSocket` so we may need to differentiate between classes related `SocketServe` away from build in class. Am not very attached to the name but it made more sense for me during the refactor. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1573857883 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -67,13 +69,16 @@ public class ClusterConfig { this.listenerName = listenerName; this.trustStoreFile = trustStoreFile; this.metadataVersion = metadataVersion; -this.serverProperties = copyOf(serverProperties); -this.producerProperties = copyOf(producerProperties); -this.consumerProperties = copyOf(consumerProperties); -this.adminClientProperties = copyOf(adminClientProperties); -this.saslServerProperties = copyOf(saslServerProperties); -this.saslClientProperties = copyOf(saslClientProperties); -perBrokerOverrideProperties.forEach((brokerId, props) -> this.perBrokerOverrideProperties.put(brokerId, copyOf(props))); +this.serverProperties = Collections.unmodifiableMap(serverProperties); +this.producerProperties = Collections.unmodifiableMap(producerProperties); +this.consumerProperties = Collections.unmodifiableMap(consumerProperties); +this.adminClientProperties = Collections.unmodifiableMap(adminClientProperties); +this.saslServerProperties = Collections.unmodifiableMap(saslServerProperties); +this.saslClientProperties = Collections.unmodifiableMap(saslClientProperties); +this.perBrokerOverrideProperties = Collections.unmodifiableMap( Review Comment: The purpose of this PR is to make `ClusterConfig` be immutable. However, `ClusterConfig` have only immutable "view" of `serverProperties`, `producerProperties`, etc. It means `ClusterConfig` is still mutable object since callers can change inner variables of `ClusterConfig` by updating `ClusterConfig#Builder`. For example, user can call `putSaslServerProperty` to chagne `saslServerProperties` of `ClusterConfig` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1573858499 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -67,13 +69,16 @@ public class ClusterConfig { this.listenerName = listenerName; this.trustStoreFile = trustStoreFile; this.metadataVersion = metadataVersion; -this.serverProperties = copyOf(serverProperties); -this.producerProperties = copyOf(producerProperties); -this.consumerProperties = copyOf(consumerProperties); -this.adminClientProperties = copyOf(adminClientProperties); -this.saslServerProperties = copyOf(saslServerProperties); -this.saslClientProperties = copyOf(saslClientProperties); -perBrokerOverrideProperties.forEach((brokerId, props) -> this.perBrokerOverrideProperties.put(brokerId, copyOf(props))); +this.serverProperties = Collections.unmodifiableMap(serverProperties); +this.producerProperties = Collections.unmodifiableMap(producerProperties); +this.consumerProperties = Collections.unmodifiableMap(consumerProperties); +this.adminClientProperties = Collections.unmodifiableMap(adminClientProperties); +this.saslServerProperties = Collections.unmodifiableMap(saslServerProperties); +this.saslClientProperties = Collections.unmodifiableMap(saslClientProperties); +this.perBrokerOverrideProperties = Collections.unmodifiableMap( Review Comment: Got it, thanks for explanation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]
chia7712 commented on PR #15772: URL: https://github.com/apache/kafka/pull/15772#issuecomment-2068139760 ``` Code Warning FS Format string should use %n rather than \n in org.apache.kafka.network.SocketServerConfigs.() [Bug type VA_FORMAT_STRING_USES_NEWLINE (click for details)](file:///home/chia7712/project/kafka/server/build/reports/spotbugs/main.html#VA_FORMAT_STRING_USES_NEWLINE) In class org.apache.kafka.network.SocketServerConfigs In method org.apache.kafka.network.SocketServerConfigs.() Called method String.format(String, Object[]) Format string " If the listener name is not a security protocol, %s must also be set.\n" At SocketServerConfigs.java:[line 48] FS Format string should use %n rather than \n in org.apache.kafka.network.SocketServerConfigs.() [Bug type VA_FORMAT_STRING_USES_NEWLINE (click for details)](file:///home/chia7712/project/kafka/server/build/reports/spotbugs/main.html#VA_FORMAT_STRING_USES_NEWLINE) In class org.apache.kafka.network.SocketServerConfigs In method org.apache.kafka.network.SocketServerConfigs.() Called method String.format(String, Object[]) Format string "Listeners to publish to ZooKeeper for clients to use, if different than the %s config property. In IaaS environments, this may need to be different from the interface to which the broker binds. If this is not set, the value for %1$1s will be used. Unlike %1$1s, it is not valid to advertise the 0.0.0.0 meta-address.\n Also unlike %1$1s, there can be duplicated ports in this property, so that one listener can be configured to advertise another listener's address. This can be useful in some cases where external load balancers are used." At SocketServerConfigs.java:[line 60] FS Format string should use %n rather than \n in org.apache.kafka.network.SocketServerConfigs.() [Bug type VA_FORMAT_STRING_USES_NEWLINE (click for details)](file:///home/chia7712/project/kafka/server/build/reports/spotbugs/main.html#VA_FORMAT_STRING_USES_NEWLINE) In class org.apache.kafka.network.SocketServerConfigs In method org.apache.kafka.network.SocketServerConfigs.() Called method String.format(String, Object[]) Format string "Name of listener used for communication between controller and brokers. A broker will use the %s to locate the endpoint in $ListenersProp list, to listen for connections from the controller. For example, if a broker's config is:\nlisteners = INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSLcontrol.plane.listener.name = CONTROLLER\nOn startup, the broker will start listening on "192.1.1.8:9094" with security protocol "SSL".\nOn the controller side, when it discovers a broker's published endpoints through ZooKeeper, it will use the %1$1s to find the endpoint, which it will use to establish connection to the broker.\nFor example, if the broker's published endpoints on ZooKeeper are:\n "endpoints" : ["INTERNAL://broker1.example.com:9092","EXTERNAL://broker1.example.com:9093","CONTROLLER://broker1.example.com:9094"]< /code>\n and the controller's config is:\nlistener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSLcontrol.plane.listener.name = CONTROLLER\nthen the controller will use "broker1.example.com:9094" with security protocol "SSL" to connect to the broker.\nIf not explicitly configured, the default value will be null and there will be no dedicated endpoints for controller connections.\nIf explicitly configured, the value cannot be the same as the value of %s." At SocketServerConfigs.java:[line 71] ``` There are build error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]
linu-shibu commented on code in PR #15620: URL: https://github.com/apache/kafka/pull/15620#discussion_r1573870979 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java: ## @@ -46,69 +44,65 @@ public class RemoteLogMetadataSerde { private static final short REMOTE_PARTITION_DELETE_API_KEY = new RemotePartitionDeleteMetadataRecord().apiKey(); private static final short REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY = new RemoteLogSegmentMetadataSnapshotRecord().apiKey(); -private final Map remoteLogStorageClassToApiKey; -private final Map keyToTransform; private final BytesApiMessageSerde bytesApiMessageSerde; +private ApiMessageAndVersion apiMessageAndVersion; Review Comment: Okay, updating this to a class variable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]
linu-shibu commented on code in PR #15620: URL: https://github.com/apache/kafka/pull/15620#discussion_r1573871668 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java: ## @@ -46,69 +44,65 @@ public class RemoteLogMetadataSerde { private static final short REMOTE_PARTITION_DELETE_API_KEY = new RemotePartitionDeleteMetadataRecord().apiKey(); private static final short REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY = new RemoteLogSegmentMetadataSnapshotRecord().apiKey(); -private final Map remoteLogStorageClassToApiKey; -private final Map keyToTransform; private final BytesApiMessageSerde bytesApiMessageSerde; +private ApiMessageAndVersion apiMessageAndVersion; + +private final RemoteLogSegmentMetadataTransform remoteLogSegmentMetadataTransform; Review Comment: Updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]
gharris1727 commented on code in PR #15620: URL: https://github.com/apache/kafka/pull/15620#discussion_r1573875832 ## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java: ## @@ -46,69 +44,65 @@ public class RemoteLogMetadataSerde { private static final short REMOTE_PARTITION_DELETE_API_KEY = new RemotePartitionDeleteMetadataRecord().apiKey(); private static final short REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY = new RemoteLogSegmentMetadataSnapshotRecord().apiKey(); -private final Map remoteLogStorageClassToApiKey; -private final Map keyToTransform; private final BytesApiMessageSerde bytesApiMessageSerde; +private ApiMessageAndVersion apiMessageAndVersion; Review Comment: I meant it should be a local variable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16356 RemoteLogMetadataSerde: Serializer via class-name dispatch removed and replaced with if-elseif-else conditions [kafka]
linu-shibu commented on PR #15620: URL: https://github.com/apache/kafka/pull/15620#issuecomment-2068156085 > Thanks @linu-shibu this is a lot closer to what I expected. > > Can you also add the build.gradle patch I mentioned earlier? I think this is the only raw types used in the storage project. Done, I was under the impression that it was for debugging and fixing the warning in local -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]
chia7712 commented on code in PR #15772: URL: https://github.com/apache/kafka/pull/15772#discussion_r1573880096 ## server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.network; + +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.server.config.ReplicationConfigs; +import java.util.Arrays; +import java.util.stream.Collectors; + +public class SocketServerConfigs { +public static final String LISTENER_SECURITY_PROTOCOL_MAP_CONFIG = "listener.security.protocol.map"; +public static final String LISTENER_SECURITY_PROTOCOL_MAP_DEFAULT = Arrays.stream(SecurityProtocol.values()) +.collect(Collectors.toMap(ListenerName::forSecurityProtocol, sp -> sp)) +.entrySet() +.stream() +.map(entry -> entry.getKey().value() + ":" + entry.getValue().name()) +.collect(Collectors.joining(",")); +public static final String LISTENER_SECURITY_PROTOCOL_MAP_DOC = "Map between listener names and security protocols. This must be defined for " + +"the same security protocol to be usable in more than one port or IP. For example, internal and " + +"external traffic can be separated even if SSL is required for both. Concretely, the user could define listeners " + +"with names INTERNAL and EXTERNAL and this property as: INTERNAL:SSL,EXTERNAL:SSL. As shown, key and value are " + +"separated by a colon and map entries are separated by commas. Each listener name should only appear once in the map. " + +"Different security (SSL and SASL) settings can be configured for each listener by adding a normalised " + +"prefix (the listener name is lowercased) to the config name. For example, to set a different keystore for the " + +"INTERNAL listener, a config with name listener.name.internal.ssl.keystore.location would be set. " + +"If the config for the listener name is not set, the config will fallback to the generic config (i.e. ssl.keystore.location). " + +"Note that in KRaft a default mapping from the listener names defined by controller.listener.names to PLAINTEXT " + +"is assumed if no explicit mapping is provided and no other security protocol is in use."; + +public static final String LISTENERS_CONFIG = "listeners"; +public static final String LISTENERS_DEFAULT = "PLAINTEXT://:9092"; +public static final String LISTENERS_DOC = "Listener List - Comma-separated list of URIs we will listen on and the listener names." + +String.format(" If the listener name is not a security protocol, %s must also be set.%n", LISTENER_SECURITY_PROTOCOL_MAP_CONFIG) + +" Listener names and port numbers must be unique unless %n" + +" one listener is an IPv4 address and the other listener is %n" + +" an IPv6 address (for the same port).%n" + +" Specify hostname as 0.0.0.0 to bind to all interfaces.%n" + +" Leave hostname empty to bind to default interface.%n" + +" Examples of legal listener lists:%n" + +" PLAINTEXT://myhost:9092,SSL://:9091%n" + +" CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093%n" + +" PLAINTEXT://127.0.0.1:9092,SSL://[::1]:9092%n"; + +public static final String ADVERTISED_LISTENERS_CONFIG = "advertised.listeners"; +public static final String ADVERTISED_LISTENERS_DOC = String.format( +"Listeners to publish to ZooKeeper for clients to use, if different than the %s config property." + +" In IaaS environments, this may need to be different from the interface to which the broker binds." + +" If this is not set, the value for %1$1s will be used." + +" Unlike %1$1s, it is not valid to advertise the 0.0.0.0 meta-address.%n" + +" Also unlike %1$1s, there can be duplicated ports in this property," + +" so that one listener ca
Re: [PR] Fix typo [kafka]
Janmm14 commented on PR #15743: URL: https://github.com/apache/kafka/pull/15743#issuecomment-2068177862 @ijuma Apache CI might've run untrusted 3rd party code . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]
OmniaGM commented on code in PR #15772: URL: https://github.com/apache/kafka/pull/15772#discussion_r1573897839 ## server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.network; + +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.server.config.ReplicationConfigs; +import java.util.Arrays; +import java.util.stream.Collectors; + +public class SocketServerConfigs { +public static final String LISTENER_SECURITY_PROTOCOL_MAP_CONFIG = "listener.security.protocol.map"; +public static final String LISTENER_SECURITY_PROTOCOL_MAP_DEFAULT = Arrays.stream(SecurityProtocol.values()) +.collect(Collectors.toMap(ListenerName::forSecurityProtocol, sp -> sp)) +.entrySet() +.stream() +.map(entry -> entry.getKey().value() + ":" + entry.getValue().name()) +.collect(Collectors.joining(",")); +public static final String LISTENER_SECURITY_PROTOCOL_MAP_DOC = "Map between listener names and security protocols. This must be defined for " + +"the same security protocol to be usable in more than one port or IP. For example, internal and " + +"external traffic can be separated even if SSL is required for both. Concretely, the user could define listeners " + +"with names INTERNAL and EXTERNAL and this property as: INTERNAL:SSL,EXTERNAL:SSL. As shown, key and value are " + +"separated by a colon and map entries are separated by commas. Each listener name should only appear once in the map. " + +"Different security (SSL and SASL) settings can be configured for each listener by adding a normalised " + +"prefix (the listener name is lowercased) to the config name. For example, to set a different keystore for the " + +"INTERNAL listener, a config with name listener.name.internal.ssl.keystore.location would be set. " + +"If the config for the listener name is not set, the config will fallback to the generic config (i.e. ssl.keystore.location). " + +"Note that in KRaft a default mapping from the listener names defined by controller.listener.names to PLAINTEXT " + +"is assumed if no explicit mapping is provided and no other security protocol is in use."; + +public static final String LISTENERS_CONFIG = "listeners"; +public static final String LISTENERS_DEFAULT = "PLAINTEXT://:9092"; +public static final String LISTENERS_DOC = "Listener List - Comma-separated list of URIs we will listen on and the listener names." + +String.format(" If the listener name is not a security protocol, %s must also be set.%n", LISTENER_SECURITY_PROTOCOL_MAP_CONFIG) + +" Listener names and port numbers must be unique unless %n" + +" one listener is an IPv4 address and the other listener is %n" + +" an IPv6 address (for the same port).%n" + +" Specify hostname as 0.0.0.0 to bind to all interfaces.%n" + +" Leave hostname empty to bind to default interface.%n" + +" Examples of legal listener lists:%n" + +" PLAINTEXT://myhost:9092,SSL://:9091%n" + +" CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093%n" + +" PLAINTEXT://127.0.0.1:9092,SSL://[::1]:9092%n"; + +public static final String ADVERTISED_LISTENERS_CONFIG = "advertised.listeners"; +public static final String ADVERTISED_LISTENERS_DOC = String.format( +"Listeners to publish to ZooKeeper for clients to use, if different than the %s config property." + +" In IaaS environments, this may need to be different from the interface to which the broker binds." + +" If this is not set, the value for %1$1s will be used." + +" Unlike %1$1s, it is not valid to advertise the 0.0.0.0 meta-address.%n" + +" Also unlike %1$1s, there can be duplicated ports in this property," + +" so that one listener can
[PR] KAFKA-15853: Move quota configs into server-common package [kafka]
OmniaGM opened a new pull request, #15774: URL: https://github.com/apache/kafka/pull/15774 - Pull all quota configs from `KafkaConfig`, `DynamicConfig` and `LogConfig` into `ServerQuotaConfigs` in `server-commons` as replication configs are shared between server, storage and tools packages. - Moved all quota configs out of `ClientQuotaManagerConfig` and `ReplicationQuotaManagerConfig` as all quotas has the same default values for sample and window size. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Move KRAFT configs out of KafkaConfig [kafka]
OmniaGM opened a new pull request, #15775: URL: https://github.com/apache/kafka/pull/15775 - Move all Kraft configs/docs/defaults into `KRaftConfigs`. - Note: We have already `RaftConfig` but it seems to contain limited amount of configs that only configure `controller` raft and shouldn't include configs shared by both brokers/controller in KRAFT mode. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1574007862 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -173,7 +208,11 @@ private void produceRecord() { private void withStableConsumerGroup(Runnable body) { Consumer consumer = createConsumer(new Properties()); try { -TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, DEFAULT_MAX_WAIT_MS); +consumer.subscribe(Collections.singletonList(TOPIC)); +ConsumerRecords records = consumer.poll(Duration.ofMillis(DEFAULT_MAX_WAIT_MS)); +if (records.isEmpty()) { Review Comment: Hi @chia7712, thanks for the review. I have addressed all following comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16567) Add New Stream Metrics based on KIP-869
[ https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839440#comment-17839440 ] Matthias J. Sax commented on KAFKA-16567: - I see – this raises a few questions... Given that KIP-869 is not fully implemented yet, and the new metrics are not added, I am wondering if we can consider the other metric effectively deprecated or not? [~cadonna] WDYT? Should we push out KAFKA-16336 to 5.0 relaese? > Add New Stream Metrics based on KIP-869 > --- > > Key: KAFKA-16567 > URL: https://issues.apache.org/jira/browse/KAFKA-16567 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Walter Hernandez >Assignee: Walter Hernandez >Priority: Blocker > Labels: kip > Fix For: 4.0.0 > > > Add the following metrics to the state updater: > * restoring-active-tasks: count > * restoring-standby-tasks: count > * paused-active-tasks: count > * paused-standby-tasks: count > * idle-ratio: percentage > * restore-ratio: percentage > * checkpoint-ratio: percentage > * restore-records-total: count > * restore-records-rate: rate > * restore-call-rate: rate -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839441#comment-17839441 ] Matthias J. Sax commented on KAFKA-16514: - You are right that there is alway a member-id etc – I am not sure though if generating a random group.instance.id would be the right way forward. Maybe making the Consumer#close() call flexible and allow uses to pass in a CloseOption similar to what we do in KafkaStreams would be the cleaner approach? An alternative might be (not sure what the exact scope would be) to add a new AdminCiient method that allows to pass in a `member.id` to remove a consumer from the group? Another question is: is this "hack" we put into KS to not send a leave group request still relevant? A lot of things got improved on the rebalance protocol over the years, and it might not be necessary any longer? Curious to hear what [~ableegoldman] and [~cadonna] think. > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839452#comment-17839452 ] Sal Sorrentino commented on KAFKA-16514: IMHO: I think if you have a replication factor of 0 and your application uses persistent container (such as a k8s stateful pod) that it "could" be relevant...but I think the recommended replication factor would is 1, in which case an incremental rebalance is probably preferable to a partition blackout during an application bounce, especially if you are using spring for dependency injection as application boot times are not exactly speedy. > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839452#comment-17839452 ] Sal Sorrentino edited comment on KAFKA-16514 at 4/22/24 2:10 AM: - IMHO: I think if you have a replication factor of 0 and your application uses persistent container (such as a k8s stateful pod) that it "could" be relevant...but I think the recommended replication factor is 1, in which case an incremental rebalance is probably preferable to a partition blackout during an application bounce, especially if you are using spring for dependency injection as application boot times are not exactly speedy. was (Author: JIRAUSER305028): IMHO: I think if you have a replication factor of 0 and your application uses persistent container (such as a k8s stateful pod) that it "could" be relevant...but I think the recommended replication factor would is 1, in which case an incremental rebalance is probably preferable to a partition blackout during an application bounce, especially if you are using spring for dependency injection as application boot times are not exactly speedy. > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839452#comment-17839452 ] Sal Sorrentino edited comment on KAFKA-16514 at 4/22/24 2:13 AM: - IMHO: I think if you have a replication factor of 0 and your application uses persistent container (such as a k8s stateful pod) that it "could" be relevant...but I think the recommended replication factor is 1. In which case an incremental rebalance is probably preferable to a partition blackout during an application bounce, especially if you are using something like spring for dependency injection as application boot times are not exactly speedy. If you are using in memory state stores at all, you would want to leave the group in every scenario I would think. Long story short, I think the "hack" is relevant if you have a replication factor of 0. was (Author: JIRAUSER305028): IMHO: I think if you have a replication factor of 0 and your application uses persistent container (such as a k8s stateful pod) that it "could" be relevant...but I think the recommended replication factor is 1, in which case an incremental rebalance is probably preferable to a partition blackout during an application bounce, especially if you are using spring for dependency injection as application boot times are not exactly speedy. > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
showuon commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1574049613 ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -116,6 +123,57 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } + @Test + def testAlterReplicaLogDirsRequestWithRetention(): Unit = { +val partitionNum = 1 + +// Alter replica dir before topic creation +val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath +val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap +val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) + +// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions +val tp = new TopicPartition(topic, 0) +assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp)) +assertTrue(servers.head.logManager.getLog(tp).isEmpty) + +val topicProperties = new Properties() +topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024") +topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1") +topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024") + +createTopic(topic, partitionNum, 1, topicProperties) +assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent) + +// send enough records to trigger log rolling +(0 until 20).foreach { _ => + TestUtils.generateAndProduceMessages(servers, topic, 10, 1) +} +TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new TopicPartition(topic, 0)).get.numberOfSegments > 1, + "timed out waiting for log segment to roll") + +// Wait for log segment retention. Override initialTaskDelayMs as 5 seconds. +// The first retention task is executed after 5 seconds, so waiting for 10 seconds should be enough. +TestUtils.waitUntilTrue(() => { + new File(logDir1, tp.toString).listFiles().count(_.getName.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)) > 0 +}, "timed out waiting for log segment to retention", 1) Review Comment: nit: I think we can leave the timeout as default value. That is, removing the 3rd parameter directly. ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -37,6 +40,10 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { val topic = "topic" + override def brokerPropertyOverrides(properties: Properties): Unit = { +properties.put(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, "5000") Review Comment: Why do we need to wait 5 secs for it? I would say we can set to 0 to speed up the test. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on PR #15732: URL: https://github.com/apache/kafka/pull/15732#issuecomment-2068407055 @akhileshchg @mumrah @cmccabe , could you take a look when available. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org