Re: [PR] KAFKA-13152: Kip 770 buffer size fix [kafka]

2024-04-22 Thread via GitHub


vamossagar12 commented on PR #13283:
URL: https://github.com/apache/kafka/pull/13283#issuecomment-2071431998

   No worries and thanks for the proposed help! There are some things which I 
need to close out but I do hope to close it out. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16588: broker shutdown hangs when log.segment.delete.delay.ms is zero [kafka]

2024-04-22 Thread via GitHub


showuon commented on PR #15773:
URL: https://github.com/apache/kafka/pull/15773#issuecomment-2071375745

   > Yeah, I tried to use unit test. However, I didn't find a good way to check 
whether kafka-delete-logs task has run or not. I also tried to check whether 
scheduler can be stopped. However, the mock scheduler can still add new task 
during scheduler shutdown.
   
   Could we use `KafkaScheduler` instead?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-22 Thread via GitHub


rreddy-22 opened a new pull request, #15785:
URL: https://github.com/apache/kafka/pull/15785

   When all the subscriptions are equal, we iterate through every member and 
check equality for the topic lists - very expensive when there’s more members
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16164: Pre-Vote, modifying vote RPCs [part 1] [kafka]

2024-04-22 Thread via GitHub


github-actions[bot] commented on PR #15231:
URL: https://github.com/apache/kafka/pull/15231#issuecomment-2071344453

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-22 Thread via GitHub


showuon commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1575591635


##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
 assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
+  @Test
+  def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
+val partitionNum = 1
+
+// Alter replica dir before topic creation
+val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
+val partitionDirs1 = (0 until partitionNum).map(partition => new 
TopicPartition(topic, partition) -> logDir1).toMap
+val alterReplicaLogDirsResponse1 = 
sendAlterReplicaLogDirsRequest(partitionDirs1)
+
+// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all 
partitions
+val tp = new TopicPartition(topic, 0)
+assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
findErrorForPartition(alterReplicaLogDirsResponse1, tp))
+assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+
+val topicProperties = new Properties()
+topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
+topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1")
+topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024")
+
+createTopic(topic, partitionNum, 1, topicProperties)
+assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent)
+
+// send enough records to trigger log rolling
+(0 until 20).foreach { _ =>
+  TestUtils.generateAndProduceMessages(servers, topic, 10, 1)
+}
+TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new 
TopicPartition(topic, 0)).get.numberOfSegments > 1,
+  "timed out waiting for log segment to roll")
+
+// Wait for log segment retention. Override initialTaskDelayMs as 5 
seconds.
+// The first retention task is executed after 5 seconds, so waiting for 10 
seconds should be enough.

Review Comment:
   This comment is not correct now.



##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
 assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
+  @Test
+  def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
+val partitionNum = 1
+
+// Alter replica dir before topic creation
+val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
+val partitionDirs1 = (0 until partitionNum).map(partition => new 
TopicPartition(topic, partition) -> logDir1).toMap
+val alterReplicaLogDirsResponse1 = 
sendAlterReplicaLogDirsRequest(partitionDirs1)
+
+// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all 
partitions
+val tp = new TopicPartition(topic, 0)
+assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
findErrorForPartition(alterReplicaLogDirsResponse1, tp))
+assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+
+val topicProperties = new Properties()
+topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
+topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1")

Review Comment:
   I see. Make sense. It's for L158's assertion, right? Could you add a comment 
here to explain it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16603) Data loss when kafka connect sending data to Kafka

2024-04-22 Thread Anil Dasari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anil Dasari updated KAFKA-16603:

Description: 
We are experiencing a data loss when Kafka Source connector is failed to send 
data to Kafka topic and offset topic. 

Kafka cluster and Kafka connect details:
 # Kafka connect version i.e client : Confluent community version 7.3.1 i.e 
Kafka 3.3.1
 # Kafka version: 0.11.0 (server)
 # Cluster size : 3 brokers
 # Number of partitions in all topics = 3
 # Replication factor = 3
 # Min ISR set 2
 # Uses no transformations in Kafka connector
 # Use default error tolerance i.e None.

Our connector checkpoints the offsets info received in SourceTask#commitRecord 
and resume the data process from the persisted checkpoint.

The data loss is noticed when broker is unresponsive for few mins due to high 
load and kafka connector was restarted. Also, Kafka connector graceful shutdown 
failed.

Logs:

 
{code:java}
[Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] 
Discovered group coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
Apr 22, 2024 @ 15:56:16.152 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be 
attempted.
Apr 22, 2024 @ 15:56:16.153 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Requesting disconnect from 
last known coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
Apr 22, 2024 @ 15:56:16.514 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 0 disconnected.
Apr 22, 2024 @ 15:56:16.708 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Node 0 
disconnected.
Apr 22, 2024 @ 15:56:16.710 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 2147483647 disconnected.
Apr 22, 2024 @ 15:56:16.731 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
to cause: coordinator unavailable. isDisconnected: true. Rediscovery will be 
attempted.
Apr 22, 2024 @ 15:56:19.103 == Trying to sleep while stop == (** custom log **)
Apr 22, 2024 @ 15:56:19.755 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Broker coordinator was 
unreachable for 3000ms. Revoking previous assignment Assignment{error=0, 
leader='connect-1-8f41a1d2-6cc9-4956-9be3-1fbae9c6d305', 
leaderUrl='http://10.75.100.46:8083/', offset=4, 
connectorIds=[d094a5d7bbb046b99d62398cb84d648c], 
taskIds=[d094a5d7bbb046b99d62398cb84d648c-0], revokedConnectorIds=[], 
revokedTaskIds=[], delay=0} to avoid running tasks while not being a member the 
group
Apr 22, 2024 @ 15:56:19.866 Stopping connector d094a5d7bbb046b99d62398cb84d648c
Apr 22, 2024 @ 15:56:19.874 Stopping task d094a5d7bbb046b99d62398cb84d648c-0
Apr 22, 2024 @ 15:56:19.880 Scheduled shutdown for 
WorkerConnectorWorkerConnector{id=d094a5d7bbb046b99d62398cb84d648c}
Apr 22, 2024 @ 15:56:24.105 Connector 'd094a5d7bbb046b99d62398cb84d648c' failed 
to properly shut down, has become unresponsive, and may be consuming external 
resources. Correct the configuration for this connector or remove the 
connector. After fixing the connector, it may be necessary to restart this 
worker to release any consumed resources.
Apr 22, 2024 @ 15:56:24.110 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Closing the 
Kafka producer with timeoutMillis = 0 ms.
Apr 22, 2024 @ 15:56:24.110 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Proceeding to 
force close the producer since pending requests could not be completed within 
timeout 0 ms.
Apr 22, 2024 @ 15:56:24.112 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Beginning 
shutdown of Kafka producer I/O thread, sending remaining records.
Apr 22, 2024 @ 15:56:24.112 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Aborting 
incomplete batches due to forced shutdown
Apr 22, 2024 @ 15:56:24.113 
WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} 
Committing offsets
Apr 22, 2024 @ 15:56:24.113 
WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} Either 
no records were produced by the task since the last offset commit, or every 
record has been filtered out by a transformation or dropped due to 
transformation or conversion errors.
Apr 22, 2024 @ 15:56:24.146 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Finished stopping tasks in 
preparation for rebalance
Apr 22, 2024 @ 15:56:24.165 
WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} 
Finished commitO

[jira] [Updated] (KAFKA-16603) Data loss when kafka connect sending data to Kafka

2024-04-22 Thread Anil Dasari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anil Dasari updated KAFKA-16603:

Description: 
We are experiencing a data loss when Kafka Source connector is failed to send 
data to Kafka topic and offset topic. 

Kafka cluster and Kafka connect details:
 # Kafka connect version i.e client : Confluent community version 7.3.1 i.e 
Kafka 3.3.1
 # Kafka version: 0.11.0 (server)
 # Cluster size : 3 brokers
 # Number of partitions in all topics = 3
 # Replication factor = 3
 # Min ISR set 2
 # Uses no transformations in Kafka connector
 # Use default error tolerance i.e None.

Our connector checkpoints the offsets info received in SourceTask#commitRecord 
and resume the data process from the persisted checkpoint.

The data loss is noticed when broker is unresponsive for few mins due to high 
load and kafka connector was restarted. However, Kafka connector graceful 
shutdown failed.

Logs:

 
{code:java}
[Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] 
Discovered group coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
Apr 22, 2024 @ 15:56:16.152 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be 
attempted.
Apr 22, 2024 @ 15:56:16.153 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Requesting disconnect from 
last known coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
Apr 22, 2024 @ 15:56:16.514 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 0 disconnected.
Apr 22, 2024 @ 15:56:16.708 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Node 0 
disconnected.
Apr 22, 2024 @ 15:56:16.710 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 2147483647 disconnected.
Apr 22, 2024 @ 15:56:16.731 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
to cause: coordinator unavailable. isDisconnected: true. Rediscovery will be 
attempted.
Apr 22, 2024 @ 15:56:19.103 == Trying to sleep while stop == (** custom log **)
Apr 22, 2024 @ 15:56:19.755 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Broker coordinator was 
unreachable for 3000ms. Revoking previous assignment Assignment{error=0, 
leader='connect-1-8f41a1d2-6cc9-4956-9be3-1fbae9c6d305', 
leaderUrl='http://10.75.100.46:8083/', offset=4, 
connectorIds=[d094a5d7bbb046b99d62398cb84d648c], 
taskIds=[d094a5d7bbb046b99d62398cb84d648c-0], revokedConnectorIds=[], 
revokedTaskIds=[], delay=0} to avoid running tasks while not being a member the 
group
Apr 22, 2024 @ 15:56:19.866 Stopping connector d094a5d7bbb046b99d62398cb84d648c
Apr 22, 2024 @ 15:56:19.874 Stopping task d094a5d7bbb046b99d62398cb84d648c-0
Apr 22, 2024 @ 15:56:19.880 Scheduled shutdown for 
WorkerConnectorWorkerConnector{id=d094a5d7bbb046b99d62398cb84d648c}
Apr 22, 2024 @ 15:56:24.105 Connector 'd094a5d7bbb046b99d62398cb84d648c' failed 
to properly shut down, has become unresponsive, and may be consuming external 
resources. Correct the configuration for this connector or remove the 
connector. After fixing the connector, it may be necessary to restart this 
worker to release any consumed resources.
Apr 22, 2024 @ 15:56:24.110 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Closing the 
Kafka producer with timeoutMillis = 0 ms.
Apr 22, 2024 @ 15:56:24.110 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Proceeding to 
force close the producer since pending requests could not be completed within 
timeout 0 ms.
Apr 22, 2024 @ 15:56:24.112 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Beginning 
shutdown of Kafka producer I/O thread, sending remaining records.
Apr 22, 2024 @ 15:56:24.112 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Aborting 
incomplete batches due to forced shutdown
Apr 22, 2024 @ 15:56:24.113 
WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} 
Committing offsets
Apr 22, 2024 @ 15:56:24.113 
WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} Either 
no records were produced by the task since the last offset commit, or every 
record has been filtered out by a transformation or dropped due to 
transformation or conversion errors.
Apr 22, 2024 @ 15:56:24.146 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Finished stopping tasks in 
preparation for rebalance
Apr 22, 2024 @ 15:56:24.165 
WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} 
Finished comm

Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-22 Thread via GitHub


FrankYang0529 commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1575583127


##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
 assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
+  @Test
+  def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
+val partitionNum = 1
+
+// Alter replica dir before topic creation
+val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
+val partitionDirs1 = (0 until partitionNum).map(partition => new 
TopicPartition(topic, partition) -> logDir1).toMap
+val alterReplicaLogDirsResponse1 = 
sendAlterReplicaLogDirsRequest(partitionDirs1)
+
+// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all 
partitions
+val tp = new TopicPartition(topic, 0)
+assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
findErrorForPartition(alterReplicaLogDirsResponse1, tp))
+assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+
+val topicProperties = new Properties()
+topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
+topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1")

Review Comment:
   The default value is 1 minute. We want to trigger some files have `.deleted` 
suffix, but not be removed too fast, because we want to wait for dir movement 
happened.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-22 Thread via GitHub


FrankYang0529 commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1575583127


##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
 assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
+  @Test
+  def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
+val partitionNum = 1
+
+// Alter replica dir before topic creation
+val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
+val partitionDirs1 = (0 until partitionNum).map(partition => new 
TopicPartition(topic, partition) -> logDir1).toMap
+val alterReplicaLogDirsResponse1 = 
sendAlterReplicaLogDirsRequest(partitionDirs1)
+
+// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all 
partitions
+val tp = new TopicPartition(topic, 0)
+assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
findErrorForPartition(alterReplicaLogDirsResponse1, tp))
+assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+
+val topicProperties = new Properties()
+topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
+topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1")

Review Comment:
   The default value is 1 minute. We want to trigger some files have `.deleted` 
subfix, but not be removed too fast, because we want to wait for dir movement 
happened.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16603) Data loss when kafka connect sending data to Kafka

2024-04-22 Thread Anil Dasari (Jira)
Anil Dasari created KAFKA-16603:
---

 Summary: Data loss when kafka connect sending data to Kafka
 Key: KAFKA-16603
 URL: https://issues.apache.org/jira/browse/KAFKA-16603
 Project: Kafka
  Issue Type: Bug
  Components: clients, producer 
Affects Versions: 3.3.1
Reporter: Anil Dasari


We are experiencing a data loss when Kafka Source connector is failed to send 
data to Kafka topic and offset topic. 

Kafka cluster and Kafka connect details:
 # Kafka version : Confluent community version 7.3.1 i.e Kafka 3.3.1
 # Cluster size : 3 brokers
 # Number of partitions in all topics = 3
 # Replication factor = 3
 # Min ISR set 2
 # Uses no transformations in Kafka connector
 # Use default error tolerance i.e None.

Our connector checkpoints the offsets info received in SourceTask#commitRecord 
and resume the data process from the persisted checkpoint.

