Re: [PR] KAFKA-13152: Kip 770 buffer size fix [kafka]
vamossagar12 commented on PR #13283: URL: https://github.com/apache/kafka/pull/13283#issuecomment-2071431998 No worries and thanks for the proposed help! There are some things which I need to close out but I do hope to close it out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16588: broker shutdown hangs when log.segment.delete.delay.ms is zero [kafka]
showuon commented on PR #15773: URL: https://github.com/apache/kafka/pull/15773#issuecomment-2071375745 > Yeah, I tried to use unit test. However, I didn't find a good way to check whether kafka-delete-logs task has run or not. I also tried to check whether scheduler can be stopped. However, the mock scheduler can still add new task during scheduler shutdown. Could we use `KafkaScheduler` instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16587: Add subscription model information to group state [kafka]
rreddy-22 opened a new pull request, #15785: URL: https://github.com/apache/kafka/pull/15785 When all the subscriptions are equal, we iterate through every member and check equality for the topic lists - very expensive when there’s more members -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16164: Pre-Vote, modifying vote RPCs [part 1] [kafka]
github-actions[bot] commented on PR #15231: URL: https://github.com/apache/kafka/pull/15231#issuecomment-2071344453 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
showuon commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1575591635 ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } + @Test + def testAlterReplicaLogDirsRequestWithRetention(): Unit = { +val partitionNum = 1 + +// Alter replica dir before topic creation +val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath +val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap +val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) + +// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions +val tp = new TopicPartition(topic, 0) +assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp)) +assertTrue(servers.head.logManager.getLog(tp).isEmpty) + +val topicProperties = new Properties() +topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024") +topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1") +topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024") + +createTopic(topic, partitionNum, 1, topicProperties) +assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent) + +// send enough records to trigger log rolling +(0 until 20).foreach { _ => + TestUtils.generateAndProduceMessages(servers, topic, 10, 1) +} +TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new TopicPartition(topic, 0)).get.numberOfSegments > 1, + "timed out waiting for log segment to roll") + +// Wait for log segment retention. Override initialTaskDelayMs as 5 seconds. +// The first retention task is executed after 5 seconds, so waiting for 10 seconds should be enough. Review Comment: This comment is not correct now. ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } + @Test + def testAlterReplicaLogDirsRequestWithRetention(): Unit = { +val partitionNum = 1 + +// Alter replica dir before topic creation +val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath +val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap +val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) + +// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions +val tp = new TopicPartition(topic, 0) +assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp)) +assertTrue(servers.head.logManager.getLog(tp).isEmpty) + +val topicProperties = new Properties() +topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024") +topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1") Review Comment: I see. Make sense. It's for L158's assertion, right? Could you add a comment here to explain it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16603) Data loss when kafka connect sending data to Kafka
[ https://issues.apache.org/jira/browse/KAFKA-16603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anil Dasari updated KAFKA-16603: Description: We are experiencing a data loss when Kafka Source connector is failed to send data to Kafka topic and offset topic. Kafka cluster and Kafka connect details: # Kafka connect version i.e client : Confluent community version 7.3.1 i.e Kafka 3.3.1 # Kafka version: 0.11.0 (server) # Cluster size : 3 brokers # Number of partitions in all topics = 3 # Replication factor = 3 # Min ISR set 2 # Uses no transformations in Kafka connector # Use default error tolerance i.e None. Our connector checkpoints the offsets info received in SourceTask#commitRecord and resume the data process from the persisted checkpoint. The data loss is noticed when broker is unresponsive for few mins due to high load and kafka connector was restarted. Also, Kafka connector graceful shutdown failed. Logs: {code:java} [Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Discovered group coordinator 10.75.100.176:31000 (id: 2147483647 rack: null) Apr 22, 2024 @ 15:56:16.152 [Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be attempted. Apr 22, 2024 @ 15:56:16.153 [Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Requesting disconnect from last known coordinator 10.75.100.176:31000 (id: 2147483647 rack: null) Apr 22, 2024 @ 15:56:16.514 [Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 0 disconnected. Apr 22, 2024 @ 15:56:16.708 [Producer clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Node 0 disconnected. Apr 22, 2024 @ 15:56:16.710 [Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 2147483647 disconnected. Apr 22, 2024 @ 15:56:16.731 [Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable. isDisconnected: true. Rediscovery will be attempted. Apr 22, 2024 @ 15:56:19.103 == Trying to sleep while stop == (** custom log **) Apr 22, 2024 @ 15:56:19.755 [Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Broker coordinator was unreachable for 3000ms. Revoking previous assignment Assignment{error=0, leader='connect-1-8f41a1d2-6cc9-4956-9be3-1fbae9c6d305', leaderUrl='http://10.75.100.46:8083/', offset=4, connectorIds=[d094a5d7bbb046b99d62398cb84d648c], taskIds=[d094a5d7bbb046b99d62398cb84d648c-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} to avoid running tasks while not being a member the group Apr 22, 2024 @ 15:56:19.866 Stopping connector d094a5d7bbb046b99d62398cb84d648c Apr 22, 2024 @ 15:56:19.874 Stopping task d094a5d7bbb046b99d62398cb84d648c-0 Apr 22, 2024 @ 15:56:19.880 Scheduled shutdown for WorkerConnectorWorkerConnector{id=d094a5d7bbb046b99d62398cb84d648c} Apr 22, 2024 @ 15:56:24.105 Connector 'd094a5d7bbb046b99d62398cb84d648c' failed to properly shut down, has become unresponsive, and may be consuming external resources. Correct the configuration for this connector or remove the connector. After fixing the connector, it may be necessary to restart this worker to release any consumed resources. Apr 22, 2024 @ 15:56:24.110 [Producer clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Closing the Kafka producer with timeoutMillis = 0 ms. Apr 22, 2024 @ 15:56:24.110 [Producer clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms. Apr 22, 2024 @ 15:56:24.112 [Producer clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Beginning shutdown of Kafka producer I/O thread, sending remaining records. Apr 22, 2024 @ 15:56:24.112 [Producer clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Aborting incomplete batches due to forced shutdown Apr 22, 2024 @ 15:56:24.113 WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} Committing offsets Apr 22, 2024 @ 15:56:24.113 WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. Apr 22, 2024 @ 15:56:24.146 [Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Finished stopping tasks in preparation for rebalance Apr 22, 2024 @ 15:56:24.165 WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} Finished commitO
[jira] [Updated] (KAFKA-16603) Data loss when kafka connect sending data to Kafka
[ https://issues.apache.org/jira/browse/KAFKA-16603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anil Dasari updated KAFKA-16603: Description: We are experiencing a data loss when Kafka Source connector is failed to send data to Kafka topic and offset topic. Kafka cluster and Kafka connect details: # Kafka connect version i.e client : Confluent community version 7.3.1 i.e Kafka 3.3.1 # Kafka version: 0.11.0 (server) # Cluster size : 3 brokers # Number of partitions in all topics = 3 # Replication factor = 3 # Min ISR set 2 # Uses no transformations in Kafka connector # Use default error tolerance i.e None. Our connector checkpoints the offsets info received in SourceTask#commitRecord and resume the data process from the persisted checkpoint. The data loss is noticed when broker is unresponsive for few mins due to high load and kafka connector was restarted. However, Kafka connector graceful shutdown failed. Logs: {code:java} [Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Discovered group coordinator 10.75.100.176:31000 (id: 2147483647 rack: null) Apr 22, 2024 @ 15:56:16.152 [Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be attempted. Apr 22, 2024 @ 15:56:16.153 [Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Requesting disconnect from last known coordinator 10.75.100.176:31000 (id: 2147483647 rack: null) Apr 22, 2024 @ 15:56:16.514 [Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 0 disconnected. Apr 22, 2024 @ 15:56:16.708 [Producer clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Node 0 disconnected. Apr 22, 2024 @ 15:56:16.710 [Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 2147483647 disconnected. Apr 22, 2024 @ 15:56:16.731 [Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable. isDisconnected: true. Rediscovery will be attempted. Apr 22, 2024 @ 15:56:19.103 == Trying to sleep while stop == (** custom log **) Apr 22, 2024 @ 15:56:19.755 [Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Broker coordinator was unreachable for 3000ms. Revoking previous assignment Assignment{error=0, leader='connect-1-8f41a1d2-6cc9-4956-9be3-1fbae9c6d305', leaderUrl='http://10.75.100.46:8083/', offset=4, connectorIds=[d094a5d7bbb046b99d62398cb84d648c], taskIds=[d094a5d7bbb046b99d62398cb84d648c-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} to avoid running tasks while not being a member the group Apr 22, 2024 @ 15:56:19.866 Stopping connector d094a5d7bbb046b99d62398cb84d648c Apr 22, 2024 @ 15:56:19.874 Stopping task d094a5d7bbb046b99d62398cb84d648c-0 Apr 22, 2024 @ 15:56:19.880 Scheduled shutdown for WorkerConnectorWorkerConnector{id=d094a5d7bbb046b99d62398cb84d648c} Apr 22, 2024 @ 15:56:24.105 Connector 'd094a5d7bbb046b99d62398cb84d648c' failed to properly shut down, has become unresponsive, and may be consuming external resources. Correct the configuration for this connector or remove the connector. After fixing the connector, it may be necessary to restart this worker to release any consumed resources. Apr 22, 2024 @ 15:56:24.110 [Producer clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Closing the Kafka producer with timeoutMillis = 0 ms. Apr 22, 2024 @ 15:56:24.110 [Producer clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms. Apr 22, 2024 @ 15:56:24.112 [Producer clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Beginning shutdown of Kafka producer I/O thread, sending remaining records. Apr 22, 2024 @ 15:56:24.112 [Producer clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Aborting incomplete batches due to forced shutdown Apr 22, 2024 @ 15:56:24.113 WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} Committing offsets Apr 22, 2024 @ 15:56:24.113 WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. Apr 22, 2024 @ 15:56:24.146 [Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Finished stopping tasks in preparation for rebalance Apr 22, 2024 @ 15:56:24.165 WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} Finished comm
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1575583127 ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } + @Test + def testAlterReplicaLogDirsRequestWithRetention(): Unit = { +val partitionNum = 1 + +// Alter replica dir before topic creation +val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath +val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap +val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) + +// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions +val tp = new TopicPartition(topic, 0) +assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp)) +assertTrue(servers.head.logManager.getLog(tp).isEmpty) + +val topicProperties = new Properties() +topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024") +topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1") Review Comment: The default value is 1 minute. We want to trigger some files have `.deleted` suffix, but not be removed too fast, because we want to wait for dir movement happened. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1575583127 ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } + @Test + def testAlterReplicaLogDirsRequestWithRetention(): Unit = { +val partitionNum = 1 + +// Alter replica dir before topic creation +val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath +val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap +val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) + +// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions +val tp = new TopicPartition(topic, 0) +assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp)) +assertTrue(servers.head.logManager.getLog(tp).isEmpty) + +val topicProperties = new Properties() +topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024") +topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1") Review Comment: The default value is 1 minute. We want to trigger some files have `.deleted` subfix, but not be removed too fast, because we want to wait for dir movement happened. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16603) Data loss when kafka connect sending data to Kafka
Anil Dasari created KAFKA-16603: --- Summary: Data loss when kafka connect sending data to Kafka Key: KAFKA-16603 URL: https://issues.apache.org/jira/browse/KAFKA-16603 Project: Kafka Issue Type: Bug Components: clients, producer Affects Versions: 3.3.1 Reporter: Anil Dasari We are experiencing a data loss when Kafka Source connector is failed to send data to Kafka topic and offset topic. Kafka cluster and Kafka connect details: # Kafka version : Confluent community version 7.3.1 i.e Kafka 3.3.1 # Cluster size : 3 brokers # Number of partitions in all topics = 3 # Replication factor = 3 # Min ISR set 2 # Uses no transformations in Kafka connector # Use default error tolerance i.e None. Our connector checkpoints the offsets info received in SourceTask#commitRecord and resume the data process from the persisted checkpoint. The data loss is noticed when broker is unresponsive for few mins due to high load and kafka connector was restarted. However, Kafka connector graceful shutdown failed. Logs: {code:java} [Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Discovered group coordinator 10.75.100.176:31000 (id: 2147483647 rack: null) Apr 22, 2024 @ 15:56:16.152 [Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be attempted. Apr 22, 2024 @ 15:56:16.153 [Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Requesting disconnect from last known coordinator 10.75.100.176:31000 (id: 2147483647 rack: null) Apr 22, 2024 @ 15:56:16.514 [Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 0 disconnected. Apr 22, 2024 @ 15:56:16.708 [Producer clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Node 0 disconnected. Apr 22, 2024 @ 15:56:16.710 [Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 2147483647 disconnected. Apr 22, 2024 @ 15:56:16.731 [Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable. isDisconnected: true. Rediscovery will be attempted. Apr 22, 2024 @ 15:56:19.103 == Trying to sleep while stop == (** custom log **) Apr 22, 2024 @ 15:56:19.755 [Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Broker coordinator was unreachable for 3000ms. Revoking previous assignment Assignment{error=0, leader='connect-1-8f41a1d2-6cc9-4956-9be3-1fbae9c6d305', leaderUrl='http://10.75.100.46:8083/', offset=4, connectorIds=[d094a5d7bbb046b99d62398cb84d648c], taskIds=[d094a5d7bbb046b99d62398cb84d648c-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} to avoid running tasks while not being a member the group Apr 22, 2024 @ 15:56:19.866 Stopping connector d094a5d7bbb046b99d62398cb84d648c Apr 22, 2024 @ 15:56:19.874 Stopping task d094a5d7bbb046b99d62398cb84d648c-0 Apr 22, 2024 @ 15:56:19.880 Scheduled shutdown for WorkerConnectorWorkerConnector{id=d094a5d7bbb046b99d62398cb84d648c} Apr 22, 2024 @ 15:56:24.105 Connector 'd094a5d7bbb046b99d62398cb84d648c' failed to properly shut down, has become unresponsive, and may be consuming external resources. Correct the configuration for this connector or remove the connector. After fixing the connector, it may be necessary to restart this worker to release any consumed resources. Apr 22, 2024 @ 15:56:24.110 [Producer clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Closing the Kafka producer with timeoutMillis = 0 ms. Apr 22, 2024 @ 15:56:24.110 [Producer clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms. Apr 22, 2024 @ 15:56:24.112 [Producer clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Beginning shutdown of Kafka producer I/O thread, sending remaining records. Apr 22, 2024 @ 15:56:24.112 [Producer clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Aborting incomplete batches due to forced shutdown Apr 22, 2024 @ 15:56:24.113 WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} Committing offsets Apr 22, 2024 @ 15:56:24.113 WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. Apr 22, 2024 @ 15:56:24.146 [Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Finished stopping tasks in p
Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]
showuon commented on code in PR #15748: URL: https://github.com/apache/kafka/pull/15748#discussion_r1575561675 ## core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala: ## @@ -121,9 +121,9 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati // Set the last modified time to an old value to force deletion of old segments val endOffset = log.logEndOffset log.logSegments.foreach(_.lastModified = time.milliseconds - (2 * retentionMs)) -TestUtils.waitUntilTrue(() => log.logStartOffset == endOffset, +TestUtils.waitUntilTrue(() => 1 == log.numberOfSegments, "Timed out waiting for deletion of old segments") -assertEquals(1, log.numberOfSegments) +assertEquals(log.logStartOffset, endOffset) Review Comment: I'm re-triggering the CI since there are 2 jobs not finished. Meanwhile, could you help explain why we need these 2 changes in v3.6, but not in v3.7/trunk? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Reduce the time taken to execute the TieredStorage tests. [kafka]
showuon merged PR #15780: URL: https://github.com/apache/kafka/pull/15780 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
showuon commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1575558298 ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } + @Test + def testAlterReplicaLogDirsRequestWithRetention(): Unit = { +val partitionNum = 1 + +// Alter replica dir before topic creation +val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath +val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap +val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) + +// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions +val tp = new TopicPartition(topic, 0) +assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp)) +assertTrue(servers.head.logManager.getLog(tp).isEmpty) + +val topicProperties = new Properties() +topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024") +topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1") Review Comment: Why 10 seconds here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Reduce the time taken to execute the TieredStorage tests. [kafka]
kamalcph commented on PR #15780: URL: https://github.com/apache/kafka/pull/15780#issuecomment-2071280063 Test failures are unrelated. 3 remote storage tests were failed, those are existing flaky tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]
kamalcph commented on PR #15748: URL: https://github.com/apache/kafka/pull/15748#issuecomment-2071274526 @showuon Test failures are fixed! Please take another look. Let me know if we have to fix the tests in 3.7 and trunk (but they weren't flaky). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor
[ https://issues.apache.org/jira/browse/KAFKA-16585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839910#comment-17839910 ] Matthias J. Sax commented on KAFKA-16585: - Well, the use-case make sense, but the question is, how can the runtime ensure that the key is not changed? The idea of `FixedKeyProcessor` is to ensure that the key is not changed, but when we allow to set a key, you could set anything and the runtime cannot ensure that the key is "correct". It would be up the user-code to do the right thing... what it unclear from your use-case description is, why can't you use a regular `Processor`? > No way to forward message from punctuation method in the FixedKeyProcessor > -- > > Key: KAFKA-16585 > URL: https://issues.apache.org/jira/browse/KAFKA-16585 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.2 >Reporter: Stanislav Spiridonov >Priority: Major > > The FixedKeyProcessorContext can forward only FixedKeyRecord. This class > doesn't have a public constructor and can be created based on existing > records. But such record usually is absent in the punctuation method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16567) Add New Stream Metrics based on KIP-869
[ https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839907#comment-17839907 ] Matthias J. Sax commented on KAFKA-16567: - Thanks Bruno – makes sense to me – should we move K16336 out of the 4.0 ticket, and into the 5.0 one? If we enable state updater in 3.8, it need to honor our deprecation period and can only remove the metrics in 5.0, right? For this ticket, it'll link it to the KIP and update it accordingly. It's not a blocker for 4.0. > Add New Stream Metrics based on KIP-869 > --- > > Key: KAFKA-16567 > URL: https://issues.apache.org/jira/browse/KAFKA-16567 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Walter Hernandez >Priority: Blocker > Labels: kip > Fix For: 4.0.0 > > > Add the following metrics to the state updater: > * restoring-active-tasks: count > * restoring-standby-tasks: count > * paused-active-tasks: count > * paused-standby-tasks: count > * idle-ratio: percentage > * restore-ratio: percentage > * checkpoint-ratio: percentage > * restore-records-total: count > * restore-records-rate: rate > * restore-call-rate: rate -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16567) Add New Stream Metrics based on KIP-869
[ https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16567: Priority: Major (was: Blocker) > Add New Stream Metrics based on KIP-869 > --- > > Key: KAFKA-16567 > URL: https://issues.apache.org/jira/browse/KAFKA-16567 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Walter Hernandez >Priority: Major > Labels: kip > > Add the following metrics to the state updater: > * restoring-active-tasks: count > * restoring-standby-tasks: count > * paused-active-tasks: count > * paused-standby-tasks: count > * idle-ratio: percentage > * restore-ratio: percentage > * checkpoint-ratio: percentage > * restore-records-total: count > * restore-records-rate: rate > * restore-call-rate: rate -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16567) Add New Stream Metrics based on KIP-869
[ https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16567: Fix Version/s: (was: 4.0.0) > Add New Stream Metrics based on KIP-869 > --- > > Key: KAFKA-16567 > URL: https://issues.apache.org/jira/browse/KAFKA-16567 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Walter Hernandez >Priority: Blocker > Labels: kip > > Add the following metrics to the state updater: > * restoring-active-tasks: count > * restoring-standby-tasks: count > * paused-active-tasks: count > * paused-standby-tasks: count > * idle-ratio: percentage > * restore-ratio: percentage > * checkpoint-ratio: percentage > * restore-records-total: count > * restore-records-rate: rate > * restore-call-rate: rate -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839905#comment-17839905 ] Matthias J. Sax commented on KAFKA-16514: - Well, in general we can, but the internal flag / config we set on the consumer is immutable – configs are in general immutable, so it's not something we can just change. Thus, to get some flexibility into `close()` we need a KIP to change the consumer. > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16466) QuorumController is swallowing some exception messages
[ https://issues.apache.org/jira/browse/KAFKA-16466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839902#comment-17839902 ] Igor Soarez commented on KAFKA-16466: - Cherry-picked to 3.7 > QuorumController is swallowing some exception messages > -- > > Key: KAFKA-16466 > URL: https://issues.apache.org/jira/browse/KAFKA-16466 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 3.7.0 >Reporter: David Arthur >Assignee: Ilya Zakharov >Priority: Major > Labels: good-first-issue > Fix For: 3.8.0, 3.7.1 > > > In some cases in QuorumController, we throw exceptions from the control > manager methods. Unless these are explicitly caught and handled, they will > eventually bubble up to the ControllerReadEvent/ControllerWriteEvent an hit > the generic error handler. > In the generic error handler of QuorumController, we examine the exception to > determine if it is a fault or not. In the case where it is not a fault, we > log the error like: > {code:java} > log.info("{}: {}", name, failureMessage); > {code} > which results in messages like > {code:java} > [2024-04-02 16:08:38,078] INFO [QuorumController id=3000] registerBroker: > event failed with UnsupportedVersionException in 167 microseconds. > (org.apache.kafka.controller.QuorumController:544) > {code} > In this case, the exception actually has more details in its own message > {code:java} > Unable to register because the broker does not support version 8 of > metadata.version. It wants a version between 20 and 20, inclusive. > {code} > We should include the exception's message in the log output for non-fault > errors as it includes very useful debugging info. > This was found while writing an integration test for KRaft migration where > the brokers and controllers have a mismatched MetadataVersion. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16466: add details of an internal exception to the failure message [kafka]
soarez merged PR #15701: URL: https://github.com/apache/kafka/pull/15701 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16466: add details of an internal exception to the failure message [kafka]
soarez commented on PR #15701: URL: https://github.com/apache/kafka/pull/15701#issuecomment-2071198673 Failing tests are unrelated, except for `org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord()` ([KAFKA-16602](https://issues.apache.org/jira/browse/KAFKA-16602)) which didn't reproduce locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16602) Flaky test – org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord()
Igor Soarez created KAFKA-16602: --- Summary: Flaky test – org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord() Key: KAFKA-16602 URL: https://issues.apache.org/jira/browse/KAFKA-16602 Project: Kafka Issue Type: Test Components: controller Reporter: Igor Soarez org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord() failed with: h4. Error {code:java} org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: exception while renouncing leadership: Attempt to resign from epoch 1 which is larger than the current epoch 0{code} h4. Stacktrace {code:java} org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: exception while renouncing leadership: Attempt to resign from epoch 1 which is larger than the current epoch 0 at app//org.apache.kafka.metalog.LocalLogManager.resign(LocalLogManager.java:808) at app//org.apache.kafka.controller.QuorumController.renounce(QuorumController.java:1270) at app//org.apache.kafka.controller.QuorumController.handleEventException(QuorumController.java:547) at app//org.apache.kafka.controller.QuorumController.access$800(QuorumController.java:179) at app//org.apache.kafka.controller.QuorumController$ControllerWriteEvent.complete(QuorumController.java:881) at app//org.apache.kafka.controller.QuorumController$ControllerWriteEvent.handleException(QuorumController.java:871) at app//org.apache.kafka.queue.KafkaEventQueue$EventContext.completeWithException(KafkaEventQueue.java:149) at app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:138) at app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211) at app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182) at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.IllegalArgumentException: Attempt to resign from epoch 1 which is larger than the current epoch 0{code} Source: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15701/3/tests/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16601) Flaky test – org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest.testClosingQuorumControllerClosesMetrics()
Igor Soarez created KAFKA-16601: --- Summary: Flaky test – org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest.testClosingQuorumControllerClosesMetrics() Key: KAFKA-16601 URL: https://issues.apache.org/jira/browse/KAFKA-16601 Project: Kafka Issue Type: Test Components: controller Reporter: Igor Soarez org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest.testClosingQuorumControllerClosesMetrics() failed with: h4. Error {code:java} org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: exception while renouncing leadership: Attempt to resign from epoch 1 which is larger than the current epoch 0{code} h4. Stacktrace {code:java} org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: exception while renouncing leadership: Attempt to resign from epoch 1 which is larger than the current epoch 0 at app//org.apache.kafka.metalog.LocalLogManager.resign(LocalLogManager.java:808) at app//org.apache.kafka.controller.QuorumController.renounce(QuorumController.java:1270) at app//org.apache.kafka.controller.QuorumController.handleEventException(QuorumController.java:547) at app//org.apache.kafka.controller.QuorumController.access$800(QuorumController.java:179) at app//org.apache.kafka.controller.QuorumController$ControllerWriteEvent.complete(QuorumController.java:881) at app//org.apache.kafka.controller.QuorumController$ControllerWriteEvent.handleException(QuorumController.java:871) at app//org.apache.kafka.queue.KafkaEventQueue$EventContext.completeWithException(KafkaEventQueue.java:149) at app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:138) at app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211) at app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182) at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.IllegalArgumentException: Attempt to resign from epoch 1 which is larger than the current epoch 0{code} Source: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15701/3/tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16577: New consumer fails with stop within allotted timeout in consumer_test.py system test [kafka]
kirktrue opened a new pull request, #15784: URL: https://github.com/apache/kafka/pull/15784 WIP ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]
kirktrue commented on PR #15723: URL: https://github.com/apache/kafka/pull/15723#issuecomment-2071054599 @lucasbru, @lianetm, @philipnee—I have re-introduced the `lastSentMs` instance variable, not as a driver for logic, but for informational purposes in the logs. This is ready for re-review. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16600) Periodically receive "Failed to transition to PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN" during streams close
[ https://issues.apache.org/jira/browse/KAFKA-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839871#comment-17839871 ] Alex Leung commented on KAFKA-16600: Hi [~ableegoldman] – added a log with more context! > Periodically receive "Failed to transition to PENDING_SHUTDOWN, current state > is PENDING_SHUTDOWN" during streams close > --- > > Key: KAFKA-16600 > URL: https://issues.apache.org/jira/browse/KAFKA-16600 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.2 >Reporter: Alex Leung >Priority: Minor > Attachments: kafka-16600.log > > > From time to time, we observe the following ERROR message during streams > close: > {code:java} > 2024-04-13 07:40:16,222 INFO o.a.k.s.KafkaStreams [32] stream-client > [testapp-83aa0e09-a6ef-45e4-9393-69dfba4928bf] State transition from RUNNING > to PENDING_SHUTDOWN > 2024-04-13 07:40:16,222 ERROR o.a.k.s.KafkaStreams [55] stream-client > [testapp-83aa0e09-a6ef-45e4-9393-69dfba4928bf] Failed to transition to > PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN > {code} > These ERRORs started showing up when we moved from 2.7.2 to 3.3.2 and we have > not changed any code related to streams shutdown. > When the problem does not occur (most of the time), it looks like the > following: > {code:java} > 2024-04-13 07:40:11,333 INFO o.a.k.s.KafkaStreams [55] stream-client > [testapp-e2212b31-e9c2-4c75-92f6-1e557e27bf66] State transition from RUNNING > to PENDING_SHUTDOWN > 2024-04-13 07:40:11,341 INFO o.a.k.s.KafkaStreams [32] stream-client > [testapp-e2212b31-e9c2-4c75-92f6-1e557e27bf66] Streams client is in > PENDING_SHUTDOWN, all resources are being closed and the client will be > stopped. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16600) Periodically receive "Failed to transition to PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN" during streams close
[ https://issues.apache.org/jira/browse/KAFKA-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alex Leung updated KAFKA-16600: --- Attachment: kafka-16600.log > Periodically receive "Failed to transition to PENDING_SHUTDOWN, current state > is PENDING_SHUTDOWN" during streams close > --- > > Key: KAFKA-16600 > URL: https://issues.apache.org/jira/browse/KAFKA-16600 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.2 >Reporter: Alex Leung >Priority: Minor > Attachments: kafka-16600.log > > > From time to time, we observe the following ERROR message during streams > close: > {code:java} > 2024-04-13 07:40:16,222 INFO o.a.k.s.KafkaStreams [32] stream-client > [testapp-83aa0e09-a6ef-45e4-9393-69dfba4928bf] State transition from RUNNING > to PENDING_SHUTDOWN > 2024-04-13 07:40:16,222 ERROR o.a.k.s.KafkaStreams [55] stream-client > [testapp-83aa0e09-a6ef-45e4-9393-69dfba4928bf] Failed to transition to > PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN > {code} > These ERRORs started showing up when we moved from 2.7.2 to 3.3.2 and we have > not changed any code related to streams shutdown. > When the problem does not occur (most of the time), it looks like the > following: > {code:java} > 2024-04-13 07:40:11,333 INFO o.a.k.s.KafkaStreams [55] stream-client > [testapp-e2212b31-e9c2-4c75-92f6-1e557e27bf66] State transition from RUNNING > to PENDING_SHUTDOWN > 2024-04-13 07:40:11,341 INFO o.a.k.s.KafkaStreams [32] stream-client > [testapp-e2212b31-e9c2-4c75-92f6-1e557e27bf66] Streams client is in > PENDING_SHUTDOWN, all resources are being closed and the client will be > stopped. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: Fix logging of KafkaStreams close [kafka]
appchemist opened a new pull request, #15783: URL: https://github.com/apache/kafka/pull/15783 Remove a unnecessary "}" in logging of KafkaStreams close ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16565) IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned
[ https://issues.apache.org/jira/browse/KAFKA-16565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16565: -- Reviewer: Lucas Brutschy Description: In {{verifiable_consumer.py}}, the {{IncrementalAssignmentConsumerEventHandler}} contains this logic: {code:python} def handle_partitions_revoked(self, event): self.revoked_count += 1 self.state = ConsumerState.Rebalancing self.position = {} for topic_partition in event["partitions"]: topic = topic_partition["topic"] partition = topic_partition["partition"] self.assignment.remove(TopicPartition(topic, partition)) {code} If the {{self.assignment.remove()}} call is passed a {{TopicPartition}} that isn't in the list, a generic Python list error is thrown. For now, we should first check that the {{TopicPartition}} is in the list with an {{assert}} that provides better information . was: In {{{}verifiable_consumer.py{}}}, the Incremental {code:java} def handle_partitions_revoked(self, event): self.revoked_count += 1 self.state = ConsumerState.Rebalancing self.position = {} for topic_partition in event["partitions"]: topic = topic_partition["topic"] partition = topic_partition["partition"] self.assignment.remove(TopicPartition(topic, partition)) {code} If the {{self.assignment.remove()}} call is passed a {{TopicPartition}} that isn't in the list, an error is thrown. For now, we should first check that the {{TopicPartition}} is in the list, and if not, log a warning or something. > IncrementalAssignmentConsumerEventHandler throws error when attempting to > remove a partition that isn't assigned > > > Key: KAFKA-16565 > URL: https://issues.apache.org/jira/browse/KAFKA-16565 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.8.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, kip-848-client-support, > system-tests > Fix For: 3.8.0 > > > In {{verifiable_consumer.py}}, the > {{IncrementalAssignmentConsumerEventHandler}} contains this logic: > {code:python} > def handle_partitions_revoked(self, event): > self.revoked_count += 1 > self.state = ConsumerState.Rebalancing > self.position = {} > for topic_partition in event["partitions"]: > topic = topic_partition["topic"] > partition = topic_partition["partition"] > self.assignment.remove(TopicPartition(topic, partition)) > {code} > If the {{self.assignment.remove()}} call is passed a {{TopicPartition}} that > isn't in the list, a generic Python list error is thrown. For now, we should > first check that the {{TopicPartition}} is in the list with an {{assert}} > that provides better information . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16461) New consumer fails to consume records in security_test.py system test
[ https://issues.apache.org/jira/browse/KAFKA-16461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16461: -- Reviewer: Lucas Brutschy > New consumer fails to consume records in security_test.py system test > - > > Key: KAFKA-16461 > URL: https://issues.apache.org/jira/browse/KAFKA-16461 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > The {{security_test.py}} system test fails with the following error: > {quote} > * Consumer failed to consume up to offsets > {quote} > Affected test: > * {{test_client_ssl_endpoint_validation_failure}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16565: IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned [kafka]
kirktrue commented on PR #15737: URL: https://github.com/apache/kafka/pull/15737#issuecomment-2070963749 @lucasbru—Can you review this change to the consumer system test harness? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16565: IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned [kafka]
kirktrue commented on code in PR #15737: URL: https://github.com/apache/kafka/pull/15737#discussion_r1575365134 ## tests/kafkatest/services/verifiable_consumer.py: ## @@ -140,22 +150,32 @@ class IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler): def __init__(self, node, verify_offsets, idx): super().__init__(node, verify_offsets, idx) -def handle_partitions_revoked(self, event): +def handle_partitions_revoked(self, event, node, logger): self.revoked_count += 1 self.state = ConsumerState.Rebalancing self.position = {} +revoked = [] + for topic_partition in event["partitions"]: -topic = topic_partition["topic"] -partition = topic_partition["partition"] -self.assignment.remove(TopicPartition(topic, partition)) +tp = _create_partition_from_dict(topic_partition) -def handle_partitions_assigned(self, event): +if tp in self.assignment: +self.assignment.remove(tp) +revoked.append(tp) +else: +logger.warn("Could not remove topic partition %s from assignment as it was not previously assigned to %s" % (tp, node.account.hostname)) Review Comment: @lianetm—I changed the logging to an `assert` that provides useful information for troubleshooting: ```python tp = _create_partition_from_dict(topic_partition) assert tp in self.assignment, \ "Topic partition %s cannot be revoked from %s as it was not previously assigned to that consumer" % \ (tp, node.account.hostname) self.assignment.remove(tp) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16600) Periodically receive "Failed to transition to PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN" during streams close
[ https://issues.apache.org/jira/browse/KAFKA-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839850#comment-17839850 ] A. Sophie Blee-Goldman commented on KAFKA-16600: Thanks [~aleung181] – definitely sounds like a bug. Hard to diagnose without more context though – can you share a bit more of the logs? A minute or so leading up to the error message should hopefully be enough > Periodically receive "Failed to transition to PENDING_SHUTDOWN, current state > is PENDING_SHUTDOWN" during streams close > --- > > Key: KAFKA-16600 > URL: https://issues.apache.org/jira/browse/KAFKA-16600 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.2 >Reporter: Alex Leung >Priority: Minor > > From time to time, we observe the following ERROR message during streams > close: > {code:java} > 2024-04-13 07:40:16,222 INFO o.a.k.s.KafkaStreams [32] stream-client > [testapp-83aa0e09-a6ef-45e4-9393-69dfba4928bf] State transition from RUNNING > to PENDING_SHUTDOWN > 2024-04-13 07:40:16,222 ERROR o.a.k.s.KafkaStreams [55] stream-client > [testapp-83aa0e09-a6ef-45e4-9393-69dfba4928bf] Failed to transition to > PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN > {code} > These ERRORs started showing up when we moved from 2.7.2 to 3.3.2 and we have > not changed any code related to streams shutdown. > When the problem does not occur (most of the time), it looks like the > following: > {code:java} > 2024-04-13 07:40:11,333 INFO o.a.k.s.KafkaStreams [55] stream-client > [testapp-e2212b31-e9c2-4c75-92f6-1e557e27bf66] State transition from RUNNING > to PENDING_SHUTDOWN > 2024-04-13 07:40:11,341 INFO o.a.k.s.KafkaStreams [32] stream-client > [testapp-e2212b31-e9c2-4c75-92f6-1e557e27bf66] Streams client is in > PENDING_SHUTDOWN, all resources are being closed and the client will be > stopped. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
lianetm commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1575354663 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,448 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; +import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; +import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; + +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), +}) +public class DeleteConsumerGroupsTest { +private final ClusterInstance cluster; +private static final String TOPIC = "foo"; +private static final String GROUP = "test.group"; + +public DeleteConsumerGroupsTest(ClusterInstance cluster) { +this.cluster = cluster; +} + +@ClusterTest +public void testDeleteWithTopicOption() { +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"}; assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteCmdNonExistingGroup() { String missingGroup = "missing.group"; - -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", missingGroup}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteNonExistingGroup() { String missingGroup = "missing.group"; -
Re: [PR] KAFKA-16461: New consumer fails to consume records in security_test.py system test [kafka]
kirktrue commented on PR #15746: URL: https://github.com/apache/kafka/pull/15746#issuecomment-2070928518 @lucasbru—Can you review? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16103) AsyncKafkaConsumer: Always await async commit callbacks in commitSync and close
[ https://issues.apache.org/jira/browse/KAFKA-16103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16103: -- Summary: AsyncKafkaConsumer: Always await async commit callbacks in commitSync and close (was: Review client logic for triggering offset commit callbacks) > AsyncKafkaConsumer: Always await async commit callbacks in commitSync and > close > --- > > Key: KAFKA-16103 > URL: https://issues.apache.org/jira/browse/KAFKA-16103 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lucas Brutschy >Priority: Critical > Labels: kip-848-client-support, offset > Fix For: 3.8.0 > > > Review logic for triggering commit callbacks, ensuring that all callbacks are > triggered before returning from commitSync -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
lianetm commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1575308633 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,448 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; +import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; +import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; + +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), +}) +public class DeleteConsumerGroupsTest { +private final ClusterInstance cluster; +private static final String TOPIC = "foo"; +private static final String GROUP = "test.group"; + +public DeleteConsumerGroupsTest(ClusterInstance cluster) { +this.cluster = cluster; +} + +@ClusterTest +public void testDeleteWithTopicOption() { +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"}; assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteCmdNonExistingGroup() { String missingGroup = "missing.group"; - -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", missingGroup}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteNonExistingGroup() { String missingGroup = "missing.group"; -
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
lianetm commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1575308633 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,448 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; +import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; +import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; + +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), +}) +public class DeleteConsumerGroupsTest { +private final ClusterInstance cluster; +private static final String TOPIC = "foo"; +private static final String GROUP = "test.group"; + +public DeleteConsumerGroupsTest(ClusterInstance cluster) { +this.cluster = cluster; +} + +@ClusterTest +public void testDeleteWithTopicOption() { +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"}; assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteCmdNonExistingGroup() { String missingGroup = "missing.group"; - -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", missingGroup}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteNonExistingGroup() { String missingGroup = "missing.group"; -
Re: [PR] MINOR: Modified System.getProperty("line.separator") to java.lang.System.lineSeparator() [kafka]
TaiJuWu commented on PR #15782: URL: https://github.com/apache/kafka/pull/15782#issuecomment-2070867568 > @TaiJuWu thanks for your contribution. Could you fix https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java#L776 also? Sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Modified System.getProperty("line.separator") to java.lang.System.lineSeparator() [kafka]
chia7712 commented on PR #15782: URL: https://github.com/apache/kafka/pull/15782#issuecomment-2070865680 @TaiJuWu thanks for your contribution. Could you fix https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java#L776 also? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1575290259 ## core/src/test/java/kafka/testkit/BrokerNode.java: ## @@ -66,11 +74,27 @@ public Builder setNumLogDirectories(int numLogDirectories) { return this; } -public BrokerNode build( -String baseDirectory, -Uuid clusterId, -boolean combined -) { +public Builder setClusterId(Uuid clusterId) { +this.clusterId = clusterId; +return this; +} + +public Builder setBaseDirectory(String baseDirectory) { +this.baseDirectory = baseDirectory; +return this; +} + +public Builder setCombined(boolean combined) { +this.combined = combined; +return this; +} + +public Builder setPropertyOverrides(Map propertyOverrides) { +this.propertyOverrides = Collections.unmodifiableMap(propertyOverrides); Review Comment: Maybe we need to do a copy in order to avoid changes produced from outer. ## core/src/test/java/kafka/testkit/TestKitNodes.java: ## @@ -97,14 +96,31 @@ public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) { if (!brokerNodeBuilders.isEmpty()) { nextId = brokerNodeBuilders.lastKey() + 1; } -BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder() +BrokerNode.Builder brokerNodeBuilder = BrokerNode.builder() .setId(nextId) .setNumLogDirectories(disksPerBroker); brokerNodeBuilders.put(nextId, brokerNodeBuilder); } return this; } +/** + * Set per broker properties overrides, this setter must be invoked after setBrokerNodes which + * setup broker id and broker builder. + * @param perBrokerPropertiesOverrides properties to override in each broker + * @return Builder + */ +public Builder setPerBrokerPropertiesOverrides(Map> perBrokerPropertiesOverrides) { +perBrokerPropertiesOverrides.forEach((brokerId, properties) -> { +if (!brokerNodeBuilders.containsKey(brokerId)) { +throw new RuntimeException("Broker id " + brokerId + " does not exist"); +} +Map propertiesOverride = new HashMap<>(properties); Review Comment: the deep copy should be addressed by `setPropertyOverrides` ## core/src/test/java/kafka/testkit/TestKitNodes.java: ## @@ -97,14 +96,32 @@ public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) { if (!brokerNodeBuilders.isEmpty()) { nextId = brokerNodeBuilders.lastKey() + 1; } -BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder() +BrokerNode.Builder brokerNodeBuilder = BrokerNode.builder() .setId(nextId) .setNumLogDirectories(disksPerBroker); brokerNodeBuilders.put(nextId, brokerNodeBuilder); } return this; } +/** + * Set per broker properties overrides, this setter must be invoked after setBrokerNodes which Review Comment: It seems `TestKitNodes` need to be refactor also. In short, it should verify/apply all settings when building. ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -126,8 +138,12 @@ public MetadataVersion metadataVersion() { return metadataVersion; } -public Properties brokerServerProperties(int brokerId) { -return perBrokerOverrideProperties.computeIfAbsent(brokerId, __ -> new Properties()); +public Map brokerServerProperties(int brokerId) { Review Comment: This helper is used by `ZkClusterInvocationContext` only. Maybe we can remove this one to simplify `ClusterConfig` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Modified System.getProperty("line.separator") to java.lang.System.lineSeparator() [kafka]
TaiJuWu opened a new pull request, #15782: URL: https://github.com/apache/kafka/pull/15782 As title ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
lianetm commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1575295376 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,448 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; +import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; +import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; + +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), +}) +public class DeleteConsumerGroupsTest { +private final ClusterInstance cluster; +private static final String TOPIC = "foo"; +private static final String GROUP = "test.group"; + +public DeleteConsumerGroupsTest(ClusterInstance cluster) { +this.cluster = cluster; +} + +@ClusterTest +public void testDeleteWithTopicOption() { +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"}; assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteCmdNonExistingGroup() { String missingGroup = "missing.group"; - -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", missingGroup}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteNonExistingGroup() { String missingGroup = "missing.group"; -
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
chia7712 commented on PR #15779: URL: https://github.com/apache/kafka/pull/15779#issuecomment-2070801312 @m1a2st Please add following server property: ```java @ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") }) ``` The default replica of offset topic is 3, and the new infra create single broker. Hence, creating offset topic get failed, and hence you can't find a coordinator as the offset partition can't get be ready. Also, please notice @lianetm comments that bootstrap servers need to be updated manually if you kill any broker. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16600) Periodically receive "Failed to transition to PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN" during streams close
Alex Leung created KAFKA-16600: -- Summary: Periodically receive "Failed to transition to PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN" during streams close Key: KAFKA-16600 URL: https://issues.apache.org/jira/browse/KAFKA-16600 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.3.2 Reporter: Alex Leung >From time to time, we observe the following ERROR message during streams close: {code:java} 2024-04-13 07:40:16,222 INFO o.a.k.s.KafkaStreams [32] stream-client [testapp-83aa0e09-a6ef-45e4-9393-69dfba4928bf] State transition from RUNNING to PENDING_SHUTDOWN 2024-04-13 07:40:16,222 ERROR o.a.k.s.KafkaStreams [55] stream-client [testapp-83aa0e09-a6ef-45e4-9393-69dfba4928bf] Failed to transition to PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN {code} These ERRORs started showing up when we moved from 2.7.2 to 3.3.2 and we have not changed any code related to streams shutdown. When the problem does not occur (most of the time), it looks like the following: {code:java} 2024-04-13 07:40:11,333 INFO o.a.k.s.KafkaStreams [55] stream-client [testapp-e2212b31-e9c2-4c75-92f6-1e557e27bf66] State transition from RUNNING to PENDING_SHUTDOWN 2024-04-13 07:40:11,341 INFO o.a.k.s.KafkaStreams [32] stream-client [testapp-e2212b31-e9c2-4c75-92f6-1e557e27bf66] Streams client is in PENDING_SHUTDOWN, all resources are being closed and the client will be stopped. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]
chia7712 commented on PR #15772: URL: https://github.com/apache/kafka/pull/15772#issuecomment-2070748890 Should we move following configs to `SocketServerConfigs`? 1. `controller.listener.names` https://github.com/apache/kafka/blob/59c781415fc37c89aa087d7c2999cec7f82f6188/core/src/main/scala/kafka/server/KafkaConfig.scala#L118 2. `inter.broker.listener.name` https://github.com/apache/kafka/blob/59c781415fc37c89aa087d7c2999cec7f82f6188/server/src/main/java/org/apache/kafka/server/config/ReplicationConfigs.java#L117 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
lianetm commented on PR #15779: URL: https://github.com/apache/kafka/pull/15779#issuecomment-2070678792 Hey @m1a2st , the way to get the servers' addresses is `cluster.bootstrapServers()`, you got it right, but I see you're not inheriting from `ConsumerGroupCommandTest` anymore, so that removes the creation of the brokers to run the test against (that happens in the base setup [here](https://github.com/apache/kafka/blob/59c781415fc37c89aa087d7c2999cec7f82f6188/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala#L121)). Could that be what you're missing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
vamossagar12 commented on PR #15305: URL: https://github.com/apache/kafka/pull/15305#issuecomment-2070326565 @showuon , I made some changes to the test so that we just heartbeat and not have to rely upon FindCoordinator requests. Let me know what yo think. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16592: Add a new constructor which invokes the existing constructor with default value for alternativeString [kafka]
vamossagar12 commented on PR #15762: URL: https://github.com/apache/kafka/pull/15762#issuecomment-2070321267 Thanks @chia7712 , for now I have made the constructor added as part of https://github.com/apache/kafka/pull/13909 as private. Regarding > I guess we need a KIP to deprecate that method and add comments to encourage users to use it would mostly be making the other constructor firstly deprecated and finally private. I don't think we can remove the Constructor because `define` eventually depends upon it. I will create a new ticket for the KIP to track it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16592: Add a new constructor which invokes the existing constructor with default value for alternativeString [kafka]
vamossagar12 commented on code in PR #15762: URL: https://github.com/apache/kafka/pull/15762#discussion_r1575112513 ## clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java: ## @@ -1256,6 +1256,16 @@ public static class ConfigKey { public final boolean internalConfig; public final String alternativeString; +// This constructor is present for backward compatibility reasons. +public ConfigKey(String name, Type type, Object defaultValue, Validator validator, + Importance importance, String documentation, String group, + int orderInGroup, Width width, String displayName, + List dependents, Recommender recommender, + boolean internalConfig) { +this(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, +dependents, recommender, internalConfig, null); +} + public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1575101365 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -173,63 +199,104 @@ public static class Builder { private String listenerName; private File trustStoreFile; private MetadataVersion metadataVersion; +private Map serverProperties = Collections.unmodifiableMap(new HashMap<>()); Review Comment: Yes. The code here is redundant. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16526; Add quorum state v1 [kafka]
jsancio opened a new pull request, #15781: URL: https://github.com/apache/kafka/pull/15781 DRAFT - needs https://github.com/apache/kafka/pull/15671 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor
[ https://issues.apache.org/jira/browse/KAFKA-16585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839791#comment-17839791 ] Stanislav Spiridonov commented on KAFKA-16585: -- If you check the example on [https://developer.confluent.io/tutorials/kafka-streams-schedule-operations/kstreams.html] you will see the same situation *transform* vs {*}transformValues{*}. {noformat} I used transform in this tutorial as it makes for a better example because you can use the ProcessorContext.forward method.{noformat} > No way to forward message from punctuation method in the FixedKeyProcessor > -- > > Key: KAFKA-16585 > URL: https://issues.apache.org/jira/browse/KAFKA-16585 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.2 >Reporter: Stanislav Spiridonov >Priority: Major > > The FixedKeyProcessorContext can forward only FixedKeyRecord. This class > doesn't have a public constructor and can be created based on existing > records. But such record usually is absent in the punctuation method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1575079922 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,448 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; +import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; +import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; + +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), +}) +public class DeleteConsumerGroupsTest { +private final ClusterInstance cluster; +private static final String TOPIC = "foo"; +private static final String GROUP = "test.group"; + +public DeleteConsumerGroupsTest(ClusterInstance cluster) { +this.cluster = cluster; +} + +@ClusterTest +public void testDeleteWithTopicOption() { +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"}; assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteCmdNonExistingGroup() { String missingGroup = "missing.group"; - -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", missingGroup}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteNonExistingGroup() { String missingGroup = "missing.group"; -
[jira] [Comment Edited] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor
[ https://issues.apache.org/jira/browse/KAFKA-16585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17838979#comment-17838979 ] Stanislav Spiridonov edited comment on KAFKA-16585 at 4/22/24 4:47 PM: --- The case is relatively simple. I have KTable with entities that have to be enrichment with icon attribute from side service. So, the processor maintains the internal store with entities keys and periodically ask the service for update for registered ids. If icon has changes it forward the message with new icon. The key of record is entity key (String), value is a icon (String). was (Author: foal): The case is relatively simple. I have KTable with entities that have to be enrichment with icon attribute from side service. So, the processor maintains the internal store with entities keys and periodically ask the service for update for registered ids. If icon has changes it forward the message with new icon. The key of record is entity key (String), value is a icon (String). BTW I faced into strange behaviour - if I forward new record from another thread it arrived to incorrect processor. So now I just update store from icon KTable instead of forward the record. > No way to forward message from punctuation method in the FixedKeyProcessor > -- > > Key: KAFKA-16585 > URL: https://issues.apache.org/jira/browse/KAFKA-16585 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.2 >Reporter: Stanislav Spiridonov >Priority: Major > > The FixedKeyProcessorContext can forward only FixedKeyRecord. This class > doesn't have a public constructor and can be created based on existing > records. But such record usually is absent in the punctuation method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1575053494 ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -173,63 +199,104 @@ public static class Builder { private String listenerName; private File trustStoreFile; private MetadataVersion metadataVersion; +private Map serverProperties = Collections.unmodifiableMap(new HashMap<>()); +private Map producerProperties = Collections.unmodifiableMap(new HashMap<>()); +private Map consumerProperties = Collections.unmodifiableMap(new HashMap<>()); +private Map adminClientProperties = Collections.unmodifiableMap(new HashMap<>()); +private Map saslServerProperties = Collections.unmodifiableMap(new HashMap<>()); +private Map saslClientProperties = Collections.unmodifiableMap(new HashMap<>()); +private Map> perBrokerOverrideProperties = Collections.unmodifiableMap(new HashMap<>()); -Builder(Type type, int brokers, int controllers, boolean autoStart, SecurityProtocol securityProtocol, MetadataVersion metadataVersion) { -this.type = type; -this.brokers = brokers; -this.controllers = controllers; -this.autoStart = autoStart; -this.securityProtocol = securityProtocol; -this.metadataVersion = metadataVersion; -} +private Builder() {} -public Builder type(Type type) { +public Builder setType(Type type) { this.type = type; return this; } -public Builder brokers(int brokers) { +public Builder setBrokers(int brokers) { this.brokers = brokers; return this; } -public Builder controllers(int controllers) { +public Builder setControllers(int controllers) { this.controllers = controllers; return this; } -public Builder name(String name) { +public Builder setName(String name) { this.name = name; return this; } -public Builder autoStart(boolean autoStart) { +public Builder setAutoStart(boolean autoStart) { this.autoStart = autoStart; return this; } -public Builder securityProtocol(SecurityProtocol securityProtocol) { +public Builder setSecurityProtocol(SecurityProtocol securityProtocol) { this.securityProtocol = securityProtocol; return this; } -public Builder listenerName(String listenerName) { +public Builder setListenerName(String listenerName) { this.listenerName = listenerName; return this; } -public Builder trustStoreFile(File trustStoreFile) { +public Builder setTrustStoreFile(File trustStoreFile) { this.trustStoreFile = trustStoreFile; return this; } -public Builder metadataVersion(MetadataVersion metadataVersion) { +public Builder setMetadataVersion(MetadataVersion metadataVersion) { this.metadataVersion = metadataVersion; return this; } +public Builder setServerProperties(Map serverProperties) { +this.serverProperties = Collections.unmodifiableMap(serverProperties); +return this; +} + +public Builder setConsumerProperties(Map consumerProperties) { +this.consumerProperties = Collections.unmodifiableMap(consumerProperties); +return this; +} + +public Builder setProducerProperties(Map producerProperties) { +this.producerProperties = Collections.unmodifiableMap(producerProperties); +return this; +} + +public Builder setAdminClientProperties(Map adminClientProperties) { +this.adminClientProperties = Collections.unmodifiableMap(adminClientProperties); +return this; +} + +public Builder setSaslServerProperties(Map saslServerProperties) { +this.saslServerProperties = Collections.unmodifiableMap(saslServerProperties); +return this; +} + +public Builder setSaslClientProperties(Map saslClientProperties) { +this.saslClientProperties = Collections.unmodifiableMap(saslClientProperties); +return this; +} + +public Builder setPerBrokerProperties(Map> perBrokerOverrideProperties) { +this.perBrokerOverrideProperties = Collections.unmodifiableMap( Review Comment: ```java this.perBrokerOverrideProperties = Collections.unmodifiableMap( perBrokerOverrideProperties.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.unmodifiableMap(new HashMap<>(e.getValue()); ``` ## core/s
[jira] [Comment Edited] (KAFKA-7878) Connect Task already exists in this worker when failed to create consumer
[ https://issues.apache.org/jira/browse/KAFKA-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839784#comment-17839784 ] John Hawkins edited comment on KAFKA-7878 at 4/22/24 4:42 PM: -- I see this quite often when I'm loading the JDBC connector. V. Annoying ! this happens on both standalone and distributed herders. was (Author: JIRAUSER304684): I see this quite often when I'm loading the JDBC connector. V. Annoying ! > Connect Task already exists in this worker when failed to create consumer > - > > Key: KAFKA-7878 > URL: https://issues.apache.org/jira/browse/KAFKA-7878 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 1.0.1, 2.0.1, 3.7.0 >Reporter: Loïc Monney >Priority: Major > > *Assumption* > 1. DNS is not available during a few minutes > 2. Consumer group rebalances > 3. Client is not able to resolve DNS entries anymore and fails > 4. Task seems already registered, so at next rebalance the task will fail due > to *Task already exists in this worker* and the only way to recover is to > restart the connect process > *Real log entries* > * Distributed cluster running one connector on top of Kubernetes > * Connect 2.0.1 > * kafka-connect-hdfs 5.0.1 > {noformat} > [2019-01-28 13:31:25,914] WARN Removing server kafka.xxx.net:9093 from > bootstrap.servers as DNS resolution failed for kafka.xxx.net > (org.apache.kafka.clients.ClientUtils:56) > [2019-01-28 13:31:25,915] ERROR WorkerSinkTask\{id=xxx-22} Task failed > initialization and will not be started. > (org.apache.kafka.connect.runtime.WorkerSinkTask:142) > org.apache.kafka.connect.errors.ConnectException: Failed to create consumer > at > org.apache.kafka.connect.runtime.WorkerSinkTask.createConsumer(WorkerSinkTask.java:476) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.initialize(WorkerSinkTask.java:139) > at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:452) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:873) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:111) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:888) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:884) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka > consumer > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:799) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:615) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:596) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.createConsumer(WorkerSinkTask.java:474) > ... 10 more > Caused by: org.apache.kafka.common.config.ConfigException: No resolvable > bootstrap urls given in bootstrap.servers > at > org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:66) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:709) > ... 13 more > [2019-01-28 13:31:25,925] INFO Finished starting connectors and tasks > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:868) > [2019-01-28 13:31:25,926] INFO Rebalance started > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1239) > [2019-01-28 13:31:25,927] INFO Stopping task xxx-22 > (org.apache.kafka.connect.runtime.Worker:555) > [2019-01-28 13:31:26,021] INFO Finished stopping tasks in preparation for > rebalance > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1269) > [2019-01-28 13:31:26,021] INFO [Worker clientId=connect-1, > groupId=xxx-cluster] (Re-)joining group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:509) > [2019-01-28 13:31:30,746] INFO [Worker clientId=connect-1, > groupId=xxx-cluster] Successfully joined group with generation 29 > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:473) > [2019-01-28 13:31:30,746] INFO Joined group and got assignment: > Assignment\{error=0, leader='connect-1-05961f03-52a7-4c02-acc2-0f1fb021692e', > leaderUrl='http://192.168.46.59:8083/', offset=32, connectorIds=[], > taskIds=[xxx-22]} > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1217) > [2019-01-28 13:31:30,747] INFO Starting connectors and tasks using config > offset 32 (org.apache.kafk
[jira] [Commented] (KAFKA-7878) Connect Task already exists in this worker when failed to create consumer
[ https://issues.apache.org/jira/browse/KAFKA-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839784#comment-17839784 ] John Hawkins commented on KAFKA-7878: - I see this quite often when I'm loading the JDBC connector. V. Annoying ! > Connect Task already exists in this worker when failed to create consumer > - > > Key: KAFKA-7878 > URL: https://issues.apache.org/jira/browse/KAFKA-7878 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 1.0.1, 2.0.1 >Reporter: Loïc Monney >Priority: Major > > *Assumption* > 1. DNS is not available during a few minutes > 2. Consumer group rebalances > 3. Client is not able to resolve DNS entries anymore and fails > 4. Task seems already registered, so at next rebalance the task will fail due > to *Task already exists in this worker* and the only way to recover is to > restart the connect process > *Real log entries* > * Distributed cluster running one connector on top of Kubernetes > * Connect 2.0.1 > * kafka-connect-hdfs 5.0.1 > {noformat} > [2019-01-28 13:31:25,914] WARN Removing server kafka.xxx.net:9093 from > bootstrap.servers as DNS resolution failed for kafka.xxx.net > (org.apache.kafka.clients.ClientUtils:56) > [2019-01-28 13:31:25,915] ERROR WorkerSinkTask\{id=xxx-22} Task failed > initialization and will not be started. > (org.apache.kafka.connect.runtime.WorkerSinkTask:142) > org.apache.kafka.connect.errors.ConnectException: Failed to create consumer > at > org.apache.kafka.connect.runtime.WorkerSinkTask.createConsumer(WorkerSinkTask.java:476) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.initialize(WorkerSinkTask.java:139) > at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:452) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:873) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:111) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:888) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:884) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka > consumer > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:799) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:615) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:596) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.createConsumer(WorkerSinkTask.java:474) > ... 10 more > Caused by: org.apache.kafka.common.config.ConfigException: No resolvable > bootstrap urls given in bootstrap.servers > at > org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:66) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:709) > ... 13 more > [2019-01-28 13:31:25,925] INFO Finished starting connectors and tasks > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:868) > [2019-01-28 13:31:25,926] INFO Rebalance started > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1239) > [2019-01-28 13:31:25,927] INFO Stopping task xxx-22 > (org.apache.kafka.connect.runtime.Worker:555) > [2019-01-28 13:31:26,021] INFO Finished stopping tasks in preparation for > rebalance > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1269) > [2019-01-28 13:31:26,021] INFO [Worker clientId=connect-1, > groupId=xxx-cluster] (Re-)joining group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:509) > [2019-01-28 13:31:30,746] INFO [Worker clientId=connect-1, > groupId=xxx-cluster] Successfully joined group with generation 29 > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:473) > [2019-01-28 13:31:30,746] INFO Joined group and got assignment: > Assignment\{error=0, leader='connect-1-05961f03-52a7-4c02-acc2-0f1fb021692e', > leaderUrl='http://192.168.46.59:8083/', offset=32, connectorIds=[], > taskIds=[xxx-22]} > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1217) > [2019-01-28 13:31:30,747] INFO Starting connectors and tasks using config > offset 32 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:858) > [2019-01-28 13:31:30,747] INFO Starting task xxx-22 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:872) > [2019-01-28 13:31:30,747] INFO Creating
[jira] [Updated] (KAFKA-7878) Connect Task already exists in this worker when failed to create consumer
[ https://issues.apache.org/jira/browse/KAFKA-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Hawkins updated KAFKA-7878: Affects Version/s: 3.7.0 > Connect Task already exists in this worker when failed to create consumer > - > > Key: KAFKA-7878 > URL: https://issues.apache.org/jira/browse/KAFKA-7878 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 1.0.1, 2.0.1, 3.7.0 >Reporter: Loïc Monney >Priority: Major > > *Assumption* > 1. DNS is not available during a few minutes > 2. Consumer group rebalances > 3. Client is not able to resolve DNS entries anymore and fails > 4. Task seems already registered, so at next rebalance the task will fail due > to *Task already exists in this worker* and the only way to recover is to > restart the connect process > *Real log entries* > * Distributed cluster running one connector on top of Kubernetes > * Connect 2.0.1 > * kafka-connect-hdfs 5.0.1 > {noformat} > [2019-01-28 13:31:25,914] WARN Removing server kafka.xxx.net:9093 from > bootstrap.servers as DNS resolution failed for kafka.xxx.net > (org.apache.kafka.clients.ClientUtils:56) > [2019-01-28 13:31:25,915] ERROR WorkerSinkTask\{id=xxx-22} Task failed > initialization and will not be started. > (org.apache.kafka.connect.runtime.WorkerSinkTask:142) > org.apache.kafka.connect.errors.ConnectException: Failed to create consumer > at > org.apache.kafka.connect.runtime.WorkerSinkTask.createConsumer(WorkerSinkTask.java:476) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.initialize(WorkerSinkTask.java:139) > at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:452) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:873) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:111) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:888) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:884) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka > consumer > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:799) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:615) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:596) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.createConsumer(WorkerSinkTask.java:474) > ... 10 more > Caused by: org.apache.kafka.common.config.ConfigException: No resolvable > bootstrap urls given in bootstrap.servers > at > org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:66) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:709) > ... 13 more > [2019-01-28 13:31:25,925] INFO Finished starting connectors and tasks > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:868) > [2019-01-28 13:31:25,926] INFO Rebalance started > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1239) > [2019-01-28 13:31:25,927] INFO Stopping task xxx-22 > (org.apache.kafka.connect.runtime.Worker:555) > [2019-01-28 13:31:26,021] INFO Finished stopping tasks in preparation for > rebalance > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1269) > [2019-01-28 13:31:26,021] INFO [Worker clientId=connect-1, > groupId=xxx-cluster] (Re-)joining group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:509) > [2019-01-28 13:31:30,746] INFO [Worker clientId=connect-1, > groupId=xxx-cluster] Successfully joined group with generation 29 > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:473) > [2019-01-28 13:31:30,746] INFO Joined group and got assignment: > Assignment\{error=0, leader='connect-1-05961f03-52a7-4c02-acc2-0f1fb021692e', > leaderUrl='http://192.168.46.59:8083/', offset=32, connectorIds=[], > taskIds=[xxx-22]} > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1217) > [2019-01-28 13:31:30,747] INFO Starting connectors and tasks using config > offset 32 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:858) > [2019-01-28 13:31:30,747] INFO Starting task xxx-22 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:872) > [2019-01-28 13:31:30,747] INFO Creating task xxx-22 > (org.apache.kafka.connect.runtime.Worker:396) > [2019-01-28 13:31:30,748] ERROR
Re: [PR] KAFKA-16466: add details of an internal exception to the failure message [kafka]
ilyazr commented on PR #15701: URL: https://github.com/apache/kafka/pull/15701#issuecomment-2070177669 @soarez Hi! Thanks for pointing out problems with the tests. I didn't know that. p.s: added suggested changes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16466: add details of an internal exception to the failure message [kafka]
ilyazr commented on code in PR #15701: URL: https://github.com/apache/kafka/pull/15701#discussion_r1575054055 ## metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java: ## @@ -174,10 +180,18 @@ public void testUnexpectedBaseOffsetExceptionInfo() { public void testUnexpectedBaseOffsetFailureMessage() { assertEquals("event failed with UnexpectedBaseOffsetException (treated as " + "NotControllerException) at epoch 123 in 90 microseconds. Renouncing leadership " + -"and reverting to the last committed offset 456.", +"and reverting to the last committed offset 456. Detailed exception message: Wanted base offset 3, but the next offset was 4", UNEXPECTED_END_OFFSET.failureMessage(123, OptionalLong.of(90L), true, 456L)); } +@Test +public void testKafkaExceptionFailureMessage() { Review Comment: It definitely would be better to have a clear name for the test. I've changed it to the suggested one -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16466: add details of an internal exception to the failure message [kafka]
ilyazr commented on code in PR #15701: URL: https://github.com/apache/kafka/pull/15701#discussion_r1575051333 ## metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java: ## @@ -72,6 +73,11 @@ public class EventHandlerExceptionInfoTest { new TimeoutException(), () -> OptionalInt.of(1)); +private static final EventHandlerExceptionInfo KAFKA_EXCEPTION = +EventHandlerExceptionInfo.fromInternal( +new KafkaException("Custom kafka exception message"), +() -> OptionalInt.of(1)); + Review Comment: Initially I thought it would be better to keep all `EventHandlerExceptionInfo` declarations at the same place. But as you mentioned, you would be even better to have it on a method level as it is used only once. Moved `TIMEOUT` to the method too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16466: add details of an internal exception to the failure message [kafka]
ilyazr commented on code in PR #15701: URL: https://github.com/apache/kafka/pull/15701#discussion_r1575046814 ## metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java: ## @@ -191,6 +191,10 @@ public String failureMessage( } } bld.append("."); +if (!isFault && internalException.getMessage() != null) { +bld.append(" Detailed exception message: "); Review Comment: Removed "Detailed" from the message -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16103) Review client logic for triggering offset commit callbacks
[ https://issues.apache.org/jira/browse/KAFKA-16103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy resolved KAFKA-16103. Resolution: Fixed > Review client logic for triggering offset commit callbacks > -- > > Key: KAFKA-16103 > URL: https://issues.apache.org/jira/browse/KAFKA-16103 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lucas Brutschy >Priority: Critical > Labels: kip-848-client-support, offset > Fix For: 3.8.0 > > > Review logic for triggering commit callbacks, ensuring that all callbacks are > triggered before returning from commitSync -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16103) Review client logic for triggering offset commit callbacks
[ https://issues.apache.org/jira/browse/KAFKA-16103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839765#comment-17839765 ] Lucas Brutschy commented on KAFKA-16103: changes in legacy consumer in https://issues.apache.org/jira/browse/KAFKA-16599 > Review client logic for triggering offset commit callbacks > -- > > Key: KAFKA-16103 > URL: https://issues.apache.org/jira/browse/KAFKA-16103 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lucas Brutschy >Priority: Critical > Labels: kip-848-client-support, offset > Fix For: 3.8.0 > > > Review logic for triggering commit callbacks, ensuring that all callbacks are > triggered before returning from commitSync -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16599) LegacyConsumer: Always await async commit callbacks in commitSync and close
[ https://issues.apache.org/jira/browse/KAFKA-16599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16599: --- Summary: LegacyConsumer: Always await async commit callbacks in commitSync and close (was: Always await async commit callbacks in commitSync and close) > LegacyConsumer: Always await async commit callbacks in commitSync and close > --- > > Key: KAFKA-16599 > URL: https://issues.apache.org/jira/browse/KAFKA-16599 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > > The javadoc for KafkaConsumer.commitSync says: > {code:java} > Note that asynchronous offset commits sent previously with the {@link > #commitAsync(OffsetCommitCallback)} > (or similar) are guaranteed to have their callbacks invoked prior to > completion of this method. > {code} > This is not always true in the legacy consumer, when the set of offsets is > empty, the execution of the commit callback is not always awaited. There are > also various races possible that can avoid callback handler execution. > Similarly, there is code in the legacy consumer to await the completion of > the commit callback before closing, however, the code doesn't cover all cases > and the behavior is therefore inconsistent. While the Javadoc doesn't > explicitly promise callback execution, it promises "completing commits", > which one would reasonably expect to include callback execution. Either way, > the current behavior of the legacy consumer is inconsistent. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16082 Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on PR #15777: URL: https://github.com/apache/kafka/pull/15777#issuecomment-2070018194 @chia7712 Rebased. PTAL :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16599) Always await async commit callbacks in commitSync and close
[ https://issues.apache.org/jira/browse/KAFKA-16599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16599: --- Component/s: clients consumer > Always await async commit callbacks in commitSync and close > --- > > Key: KAFKA-16599 > URL: https://issues.apache.org/jira/browse/KAFKA-16599 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > > The javadoc for KafkaConsumer.commitSync says: > {code:java} > Note that asynchronous offset commits sent previously with the {@link > #commitAsync(OffsetCommitCallback)} > (or similar) are guaranteed to have their callbacks invoked prior to > completion of this method. > {code} > This is not always true in the legacy consumer, when the set of offsets is > empty, the execution of the commit callback is not always awaited. There are > also various races possible that can avoid callback handler execution. > Similarly, there is code in the legacy consumer to await the completion of > the commit callback before closing, however, the code doesn't cover all cases > and the behavior is therefore inconsistent. While the Javadoc doesn't > explicitly promise callback execution, it promises "completing commits", > which one would reasonably expect to include callback execution. Either way, > the current behavior of the legacy consumer is inconsistent. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16599) Always await async commit callbacks in commitSync and close
[ https://issues.apache.org/jira/browse/KAFKA-16599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-16599: --- Description: The javadoc for KafkaConsumer.commitSync says: {code:java} Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)} (or similar) are guaranteed to have their callbacks invoked prior to completion of this method. {code} This is not always true in the legacy consumer, when the set of offsets is empty, the execution of the commit callback is not always awaited. There are also various races possible that can avoid callback handler execution. Similarly, there is code in the legacy consumer to await the completion of the commit callback before closing, however, the code doesn't cover all cases and the behavior is therefore inconsistent. While the Javadoc doesn't explicitly promise callback execution, it promises "completing commits", which one would reasonably expect to include callback execution. Either way, the current behavior of the legacy consumer is inconsistent. > Always await async commit callbacks in commitSync and close > --- > > Key: KAFKA-16599 > URL: https://issues.apache.org/jira/browse/KAFKA-16599 > Project: Kafka > Issue Type: Task >Reporter: Lucas Brutschy >Priority: Major > > The javadoc for KafkaConsumer.commitSync says: > {code:java} > Note that asynchronous offset commits sent previously with the {@link > #commitAsync(OffsetCommitCallback)} > (or similar) are guaranteed to have their callbacks invoked prior to > completion of this method. > {code} > This is not always true in the legacy consumer, when the set of offsets is > empty, the execution of the commit callback is not always awaited. There are > also various races possible that can avoid callback handler execution. > Similarly, there is code in the legacy consumer to await the completion of > the commit callback before closing, however, the code doesn't cover all cases > and the behavior is therefore inconsistent. While the Javadoc doesn't > explicitly promise callback execution, it promises "completing commits", > which one would reasonably expect to include callback execution. Either way, > the current behavior of the legacy consumer is inconsistent. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16082 Avoid resuming future replica if current replica is in the same directory [kafka]
chia7712 commented on PR #15777: URL: https://github.com/apache/kafka/pull/15777#issuecomment-2070011423 #15776 is merged so please rebase code :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16599) Always await async commit callbacks in commitSync and close
[ https://issues.apache.org/jira/browse/KAFKA-16599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16599: -- Assignee: Lucas Brutschy > Always await async commit callbacks in commitSync and close > --- > > Key: KAFKA-16599 > URL: https://issues.apache.org/jira/browse/KAFKA-16599 > Project: Kafka > Issue Type: Task >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > > The javadoc for KafkaConsumer.commitSync says: > {code:java} > Note that asynchronous offset commits sent previously with the {@link > #commitAsync(OffsetCommitCallback)} > (or similar) are guaranteed to have their callbacks invoked prior to > completion of this method. > {code} > This is not always true in the legacy consumer, when the set of offsets is > empty, the execution of the commit callback is not always awaited. There are > also various races possible that can avoid callback handler execution. > Similarly, there is code in the legacy consumer to await the completion of > the commit callback before closing, however, the code doesn't cover all cases > and the behavior is therefore inconsistent. While the Javadoc doesn't > explicitly promise callback execution, it promises "completing commits", > which one would reasonably expect to include callback execution. Either way, > the current behavior of the legacy consumer is inconsistent. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16559) allow defining number of disks per broker in TestKitNodes
[ https://issues.apache.org/jira/browse/KAFKA-16559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-16559: --- Fix Version/s: 3.7.1 > allow defining number of disks per broker in TestKitNodes > - > > Key: KAFKA-16559 > URL: https://issues.apache.org/jira/browse/KAFKA-16559 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: Gaurav Narula >Priority: Major > Fix For: 3.8.0, 3.7.1 > > > from: https://github.com/apache/kafka/pull/15136#discussion_r1565571409 > That allow us to run the reassignment tests. > Also, we should enhance setNumBrokerNodes > (https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/testkit/TestKitNodes.java#L81) > to accept extra argument to define the number of folders (by > setLogDirectories) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16559 allow defining number of disks per broker in TestKitNodes [kafka]
chia7712 merged PR #15776: URL: https://github.com/apache/kafka/pull/15776 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16599) Always await async commit callbacks in commitSync and close
Lucas Brutschy created KAFKA-16599: -- Summary: Always await async commit callbacks in commitSync and close Key: KAFKA-16599 URL: https://issues.apache.org/jira/browse/KAFKA-16599 Project: Kafka Issue Type: Task Reporter: Lucas Brutschy -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1575000110 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,448 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; +import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; +import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; + +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), +}) +public class DeleteConsumerGroupsTest { +private final ClusterInstance cluster; +private static final String TOPIC = "foo"; +private static final String GROUP = "test.group"; + +public DeleteConsumerGroupsTest(ClusterInstance cluster) { +this.cluster = cluster; +} + +@ClusterTest +public void testDeleteWithTopicOption() { +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"}; assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteCmdNonExistingGroup() { String missingGroup = "missing.group"; - -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", missingGroup}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteNonExistingGroup() { String missingGroup = "missing.group"; -
Re: [PR] KAFKA-16463 System test for reverting migration to ZK [kafka]
soarez commented on code in PR #15754: URL: https://github.com/apache/kafka/pull/15754#discussion_r1574994375 ## tests/kafkatest/tests/core/zookeeper_migration_test.py: ## @@ -86,10 +87,35 @@ def do_migration(self, roll_controller = False, downgrade_to_zk = False): controller.stop_node(node) controller.start_node(node) +if downgrade_to_zk: +self.logger.info("Shutdown brokers to avoid waiting on unclean shutdown") +for node in self.kafka.nodes: +self.kafka.stop_node(node) +metadata_log_dir = KafkaService.METADATA_LOG_DIR + "/__cluster_metadata-0" +for node in self.kafka.nodes: +assert path_exists(node, metadata_log_dir), "Should still have a metadata log on the brokers." + +self.logger.info("Shutdown KRaft quorum") +for node in controller.nodes: +controller.stop_node(node) + +self.logger.info("Deleting controller ZNode") +self.zk.delete(path="/controller", recursive=True) + +self.logger.info("Rolling brokers back to ZK mode") +self.kafka.downgrade_kraft_broker_to_zk(controller) +for node in self.kafka.nodes: +self.kafka.start_node(node) + +# This blocks until all brokers have a full ISR +self.wait_until_rejoin() Review Comment: The test name is `test_online_migration` but we're shutting every broker down to apply the downgrade, it seems like a contradiction? Does the downgrade require a full shutdown of the cluster? ## tests/kafkatest/services/kafka/kafka.py: ## @@ -463,6 +463,18 @@ def reconfigure_zk_for_migration(self, kraft_quorum): # This is not added to "advertised.listeners" because of configured_for_zk_migration=True self.port_mappings[kraft_quorum.controller_listener_names] = kraft_quorum.port_mappings.get(kraft_quorum.controller_listener_names) +def downgrade_kraft_broker_to_zk(self, kraft_quorum): +self.configured_for_zk_migration = True +self.quorum_info = quorum.ServiceQuorumInfo(quorum.zk, self) +self.controller_quorum = kraft_quorum + +# Set the migration properties +self.server_prop_overrides.extend([ +["zookeeper.metadata.migration.enable", "true"], +["controller.quorum.voters", kraft_quorum.controller_quorum_voters], +["controller.listener.names", kraft_quorum.controller_listener_names] +]) Review Comment: In https://kafka.apache.org/documentation/#kraft_zk_migration under "Reverting to ZooKeeper mode During the Migration" we say: > On each broker, remove the zookeeper.metadata.migration.enable, controller.listener.names, and controller.quorum.voters configurations. And before this method is called we log `"Rolling brokers back to ZK mode"`. I'm guessing I'm probably missing something, shouldn't these three properties be removed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Reduce the time taken to execute the TieredStorage tests. [kafka]
kamalcph commented on PR #15780: URL: https://github.com/apache/kafka/pull/15780#issuecomment-2069948134 This PR is a follow-up of #15719. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1574974316 ## core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala: ## @@ -18,41 +18,58 @@ package kafka.server import java.net.Socket import java.util.Collections - import kafka.api.{KafkaSasl, SaslSetup} -import kafka.test.annotation.{ClusterTest, Type} +import kafka.server.SaslApiVersionsRequestTest.{kafkaClientSaslMechanism, kafkaServerSaslMechanisms} +import kafka.test.annotation.{ClusterTemplate, Type} import kafka.test.junit.ClusterTestExtensions -import kafka.test.{ClusterConfig, ClusterInstance} +import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance} import kafka.utils.JaasTestUtils +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.message.SaslHandshakeRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, SaslHandshakeRequest, SaslHandshakeResponse} import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.server.config.KafkaSecurityConfigs import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.{AfterEach, BeforeEach} import scala.jdk.CollectionConverters._ +object SaslApiVersionsRequestTest { + val kafkaClientSaslMechanism = "PLAIN" + val kafkaServerSaslMechanisms: Seq[String] = List("PLAIN") + val controlPlaneListenerName = "CONTROL_PLANE" + val securityProtocol = SecurityProtocol.SASL_PLAINTEXT + + def saslApiVersionsRequestClusterConfig(clusterGenerator: ClusterGenerator): Unit = { +clusterGenerator.accept(ClusterConfig.defaultBuilder + .securityProtocol(securityProtocol) + .`type`(Type.ZK) + .putSaslServerProperty(KafkaSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, kafkaClientSaslMechanism) + .putSaslServerProperty(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, kafkaServerSaslMechanisms.mkString(",")) + .putSaslClientProperty(SaslConfigs.SASL_MECHANISM, kafkaClientSaslMechanism) + // Configure control plane listener to make sure we have separate listeners for testing. + .putServerProperty(KafkaConfig.ControlPlaneListenerNameProp, controlPlaneListenerName) + .putServerProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"$controlPlaneListenerName:$securityProtocol,$securityProtocol:$securityProtocol") + .putServerProperty("listeners", s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0") + .putServerProperty(KafkaConfig.AdvertisedListenersProp, s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0") + .build()) + } +} @ExtendWith(value = Array(classOf[ClusterTestExtensions])) class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) { - - val kafkaClientSaslMechanism = "PLAIN" - val kafkaServerSaslMechanisms = List("PLAIN") - private var sasl: SaslSetup = _ @BeforeEach - def setupSasl(config: ClusterConfig): Unit = { + def setupSasl(): Unit = { sasl = new SaslSetup() {} Review Comment: Currently, no. Since we clear `java.security.auth.login.config` in [QuorumTestHarness#tearDown](https://github.com/apache/kafka/blob/59c781415fc37c89aa087d7c2999cec7f82f6188/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala#L440). So if we initialize sasl in `@BeforeAll`, first test passed, second one raise error as below. ```text Could not find a 'KafkaServer' or 'control_plane.KafkaServer' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set ``` This would require some effort if we want to make this happen. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]
OmniaGM commented on code in PR #15772: URL: https://github.com/apache/kafka/pull/15772#discussion_r1574953974 ## server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.network; + +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.server.config.ReplicationConfigs; +import java.util.Arrays; +import java.util.stream.Collectors; + +public class SocketServerConfigs { +public static final String LISTENER_SECURITY_PROTOCOL_MAP_CONFIG = "listener.security.protocol.map"; +public static final String LISTENER_SECURITY_PROTOCOL_MAP_DEFAULT = Arrays.stream(SecurityProtocol.values()) +.collect(Collectors.toMap(ListenerName::forSecurityProtocol, sp -> sp)) +.entrySet() +.stream() +.map(entry -> entry.getKey().value() + ":" + entry.getValue().name()) +.collect(Collectors.joining(",")); +public static final String LISTENER_SECURITY_PROTOCOL_MAP_DOC = "Map between listener names and security protocols. This must be defined for " + +"the same security protocol to be usable in more than one port or IP. For example, internal and " + +"external traffic can be separated even if SSL is required for both. Concretely, the user could define listeners " + +"with names INTERNAL and EXTERNAL and this property as: INTERNAL:SSL,EXTERNAL:SSL. As shown, key and value are " + +"separated by a colon and map entries are separated by commas. Each listener name should only appear once in the map. " + +"Different security (SSL and SASL) settings can be configured for each listener by adding a normalised " + +"prefix (the listener name is lowercased) to the config name. For example, to set a different keystore for the " + +"INTERNAL listener, a config with name listener.name.internal.ssl.keystore.location would be set. " + +"If the config for the listener name is not set, the config will fallback to the generic config (i.e. ssl.keystore.location). " + +"Note that in KRaft a default mapping from the listener names defined by controller.listener.names to PLAINTEXT " + +"is assumed if no explicit mapping is provided and no other security protocol is in use."; + +public static final String LISTENERS_CONFIG = "listeners"; +public static final String LISTENERS_DEFAULT = "PLAINTEXT://:9092"; +public static final String LISTENERS_DOC = "Listener List - Comma-separated list of URIs we will listen on and the listener names." + +String.format(" If the listener name is not a security protocol, %s must also be set.%n", LISTENER_SECURITY_PROTOCOL_MAP_CONFIG) + +" Listener names and port numbers must be unique unless %n" + +" one listener is an IPv4 address and the other listener is %n" + +" an IPv6 address (for the same port).%n" + +" Specify hostname as 0.0.0.0 to bind to all interfaces.%n" + +" Leave hostname empty to bind to default interface.%n" + +" Examples of legal listener lists:%n" + +" PLAINTEXT://myhost:9092,SSL://:9091%n" + +" CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093%n" + +" PLAINTEXT://127.0.0.1:9092,SSL://[::1]:9092%n"; + +public static final String ADVERTISED_LISTENERS_CONFIG = "advertised.listeners"; +public static final String ADVERTISED_LISTENERS_DOC = String.format( +"Listeners to publish to ZooKeeper for clients to use, if different than the %s config property." + +" In IaaS environments, this may need to be different from the interface to which the broker binds." + +" If this is not set, the value for %1$1s will be used." + +" Unlike %1$1s, it is not valid to advertise the 0.0.0.0 meta-address.%n" + +" Also unlike %1$1s, there can be duplicated ports in this property," + +" so that one listener can
Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]
OmniaGM commented on code in PR #15772: URL: https://github.com/apache/kafka/pull/15772#discussion_r1574952989 ## server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.network; + +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.server.config.ReplicationConfigs; +import java.util.Arrays; +import java.util.stream.Collectors; + +public class SocketServerConfigs { +public static final String LISTENER_SECURITY_PROTOCOL_MAP_CONFIG = "listener.security.protocol.map"; +public static final String LISTENER_SECURITY_PROTOCOL_MAP_DEFAULT = Arrays.stream(SecurityProtocol.values()) +.collect(Collectors.toMap(ListenerName::forSecurityProtocol, sp -> sp)) +.entrySet() +.stream() +.map(entry -> entry.getKey().value() + ":" + entry.getValue().name()) +.collect(Collectors.joining(",")); +public static final String LISTENER_SECURITY_PROTOCOL_MAP_DOC = "Map between listener names and security protocols. This must be defined for " + +"the same security protocol to be usable in more than one port or IP. For example, internal and " + +"external traffic can be separated even if SSL is required for both. Concretely, the user could define listeners " + +"with names INTERNAL and EXTERNAL and this property as: INTERNAL:SSL,EXTERNAL:SSL. As shown, key and value are " + +"separated by a colon and map entries are separated by commas. Each listener name should only appear once in the map. " + +"Different security (SSL and SASL) settings can be configured for each listener by adding a normalised " + +"prefix (the listener name is lowercased) to the config name. For example, to set a different keystore for the " + +"INTERNAL listener, a config with name listener.name.internal.ssl.keystore.location would be set. " + +"If the config for the listener name is not set, the config will fallback to the generic config (i.e. ssl.keystore.location). " + +"Note that in KRaft a default mapping from the listener names defined by controller.listener.names to PLAINTEXT " + +"is assumed if no explicit mapping is provided and no other security protocol is in use."; + +public static final String LISTENERS_CONFIG = "listeners"; +public static final String LISTENERS_DEFAULT = "PLAINTEXT://:9092"; +public static final String LISTENERS_DOC = "Listener List - Comma-separated list of URIs we will listen on and the listener names." + +String.format(" If the listener name is not a security protocol, %s must also be set.%n", LISTENER_SECURITY_PROTOCOL_MAP_CONFIG) + +" Listener names and port numbers must be unique unless %n" + +" one listener is an IPv4 address and the other listener is %n" + +" an IPv6 address (for the same port).%n" + +" Specify hostname as 0.0.0.0 to bind to all interfaces.%n" + +" Leave hostname empty to bind to default interface.%n" + +" Examples of legal listener lists:%n" + +" PLAINTEXT://myhost:9092,SSL://:9091%n" + +" CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093%n" + +" PLAINTEXT://127.0.0.1:9092,SSL://[::1]:9092%n"; + +public static final String ADVERTISED_LISTENERS_CONFIG = "advertised.listeners"; +public static final String ADVERTISED_LISTENERS_DOC = String.format( +"Listeners to publish to ZooKeeper for clients to use, if different than the %s config property." + +" In IaaS environments, this may need to be different from the interface to which the broker binds." + +" If this is not set, the value for %1$1s will be used." + +" Unlike %1$1s, it is not valid to advertise the 0.0.0.0 meta-address.%n" + +" Also unlike %1$1s, there can be duplicated ports in this property," + +" so that one listener can
[jira] [Commented] (KAFKA-15089) Consolidate all the group coordinator configs
[ https://issues.apache.org/jira/browse/KAFKA-15089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839744#comment-17839744 ] Omnia Ibrahim commented on KAFKA-15089: --- I can have a look once I finish KAFKA-15853 > Consolidate all the group coordinator configs > - > > Key: KAFKA-15089 > URL: https://issues.apache.org/jira/browse/KAFKA-15089 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Priority: Major > > The group coordinator configurations are defined in KafkaConfig at the > moment. As KafkaConfig is defined in the core module, we can't pass it to the > new java modules to pass the configurations along. > A suggestion here is to centralize all the configurations of a module in the > module itself similarly to what we have do for RemoteLogManagerConfig and > RaftConfig. We also need a mechanism to add all the properties defined in the > module to the KafkaConfig's ConfigDef. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: Reduce the time taken to execute the TieredStorage tests. [kafka]
kamalcph opened a new pull request, #15780: URL: https://github.com/apache/kafka/pull/15780 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16559 allow defining number of disks per broker in TestKitNodes [kafka]
chia7712 commented on PR #15776: URL: https://github.com/apache/kafka/pull/15776#issuecomment-2069843217 @soarez Any concerns? The failed streams tests can get fixed by backporting #14983. Hence, I'd like to merge it :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
m1a2st commented on PR #15779: URL: https://github.com/apache/kafka/pull/15779#issuecomment-2069841257 @chia7712 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]
m1a2st opened a new pull request, #15779: URL: https://github.com/apache/kafka/pull/15779 change the ResetConsumerGroupOffsetTest first to new test framework -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move quota configs into server-common package [kafka]
OmniaGM commented on code in PR #15774: URL: https://github.com/apache/kafka/pull/15774#discussion_r1574929931 ## server-common/src/main/java/org/apache/kafka/server/config/ServerQuotaConfigs.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.config; + +import java.util.Collections; +import java.util.List; + +public class ServerQuotaConfigs { Review Comment: Just pushed an update now. - Renamed `ServerQuotaConfigs` to `QuotaConfigs` - Moved all quota related configs, methods, config def, etc to `QuotaConfigs` in `server-common` and out of `client` and `server` - Deleted `org.apache.kafka.common.config.internals.QuotaConfigs` and `DynamicConfig.QuotaConfigs`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16589) Consider removing `ClusterInstance#createAdminClient` since callers are not sure whether they need to call close
[ https://issues.apache.org/jira/browse/KAFKA-16589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16589: -- Assignee: PoAn Yang (was: Chia-Ping Tsai) > Consider removing `ClusterInstance#createAdminClient` since callers are not > sure whether they need to call close > > > Key: KAFKA-16589 > URL: https://issues.apache.org/jira/browse/KAFKA-16589 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: PoAn Yang >Priority: Minor > > Sometimes we close the admin created by `createAdminClient`, and sometimes we > don't. That is not a true problem since the `ClusterInstance` will call > `close` when stopping. > However, that cause a lot of inconsistent code, and in fact it does not save > much time since creating a Admin is not a hard work. We can get > `bootstrapServers` and `bootstrapControllers` from `ClusterInstance` easily. > > {code:java} > // before > try (Admin admin = cluster.createAdminClient()) { } > // after v0 > try (Admin admin = Admin.create(Collections.singletonMap( > CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, > cluster.bootstrapServers( {} > {code} > Personally, the `after` version is not verbose, but we can have alternatives: > `Map clientConfigs`. > > {code:java} > // after v1 > try (Admin admin = Admin.create(cluster.clientConfigs())) {}{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16598) Mirgrate `ResetConsumerGroupOffsetTest` to new test infra
[ https://issues.apache.org/jira/browse/KAFKA-16598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16598: -- Assignee: 黃竣陽 (was: Chia-Ping Tsai) > Mirgrate `ResetConsumerGroupOffsetTest` to new test infra > - > > Key: KAFKA-16598 > URL: https://issues.apache.org/jira/browse/KAFKA-16598 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: 黃竣陽 >Priority: Major > > as title. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16589) Consider removing `ClusterInstance#createAdminClient` since callers are not sure whether they need to call close
[ https://issues.apache.org/jira/browse/KAFKA-16589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839733#comment-17839733 ] PoAn Yang commented on KAFKA-16589: --- Got it. I'm interested in the issue. May I take it? Thank you. > Consider removing `ClusterInstance#createAdminClient` since callers are not > sure whether they need to call close > > > Key: KAFKA-16589 > URL: https://issues.apache.org/jira/browse/KAFKA-16589 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > Sometimes we close the admin created by `createAdminClient`, and sometimes we > don't. That is not a true problem since the `ClusterInstance` will call > `close` when stopping. > However, that cause a lot of inconsistent code, and in fact it does not save > much time since creating a Admin is not a hard work. We can get > `bootstrapServers` and `bootstrapControllers` from `ClusterInstance` easily. > > {code:java} > // before > try (Admin admin = cluster.createAdminClient()) { } > // after v0 > try (Admin admin = Admin.create(Collections.singletonMap( > CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, > cluster.bootstrapServers( {} > {code} > Personally, the `after` version is not verbose, but we can have alternatives: > `Map clientConfigs`. > > {code:java} > // after v1 > try (Admin admin = Admin.create(cluster.clientConfigs())) {}{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16589) Consider removing `ClusterInstance#createAdminClient` since callers are not sure whether they need to call close
[ https://issues.apache.org/jira/browse/KAFKA-16589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839731#comment-17839731 ] Chia-Ping Tsai commented on KAFKA-16589: {quote} is this ticket about removing `createAdminClient` from `ClusterInstance` to encourage developer to create admin by `Admin.create` directly? Thanks. {quote} yep, I'd like to make the `creation` be more consistent. For example, why ClusterInstance don't have `createConsumer` which can manage all created consumers? However, it could cause the error if `ClusterInstance` do close consumers in teardown since the consumers don't leave group in running the test case. Furthermore, it will cause a lot of false warnings that the created admins don't get closed. > Consider removing `ClusterInstance#createAdminClient` since callers are not > sure whether they need to call close > > > Key: KAFKA-16589 > URL: https://issues.apache.org/jira/browse/KAFKA-16589 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > Sometimes we close the admin created by `createAdminClient`, and sometimes we > don't. That is not a true problem since the `ClusterInstance` will call > `close` when stopping. > However, that cause a lot of inconsistent code, and in fact it does not save > much time since creating a Admin is not a hard work. We can get > `bootstrapServers` and `bootstrapControllers` from `ClusterInstance` easily. > > {code:java} > // before > try (Admin admin = cluster.createAdminClient()) { } > // after v0 > try (Admin admin = Admin.create(Collections.singletonMap( > CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, > cluster.bootstrapServers( {} > {code} > Personally, the `after` version is not verbose, but we can have alternatives: > `Map clientConfigs`. > > {code:java} > // after v1 > try (Admin admin = Admin.create(cluster.clientConfigs())) {}{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16598) Mirgrate `ResetConsumerGroupOffsetTest` to new test infra
[ https://issues.apache.org/jira/browse/KAFKA-16598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839730#comment-17839730 ] 黃竣陽 commented on KAFKA-16598: - I am able to handle this issue. > Mirgrate `ResetConsumerGroupOffsetTest` to new test infra > - > > Key: KAFKA-16598 > URL: https://issues.apache.org/jira/browse/KAFKA-16598 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > as title. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Add junit properties to display parameterized test names [kafka]
chia7712 commented on PR #14983: URL: https://github.com/apache/kafka/pull/14983#issuecomment-2069776278 When testing 3.7 branch for #15776, I encountered following error: ``` java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidTopicException: Topic name is invalid: the length of 'appId_1713797371220_HighAvailabilityTaskAssignorIntegrationTestshouldScaleOutWithWarmupTasksAndPersistentStores_3__rackAwareStrategy_balance-storeHighAvailabilityTaskAssignorIntegrationTestshouldScaleOutWithWarmupTasksAndPersistentStores_3__rackAwareStrategy_balance-changelog' is longer than the max allowed length 249 at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) at org.apache.kafka.streams.integration.utils.KafkaEmbedded.createTopic(KafkaEmbedded.java:178) ... 5 more Caused by: org.apache.kafka.common.errors.InvalidTopicException: Topic name is invalid: the length of 'appId_1713797371220_HighAvailabilityTaskAssignorIntegrationTestshouldScaleOutWithWarmupTasksAndPersistentStores_3__rackAwareStrategy_balance-storeHighAvailabilityTaskAssignorIntegrationTestshouldScaleOutWithWarmupTasksAndPersistentStores_3__rackAwareStrategy_balance-changelog' is longer than the max allowed length 249 ``` Hence, I'd like to backport this PR to 3.7. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16589) Consider removing `ClusterInstance#createAdminClient` since callers are not sure whether they need to call close
[ https://issues.apache.org/jira/browse/KAFKA-16589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839724#comment-17839724 ] PoAn Yang edited comment on KAFKA-16589 at 4/22/24 2:56 PM: Hi [~chia7712], is this ticket about removing `createAdminClient` from `ClusterInstance` to encourage developer to create admin by `Admin.create` directly? Thanks. was (Author: JIRAUSER300229): Hi [~chia7712], is this ticket try to `createAdminClient` from `ClusterInstance` to encourage developer to create admin by `Admin.create` directly? Thanks. > Consider removing `ClusterInstance#createAdminClient` since callers are not > sure whether they need to call close > > > Key: KAFKA-16589 > URL: https://issues.apache.org/jira/browse/KAFKA-16589 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > Sometimes we close the admin created by `createAdminClient`, and sometimes we > don't. That is not a true problem since the `ClusterInstance` will call > `close` when stopping. > However, that cause a lot of inconsistent code, and in fact it does not save > much time since creating a Admin is not a hard work. We can get > `bootstrapServers` and `bootstrapControllers` from `ClusterInstance` easily. > > {code:java} > // before > try (Admin admin = cluster.createAdminClient()) { } > // after v0 > try (Admin admin = Admin.create(Collections.singletonMap( > CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, > cluster.bootstrapServers( {} > {code} > Personally, the `after` version is not verbose, but we can have alternatives: > `Map clientConfigs`. > > {code:java} > // after v1 > try (Admin admin = Admin.create(cluster.clientConfigs())) {}{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on PR #15679: URL: https://github.com/apache/kafka/pull/15679#issuecomment-2069788585 > @FrankYang0529 thanks for updated PR. two minor comments left. PTAL Hi @chia7712, I addressed last comments. Thanks for the review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]
dongnuo123 commented on code in PR #15721: URL: https://github.com/apache/kafka/pull/15721#discussion_r1574897141 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -10331,6 +10333,461 @@ public void testClassicGroupOnUnloadedCompletingRebalance() throws Exception { .setErrorCode(NOT_COORDINATOR.code()), pendingMemberSyncResult.syncFuture.get()); } +@Test +public void testLastClassicProtocolMemberLeavingConsumerGroup() { +String groupId = "group-id"; +String memberId1 = Uuid.randomUuid().toString(); +String memberId2 = Uuid.randomUuid().toString(); + +Uuid fooTopicId = Uuid.randomUuid(); +String fooTopicName = "foo"; +Uuid barTopicId = Uuid.randomUuid(); +String barTopicName = "bar"; +Uuid zarTopicId = Uuid.randomUuid(); +String zarTopicName = "zar"; + +MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + +List protocols = Collections.singletonList( +new ConsumerGroupMemberMetadataValue.ClassicProtocol() +.setName("range") + .setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription( +Arrays.asList(fooTopicName, barTopicName), +null, +Arrays.asList( +new TopicPartition(fooTopicName, 0), +new TopicPartition(fooTopicName, 1), +new TopicPartition(fooTopicName, 2), +new TopicPartition(barTopicName, 0), +new TopicPartition(barTopicName, 1) +) + +); + +ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) +.setState(MemberState.STABLE) +.setMemberEpoch(10) +.setPreviousMemberEpoch(9) +.setClientId("client") +.setClientHost("localhost/127.0.0.1") +.setSubscribedTopicNames(Arrays.asList("foo", "bar")) +.setServerAssignorName("range") +.setRebalanceTimeoutMs(45000) +.setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata() +.setSupportedProtocols(protocols)) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2), +mkTopicAssignment(barTopicId, 0, 1))) +.build(); +ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) +.setState(MemberState.STABLE) +.setMemberEpoch(10) +.setPreviousMemberEpoch(9) +.setClientId("client") +.setClientHost("localhost/127.0.0.1") +// Use zar only here to ensure that metadata needs to be recomputed. +.setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar")) +.setServerAssignorName("range") +.setRebalanceTimeoutMs(45000) +.setAssignedPartitions(mkAssignment( +mkTopicAssignment(fooTopicId, 3, 4, 5), +mkTopicAssignment(barTopicId, 2))) +.build(); + +// Consumer group with two members. +// Member 1 uses the classic protocol and member 2 uses the consumer protocol. +GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) +.withAssignors(Collections.singletonList(assignor)) +.withMetadataImage(new MetadataImageBuilder() +.addTopic(fooTopicId, fooTopicName, 6) +.addTopic(barTopicId, barTopicName, 3) +.addTopic(zarTopicId, zarTopicName, 1) +.addRacks() +.build()) +.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) +.withMember(member1) +.withMember(member2) +.withAssignment(memberId1, mkAssignment( +mkTopicAssignment(fooTopicId, 0, 1, 2), +mkTopicAssignment(barTopicId, 0, 1))) +.withAssignment(memberId2, mkAssignment( +mkTopicAssignment(fooTopicId, 3, 4, 5), +mkTopicAssignment(barTopicId, 2))) +.withAssignmentEpoch(10)) +.build(); + +context.commit(); +ConsumerGroup consumerGroup = context.groupMetadataManager.consumerGroup(groupId); + +// Member 2 leaves the consumer group, triggering the downgrade. +CoordinatorResult result = context.consumerGroupHeartbeat( +new ConsumerGroupHeartbeatRequestData() +.setGroupId(groupId) +.setMemberId(memberId2) +.setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH) +
[jira] [Created] (KAFKA-16598) Mirgrate `ResetConsumerGroupOffsetTest` to new test infra
Chia-Ping Tsai created KAFKA-16598: -- Summary: Mirgrate `ResetConsumerGroupOffsetTest` to new test infra Key: KAFKA-16598 URL: https://issues.apache.org/jira/browse/KAFKA-16598 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai as title. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16589) Consider removing `ClusterInstance#createAdminClient` since callers are not sure whether they need to call close
[ https://issues.apache.org/jira/browse/KAFKA-16589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839724#comment-17839724 ] PoAn Yang commented on KAFKA-16589: --- Hi [~chia7712], is this ticket try to `createAdminClient` from `ClusterInstance` to encourage developer to create admin by `Admin.create` directly? Thanks. > Consider removing `ClusterInstance#createAdminClient` since callers are not > sure whether they need to call close > > > Key: KAFKA-16589 > URL: https://issues.apache.org/jira/browse/KAFKA-16589 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > Sometimes we close the admin created by `createAdminClient`, and sometimes we > don't. That is not a true problem since the `ClusterInstance` will call > `close` when stopping. > However, that cause a lot of inconsistent code, and in fact it does not save > much time since creating a Admin is not a hard work. We can get > `bootstrapServers` and `bootstrapControllers` from `ClusterInstance` easily. > > {code:java} > // before > try (Admin admin = cluster.createAdminClient()) { } > // after v0 > try (Admin admin = Admin.create(Collections.singletonMap( > CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, > cluster.bootstrapServers( {} > {code} > Personally, the `after` version is not verbose, but we can have alternatives: > `Map clientConfigs`. > > {code:java} > // after v1 > try (Admin admin = Admin.create(cluster.clientConfigs())) {}{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16528: Client HB timing fix [kafka]
lianetm commented on PR #15698: URL: https://github.com/apache/kafka/pull/15698#issuecomment-2069725394 Hi @cadonna, thanks for the comments! - the unit test I added initially fails on [this](https://github.com/apache/kafka/blob/d242c444562fe4be41a2bc53d47ab2a6f523b955/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java#L276) assertion without the changes on this PR, I guess that's what you were looking for with your first comment? - agreed, we did not have test coverage to make sure that the `canSendRequest` and `nextHeartbeat` move along consistently. I added a func to validate them together, included it in the existing test that covers responses with errors, and added a new test to include it in the successful path too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org