The data loss is noticed when broker is unresponsive for few mins due to high 
load and kafka connector was restarted. However, Kafka connector graceful 
shutdown failed.

Logs:

 
{code:java}
[Worker clientId=connect-1, groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] 
Discovered group coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
Apr 22, 2024 @ 15:56:16.152 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be 
attempted.
Apr 22, 2024 @ 15:56:16.153 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Requesting disconnect from 
last known coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
Apr 22, 2024 @ 15:56:16.514 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 0 disconnected.
Apr 22, 2024 @ 15:56:16.708 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Node 0 
disconnected.
Apr 22, 2024 @ 15:56:16.710 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 2147483647 disconnected.
Apr 22, 2024 @ 15:56:16.731 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
to cause: coordinator unavailable. isDisconnected: true. Rediscovery will be 
attempted.
Apr 22, 2024 @ 15:56:19.103 == Trying to sleep while stop == (** custom log **)
Apr 22, 2024 @ 15:56:19.755 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Broker coordinator was 
unreachable for 3000ms. Revoking previous assignment Assignment{error=0, 
leader='connect-1-8f41a1d2-6cc9-4956-9be3-1fbae9c6d305', 
leaderUrl='http://10.75.100.46:8083/', offset=4, 
connectorIds=[d094a5d7bbb046b99d62398cb84d648c], 
taskIds=[d094a5d7bbb046b99d62398cb84d648c-0], revokedConnectorIds=[], 
revokedTaskIds=[], delay=0} to avoid running tasks while not being a member the 
group
Apr 22, 2024 @ 15:56:19.866 Stopping connector d094a5d7bbb046b99d62398cb84d648c
Apr 22, 2024 @ 15:56:19.874 Stopping task d094a5d7bbb046b99d62398cb84d648c-0
Apr 22, 2024 @ 15:56:19.880 Scheduled shutdown for 
WorkerConnectorWorkerConnector{id=d094a5d7bbb046b99d62398cb84d648c}
Apr 22, 2024 @ 15:56:24.105 Connector 'd094a5d7bbb046b99d62398cb84d648c' failed 
to properly shut down, has become unresponsive, and may be consuming external 
resources. Correct the configuration for this connector or remove the 
connector. After fixing the connector, it may be necessary to restart this 
worker to release any consumed resources.
Apr 22, 2024 @ 15:56:24.110 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Closing the 
Kafka producer with timeoutMillis = 0 ms.
Apr 22, 2024 @ 15:56:24.110 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Proceeding to 
force close the producer since pending requests could not be completed within 
timeout 0 ms.
Apr 22, 2024 @ 15:56:24.112 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Beginning 
shutdown of Kafka producer I/O thread, sending remaining records.
Apr 22, 2024 @ 15:56:24.112 [Producer 
clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Aborting 
incomplete batches due to forced shutdown
Apr 22, 2024 @ 15:56:24.113 
WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} 
Committing offsets
Apr 22, 2024 @ 15:56:24.113 
WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} Either 
no records were produced by the task since the last offset commit, or every 
record has been filtered out by a transformation or dropped due to 
transformation or conversion errors.
Apr 22, 2024 @ 15:56:24.146 [Worker clientId=connect-1, 
groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Finished stopping tasks in 
p

Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]

2024-04-22 Thread via GitHub


showuon commented on code in PR #15748:
URL: https://github.com/apache/kafka/pull/15748#discussion_r1575561675


##
core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala:
##
@@ -121,9 +121,9 @@ class LogCleanerParameterizedIntegrationTest extends 
AbstractLogCleanerIntegrati
 // Set the last modified time to an old value to force deletion of old 
segments
 val endOffset = log.logEndOffset
 log.logSegments.foreach(_.lastModified = time.milliseconds - (2 * 
retentionMs))
-TestUtils.waitUntilTrue(() => log.logStartOffset == endOffset,
+TestUtils.waitUntilTrue(() => 1 == log.numberOfSegments,
   "Timed out waiting for deletion of old segments")
-assertEquals(1, log.numberOfSegments)
+assertEquals(log.logStartOffset, endOffset)

Review Comment:
   I'm re-triggering the CI since there are 2 jobs not finished. Meanwhile, 
could you help explain why we need these 2 changes in v3.6, but not in 
v3.7/trunk?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Reduce the time taken to execute the TieredStorage tests. [kafka]

2024-04-22 Thread via GitHub


showuon merged PR #15780:
URL: https://github.com/apache/kafka/pull/15780


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-22 Thread via GitHub


showuon commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1575558298


##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
 assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
+  @Test
+  def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
+val partitionNum = 1
+
+// Alter replica dir before topic creation
+val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
+val partitionDirs1 = (0 until partitionNum).map(partition => new 
TopicPartition(topic, partition) -> logDir1).toMap
+val alterReplicaLogDirsResponse1 = 
sendAlterReplicaLogDirsRequest(partitionDirs1)
+
+// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all 
partitions
+val tp = new TopicPartition(topic, 0)
+assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
findErrorForPartition(alterReplicaLogDirsResponse1, tp))
+assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+
+val topicProperties = new Properties()
+topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
+topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1")

Review Comment:
   Why 10 seconds here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Reduce the time taken to execute the TieredStorage tests. [kafka]

2024-04-22 Thread via GitHub


kamalcph commented on PR #15780:
URL: https://github.com/apache/kafka/pull/15780#issuecomment-2071280063

   Test failures are unrelated. 3 remote storage tests were failed, those are 
existing flaky tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]

2024-04-22 Thread via GitHub


kamalcph commented on PR #15748:
URL: https://github.com/apache/kafka/pull/15748#issuecomment-2071274526

   @showuon 
   
   Test failures are fixed! Please take another look. Let me know if we have to 
fix the tests in 3.7 and trunk (but they weren't flaky).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor

2024-04-22 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839910#comment-17839910
 ] 

Matthias J. Sax commented on KAFKA-16585:
-

Well, the use-case make sense, but the question is, how can the runtime ensure 
that the key is not changed? The idea of `FixedKeyProcessor` is to ensure that 
the key is not changed, but when we allow to set a key, you could set anything 
and the runtime cannot ensure that the key is "correct". It would be up the 
user-code to do the right thing... what it unclear from your use-case 
description is, why can't you use a regular `Processor`?

> No way to forward message from punctuation method in the FixedKeyProcessor
> --
>
> Key: KAFKA-16585
> URL: https://issues.apache.org/jira/browse/KAFKA-16585
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> The FixedKeyProcessorContext can forward only FixedKeyRecord. This class 
> doesn't have a public constructor and can be created based on existing 
> records. But such record usually is absent in the punctuation method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-04-22 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839907#comment-17839907
 ] 

Matthias J. Sax commented on KAFKA-16567:
-

Thanks Bruno – makes sense to me – should we move K16336 out of the 4.0 ticket, 
and into the 5.0 one? If we enable state updater in 3.8, it need to honor our 
deprecation period and can only remove the metrics in 5.0, right?

For this ticket, it'll link it to the KIP and update it accordingly. It's not a 
blocker for 4.0.

> Add New Stream Metrics based on KIP-869
> ---
>
> Key: KAFKA-16567
> URL: https://issues.apache.org/jira/browse/KAFKA-16567
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Walter Hernandez
>Priority: Blocker
>  Labels: kip
> Fix For: 4.0.0
>
>
> Add the following metrics to the state updater:
>  * restoring-active-tasks: count
>  * restoring-standby-tasks: count
>  * paused-active-tasks: count
>  * paused-standby-tasks: count
>  * idle-ratio: percentage
>  * restore-ratio: percentage
>  * checkpoint-ratio: percentage
>  * restore-records-total: count
>  * restore-records-rate: rate
>  * restore-call-rate: rate



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-04-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16567:

Priority: Major  (was: Blocker)

> Add New Stream Metrics based on KIP-869
> ---
>
> Key: KAFKA-16567
> URL: https://issues.apache.org/jira/browse/KAFKA-16567
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Walter Hernandez
>Priority: Major
>  Labels: kip
>
> Add the following metrics to the state updater:
>  * restoring-active-tasks: count
>  * restoring-standby-tasks: count
>  * paused-active-tasks: count
>  * paused-standby-tasks: count
>  * idle-ratio: percentage
>  * restore-ratio: percentage
>  * checkpoint-ratio: percentage
>  * restore-records-total: count
>  * restore-records-rate: rate
>  * restore-call-rate: rate



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-04-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16567:

Fix Version/s: (was: 4.0.0)

> Add New Stream Metrics based on KIP-869
> ---
>
> Key: KAFKA-16567
> URL: https://issues.apache.org/jira/browse/KAFKA-16567
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Walter Hernandez
>Priority: Blocker
>  Labels: kip
>
> Add the following metrics to the state updater:
>  * restoring-active-tasks: count
>  * restoring-standby-tasks: count
>  * paused-active-tasks: count
>  * paused-standby-tasks: count
>  * idle-ratio: percentage
>  * restore-ratio: percentage
>  * checkpoint-ratio: percentage
>  * restore-records-total: count
>  * restore-records-rate: rate
>  * restore-call-rate: rate



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-22 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839905#comment-17839905
 ] 

Matthias J. Sax commented on KAFKA-16514:
-

Well, in general we can, but the internal flag / config we set on the consumer 
is immutable – configs are in general immutable, so it's not something we can 
just change. Thus, to get some flexibility into `close()` we need a KIP to 
change the consumer.

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16466) QuorumController is swallowing some exception messages

2024-04-22 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839902#comment-17839902
 ] 

Igor Soarez commented on KAFKA-16466:
-

Cherry-picked to 3.7

> QuorumController is swallowing some exception messages
> --
>
> Key: KAFKA-16466
> URL: https://issues.apache.org/jira/browse/KAFKA-16466
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 3.7.0
>Reporter: David Arthur
>Assignee: Ilya Zakharov
>Priority: Major
>  Labels: good-first-issue
> Fix For: 3.8.0, 3.7.1
>
>
> In some cases in QuorumController, we throw exceptions from the control 
> manager methods. Unless these are explicitly caught and handled, they will 
> eventually bubble up to the ControllerReadEvent/ControllerWriteEvent an hit 
> the generic error handler.
> In the generic error handler of QuorumController, we examine the exception to 
> determine if it is a fault or not. In the case where it is not a fault, we 
> log the error like:
> {code:java}
>  log.info("{}: {}", name, failureMessage);
> {code}
> which results in messages like
> {code:java}
> [2024-04-02 16:08:38,078] INFO [QuorumController id=3000] registerBroker: 
> event failed with UnsupportedVersionException in 167 microseconds. 
> (org.apache.kafka.controller.QuorumController:544)
> {code}
> In this case, the exception actually has more details in its own message
> {code:java}
> Unable to register because the broker does not support version 8 of 
> metadata.version. It wants a version between 20 and 20, inclusive.
> {code}
> We should include the exception's message in the log output for non-fault 
> errors as it includes very useful debugging info.
> This was found while writing an integration test for KRaft migration where 
> the brokers and controllers have a mismatched MetadataVersion.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16466: add details of an internal exception to the failure message [kafka]

2024-04-22 Thread via GitHub


soarez merged PR #15701:
URL: https://github.com/apache/kafka/pull/15701


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16466: add details of an internal exception to the failure message [kafka]

2024-04-22 Thread via GitHub


soarez commented on PR #15701:
URL: https://github.com/apache/kafka/pull/15701#issuecomment-2071198673

   Failing tests are unrelated, except for 
`org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord()`
 ([KAFKA-16602](https://issues.apache.org/jira/browse/KAFKA-16602)) which 
didn't reproduce locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16602) Flaky test – org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord()

2024-04-22 Thread Igor Soarez (Jira)
Igor Soarez created KAFKA-16602:
---

 Summary: Flaky test – 
org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord()
 Key: KAFKA-16602
 URL: https://issues.apache.org/jira/browse/KAFKA-16602
 Project: Kafka
  Issue Type: Test
  Components: controller
Reporter: Igor Soarez


 
org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord()
 failed with:

 
h4. Error
{code:java}
org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: 
exception while renouncing leadership: Attempt to resign from epoch 1 which is 
larger than the current epoch 0{code}
h4. Stacktrace
{code:java}
org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: 
exception while renouncing leadership: Attempt to resign from epoch 1 which is 
larger than the current epoch 0  at 
app//org.apache.kafka.metalog.LocalLogManager.resign(LocalLogManager.java:808)  
at 
app//org.apache.kafka.controller.QuorumController.renounce(QuorumController.java:1270)
  at 
app//org.apache.kafka.controller.QuorumController.handleEventException(QuorumController.java:547)
  at 
app//org.apache.kafka.controller.QuorumController.access$800(QuorumController.java:179)
  at 
app//org.apache.kafka.controller.QuorumController$ControllerWriteEvent.complete(QuorumController.java:881)
  at 
app//org.apache.kafka.controller.QuorumController$ControllerWriteEvent.handleException(QuorumController.java:871)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventContext.completeWithException(KafkaEventQueue.java:149)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:138)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182)
  at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829) Caused by: 
java.lang.IllegalArgumentException: Attempt to resign from epoch 1 which is 
larger than the current epoch 0{code}
 

Source: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15701/3/tests/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16601) Flaky test – org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest.testClosingQuorumControllerClosesMetrics()

2024-04-22 Thread Igor Soarez (Jira)
Igor Soarez created KAFKA-16601:
---

 Summary: Flaky test – 
org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest.testClosingQuorumControllerClosesMetrics()
 Key: KAFKA-16601
 URL: https://issues.apache.org/jira/browse/KAFKA-16601
 Project: Kafka
  Issue Type: Test
  Components: controller
Reporter: Igor Soarez


 
org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest.testClosingQuorumControllerClosesMetrics()
 failed with:
h4. Error
{code:java}
org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: 
exception while renouncing leadership: Attempt to resign from epoch 1 which is 
larger than the current epoch 0{code}
h4. Stacktrace
{code:java}
org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: 
exception while renouncing leadership: Attempt to resign from epoch 1 which is 
larger than the current epoch 0  at 
app//org.apache.kafka.metalog.LocalLogManager.resign(LocalLogManager.java:808)  
at 
app//org.apache.kafka.controller.QuorumController.renounce(QuorumController.java:1270)
  at 
app//org.apache.kafka.controller.QuorumController.handleEventException(QuorumController.java:547)
  at 
app//org.apache.kafka.controller.QuorumController.access$800(QuorumController.java:179)
  at 
app//org.apache.kafka.controller.QuorumController$ControllerWriteEvent.complete(QuorumController.java:881)
  at 
app//org.apache.kafka.controller.QuorumController$ControllerWriteEvent.handleException(QuorumController.java:871)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventContext.completeWithException(KafkaEventQueue.java:149)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:138)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182)
  at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829) Caused by: 
java.lang.IllegalArgumentException: Attempt to resign from epoch 1 which is 
larger than the current epoch 0{code}
 

Source:

https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15701/3/tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16577: New consumer fails with stop within allotted timeout in consumer_test.py system test [kafka]

2024-04-22 Thread via GitHub


kirktrue opened a new pull request, #15784:
URL: https://github.com/apache/kafka/pull/15784

   WIP
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]

2024-04-22 Thread via GitHub


kirktrue commented on PR #15723:
URL: https://github.com/apache/kafka/pull/15723#issuecomment-2071054599

   @lucasbru, @lianetm, @philipnee—I have re-introduced the `lastSentMs` 
instance variable, not as a driver for logic, but for informational purposes in 
the logs.
   
   This is ready for re-review. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16600) Periodically receive "Failed to transition to PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN" during streams close

2024-04-22 Thread Alex Leung (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839871#comment-17839871
 ] 

Alex Leung commented on KAFKA-16600:


Hi [~ableegoldman] – added a log with more context!

> Periodically receive "Failed to transition to PENDING_SHUTDOWN, current state 
> is PENDING_SHUTDOWN" during streams close
> ---
>
> Key: KAFKA-16600
> URL: https://issues.apache.org/jira/browse/KAFKA-16600
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Alex Leung
>Priority: Minor
> Attachments: kafka-16600.log
>
>
> From time to time, we observe the following ERROR message during streams 
> close:
> {code:java}
> 2024-04-13 07:40:16,222 INFO o.a.k.s.KafkaStreams [32] stream-client 
> [testapp-83aa0e09-a6ef-45e4-9393-69dfba4928bf] State transition from RUNNING 
> to PENDING_SHUTDOWN
> 2024-04-13 07:40:16,222 ERROR o.a.k.s.KafkaStreams [55] stream-client 
> [testapp-83aa0e09-a6ef-45e4-9393-69dfba4928bf] Failed to transition to 
> PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN
> {code}
> These ERRORs started showing up when we moved from 2.7.2 to 3.3.2 and we have 
> not changed any code related to streams shutdown.
> When the problem does not occur (most of the time), it looks like the 
> following:
> {code:java}
> 2024-04-13 07:40:11,333 INFO o.a.k.s.KafkaStreams [55] stream-client 
> [testapp-e2212b31-e9c2-4c75-92f6-1e557e27bf66] State transition from RUNNING 
> to PENDING_SHUTDOWN
> 2024-04-13 07:40:11,341 INFO o.a.k.s.KafkaStreams [32] stream-client 
> [testapp-e2212b31-e9c2-4c75-92f6-1e557e27bf66] Streams client is in 
> PENDING_SHUTDOWN, all resources are being closed and the client will be 
> stopped. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16600) Periodically receive "Failed to transition to PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN" during streams close

2024-04-22 Thread Alex Leung (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex Leung updated KAFKA-16600:
---
Attachment: kafka-16600.log

> Periodically receive "Failed to transition to PENDING_SHUTDOWN, current state 
> is PENDING_SHUTDOWN" during streams close
> ---
>
> Key: KAFKA-16600
> URL: https://issues.apache.org/jira/browse/KAFKA-16600
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Alex Leung
>Priority: Minor
> Attachments: kafka-16600.log
>
>
> From time to time, we observe the following ERROR message during streams 
> close:
> {code:java}
> 2024-04-13 07:40:16,222 INFO o.a.k.s.KafkaStreams [32] stream-client 
> [testapp-83aa0e09-a6ef-45e4-9393-69dfba4928bf] State transition from RUNNING 
> to PENDING_SHUTDOWN
> 2024-04-13 07:40:16,222 ERROR o.a.k.s.KafkaStreams [55] stream-client 
> [testapp-83aa0e09-a6ef-45e4-9393-69dfba4928bf] Failed to transition to 
> PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN
> {code}
> These ERRORs started showing up when we moved from 2.7.2 to 3.3.2 and we have 
> not changed any code related to streams shutdown.
> When the problem does not occur (most of the time), it looks like the 
> following:
> {code:java}
> 2024-04-13 07:40:11,333 INFO o.a.k.s.KafkaStreams [55] stream-client 
> [testapp-e2212b31-e9c2-4c75-92f6-1e557e27bf66] State transition from RUNNING 
> to PENDING_SHUTDOWN
> 2024-04-13 07:40:11,341 INFO o.a.k.s.KafkaStreams [32] stream-client 
> [testapp-e2212b31-e9c2-4c75-92f6-1e557e27bf66] Streams client is in 
> PENDING_SHUTDOWN, all resources are being closed and the client will be 
> stopped. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: Fix logging of KafkaStreams close [kafka]

2024-04-22 Thread via GitHub


appchemist opened a new pull request, #15783:
URL: https://github.com/apache/kafka/pull/15783

   Remove a unnecessary "}" in logging of KafkaStreams close
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16565) IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned

2024-04-22 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16565:
--
   Reviewer: Lucas Brutschy
Description: 
In {{verifiable_consumer.py}}, the 
{{IncrementalAssignmentConsumerEventHandler}} contains this logic:

{code:python}
def handle_partitions_revoked(self, event):
self.revoked_count += 1
self.state = ConsumerState.Rebalancing
self.position = {}
for topic_partition in event["partitions"]:
topic = topic_partition["topic"]
partition = topic_partition["partition"]
self.assignment.remove(TopicPartition(topic, partition))
{code}

If the {{self.assignment.remove()}} call is passed a {{TopicPartition}} that 
isn't in the list, a generic Python list error is thrown. For now, we should 
first check that the {{TopicPartition}} is in the list with an {{assert}} that 
provides better information .

  was:
In {{{}verifiable_consumer.py{}}}, the Incremental

 
{code:java}
def handle_partitions_revoked(self, event):
self.revoked_count += 1
self.state = ConsumerState.Rebalancing
self.position = {}
for topic_partition in event["partitions"]:
topic = topic_partition["topic"]
partition = topic_partition["partition"]
self.assignment.remove(TopicPartition(topic, partition))
 {code}
If the {{self.assignment.remove()}} call is passed a {{TopicPartition}} that 
isn't in the list, an error is thrown. For now, we should first check that the 
{{TopicPartition}} is in the list, and if not, log a warning or something.


> IncrementalAssignmentConsumerEventHandler throws error when attempting to 
> remove a partition that isn't assigned
> 
>
> Key: KAFKA-16565
> URL: https://issues.apache.org/jira/browse/KAFKA-16565
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848-client-support, 
> system-tests
> Fix For: 3.8.0
>
>
> In {{verifiable_consumer.py}}, the 
> {{IncrementalAssignmentConsumerEventHandler}} contains this logic:
> {code:python}
> def handle_partitions_revoked(self, event):
> self.revoked_count += 1
> self.state = ConsumerState.Rebalancing
> self.position = {}
> for topic_partition in event["partitions"]:
> topic = topic_partition["topic"]
> partition = topic_partition["partition"]
> self.assignment.remove(TopicPartition(topic, partition))
> {code}
> If the {{self.assignment.remove()}} call is passed a {{TopicPartition}} that 
> isn't in the list, a generic Python list error is thrown. For now, we should 
> first check that the {{TopicPartition}} is in the list with an {{assert}} 
> that provides better information .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16461) New consumer fails to consume records in security_test.py system test

2024-04-22 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16461:
--
Reviewer: Lucas Brutschy

> New consumer fails to consume records in security_test.py system test
> -
>
> Key: KAFKA-16461
> URL: https://issues.apache.org/jira/browse/KAFKA-16461
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> The {{security_test.py}} system test fails with the following error:
> {quote}
> * Consumer failed to consume up to offsets
> {quote}
> Affected test:
> * {{test_client_ssl_endpoint_validation_failure}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16565: IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned [kafka]

2024-04-22 Thread via GitHub


kirktrue commented on PR #15737:
URL: https://github.com/apache/kafka/pull/15737#issuecomment-2070963749

   @lucasbru—Can you review this change to the consumer system test harness? 
Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16565: IncrementalAssignmentConsumerEventHandler throws error when attempting to remove a partition that isn't assigned [kafka]

2024-04-22 Thread via GitHub


kirktrue commented on code in PR #15737:
URL: https://github.com/apache/kafka/pull/15737#discussion_r1575365134


##
tests/kafkatest/services/verifiable_consumer.py:
##
@@ -140,22 +150,32 @@ class 
IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler):
 def __init__(self, node, verify_offsets, idx):
 super().__init__(node, verify_offsets, idx)
 
-def handle_partitions_revoked(self, event):
+def handle_partitions_revoked(self, event, node, logger):
 self.revoked_count += 1
 self.state = ConsumerState.Rebalancing
 self.position = {}
+revoked = []
+
 for topic_partition in event["partitions"]:
-topic = topic_partition["topic"]
-partition = topic_partition["partition"]
-self.assignment.remove(TopicPartition(topic, partition))
+tp = _create_partition_from_dict(topic_partition)
 
-def handle_partitions_assigned(self, event):
+if tp in self.assignment:
+self.assignment.remove(tp)
+revoked.append(tp)
+else:
+logger.warn("Could not remove topic partition %s from 
assignment as it was not previously assigned to %s" % (tp, 
node.account.hostname))

Review Comment:
   @lianetm—I changed the logging to an `assert` that provides useful 
information for troubleshooting:
   
   ```python
   tp = _create_partition_from_dict(topic_partition)
   assert tp in self.assignment, \
   "Topic partition %s cannot be revoked from %s as it was not previously 
assigned to that consumer" % \
   (tp, node.account.hostname)
   self.assignment.remove(tp)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16600) Periodically receive "Failed to transition to PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN" during streams close

2024-04-22 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839850#comment-17839850
 ] 

A. Sophie Blee-Goldman commented on KAFKA-16600:


Thanks [~aleung181] – definitely sounds like a bug. Hard to diagnose without 
more context though – can you share a bit more of the logs? A minute or so 
leading up to the error message should hopefully be enough 

> Periodically receive "Failed to transition to PENDING_SHUTDOWN, current state 
> is PENDING_SHUTDOWN" during streams close
> ---
>
> Key: KAFKA-16600
> URL: https://issues.apache.org/jira/browse/KAFKA-16600
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Alex Leung
>Priority: Minor
>
> From time to time, we observe the following ERROR message during streams 
> close:
> {code:java}
> 2024-04-13 07:40:16,222 INFO o.a.k.s.KafkaStreams [32] stream-client 
> [testapp-83aa0e09-a6ef-45e4-9393-69dfba4928bf] State transition from RUNNING 
> to PENDING_SHUTDOWN
> 2024-04-13 07:40:16,222 ERROR o.a.k.s.KafkaStreams [55] stream-client 
> [testapp-83aa0e09-a6ef-45e4-9393-69dfba4928bf] Failed to transition to 
> PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN
> {code}
> These ERRORs started showing up when we moved from 2.7.2 to 3.3.2 and we have 
> not changed any code related to streams shutdown.
> When the problem does not occur (most of the time), it looks like the 
> following:
> {code:java}
> 2024-04-13 07:40:11,333 INFO o.a.k.s.KafkaStreams [55] stream-client 
> [testapp-e2212b31-e9c2-4c75-92f6-1e557e27bf66] State transition from RUNNING 
> to PENDING_SHUTDOWN
> 2024-04-13 07:40:11,341 INFO o.a.k.s.KafkaStreams [32] stream-client 
> [testapp-e2212b31-e9c2-4c75-92f6-1e557e27bf66] Streams client is in 
> PENDING_SHUTDOWN, all resources are being closed and the client will be 
> stopped. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-22 Thread via GitHub


lianetm commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1575354663


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,448 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+@ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+})
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
+private static final String TOPIC = "foo";
+private static final String GROUP = "test.group";
+
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
+
+@ClusterTest
+public void testDeleteWithTopicOption() {
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"};
 assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteCmdNonExistingGroup() {
 String missingGroup = "missing.group";
-
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", missingGroup};
 ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
 String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
 assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was 
not detected while deleting consumer group");
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteNonExistingGroup() {
 String missingGroup = "missing.group";
-
  

Re: [PR] KAFKA-16461: New consumer fails to consume records in security_test.py system test [kafka]

2024-04-22 Thread via GitHub


kirktrue commented on PR #15746:
URL: https://github.com/apache/kafka/pull/15746#issuecomment-2070928518

   @lucasbru—Can you review? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16103) AsyncKafkaConsumer: Always await async commit callbacks in commitSync and close

2024-04-22 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16103:
--
Summary: AsyncKafkaConsumer: Always await async commit callbacks in 
commitSync and close  (was: Review client logic for triggering offset commit 
callbacks)

> AsyncKafkaConsumer: Always await async commit callbacks in commitSync and 
> close
> ---
>
> Key: KAFKA-16103
> URL: https://issues.apache.org/jira/browse/KAFKA-16103
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lucas Brutschy
>Priority: Critical
>  Labels: kip-848-client-support, offset
> Fix For: 3.8.0
>
>
> Review logic for triggering commit callbacks, ensuring that all callbacks are 
> triggered before returning from commitSync



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-22 Thread via GitHub


lianetm commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1575308633


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,448 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+@ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+})
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
+private static final String TOPIC = "foo";
+private static final String GROUP = "test.group";
+
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
+
+@ClusterTest
+public void testDeleteWithTopicOption() {
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"};
 assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteCmdNonExistingGroup() {
 String missingGroup = "missing.group";
-
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", missingGroup};
 ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
 String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
 assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was 
not detected while deleting consumer group");
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteNonExistingGroup() {
 String missingGroup = "missing.group";
-
  

Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-22 Thread via GitHub


lianetm commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1575308633


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,448 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+@ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+})
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
+private static final String TOPIC = "foo";
+private static final String GROUP = "test.group";
+
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
+
+@ClusterTest
+public void testDeleteWithTopicOption() {
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"};
 assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteCmdNonExistingGroup() {
 String missingGroup = "missing.group";
-
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", missingGroup};
 ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
 String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
 assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was 
not detected while deleting consumer group");
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteNonExistingGroup() {
 String missingGroup = "missing.group";
-
  

Re: [PR] MINOR: Modified System.getProperty("line.separator") to java.lang.System.lineSeparator() [kafka]

2024-04-22 Thread via GitHub


TaiJuWu commented on PR #15782:
URL: https://github.com/apache/kafka/pull/15782#issuecomment-2070867568

   > @TaiJuWu thanks for your contribution. Could you fix 
https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java#L776
 also?
   
   Sure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Modified System.getProperty("line.separator") to java.lang.System.lineSeparator() [kafka]

2024-04-22 Thread via GitHub


chia7712 commented on PR #15782:
URL: https://github.com/apache/kafka/pull/15782#issuecomment-2070865680

   @TaiJuWu thanks for your contribution. Could you fix 
https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java#L776
 also?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-22 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1575290259


##
core/src/test/java/kafka/testkit/BrokerNode.java:
##
@@ -66,11 +74,27 @@ public Builder setNumLogDirectories(int numLogDirectories) {
 return this;
 }
 
-public BrokerNode build(
-String baseDirectory,
-Uuid clusterId,
-boolean combined
-) {
+public Builder setClusterId(Uuid clusterId) {
+this.clusterId = clusterId;
+return this;
+}
+
+public Builder setBaseDirectory(String baseDirectory) {
+this.baseDirectory = baseDirectory;
+return this;
+}
+
+public Builder setCombined(boolean combined) {
+this.combined = combined;
+return this;
+}
+
+public Builder setPropertyOverrides(Map 
propertyOverrides) {
+this.propertyOverrides = 
Collections.unmodifiableMap(propertyOverrides);

Review Comment:
   Maybe we need to do a copy in order to avoid changes produced from outer.



##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -97,14 +96,31 @@ public Builder setBrokerNodes(int numBrokerNodes, int 
disksPerBroker) {
 if (!brokerNodeBuilders.isEmpty()) {
 nextId = brokerNodeBuilders.lastKey() + 1;
 }
-BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder()
+BrokerNode.Builder brokerNodeBuilder = BrokerNode.builder()
 .setId(nextId)
 .setNumLogDirectories(disksPerBroker);
 brokerNodeBuilders.put(nextId, brokerNodeBuilder);
 }
 return this;
 }
 
+/**
+ * Set per broker properties overrides, this setter must be invoked 
after setBrokerNodes which
+ * setup broker id and broker builder.
+ * @param perBrokerPropertiesOverrides properties to override in each 
broker
+ * @return Builder
+ */
+public Builder setPerBrokerPropertiesOverrides(Map> perBrokerPropertiesOverrides) {
+perBrokerPropertiesOverrides.forEach((brokerId, properties) -> {
+if (!brokerNodeBuilders.containsKey(brokerId)) {
+throw new RuntimeException("Broker id " + brokerId + " 
does not exist");
+}
+Map propertiesOverride = new 
HashMap<>(properties);

Review Comment:
   the deep copy should be addressed by `setPropertyOverrides`



##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -97,14 +96,32 @@ public Builder setBrokerNodes(int numBrokerNodes, int 
disksPerBroker) {
 if (!brokerNodeBuilders.isEmpty()) {
 nextId = brokerNodeBuilders.lastKey() + 1;
 }
-BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder()
+BrokerNode.Builder brokerNodeBuilder = BrokerNode.builder()
 .setId(nextId)
 .setNumLogDirectories(disksPerBroker);
 brokerNodeBuilders.put(nextId, brokerNodeBuilder);
 }
 return this;
 }
 
+/**
+ * Set per broker properties overrides, this setter must be invoked 
after setBrokerNodes which

Review Comment:
   It seems `TestKitNodes` need to be refactor also. In short, it should 
verify/apply all settings when building.



##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -126,8 +138,12 @@ public MetadataVersion metadataVersion() {
 return metadataVersion;
 }
 
-public Properties brokerServerProperties(int brokerId) {
-return perBrokerOverrideProperties.computeIfAbsent(brokerId, __ -> new 
Properties());
+public Map brokerServerProperties(int brokerId) {

Review Comment:
   This helper is used by `ZkClusterInvocationContext` only. Maybe we can 
remove this one to simplify `ClusterConfig`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Modified System.getProperty("line.separator") to java.lang.System.lineSeparator() [kafka]

2024-04-22 Thread via GitHub


TaiJuWu opened a new pull request, #15782:
URL: https://github.com/apache/kafka/pull/15782

   As title
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-22 Thread via GitHub


lianetm commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1575295376


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,448 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+@ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+})
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
+private static final String TOPIC = "foo";
+private static final String GROUP = "test.group";
+
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
+
+@ClusterTest
+public void testDeleteWithTopicOption() {
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"};
 assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteCmdNonExistingGroup() {
 String missingGroup = "missing.group";
-
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", missingGroup};
 ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
 String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
 assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was 
not detected while deleting consumer group");
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteNonExistingGroup() {
 String missingGroup = "missing.group";
-
  

Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-04-22 Thread via GitHub


chia7712 commented on PR #15779:
URL: https://github.com/apache/kafka/pull/15779#issuecomment-2070801312

   @m1a2st Please add following server property:
   ```java
   @ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
   @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
   @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
   })
   ```
   The default replica of offset topic is 3, and the new infra create single 
broker. Hence, creating offset topic get failed, and hence you can't find a 
coordinator as the offset partition can't get be ready.
   
   Also, please notice @lianetm comments that bootstrap servers need to be 
updated manually if you kill any broker. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16600) Periodically receive "Failed to transition to PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN" during streams close

2024-04-22 Thread Alex Leung (Jira)
Alex Leung created KAFKA-16600:
--

 Summary: Periodically receive "Failed to transition to 
PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN" during streams close
 Key: KAFKA-16600
 URL: https://issues.apache.org/jira/browse/KAFKA-16600
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.3.2
Reporter: Alex Leung


>From time to time, we observe the following ERROR message during streams close:
{code:java}
2024-04-13 07:40:16,222 INFO o.a.k.s.KafkaStreams [32] stream-client 
[testapp-83aa0e09-a6ef-45e4-9393-69dfba4928bf] State transition from RUNNING to 
PENDING_SHUTDOWN
2024-04-13 07:40:16,222 ERROR o.a.k.s.KafkaStreams [55] stream-client 
[testapp-83aa0e09-a6ef-45e4-9393-69dfba4928bf] Failed to transition to 
PENDING_SHUTDOWN, current state is PENDING_SHUTDOWN
{code}
These ERRORs started showing up when we moved from 2.7.2 to 3.3.2 and we have 
not changed any code related to streams shutdown.



When the problem does not occur (most of the time), it looks like the following:
{code:java}
2024-04-13 07:40:11,333 INFO o.a.k.s.KafkaStreams [55] stream-client 
[testapp-e2212b31-e9c2-4c75-92f6-1e557e27bf66] State transition from RUNNING to 
PENDING_SHUTDOWN
2024-04-13 07:40:11,341 INFO o.a.k.s.KafkaStreams [32] stream-client 
[testapp-e2212b31-e9c2-4c75-92f6-1e557e27bf66] Streams client is in 
PENDING_SHUTDOWN, all resources are being closed and the client will be 
stopped. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]

2024-04-22 Thread via GitHub


chia7712 commented on PR #15772:
URL: https://github.com/apache/kafka/pull/15772#issuecomment-2070748890

   Should we move following configs to `SocketServerConfigs`?
   
   1. `controller.listener.names`
   
https://github.com/apache/kafka/blob/59c781415fc37c89aa087d7c2999cec7f82f6188/core/src/main/scala/kafka/server/KafkaConfig.scala#L118
   2. `inter.broker.listener.name`
   
https://github.com/apache/kafka/blob/59c781415fc37c89aa087d7c2999cec7f82f6188/server/src/main/java/org/apache/kafka/server/config/ReplicationConfigs.java#L117


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-04-22 Thread via GitHub


lianetm commented on PR #15779:
URL: https://github.com/apache/kafka/pull/15779#issuecomment-2070678792

   Hey @m1a2st , the way to get the servers' addresses is 
`cluster.bootstrapServers()`, you got it right, but I see you're not inheriting 
from `ConsumerGroupCommandTest` anymore, so that removes the creation of the 
brokers to run the test against (that happens in the base setup 
[here](https://github.com/apache/kafka/blob/59c781415fc37c89aa087d7c2999cec7f82f6188/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala#L121)).
 Could that be what you're missing? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-22 Thread via GitHub


vamossagar12 commented on PR #15305:
URL: https://github.com/apache/kafka/pull/15305#issuecomment-2070326565

   @showuon , I made some changes to the test so that we just heartbeat and not 
have to rely upon FindCoordinator requests. Let me know what yo think. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16592: Add a new constructor which invokes the existing constructor with default value for alternativeString [kafka]

2024-04-22 Thread via GitHub


vamossagar12 commented on PR #15762:
URL: https://github.com/apache/kafka/pull/15762#issuecomment-2070321267

   Thanks @chia7712 , for now I have made the constructor added as part of 
https://github.com/apache/kafka/pull/13909 as private. 
   Regarding  
   
   > I guess we need a KIP to deprecate that method and add comments to 
encourage users to use
   
   it would mostly be making the other constructor firstly deprecated and 
finally private. I don't think we can remove the Constructor because `define` 
eventually depends upon it. I will create a new ticket for the KIP to track it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16592: Add a new constructor which invokes the existing constructor with default value for alternativeString [kafka]

2024-04-22 Thread via GitHub


vamossagar12 commented on code in PR #15762:
URL: https://github.com/apache/kafka/pull/15762#discussion_r1575112513


##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -1256,6 +1256,16 @@ public static class ConfigKey {
 public final boolean internalConfig;
 public final String alternativeString;
 
+// This constructor is present for backward compatibility reasons.
+public ConfigKey(String name, Type type, Object defaultValue, 
Validator validator,
+ Importance importance, String documentation, String 
group,
+ int orderInGroup, Width width, String displayName,
+ List dependents, Recommender recommender,
+ boolean internalConfig) {
+this(name, type, defaultValue, validator, importance, 
documentation, group, orderInGroup, width, displayName,
+dependents, recommender, internalConfig, null);
+}
+
 public ConfigKey(String name, Type type, Object defaultValue, 
Validator validator,

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-22 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1575101365


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -173,63 +199,104 @@ public static class Builder {
 private String listenerName;
 private File trustStoreFile;
 private MetadataVersion metadataVersion;
+private Map serverProperties = 
Collections.unmodifiableMap(new HashMap<>());

Review Comment:
   Yes. The code here is redundant. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16526; Add quorum state v1 [kafka]

2024-04-22 Thread via GitHub


jsancio opened a new pull request, #15781:
URL: https://github.com/apache/kafka/pull/15781

   DRAFT - needs https://github.com/apache/kafka/pull/15671
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor

2024-04-22 Thread Stanislav Spiridonov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839791#comment-17839791
 ] 

Stanislav Spiridonov commented on KAFKA-16585:
--

If you check the example on 
[https://developer.confluent.io/tutorials/kafka-streams-schedule-operations/kstreams.html]
 you will see the same situation *transform* vs {*}transformValues{*}. 
{noformat}
I used transform in this tutorial as it makes for a better example because you 
can use the ProcessorContext.forward method.{noformat}

> No way to forward message from punctuation method in the FixedKeyProcessor
> --
>
> Key: KAFKA-16585
> URL: https://issues.apache.org/jira/browse/KAFKA-16585
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> The FixedKeyProcessorContext can forward only FixedKeyRecord. This class 
> doesn't have a public constructor and can be created based on existing 
> records. But such record usually is absent in the punctuation method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-22 Thread via GitHub


chia7712 commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1575079922


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,448 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+@ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+})
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
+private static final String TOPIC = "foo";
+private static final String GROUP = "test.group";
+
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
+
+@ClusterTest
+public void testDeleteWithTopicOption() {
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"};
 assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteCmdNonExistingGroup() {
 String missingGroup = "missing.group";
-
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", missingGroup};
 ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
 String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
 assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was 
not detected while deleting consumer group");
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteNonExistingGroup() {
 String missingGroup = "missing.group";
-
 

[jira] [Comment Edited] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor

2024-04-22 Thread Stanislav Spiridonov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17838979#comment-17838979
 ] 

Stanislav Spiridonov edited comment on KAFKA-16585 at 4/22/24 4:47 PM:
---

The case is relatively simple. I have KTable with entities that have to be 
enrichment with icon attribute from side service. So, the processor maintains 
the internal store with entities keys and periodically ask the service for 
update for registered ids. If icon has changes it forward the message with new 
icon. The key of record is entity key (String), value is a icon (String).


was (Author: foal):
The case is relatively simple. I have KTable with entities that have to be 
enrichment with icon attribute from side service. So, the processor maintains 
the internal store with entities keys and periodically ask the service for 
update for registered ids. If icon has changes it forward the message with new 
icon. The key of record is entity key (String), value is a icon (String).

 

BTW I faced into strange behaviour - if I forward new record from another 
thread it arrived to incorrect processor. So now I just update store from icon 
KTable instead of forward the record.   

> No way to forward message from punctuation method in the FixedKeyProcessor
> --
>
> Key: KAFKA-16585
> URL: https://issues.apache.org/jira/browse/KAFKA-16585
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> The FixedKeyProcessorContext can forward only FixedKeyRecord. This class 
> doesn't have a public constructor and can be created based on existing 
> records. But such record usually is absent in the punctuation method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-22 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1575053494


##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -173,63 +199,104 @@ public static class Builder {
 private String listenerName;
 private File trustStoreFile;
 private MetadataVersion metadataVersion;
+private Map serverProperties = 
Collections.unmodifiableMap(new HashMap<>());
+private Map producerProperties = 
Collections.unmodifiableMap(new HashMap<>());
+private Map consumerProperties = 
Collections.unmodifiableMap(new HashMap<>());
+private Map adminClientProperties = 
Collections.unmodifiableMap(new HashMap<>());
+private Map saslServerProperties = 
Collections.unmodifiableMap(new HashMap<>());
+private Map saslClientProperties = 
Collections.unmodifiableMap(new HashMap<>());
+private Map> perBrokerOverrideProperties 
= Collections.unmodifiableMap(new HashMap<>());
 
-Builder(Type type, int brokers, int controllers, boolean autoStart, 
SecurityProtocol securityProtocol, MetadataVersion metadataVersion) {
-this.type = type;
-this.brokers = brokers;
-this.controllers = controllers;
-this.autoStart = autoStart;
-this.securityProtocol = securityProtocol;
-this.metadataVersion = metadataVersion;
-}
+private Builder() {}
 
-public Builder type(Type type) {
+public Builder setType(Type type) {
 this.type = type;
 return this;
 }
 
-public Builder brokers(int brokers) {
+public Builder setBrokers(int brokers) {
 this.brokers = brokers;
 return this;
 }
 
-public Builder controllers(int controllers) {
+public Builder setControllers(int controllers) {
 this.controllers = controllers;
 return this;
 }
 
-public Builder name(String name) {
+public Builder setName(String name) {
 this.name = name;
 return this;
 }
 
-public Builder autoStart(boolean autoStart) {
+public Builder setAutoStart(boolean autoStart) {
 this.autoStart = autoStart;
 return this;
 }
 
-public Builder securityProtocol(SecurityProtocol securityProtocol) {
+public Builder setSecurityProtocol(SecurityProtocol securityProtocol) {
 this.securityProtocol = securityProtocol;
 return this;
 }
 
-public Builder listenerName(String listenerName) {
+public Builder setListenerName(String listenerName) {
 this.listenerName = listenerName;
 return this;
 }
 
-public Builder trustStoreFile(File trustStoreFile) {
+public Builder setTrustStoreFile(File trustStoreFile) {
 this.trustStoreFile = trustStoreFile;
 return this;
 }
 
-public Builder metadataVersion(MetadataVersion metadataVersion) {
+public Builder setMetadataVersion(MetadataVersion metadataVersion) {
 this.metadataVersion = metadataVersion;
 return this;
 }
 
+public Builder setServerProperties(Map 
serverProperties) {
+this.serverProperties = 
Collections.unmodifiableMap(serverProperties);
+return this;
+}
+
+public Builder setConsumerProperties(Map 
consumerProperties) {
+this.consumerProperties = 
Collections.unmodifiableMap(consumerProperties);
+return this;
+}
+
+public Builder setProducerProperties(Map 
producerProperties) {
+this.producerProperties = 
Collections.unmodifiableMap(producerProperties);
+return this;
+}
+
+public Builder setAdminClientProperties(Map 
adminClientProperties) {
+this.adminClientProperties = 
Collections.unmodifiableMap(adminClientProperties);
+return this;
+}
+
+public Builder setSaslServerProperties(Map 
saslServerProperties) {
+this.saslServerProperties = 
Collections.unmodifiableMap(saslServerProperties);
+return this;
+}
+
+public Builder setSaslClientProperties(Map 
saslClientProperties) {
+this.saslClientProperties = 
Collections.unmodifiableMap(saslClientProperties);
+return this;
+}
+
+public Builder setPerBrokerProperties(Map> perBrokerOverrideProperties) {
+this.perBrokerOverrideProperties = Collections.unmodifiableMap(

Review Comment:
   ```java
   this.perBrokerOverrideProperties = Collections.unmodifiableMap(
   perBrokerOverrideProperties.entrySet().stream()
   .collect(Collectors.toMap(Map.Entry::getKey, e 
-> Collections.unmodifiableMap(new HashMap<>(e.getValue());
   ```



##
core/s

[jira] [Comment Edited] (KAFKA-7878) Connect Task already exists in this worker when failed to create consumer

2024-04-22 Thread John Hawkins (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839784#comment-17839784
 ] 

John Hawkins edited comment on KAFKA-7878 at 4/22/24 4:42 PM:
--

I see this quite often when I'm loading the JDBC connector. V. Annoying ! this 
happens on both standalone and distributed herders.


was (Author: JIRAUSER304684):
I see this quite often when I'm loading the JDBC connector. V. Annoying !

> Connect Task already exists in this worker when failed to create consumer
> -
>
> Key: KAFKA-7878
> URL: https://issues.apache.org/jira/browse/KAFKA-7878
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 1.0.1, 2.0.1, 3.7.0
>Reporter: Loïc Monney
>Priority: Major
>
> *Assumption*
> 1. DNS is not available during a few minutes
> 2. Consumer group rebalances
> 3. Client is not able to resolve DNS entries anymore and fails
> 4. Task seems already registered, so at next rebalance the task will fail due 
> to *Task already exists in this worker* and the only way to recover is to 
> restart the connect process
> *Real log entries*
> * Distributed cluster running one connector on top of Kubernetes
> * Connect 2.0.1
> * kafka-connect-hdfs 5.0.1
> {noformat}
> [2019-01-28 13:31:25,914] WARN Removing server kafka.xxx.net:9093 from 
> bootstrap.servers as DNS resolution failed for kafka.xxx.net 
> (org.apache.kafka.clients.ClientUtils:56)
> [2019-01-28 13:31:25,915] ERROR WorkerSinkTask\{id=xxx-22} Task failed 
> initialization and will not be started. 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:142)
> org.apache.kafka.connect.errors.ConnectException: Failed to create consumer
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.createConsumer(WorkerSinkTask.java:476)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.initialize(WorkerSinkTask.java:139)
>  at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:452)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:873)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:111)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:888)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:884)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka 
> consumer
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:799)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:615)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:596)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.createConsumer(WorkerSinkTask.java:474)
>  ... 10 more
> Caused by: org.apache.kafka.common.config.ConfigException: No resolvable 
> bootstrap urls given in bootstrap.servers
>  at 
> org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:66)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:709)
>  ... 13 more
> [2019-01-28 13:31:25,925] INFO Finished starting connectors and tasks 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:868)
> [2019-01-28 13:31:25,926] INFO Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1239)
> [2019-01-28 13:31:25,927] INFO Stopping task xxx-22 
> (org.apache.kafka.connect.runtime.Worker:555)
> [2019-01-28 13:31:26,021] INFO Finished stopping tasks in preparation for 
> rebalance 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1269)
> [2019-01-28 13:31:26,021] INFO [Worker clientId=connect-1, 
> groupId=xxx-cluster] (Re-)joining group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:509)
> [2019-01-28 13:31:30,746] INFO [Worker clientId=connect-1, 
> groupId=xxx-cluster] Successfully joined group with generation 29 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:473)
> [2019-01-28 13:31:30,746] INFO Joined group and got assignment: 
> Assignment\{error=0, leader='connect-1-05961f03-52a7-4c02-acc2-0f1fb021692e', 
> leaderUrl='http://192.168.46.59:8083/', offset=32, connectorIds=[], 
> taskIds=[xxx-22]} 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1217)
> [2019-01-28 13:31:30,747] INFO Starting connectors and tasks using config 
> offset 32 (org.apache.kafk

[jira] [Commented] (KAFKA-7878) Connect Task already exists in this worker when failed to create consumer

2024-04-22 Thread John Hawkins (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839784#comment-17839784
 ] 

John Hawkins commented on KAFKA-7878:
-

I see this quite often when I'm loading the JDBC connector. V. Annoying !

> Connect Task already exists in this worker when failed to create consumer
> -
>
> Key: KAFKA-7878
> URL: https://issues.apache.org/jira/browse/KAFKA-7878
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 1.0.1, 2.0.1
>Reporter: Loïc Monney
>Priority: Major
>
> *Assumption*
> 1. DNS is not available during a few minutes
> 2. Consumer group rebalances
> 3. Client is not able to resolve DNS entries anymore and fails
> 4. Task seems already registered, so at next rebalance the task will fail due 
> to *Task already exists in this worker* and the only way to recover is to 
> restart the connect process
> *Real log entries*
> * Distributed cluster running one connector on top of Kubernetes
> * Connect 2.0.1
> * kafka-connect-hdfs 5.0.1
> {noformat}
> [2019-01-28 13:31:25,914] WARN Removing server kafka.xxx.net:9093 from 
> bootstrap.servers as DNS resolution failed for kafka.xxx.net 
> (org.apache.kafka.clients.ClientUtils:56)
> [2019-01-28 13:31:25,915] ERROR WorkerSinkTask\{id=xxx-22} Task failed 
> initialization and will not be started. 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:142)
> org.apache.kafka.connect.errors.ConnectException: Failed to create consumer
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.createConsumer(WorkerSinkTask.java:476)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.initialize(WorkerSinkTask.java:139)
>  at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:452)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:873)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:111)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:888)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:884)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka 
> consumer
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:799)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:615)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:596)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.createConsumer(WorkerSinkTask.java:474)
>  ... 10 more
> Caused by: org.apache.kafka.common.config.ConfigException: No resolvable 
> bootstrap urls given in bootstrap.servers
>  at 
> org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:66)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:709)
>  ... 13 more
> [2019-01-28 13:31:25,925] INFO Finished starting connectors and tasks 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:868)
> [2019-01-28 13:31:25,926] INFO Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1239)
> [2019-01-28 13:31:25,927] INFO Stopping task xxx-22 
> (org.apache.kafka.connect.runtime.Worker:555)
> [2019-01-28 13:31:26,021] INFO Finished stopping tasks in preparation for 
> rebalance 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1269)
> [2019-01-28 13:31:26,021] INFO [Worker clientId=connect-1, 
> groupId=xxx-cluster] (Re-)joining group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:509)
> [2019-01-28 13:31:30,746] INFO [Worker clientId=connect-1, 
> groupId=xxx-cluster] Successfully joined group with generation 29 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:473)
> [2019-01-28 13:31:30,746] INFO Joined group and got assignment: 
> Assignment\{error=0, leader='connect-1-05961f03-52a7-4c02-acc2-0f1fb021692e', 
> leaderUrl='http://192.168.46.59:8083/', offset=32, connectorIds=[], 
> taskIds=[xxx-22]} 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1217)
> [2019-01-28 13:31:30,747] INFO Starting connectors and tasks using config 
> offset 32 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:858)
> [2019-01-28 13:31:30,747] INFO Starting task xxx-22 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:872)
> [2019-01-28 13:31:30,747] INFO Creating

[jira] [Updated] (KAFKA-7878) Connect Task already exists in this worker when failed to create consumer

2024-04-22 Thread John Hawkins (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Hawkins updated KAFKA-7878:

Affects Version/s: 3.7.0

> Connect Task already exists in this worker when failed to create consumer
> -
>
> Key: KAFKA-7878
> URL: https://issues.apache.org/jira/browse/KAFKA-7878
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 1.0.1, 2.0.1, 3.7.0
>Reporter: Loïc Monney
>Priority: Major
>
> *Assumption*
> 1. DNS is not available during a few minutes
> 2. Consumer group rebalances
> 3. Client is not able to resolve DNS entries anymore and fails
> 4. Task seems already registered, so at next rebalance the task will fail due 
> to *Task already exists in this worker* and the only way to recover is to 
> restart the connect process
> *Real log entries*
> * Distributed cluster running one connector on top of Kubernetes
> * Connect 2.0.1
> * kafka-connect-hdfs 5.0.1
> {noformat}
> [2019-01-28 13:31:25,914] WARN Removing server kafka.xxx.net:9093 from 
> bootstrap.servers as DNS resolution failed for kafka.xxx.net 
> (org.apache.kafka.clients.ClientUtils:56)
> [2019-01-28 13:31:25,915] ERROR WorkerSinkTask\{id=xxx-22} Task failed 
> initialization and will not be started. 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:142)
> org.apache.kafka.connect.errors.ConnectException: Failed to create consumer
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.createConsumer(WorkerSinkTask.java:476)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.initialize(WorkerSinkTask.java:139)
>  at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:452)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:873)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:111)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:888)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:884)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka 
> consumer
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:799)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:615)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:596)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.createConsumer(WorkerSinkTask.java:474)
>  ... 10 more
> Caused by: org.apache.kafka.common.config.ConfigException: No resolvable 
> bootstrap urls given in bootstrap.servers
>  at 
> org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:66)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:709)
>  ... 13 more
> [2019-01-28 13:31:25,925] INFO Finished starting connectors and tasks 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:868)
> [2019-01-28 13:31:25,926] INFO Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1239)
> [2019-01-28 13:31:25,927] INFO Stopping task xxx-22 
> (org.apache.kafka.connect.runtime.Worker:555)
> [2019-01-28 13:31:26,021] INFO Finished stopping tasks in preparation for 
> rebalance 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1269)
> [2019-01-28 13:31:26,021] INFO [Worker clientId=connect-1, 
> groupId=xxx-cluster] (Re-)joining group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:509)
> [2019-01-28 13:31:30,746] INFO [Worker clientId=connect-1, 
> groupId=xxx-cluster] Successfully joined group with generation 29 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:473)
> [2019-01-28 13:31:30,746] INFO Joined group and got assignment: 
> Assignment\{error=0, leader='connect-1-05961f03-52a7-4c02-acc2-0f1fb021692e', 
> leaderUrl='http://192.168.46.59:8083/', offset=32, connectorIds=[], 
> taskIds=[xxx-22]} 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1217)
> [2019-01-28 13:31:30,747] INFO Starting connectors and tasks using config 
> offset 32 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:858)
> [2019-01-28 13:31:30,747] INFO Starting task xxx-22 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:872)
> [2019-01-28 13:31:30,747] INFO Creating task xxx-22 
> (org.apache.kafka.connect.runtime.Worker:396)
> [2019-01-28 13:31:30,748] ERROR 

Re: [PR] KAFKA-16466: add details of an internal exception to the failure message [kafka]

2024-04-22 Thread via GitHub


ilyazr commented on PR #15701:
URL: https://github.com/apache/kafka/pull/15701#issuecomment-2070177669

   @soarez 
   Hi! Thanks for pointing out problems with the tests. I didn't know that.
   p.s: added suggested changes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16466: add details of an internal exception to the failure message [kafka]

2024-04-22 Thread via GitHub


ilyazr commented on code in PR #15701:
URL: https://github.com/apache/kafka/pull/15701#discussion_r1575054055


##
metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java:
##
@@ -174,10 +180,18 @@ public void testUnexpectedBaseOffsetExceptionInfo() {
 public void testUnexpectedBaseOffsetFailureMessage() {
 assertEquals("event failed with UnexpectedBaseOffsetException (treated 
as " +
 "NotControllerException) at epoch 123 in 90 microseconds. 
Renouncing leadership " +
-"and reverting to the last committed offset 456.",
+"and reverting to the last committed offset 456. Detailed 
exception message: Wanted base offset 3, but the next offset was 4",
 UNEXPECTED_END_OFFSET.failureMessage(123, 
OptionalLong.of(90L), true, 456L));
 }
 
+@Test
+public void testKafkaExceptionFailureMessage() {

Review Comment:
   It definitely would be better to have a clear name for the test. I've 
changed it to the suggested one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16466: add details of an internal exception to the failure message [kafka]

2024-04-22 Thread via GitHub


ilyazr commented on code in PR #15701:
URL: https://github.com/apache/kafka/pull/15701#discussion_r1575051333


##
metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java:
##
@@ -72,6 +73,11 @@ public class EventHandlerExceptionInfoTest {
 new TimeoutException(),
 () -> OptionalInt.of(1));
 
+private static final EventHandlerExceptionInfo KAFKA_EXCEPTION =
+EventHandlerExceptionInfo.fromInternal(
+new KafkaException("Custom kafka exception message"),
+() -> OptionalInt.of(1));
+

Review Comment:
   Initially I thought it would be better to keep all 
`EventHandlerExceptionInfo` declarations at the same place. But as you 
mentioned, you would be even better to have it on a method level as it is used 
only once. Moved `TIMEOUT` to the method too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16466: add details of an internal exception to the failure message [kafka]

2024-04-22 Thread via GitHub


ilyazr commented on code in PR #15701:
URL: https://github.com/apache/kafka/pull/15701#discussion_r1575046814


##
metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java:
##
@@ -191,6 +191,10 @@ public String failureMessage(
 }
 }
 bld.append(".");
+if (!isFault && internalException.getMessage() != null) {
+bld.append(" Detailed exception message: ");

Review Comment:
   Removed "Detailed" from the message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16103) Review client logic for triggering offset commit callbacks

2024-04-22 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy resolved KAFKA-16103.

Resolution: Fixed

> Review client logic for triggering offset commit callbacks
> --
>
> Key: KAFKA-16103
> URL: https://issues.apache.org/jira/browse/KAFKA-16103
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lucas Brutschy
>Priority: Critical
>  Labels: kip-848-client-support, offset
> Fix For: 3.8.0
>
>
> Review logic for triggering commit callbacks, ensuring that all callbacks are 
> triggered before returning from commitSync



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16103) Review client logic for triggering offset commit callbacks

2024-04-22 Thread Lucas Brutschy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839765#comment-17839765
 ] 

Lucas Brutschy commented on KAFKA-16103:


changes in legacy consumer in https://issues.apache.org/jira/browse/KAFKA-16599

> Review client logic for triggering offset commit callbacks
> --
>
> Key: KAFKA-16103
> URL: https://issues.apache.org/jira/browse/KAFKA-16103
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lucas Brutschy
>Priority: Critical
>  Labels: kip-848-client-support, offset
> Fix For: 3.8.0
>
>
> Review logic for triggering commit callbacks, ensuring that all callbacks are 
> triggered before returning from commitSync



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16599) LegacyConsumer: Always await async commit callbacks in commitSync and close

2024-04-22 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy updated KAFKA-16599:
---
Summary: LegacyConsumer: Always await async commit callbacks in commitSync 
and close  (was: Always await async commit callbacks in commitSync and close)

> LegacyConsumer: Always await async commit callbacks in commitSync and close
> ---
>
> Key: KAFKA-16599
> URL: https://issues.apache.org/jira/browse/KAFKA-16599
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Major
>
> The javadoc for KafkaConsumer.commitSync says:
> {code:java}
> Note that asynchronous offset commits sent previously with the {@link 
> #commitAsync(OffsetCommitCallback)}
> (or similar) are guaranteed to have their callbacks invoked prior to 
> completion of this method.
> {code}
> This is not always true in the legacy consumer, when the set of offsets is 
> empty, the execution of the commit callback is not always awaited. There are 
> also various races possible that can avoid callback handler execution.
> Similarly, there is code in the legacy consumer to await the completion of 
> the commit callback before closing, however, the code doesn't cover all cases 
> and the behavior is therefore inconsistent. While the Javadoc doesn't 
> explicitly promise callback execution, it promises "completing commits", 
> which one would reasonably expect to include callback execution. Either way, 
> the current behavior of the legacy consumer is inconsistent.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16082 Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-22 Thread via GitHub


gaurav-narula commented on PR #15777:
URL: https://github.com/apache/kafka/pull/15777#issuecomment-2070018194

   @chia7712 Rebased. PTAL :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16599) Always await async commit callbacks in commitSync and close

2024-04-22 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy updated KAFKA-16599:
---
Component/s: clients
 consumer

> Always await async commit callbacks in commitSync and close
> ---
>
> Key: KAFKA-16599
> URL: https://issues.apache.org/jira/browse/KAFKA-16599
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Major
>
> The javadoc for KafkaConsumer.commitSync says:
> {code:java}
> Note that asynchronous offset commits sent previously with the {@link 
> #commitAsync(OffsetCommitCallback)}
> (or similar) are guaranteed to have their callbacks invoked prior to 
> completion of this method.
> {code}
> This is not always true in the legacy consumer, when the set of offsets is 
> empty, the execution of the commit callback is not always awaited. There are 
> also various races possible that can avoid callback handler execution.
> Similarly, there is code in the legacy consumer to await the completion of 
> the commit callback before closing, however, the code doesn't cover all cases 
> and the behavior is therefore inconsistent. While the Javadoc doesn't 
> explicitly promise callback execution, it promises "completing commits", 
> which one would reasonably expect to include callback execution. Either way, 
> the current behavior of the legacy consumer is inconsistent.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16599) Always await async commit callbacks in commitSync and close

2024-04-22 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy updated KAFKA-16599:
---
Description: 
The javadoc for KafkaConsumer.commitSync says:

{code:java}
Note that asynchronous offset commits sent previously with the {@link 
#commitAsync(OffsetCommitCallback)}
(or similar) are guaranteed to have their callbacks invoked prior to completion 
of this method.
{code}

This is not always true in the legacy consumer, when the set of offsets is 
empty, the execution of the commit callback is not always awaited. There are 
also various races possible that can avoid callback handler execution.

Similarly, there is code in the legacy consumer to await the completion of the 
commit callback before closing, however, the code doesn't cover all cases and 
the behavior is therefore inconsistent. While the Javadoc doesn't explicitly 
promise callback execution, it promises "completing commits", which one would 
reasonably expect to include callback execution. Either way, the current 
behavior of the legacy consumer is inconsistent.

> Always await async commit callbacks in commitSync and close
> ---
>
> Key: KAFKA-16599
> URL: https://issues.apache.org/jira/browse/KAFKA-16599
> Project: Kafka
>  Issue Type: Task
>Reporter: Lucas Brutschy
>Priority: Major
>
> The javadoc for KafkaConsumer.commitSync says:
> {code:java}
> Note that asynchronous offset commits sent previously with the {@link 
> #commitAsync(OffsetCommitCallback)}
> (or similar) are guaranteed to have their callbacks invoked prior to 
> completion of this method.
> {code}
> This is not always true in the legacy consumer, when the set of offsets is 
> empty, the execution of the commit callback is not always awaited. There are 
> also various races possible that can avoid callback handler execution.
> Similarly, there is code in the legacy consumer to await the completion of 
> the commit callback before closing, however, the code doesn't cover all cases 
> and the behavior is therefore inconsistent. While the Javadoc doesn't 
> explicitly promise callback execution, it promises "completing commits", 
> which one would reasonably expect to include callback execution. Either way, 
> the current behavior of the legacy consumer is inconsistent.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16082 Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-22 Thread via GitHub


chia7712 commented on PR #15777:
URL: https://github.com/apache/kafka/pull/15777#issuecomment-2070011423

   #15776 is merged so please rebase code :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16599) Always await async commit callbacks in commitSync and close

2024-04-22 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-16599:
--

Assignee: Lucas Brutschy

> Always await async commit callbacks in commitSync and close
> ---
>
> Key: KAFKA-16599
> URL: https://issues.apache.org/jira/browse/KAFKA-16599
> Project: Kafka
>  Issue Type: Task
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Major
>
> The javadoc for KafkaConsumer.commitSync says:
> {code:java}
> Note that asynchronous offset commits sent previously with the {@link 
> #commitAsync(OffsetCommitCallback)}
> (or similar) are guaranteed to have their callbacks invoked prior to 
> completion of this method.
> {code}
> This is not always true in the legacy consumer, when the set of offsets is 
> empty, the execution of the commit callback is not always awaited. There are 
> also various races possible that can avoid callback handler execution.
> Similarly, there is code in the legacy consumer to await the completion of 
> the commit callback before closing, however, the code doesn't cover all cases 
> and the behavior is therefore inconsistent. While the Javadoc doesn't 
> explicitly promise callback execution, it promises "completing commits", 
> which one would reasonably expect to include callback execution. Either way, 
> the current behavior of the legacy consumer is inconsistent.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16559) allow defining number of disks per broker in TestKitNodes

2024-04-22 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-16559:
---
Fix Version/s: 3.7.1

> allow defining number of disks per broker in TestKitNodes
> -
>
> Key: KAFKA-16559
> URL: https://issues.apache.org/jira/browse/KAFKA-16559
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Gaurav Narula
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> from: https://github.com/apache/kafka/pull/15136#discussion_r1565571409
> That allow us to run the reassignment tests.
> Also, we should enhance setNumBrokerNodes 
> (https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/testkit/TestKitNodes.java#L81)
>  to accept extra argument to define the number of folders (by 
> setLogDirectories)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16559 allow defining number of disks per broker in TestKitNodes [kafka]

2024-04-22 Thread via GitHub


chia7712 merged PR #15776:
URL: https://github.com/apache/kafka/pull/15776


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16599) Always await async commit callbacks in commitSync and close

2024-04-22 Thread Lucas Brutschy (Jira)
Lucas Brutschy created KAFKA-16599:
--

 Summary: Always await async commit callbacks in commitSync and 
close
 Key: KAFKA-16599
 URL: https://issues.apache.org/jira/browse/KAFKA-16599
 Project: Kafka
  Issue Type: Task
Reporter: Lucas Brutschy






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-22 Thread via GitHub


chia7712 commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1575000110


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,448 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+@ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+})
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
+private static final String TOPIC = "foo";
+private static final String GROUP = "test.group";
+
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
+
+@ClusterTest
+public void testDeleteWithTopicOption() {
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"};
 assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteCmdNonExistingGroup() {
 String missingGroup = "missing.group";
-
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", missingGroup};
 ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
 String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
 assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was 
not detected while deleting consumer group");
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteNonExistingGroup() {
 String missingGroup = "missing.group";
-
 

Re: [PR] KAFKA-16463 System test for reverting migration to ZK [kafka]

2024-04-22 Thread via GitHub


soarez commented on code in PR #15754:
URL: https://github.com/apache/kafka/pull/15754#discussion_r1574994375


##
tests/kafkatest/tests/core/zookeeper_migration_test.py:
##
@@ -86,10 +87,35 @@ def do_migration(self, roll_controller = False, 
downgrade_to_zk = False):
 controller.stop_node(node)
 controller.start_node(node)
 
+if downgrade_to_zk:
+self.logger.info("Shutdown brokers to avoid waiting on unclean 
shutdown")
+for node in self.kafka.nodes:
+self.kafka.stop_node(node)
+metadata_log_dir = KafkaService.METADATA_LOG_DIR + 
"/__cluster_metadata-0"
+for node in self.kafka.nodes:
+assert path_exists(node, metadata_log_dir), "Should still 
have a metadata log on the brokers."
+
+self.logger.info("Shutdown KRaft quorum")
+for node in controller.nodes:
+controller.stop_node(node)
+
+self.logger.info("Deleting controller ZNode")
+self.zk.delete(path="/controller", recursive=True)
+
+self.logger.info("Rolling brokers back to ZK mode")
+self.kafka.downgrade_kraft_broker_to_zk(controller)
+for node in self.kafka.nodes:
+self.kafka.start_node(node)
+
+# This blocks until all brokers have a full ISR
+self.wait_until_rejoin()

Review Comment:
   The test name is `test_online_migration` but we're shutting every broker 
down to apply the downgrade, it seems like a contradiction? Does the downgrade 
require a full shutdown of the cluster?



##
tests/kafkatest/services/kafka/kafka.py:
##
@@ -463,6 +463,18 @@ def reconfigure_zk_for_migration(self, kraft_quorum):
 # This is not added to "advertised.listeners" because of 
configured_for_zk_migration=True
 self.port_mappings[kraft_quorum.controller_listener_names] = 
kraft_quorum.port_mappings.get(kraft_quorum.controller_listener_names)
 
+def downgrade_kraft_broker_to_zk(self, kraft_quorum):
+self.configured_for_zk_migration = True
+self.quorum_info = quorum.ServiceQuorumInfo(quorum.zk, self)
+self.controller_quorum = kraft_quorum
+
+# Set the migration properties
+self.server_prop_overrides.extend([
+["zookeeper.metadata.migration.enable", "true"],
+["controller.quorum.voters", 
kraft_quorum.controller_quorum_voters],
+["controller.listener.names", 
kraft_quorum.controller_listener_names]
+])

Review Comment:
   In https://kafka.apache.org/documentation/#kraft_zk_migration under 
"Reverting to ZooKeeper mode During the Migration" we say:
   
   > On each broker, remove the zookeeper.metadata.migration.enable, 
controller.listener.names, and controller.quorum.voters configurations.
   
   And before this method is called we log `"Rolling brokers back to ZK mode"`.
   
   I'm guessing I'm probably missing something, shouldn't these three 
properties be removed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Reduce the time taken to execute the TieredStorage tests. [kafka]

2024-04-22 Thread via GitHub


kamalcph commented on PR #15780:
URL: https://github.com/apache/kafka/pull/15780#issuecomment-2069948134

   This PR is a follow-up of #15719.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-22 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1574974316


##
core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala:
##
@@ -18,41 +18,58 @@ package kafka.server
 
 import java.net.Socket
 import java.util.Collections
-
 import kafka.api.{KafkaSasl, SaslSetup}
-import kafka.test.annotation.{ClusterTest, Type}
+import kafka.server.SaslApiVersionsRequestTest.{kafkaClientSaslMechanism, 
kafkaServerSaslMechanisms}
+import kafka.test.annotation.{ClusterTemplate, Type}
 import kafka.test.junit.ClusterTestExtensions
-import kafka.test.{ClusterConfig, ClusterInstance}
+import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance}
 import kafka.utils.JaasTestUtils
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.message.SaslHandshakeRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{ApiVersionsRequest, 
ApiVersionsResponse, SaslHandshakeRequest, SaslHandshakeResponse}
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.server.config.KafkaSecurityConfigs
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.extension.ExtendWith
 import org.junit.jupiter.api.{AfterEach, BeforeEach}
 
 import scala.jdk.CollectionConverters._
 
+object SaslApiVersionsRequestTest {
+  val kafkaClientSaslMechanism = "PLAIN"
+  val kafkaServerSaslMechanisms: Seq[String] = List("PLAIN")
+  val controlPlaneListenerName = "CONTROL_PLANE"
+  val securityProtocol = SecurityProtocol.SASL_PLAINTEXT
+
+  def saslApiVersionsRequestClusterConfig(clusterGenerator: ClusterGenerator): 
Unit = {
+clusterGenerator.accept(ClusterConfig.defaultBuilder
+  .securityProtocol(securityProtocol)
+  .`type`(Type.ZK)
+  
.putSaslServerProperty(KafkaSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG,
 kafkaClientSaslMechanism)
+  
.putSaslServerProperty(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, 
kafkaServerSaslMechanisms.mkString(","))
+  .putSaslClientProperty(SaslConfigs.SASL_MECHANISM, 
kafkaClientSaslMechanism)
+  // Configure control plane listener to make sure we have separate 
listeners for testing.
+  .putServerProperty(KafkaConfig.ControlPlaneListenerNameProp, 
controlPlaneListenerName)
+  .putServerProperty(KafkaConfig.ListenerSecurityProtocolMapProp, 
s"$controlPlaneListenerName:$securityProtocol,$securityProtocol:$securityProtocol")
+  .putServerProperty("listeners", 
s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
+  .putServerProperty(KafkaConfig.AdvertisedListenersProp, 
s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
+  .build())
+  }
+}
 
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 class SaslApiVersionsRequestTest(cluster: ClusterInstance) extends 
AbstractApiVersionsRequestTest(cluster) {
-
-  val kafkaClientSaslMechanism = "PLAIN"
-  val kafkaServerSaslMechanisms = List("PLAIN")
-
   private var sasl: SaslSetup = _
 
   @BeforeEach
-  def setupSasl(config: ClusterConfig): Unit = {
+  def setupSasl(): Unit = {
 sasl = new SaslSetup() {}

Review Comment:
   Currently, no.  Since we clear `java.security.auth.login.config` in 
[QuorumTestHarness#tearDown](https://github.com/apache/kafka/blob/59c781415fc37c89aa087d7c2999cec7f82f6188/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala#L440).
 So if we initialize sasl in `@BeforeAll`, first test passed, second one raise 
error as below. 
   ```text
   Could not find a 'KafkaServer' or 'control_plane.KafkaServer' entry in the 
JAAS configuration. System property 'java.security.auth.login.config' is not set
   ```
   This would require some effort if we want to make this happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]

2024-04-22 Thread via GitHub


OmniaGM commented on code in PR #15772:
URL: https://github.com/apache/kafka/pull/15772#discussion_r1574953974


##
server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.network;
+
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+public class SocketServerConfigs {
+public static final String LISTENER_SECURITY_PROTOCOL_MAP_CONFIG = 
"listener.security.protocol.map";
+public static final String LISTENER_SECURITY_PROTOCOL_MAP_DEFAULT = 
Arrays.stream(SecurityProtocol.values())
+.collect(Collectors.toMap(ListenerName::forSecurityProtocol, sp -> 
sp))
+.entrySet()
+.stream()
+.map(entry -> entry.getKey().value() + ":" + 
entry.getValue().name())
+.collect(Collectors.joining(","));
+public static final String LISTENER_SECURITY_PROTOCOL_MAP_DOC = "Map 
between listener names and security protocols. This must be defined for " +
+"the same security protocol to be usable in more than one port or 
IP. For example, internal and " +
+"external traffic can be separated even if SSL is required for 
both. Concretely, the user could define listeners " +
+"with names INTERNAL and EXTERNAL and this property as: 
INTERNAL:SSL,EXTERNAL:SSL. As shown, key and value are " +
+"separated by a colon and map entries are separated by commas. 
Each listener name should only appear once in the map. " +
+"Different security (SSL and SASL) settings can be configured for 
each listener by adding a normalised " +
+"prefix (the listener name is lowercased) to the config name. For 
example, to set a different keystore for the " +
+"INTERNAL listener, a config with name 
listener.name.internal.ssl.keystore.location would be set. " +
+"If the config for the listener name is not set, the config will 
fallback to the generic config (i.e. ssl.keystore.location). " +
+"Note that in KRaft a default mapping from the listener names 
defined by controller.listener.names to PLAINTEXT " +
+"is assumed if no explicit mapping is provided and no other 
security protocol is in use.";
+
+public static final String LISTENERS_CONFIG = "listeners";
+public static final String LISTENERS_DEFAULT = "PLAINTEXT://:9092";
+public static final String LISTENERS_DOC = "Listener List - 
Comma-separated list of URIs we will listen on and the listener names." +
+String.format(" If the listener name is not a security protocol, 
%s must also be set.%n", LISTENER_SECURITY_PROTOCOL_MAP_CONFIG) +
+" Listener names and port numbers must be unique unless %n" +
+" one listener is an IPv4 address and the other listener is %n" +
+" an IPv6 address (for the same port).%n" +
+" Specify hostname as 0.0.0.0 to bind to all interfaces.%n" +
+" Leave hostname empty to bind to default interface.%n" +
+" Examples of legal listener lists:%n" +
+" PLAINTEXT://myhost:9092,SSL://:9091%n" +
+" 
CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093%n" +
+" PLAINTEXT://127.0.0.1:9092,SSL://[::1]:9092%n";
+
+public static final String ADVERTISED_LISTENERS_CONFIG = 
"advertised.listeners";
+public static final String ADVERTISED_LISTENERS_DOC = String.format(
+"Listeners to publish to ZooKeeper for clients to use, if 
different than the %s config property." +
+" In IaaS environments, this may need to be different from 
the interface to which the broker binds." +
+" If this is not set, the value for %1$1s 
will be used." +
+" Unlike %1$1s, it is not valid to advertise 
the 0.0.0.0 meta-address.%n" +
+" Also unlike %1$1s, there can be duplicated 
ports in this property," +
+" so that one listener can

Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]

2024-04-22 Thread via GitHub


OmniaGM commented on code in PR #15772:
URL: https://github.com/apache/kafka/pull/15772#discussion_r1574952989


##
server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.network;
+
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+public class SocketServerConfigs {
+public static final String LISTENER_SECURITY_PROTOCOL_MAP_CONFIG = 
"listener.security.protocol.map";
+public static final String LISTENER_SECURITY_PROTOCOL_MAP_DEFAULT = 
Arrays.stream(SecurityProtocol.values())
+.collect(Collectors.toMap(ListenerName::forSecurityProtocol, sp -> 
sp))
+.entrySet()
+.stream()
+.map(entry -> entry.getKey().value() + ":" + 
entry.getValue().name())
+.collect(Collectors.joining(","));
+public static final String LISTENER_SECURITY_PROTOCOL_MAP_DOC = "Map 
between listener names and security protocols. This must be defined for " +
+"the same security protocol to be usable in more than one port or 
IP. For example, internal and " +
+"external traffic can be separated even if SSL is required for 
both. Concretely, the user could define listeners " +
+"with names INTERNAL and EXTERNAL and this property as: 
INTERNAL:SSL,EXTERNAL:SSL. As shown, key and value are " +
+"separated by a colon and map entries are separated by commas. 
Each listener name should only appear once in the map. " +
+"Different security (SSL and SASL) settings can be configured for 
each listener by adding a normalised " +
+"prefix (the listener name is lowercased) to the config name. For 
example, to set a different keystore for the " +
+"INTERNAL listener, a config with name 
listener.name.internal.ssl.keystore.location would be set. " +
+"If the config for the listener name is not set, the config will 
fallback to the generic config (i.e. ssl.keystore.location). " +
+"Note that in KRaft a default mapping from the listener names 
defined by controller.listener.names to PLAINTEXT " +
+"is assumed if no explicit mapping is provided and no other 
security protocol is in use.";
+
+public static final String LISTENERS_CONFIG = "listeners";
+public static final String LISTENERS_DEFAULT = "PLAINTEXT://:9092";
+public static final String LISTENERS_DOC = "Listener List - 
Comma-separated list of URIs we will listen on and the listener names." +
+String.format(" If the listener name is not a security protocol, 
%s must also be set.%n", LISTENER_SECURITY_PROTOCOL_MAP_CONFIG) +
+" Listener names and port numbers must be unique unless %n" +
+" one listener is an IPv4 address and the other listener is %n" +
+" an IPv6 address (for the same port).%n" +
+" Specify hostname as 0.0.0.0 to bind to all interfaces.%n" +
+" Leave hostname empty to bind to default interface.%n" +
+" Examples of legal listener lists:%n" +
+" PLAINTEXT://myhost:9092,SSL://:9091%n" +
+" 
CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093%n" +
+" PLAINTEXT://127.0.0.1:9092,SSL://[::1]:9092%n";
+
+public static final String ADVERTISED_LISTENERS_CONFIG = 
"advertised.listeners";
+public static final String ADVERTISED_LISTENERS_DOC = String.format(
+"Listeners to publish to ZooKeeper for clients to use, if 
different than the %s config property." +
+" In IaaS environments, this may need to be different from 
the interface to which the broker binds." +
+" If this is not set, the value for %1$1s 
will be used." +
+" Unlike %1$1s, it is not valid to advertise 
the 0.0.0.0 meta-address.%n" +
+" Also unlike %1$1s, there can be duplicated 
ports in this property," +
+" so that one listener can

[jira] [Commented] (KAFKA-15089) Consolidate all the group coordinator configs

2024-04-22 Thread Omnia Ibrahim (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839744#comment-17839744
 ] 

Omnia Ibrahim commented on KAFKA-15089:
---

I can have a look once I finish KAFKA-15853

> Consolidate all the group coordinator configs
> -
>
> Key: KAFKA-15089
> URL: https://issues.apache.org/jira/browse/KAFKA-15089
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Priority: Major
>
> The group coordinator configurations are defined in KafkaConfig at the 
> moment. As KafkaConfig is defined in the core module, we can't pass it to the 
> new java modules to pass the configurations along.
> A suggestion here is to centralize all the configurations of a module in the 
> module itself similarly to what we have do for RemoteLogManagerConfig and 
> RaftConfig. We also need a mechanism to add all the properties defined in the 
> module to the KafkaConfig's ConfigDef.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: Reduce the time taken to execute the TieredStorage tests. [kafka]

2024-04-22 Thread via GitHub


kamalcph opened a new pull request, #15780:
URL: https://github.com/apache/kafka/pull/15780

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16559 allow defining number of disks per broker in TestKitNodes [kafka]

2024-04-22 Thread via GitHub


chia7712 commented on PR #15776:
URL: https://github.com/apache/kafka/pull/15776#issuecomment-2069843217

   @soarez Any concerns? The failed streams tests can get fixed by backporting 
#14983. Hence, I'd like to merge it :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-04-22 Thread via GitHub


m1a2st commented on PR #15779:
URL: https://github.com/apache/kafka/pull/15779#issuecomment-2069841257

   @chia7712 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16598 Mirgrate `ResetConsumerGroupOffsetTest` to new test infra [kafka]

2024-04-22 Thread via GitHub


m1a2st opened a new pull request, #15779:
URL: https://github.com/apache/kafka/pull/15779

   change the ResetConsumerGroupOffsetTest first to new test framework


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move quota configs into server-common package [kafka]

2024-04-22 Thread via GitHub


OmniaGM commented on code in PR #15774:
URL: https://github.com/apache/kafka/pull/15774#discussion_r1574929931


##
server-common/src/main/java/org/apache/kafka/server/config/ServerQuotaConfigs.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import java.util.Collections;
+import java.util.List;
+
+public class ServerQuotaConfigs {

Review Comment:
   Just pushed an update now. 
   - Renamed `ServerQuotaConfigs` to `QuotaConfigs`
   - Moved all quota related configs, methods, config def, etc to 
`QuotaConfigs` in `server-common` and out of `client` and `server`
   - Deleted `org.apache.kafka.common.config.internals.QuotaConfigs` and 
`DynamicConfig.QuotaConfigs`. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16589) Consider removing `ClusterInstance#createAdminClient` since callers are not sure whether they need to call close

2024-04-22 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16589:
--

Assignee: PoAn Yang  (was: Chia-Ping Tsai)

> Consider removing `ClusterInstance#createAdminClient` since callers are not 
> sure whether they need to call close
> 
>
> Key: KAFKA-16589
> URL: https://issues.apache.org/jira/browse/KAFKA-16589
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
>
> Sometimes we close the admin created by `createAdminClient`, and sometimes we 
> don't. That is not a true problem since the `ClusterInstance` will call 
> `close` when stopping.
> However, that cause a lot of inconsistent code, and in fact it does not save 
> much time since creating a Admin is not a hard work. We can get 
> `bootstrapServers` and `bootstrapControllers` from `ClusterInstance` easily.
>  
> {code:java}
> // before
> try (Admin admin = cluster.createAdminClient()) { }
> // after v0
> try (Admin admin = Admin.create(Collections.singletonMap(
> CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
> cluster.bootstrapServers( {}
> {code}
> Personally, the `after` version is not verbose, but we can have alternatives: 
> `Map clientConfigs`.
>  
> {code:java}
> // after v1
> try (Admin admin = Admin.create(cluster.clientConfigs())) {}{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16598) Mirgrate `ResetConsumerGroupOffsetTest` to new test infra

2024-04-22 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16598:
--

Assignee: 黃竣陽  (was: Chia-Ping Tsai)

> Mirgrate `ResetConsumerGroupOffsetTest` to new test infra
> -
>
> Key: KAFKA-16598
> URL: https://issues.apache.org/jira/browse/KAFKA-16598
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: 黃竣陽
>Priority: Major
>
> as title.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16589) Consider removing `ClusterInstance#createAdminClient` since callers are not sure whether they need to call close

2024-04-22 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839733#comment-17839733
 ] 

PoAn Yang commented on KAFKA-16589:
---

Got it. I'm interested in the issue. May I take it? Thank you.

> Consider removing `ClusterInstance#createAdminClient` since callers are not 
> sure whether they need to call close
> 
>
> Key: KAFKA-16589
> URL: https://issues.apache.org/jira/browse/KAFKA-16589
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> Sometimes we close the admin created by `createAdminClient`, and sometimes we 
> don't. That is not a true problem since the `ClusterInstance` will call 
> `close` when stopping.
> However, that cause a lot of inconsistent code, and in fact it does not save 
> much time since creating a Admin is not a hard work. We can get 
> `bootstrapServers` and `bootstrapControllers` from `ClusterInstance` easily.
>  
> {code:java}
> // before
> try (Admin admin = cluster.createAdminClient()) { }
> // after v0
> try (Admin admin = Admin.create(Collections.singletonMap(
> CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
> cluster.bootstrapServers( {}
> {code}
> Personally, the `after` version is not verbose, but we can have alternatives: 
> `Map clientConfigs`.
>  
> {code:java}
> // after v1
> try (Admin admin = Admin.create(cluster.clientConfigs())) {}{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16589) Consider removing `ClusterInstance#createAdminClient` since callers are not sure whether they need to call close

2024-04-22 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839731#comment-17839731
 ] 

Chia-Ping Tsai commented on KAFKA-16589:


{quote}

 is this ticket about removing `createAdminClient` from `ClusterInstance` to 
encourage developer to create admin by `Admin.create` directly? Thanks.

{quote}

yep, I'd like to make the `creation` be more consistent. For example, why 
ClusterInstance don't have `createConsumer` which can manage all created 
consumers? However, it could cause the error if `ClusterInstance` do close 
consumers in teardown since the consumers don't leave group in running the test 
case.

Furthermore, it will cause a lot of false warnings that the created admins 
don't get closed.

 

 

> Consider removing `ClusterInstance#createAdminClient` since callers are not 
> sure whether they need to call close
> 
>
> Key: KAFKA-16589
> URL: https://issues.apache.org/jira/browse/KAFKA-16589
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> Sometimes we close the admin created by `createAdminClient`, and sometimes we 
> don't. That is not a true problem since the `ClusterInstance` will call 
> `close` when stopping.
> However, that cause a lot of inconsistent code, and in fact it does not save 
> much time since creating a Admin is not a hard work. We can get 
> `bootstrapServers` and `bootstrapControllers` from `ClusterInstance` easily.
>  
> {code:java}
> // before
> try (Admin admin = cluster.createAdminClient()) { }
> // after v0
> try (Admin admin = Admin.create(Collections.singletonMap(
> CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
> cluster.bootstrapServers( {}
> {code}
> Personally, the `after` version is not verbose, but we can have alternatives: 
> `Map clientConfigs`.
>  
> {code:java}
> // after v1
> try (Admin admin = Admin.create(cluster.clientConfigs())) {}{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16598) Mirgrate `ResetConsumerGroupOffsetTest` to new test infra

2024-04-22 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-16598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839730#comment-17839730
 ] 

黃竣陽 commented on KAFKA-16598:
-

I am able to handle this issue.

> Mirgrate `ResetConsumerGroupOffsetTest` to new test infra
> -
>
> Key: KAFKA-16598
> URL: https://issues.apache.org/jira/browse/KAFKA-16598
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> as title.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Add junit properties to display parameterized test names [kafka]

2024-04-22 Thread via GitHub


chia7712 commented on PR #14983:
URL: https://github.com/apache/kafka/pull/14983#issuecomment-2069776278

   When testing 3.7 branch for #15776, I encountered following error:
   ```
   java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.InvalidTopicException: Topic name is invalid: 
the length of 
'appId_1713797371220_HighAvailabilityTaskAssignorIntegrationTestshouldScaleOutWithWarmupTasksAndPersistentStores_3__rackAwareStrategy_balance-storeHighAvailabilityTaskAssignorIntegrationTestshouldScaleOutWithWarmupTasksAndPersistentStores_3__rackAwareStrategy_balance-changelog'
 is longer than the max allowed length 249
   at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
   at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
   at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
   at 
org.apache.kafka.streams.integration.utils.KafkaEmbedded.createTopic(KafkaEmbedded.java:178)
   ... 5 more
   
   Caused by:
   org.apache.kafka.common.errors.InvalidTopicException: Topic name 
is invalid: the length of 
'appId_1713797371220_HighAvailabilityTaskAssignorIntegrationTestshouldScaleOutWithWarmupTasksAndPersistentStores_3__rackAwareStrategy_balance-storeHighAvailabilityTaskAssignorIntegrationTestshouldScaleOutWithWarmupTasksAndPersistentStores_3__rackAwareStrategy_balance-changelog'
 is longer than the max allowed length 249
   ```
   Hence, I'd like to backport this PR to 3.7. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16589) Consider removing `ClusterInstance#createAdminClient` since callers are not sure whether they need to call close

2024-04-22 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839724#comment-17839724
 ] 

PoAn Yang edited comment on KAFKA-16589 at 4/22/24 2:56 PM:


Hi [~chia7712], is this ticket about removing `createAdminClient` from 
`ClusterInstance` to encourage developer to create admin by `Admin.create` 
directly? Thanks.


was (Author: JIRAUSER300229):
Hi [~chia7712], is this ticket try to `createAdminClient` from 
`ClusterInstance` to encourage developer to create admin by `Admin.create` 
directly? Thanks.

> Consider removing `ClusterInstance#createAdminClient` since callers are not 
> sure whether they need to call close
> 
>
> Key: KAFKA-16589
> URL: https://issues.apache.org/jira/browse/KAFKA-16589
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> Sometimes we close the admin created by `createAdminClient`, and sometimes we 
> don't. That is not a true problem since the `ClusterInstance` will call 
> `close` when stopping.
> However, that cause a lot of inconsistent code, and in fact it does not save 
> much time since creating a Admin is not a hard work. We can get 
> `bootstrapServers` and `bootstrapControllers` from `ClusterInstance` easily.
>  
> {code:java}
> // before
> try (Admin admin = cluster.createAdminClient()) { }
> // after v0
> try (Admin admin = Admin.create(Collections.singletonMap(
> CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
> cluster.bootstrapServers( {}
> {code}
> Personally, the `after` version is not verbose, but we can have alternatives: 
> `Map clientConfigs`.
>  
> {code:java}
> // after v1
> try (Admin admin = Admin.create(cluster.clientConfigs())) {}{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-22 Thread via GitHub


FrankYang0529 commented on PR #15679:
URL: https://github.com/apache/kafka/pull/15679#issuecomment-2069788585

   > @FrankYang0529 thanks for updated PR. two minor comments left. PTAL
   
   Hi @chia7712, I addressed last comments. Thanks for the review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-22 Thread via GitHub


dongnuo123 commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1574897141


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -10331,6 +10333,461 @@ public void 
testClassicGroupOnUnloadedCompletingRebalance() throws Exception {
 .setErrorCode(NOT_COORDINATOR.code()), 
pendingMemberSyncResult.syncFuture.get());
 }
 
+@Test
+public void testLastClassicProtocolMemberLeavingConsumerGroup() {
+String groupId = "group-id";
+String memberId1 = Uuid.randomUuid().toString();
+String memberId2 = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+Uuid zarTopicId = Uuid.randomUuid();
+String zarTopicName = "zar";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+List protocols = 
Collections.singletonList(
+new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+.setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+Arrays.asList(fooTopicName, barTopicName),
+null,
+Arrays.asList(
+new TopicPartition(fooTopicName, 0),
+new TopicPartition(fooTopicName, 1),
+new TopicPartition(fooTopicName, 2),
+new TopicPartition(barTopicName, 0),
+new TopicPartition(barTopicName, 1)
+)
+
+);
+
+ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(9)
+.setClientId("client")
+.setClientHost("localhost/127.0.0.1")
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setServerAssignorName("range")
+.setRebalanceTimeoutMs(45000)
+.setClassicMemberMetadata(new 
ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+.setSupportedProtocols(protocols))
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2),
+mkTopicAssignment(barTopicId, 0, 1)))
+.build();
+ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+.setState(MemberState.STABLE)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(9)
+.setClientId("client")
+.setClientHost("localhost/127.0.0.1")
+// Use zar only here to ensure that metadata needs to be 
recomputed.
+.setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
+.setServerAssignorName("range")
+.setRebalanceTimeoutMs(45000)
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 3, 4, 5),
+mkTopicAssignment(barTopicId, 2)))
+.build();
+
+// Consumer group with two members.
+// Member 1 uses the classic protocol and member 2 uses the consumer 
protocol.
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+
.withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.addTopic(barTopicId, barTopicName, 3)
+.addTopic(zarTopicId, zarTopicName, 1)
+.addRacks()
+.build())
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(member1)
+.withMember(member2)
+.withAssignment(memberId1, mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2),
+mkTopicAssignment(barTopicId, 0, 1)))
+.withAssignment(memberId2, mkAssignment(
+mkTopicAssignment(fooTopicId, 3, 4, 5),
+mkTopicAssignment(barTopicId, 2)))
+.withAssignmentEpoch(10))
+.build();
+
+context.commit();
+ConsumerGroup consumerGroup = 
context.groupMetadataManager.consumerGroup(groupId);
+
+// Member 2 leaves the consumer group, triggering the downgrade.
+CoordinatorResult result = 
context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId2)
+.setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+   

[jira] [Created] (KAFKA-16598) Mirgrate `ResetConsumerGroupOffsetTest` to new test infra

2024-04-22 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16598:
--

 Summary: Mirgrate `ResetConsumerGroupOffsetTest` to new test infra
 Key: KAFKA-16598
 URL: https://issues.apache.org/jira/browse/KAFKA-16598
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


as title.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16589) Consider removing `ClusterInstance#createAdminClient` since callers are not sure whether they need to call close

2024-04-22 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839724#comment-17839724
 ] 

PoAn Yang commented on KAFKA-16589:
---

Hi [~chia7712], is this ticket try to `createAdminClient` from 
`ClusterInstance` to encourage developer to create admin by `Admin.create` 
directly? Thanks.

> Consider removing `ClusterInstance#createAdminClient` since callers are not 
> sure whether they need to call close
> 
>
> Key: KAFKA-16589
> URL: https://issues.apache.org/jira/browse/KAFKA-16589
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> Sometimes we close the admin created by `createAdminClient`, and sometimes we 
> don't. That is not a true problem since the `ClusterInstance` will call 
> `close` when stopping.
> However, that cause a lot of inconsistent code, and in fact it does not save 
> much time since creating a Admin is not a hard work. We can get 
> `bootstrapServers` and `bootstrapControllers` from `ClusterInstance` easily.
>  
> {code:java}
> // before
> try (Admin admin = cluster.createAdminClient()) { }
> // after v0
> try (Admin admin = Admin.create(Collections.singletonMap(
> CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
> cluster.bootstrapServers( {}
> {code}
> Personally, the `after` version is not verbose, but we can have alternatives: 
> `Map clientConfigs`.
>  
> {code:java}
> // after v1
> try (Admin admin = Admin.create(cluster.clientConfigs())) {}{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16528: Client HB timing fix [kafka]

2024-04-22 Thread via GitHub


lianetm commented on PR #15698:
URL: https://github.com/apache/kafka/pull/15698#issuecomment-2069725394

   Hi @cadonna, thanks for the comments!
   - the unit test I added initially fails on 
[this](https://github.com/apache/kafka/blob/d242c444562fe4be41a2bc53d47ab2a6f523b955/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java#L276)
 assertion without the changes on this PR, I guess that's what you were looking 
for with your first comment? 
   -  agreed, we did not have test coverage to make sure that the 
`canSendRequest` and `nextHeartbeat` move along consistently. I added a func to 
validate them together, included it in the existing test that covers responses 
with errors, and added a new test to include it in the successful path too.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >