[jira] [Commented] (KAFKA-15709) KRaft support in ServerStartupTest

2024-04-18 Thread Matthew de Detrich (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838640#comment-17838640
 ] 

Matthew de Detrich commented on KAFKA-15709:


[~linzihao1999] are you still working on this or can I take over

> KRaft support in ServerStartupTest
> --
>
> Key: KAFKA-15709
> URL: https://issues.apache.org/jira/browse/KAFKA-15709
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Assignee: Zihao Lin
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
>
> The following tests in ServerStartupTest in 
> core/src/test/scala/unit/kafka/server/ServerStartupTest.scala need to be 
> updated to support KRaft
> 38 : def testBrokerCreatesZKChroot(): Unit = {
> 51 : def testConflictBrokerStartupWithSamePort(): Unit = {
> 65 : def testConflictBrokerRegistration(): Unit = {
> 82 : def testBrokerSelfAware(): Unit = {
> 93 : def testBrokerStateRunningAfterZK(): Unit = {
> Scanned 107 lines. Found 0 KRaft tests out of 5 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15737) KRaft support in ConsumerBounceTest

2024-04-18 Thread Matthew de Detrich (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838639#comment-17838639
 ] 

Matthew de Detrich edited comment on KAFKA-15737 at 4/18/24 12:39 PM:
--

[~k-raina] are you working on this or can I have a look into it?


was (Author: JIRAUSER297606):
Ill have a look at this

> KRaft support in ConsumerBounceTest
> ---
>
> Key: KAFKA-15737
> URL: https://issues.apache.org/jira/browse/KAFKA-15737
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Assignee: Kaushik Raina
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
>
> The following tests in ConsumerBounceTest in 
> core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala need to be 
> updated to support KRaft
> 81 : def testConsumptionWithBrokerFailures(): Unit = 
> consumeWithBrokerFailures(10)
> 122 : def testSeekAndCommitWithBrokerFailures(): Unit = 
> seekAndCommitWithBrokerFailures(5)
> 161 : def testSubscribeWhenTopicUnavailable(): Unit = {
> 212 : def testClose(): Unit = {
> 297 : def 
> testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(): 
> Unit = {
> 337 : def testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize(): Unit = {
> 370 : def testCloseDuringRebalance(): Unit = {
> Scanned 535 lines. Found 0 KRaft tests out of 7 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15737) KRaft support in ConsumerBounceTest

2024-04-18 Thread Matthew de Detrich (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838639#comment-17838639
 ] 

Matthew de Detrich commented on KAFKA-15737:


Ill have a look at this

> KRaft support in ConsumerBounceTest
> ---
>
> Key: KAFKA-15737
> URL: https://issues.apache.org/jira/browse/KAFKA-15737
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Assignee: Kaushik Raina
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
>
> The following tests in ConsumerBounceTest in 
> core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala need to be 
> updated to support KRaft
> 81 : def testConsumptionWithBrokerFailures(): Unit = 
> consumeWithBrokerFailures(10)
> 122 : def testSeekAndCommitWithBrokerFailures(): Unit = 
> seekAndCommitWithBrokerFailures(5)
> 161 : def testSubscribeWhenTopicUnavailable(): Unit = {
> 212 : def testClose(): Unit = {
> 297 : def 
> testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(): 
> Unit = {
> 337 : def testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize(): Unit = {
> 370 : def testCloseDuringRebalance(): Unit = {
> Scanned 535 lines. Found 0 KRaft tests out of 7 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14547) Be able to run kafka KRaft Server in tests without needing to run a storage setup script

2024-04-18 Thread Matthew de Detrich (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838637#comment-17838637
 ] 

Matthew de Detrich commented on KAFKA-14547:


Ill have a look into this

> Be able to run kafka KRaft Server in tests without needing to run a storage 
> setup script
> 
>
> Key: KAFKA-14547
> URL: https://issues.apache.org/jira/browse/KAFKA-14547
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft
>Affects Versions: 3.3.1
>Reporter: Natan Silnitsky
>Priority: Major
>
> Currently kafka KRaft Server requires running kafka-storage.sh in order to 
> start properly.
> This makes setup much more cubersome for build tools like bazel to work 
> properly.
> One way to mitigate this is to configure the paths via kafkaConfig...



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16378) Under tiered storage, deleting local logs does not free disk space

2024-04-18 Thread Ivan Yurchenko (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838635#comment-17838635
 ] 

Ivan Yurchenko commented on KAFKA-16378:


This was a problem in the RemoteStorageManager. I'll close this issue. 
[~jianbin] feel free to reopen if you think this needs more attention.

> Under tiered storage, deleting local logs does not free disk space
> --
>
> Key: KAFKA-16378
> URL: https://issues.apache.org/jira/browse/KAFKA-16378
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Jianbin Chen
>Priority: Major
> Attachments: image-2024-03-15-09-33-13-903.png
>
>
> Of course, this is an occasional phenomenon, as long as the tiered storage 
> topic triggered the deletion of the local log action, there is always the 
> possibility of residual file references, but these files on the local disk is 
> already impossible to find!
> I use the implementation as: [Aiven-Open/tiered-storage-for-apache-kafka: 
> RemoteStorageManager for Apache Kafka® Tiered Storage 
> (github.com)|https://github.com/Aiven-Open/tiered-storage-for-apache-kafka]
> I also filed an issue in their community, which also contains a full 
> description of the problem
> [Disk space not released · Issue #513 · 
> Aiven-Open/tiered-storage-for-apache-kafka 
> (github.com)|https://github.com/Aiven-Open/tiered-storage-for-apache-kafka/issues/513]
> !image-2024-03-15-09-33-13-903.png!
> You can clearly see in this figure that the kafka log has already output the 
> log of the operation that deleted the log, but the log is still referenced 
> and the disk space has not been released



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-16378) Under tiered storage, deleting local logs does not free disk space

2024-04-18 Thread Ivan Yurchenko (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Yurchenko closed KAFKA-16378.
--

Was a problem in the remote storage manager plugin

> Under tiered storage, deleting local logs does not free disk space
> --
>
> Key: KAFKA-16378
> URL: https://issues.apache.org/jira/browse/KAFKA-16378
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Jianbin Chen
>Priority: Major
> Attachments: image-2024-03-15-09-33-13-903.png
>
>
> Of course, this is an occasional phenomenon, as long as the tiered storage 
> topic triggered the deletion of the local log action, there is always the 
> possibility of residual file references, but these files on the local disk is 
> already impossible to find!
> I use the implementation as: [Aiven-Open/tiered-storage-for-apache-kafka: 
> RemoteStorageManager for Apache Kafka® Tiered Storage 
> (github.com)|https://github.com/Aiven-Open/tiered-storage-for-apache-kafka]
> I also filed an issue in their community, which also contains a full 
> description of the problem
> [Disk space not released · Issue #513 · 
> Aiven-Open/tiered-storage-for-apache-kafka 
> (github.com)|https://github.com/Aiven-Open/tiered-storage-for-apache-kafka/issues/513]
> !image-2024-03-15-09-33-13-903.png!
> You can clearly see in this figure that the kafka log has already output the 
> log of the operation that deleted the log, but the log is still referenced 
> and the disk space has not been released



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-18 Thread via GitHub


chia7712 commented on PR #15634:
URL: https://github.com/apache/kafka/pull/15634#issuecomment-2063727744

   Sorry that the story I mentioned above seems be another issue. Let me have 
the summary about my thought.
   
   1. `log-start-offset-checkpoint` is missing and remote storage is enabled. 
The `logStartOffset` will be set to zero, and it seems be a potential issue 
since the `ListOffsetRequest` could get incorrect result
   2. `replication-offset-checkpoint` is missing and remote storage is enabled. 
This is what your described. The HWM is pointed to middle of tiered storage and 
so it causes error when fetching records from local segments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]

2024-04-18 Thread via GitHub


showuon commented on code in PR #15732:
URL: https://github.com/apache/kafka/pull/15732#discussion_r1570596642


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -391,13 +391,16 @@ void enqueueMetadataChangeEvent(
 
 // Events handled by Migration Driver.
 abstract class MigrationEvent implements EventQueue.Event {
+// Use no-op handler by default because the retryHandler will be 
overridden if needed
+public void retryHandler() { }
 @SuppressWarnings("ThrowableNotThrown")
 @Override
 public void handleException(Throwable e) {
 if (e instanceof MigrationClientAuthException) {
 
KRaftMigrationDriver.this.faultHandler.handleFault("Encountered ZooKeeper 
authentication in " + this, e);
 } else if (e instanceof MigrationClientException) {
 log.info(String.format("Encountered ZooKeeper error during 
event %s. Will retry.", this), e.getCause());
+retryHandler();

Review Comment:
   @chia7712 , I take your suggestion to add `RecoverMigrationStateFromZKEvent` 
so that we don't need to worry about retry anymore. I was checking if this 
change will cause any side effect, and here is my finding:
   1. `recoverMigrationStateFromZK` is expected to run before the driver starts 
the state machine.
   2. In the `recoverMigrationStateFromZK`, we'll do these things:
 a. create a ZNode for migration and initial migration state
 b. install this class as a metadata publisher
 c. transition to INACTIVE state
   3. If this `recoverMigrationStateFromZK` is keep failing, the log will keep 
outputting errors and keep retrying. Once it succeeds, the metadata publisher 
will be installed and the `onControllerChange` and `onMetadataUpdate` will be 
triggered to start the process. That means, if we change 
`recoverMigrationStateFromZK` into an event, it won't affect anything because 
what we need to do at this state is just waiting for the (a)(b)(c) operation 
completes.
   
   So, I'm +1 with this suggestion. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14569: Migrate Kafka Streams tests from Zookeeper to KRaft [kafka]

2024-04-18 Thread via GitHub


mdedetrich commented on PR #15341:
URL: https://github.com/apache/kafka/pull/15341#issuecomment-2063692520

   @OmniaGM So the structure of the PR/tests is now finalized, I ended up 
entirely deleting `KafkaEmbedded` and so now the tests are a lot closer to 
idioomatic testing with KRaft/`KafkaClusterTestKit`. Regarding using a global 
`EmbeddedKafkaCluster`, I think this makes sense to do in a separate PR or at 
least when I manage to get the tests to pass in this PR.
   
   On that note, the current issue with the PR is that not all of the tests are 
passing, i.e. `HandlingSourceTopicDeletionIntegrationTest` as an example on the 
top of my head. Currently debugging to figure out why but any help would be 
appreciated, its likely some config/prop is not properly set


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor

2024-04-18 Thread Stanislav Spiridonov (Jira)
Stanislav Spiridonov created KAFKA-16585:


 Summary: No way to forward message from punctuation method in the 
FixedKeyProcessor
 Key: KAFKA-16585
 URL: https://issues.apache.org/jira/browse/KAFKA-16585
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.6.2
Reporter: Stanislav Spiridonov


The FixedKeyProcessorContext can forward only FixedKeyRecord. This class 
doesn't have a public constructor and can be created based on existing records. 
But such record usually is absent in the punctuation method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR:Add hint for `resumeDeletions` in `TopicDeletionManager` [kafka]

2024-04-18 Thread via GitHub


hudeqi commented on PR #15543:
URL: https://github.com/apache/kafka/pull/15543#issuecomment-2063665249

   Hi, @jlprat , could you help to review this pr?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR:Optimize the log output when cleaning up offsets for groups with a generation less than or equal to 0 [kafka]

2024-04-18 Thread via GitHub


hudeqi commented on PR #15726:
URL: https://github.com/apache/kafka/pull/15726#issuecomment-2063663065

   @dajac Hi, does this PR make sense for the group's offset cleanup log output?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16578) Revert changes to connect_distributed_test.py for the new async Consumer

2024-04-18 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838609#comment-17838609
 ] 

Sagar Rao edited comment on KAFKA-16578 at 4/18/24 11:11 AM:
-

[~kirktrue], I am running the the system tests for connect using the test suite 
on my local, and this exact test *test_exactly_once_source* fails regularly for 
me for a different config.
{code:java}

test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   7 minutes 59.232 seconds


InsufficientResourcesError('linux nodes requested: 1. linux nodes 
available: 0')
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", 
line 929, in test_exactly_once_source
consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, consumer_timeout_ms=1000, print_key=True, 
consumer_properties=consumer_properties)
  File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, 
in __init__
BackgroundThreadService.__init__(self, context, num_nodes)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py",
 line 26, in __init__
super(BackgroundThreadService, self).__init__(context, num_nodes, 
cluster_spec, *args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
line 107, in __init__
self.allocate_nodes()
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
line 217, in allocate_nodes
self.nodes = self.cluster.alloc(self.cluster_spec)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/cluster.py", 
line 54, in alloc
allocated = self.do_alloc(cluster_spec)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/cluster/finite_subcluster.py", 
line 37, in do_alloc
good_nodes, bad_nodes = self._available_nodes.remove_spec(cluster_spec)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/cluster/node_container.py", 
line 131, in remove_spec
raise InsufficientResourcesError(err)
ducktape.cluster.node_container.InsufficientResourcesError: linux nodes 
requested: 1. linux nodes available: 0
{code}
If you want, I can revert the change as part of my PR. It should be ready for 
review by today or tomorrow.


was (Author: sagarrao):
[~kirktrue], I am working on modifying the  the system tests for connect, and 
this exact test *test_exactly_once_source* fails regularly for me for a 
different config. 


{code:java}

test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   7 minutes 59.232 seconds


InsufficientResourcesError('linux nodes requested: 1. linux nodes 
available: 0')
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", 
line 929, in test_exactly_once_source
consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, consumer_timeout_ms=1000, print_key=True, 
consumer_properties=consumer_properties)
  File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, 
in __init__
BackgroundThreadService.__init__(self, context, num_nodes)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py",
 line 26, in __init__
super(BackgroundThreadService, self).__init__(context, num_nodes, 
cluster_spec, *args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
line 107, in 

[jira] [Comment Edited] (KAFKA-16578) Revert changes to connect_distributed_test.py for the new async Consumer

2024-04-18 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838609#comment-17838609
 ] 

Sagar Rao edited comment on KAFKA-16578 at 4/18/24 11:10 AM:
-

[~kirktrue], I am working on modifying the  the system tests for connect, and 
this exact test *test_exactly_once_source* fails regularly for me for a 
different config. 


{code:java}

test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   7 minutes 59.232 seconds


InsufficientResourcesError('linux nodes requested: 1. linux nodes 
available: 0')
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", 
line 929, in test_exactly_once_source
consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, consumer_timeout_ms=1000, print_key=True, 
consumer_properties=consumer_properties)
  File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, 
in __init__
BackgroundThreadService.__init__(self, context, num_nodes)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py",
 line 26, in __init__
super(BackgroundThreadService, self).__init__(context, num_nodes, 
cluster_spec, *args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
line 107, in __init__
self.allocate_nodes()
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
line 217, in allocate_nodes
self.nodes = self.cluster.alloc(self.cluster_spec)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/cluster.py", 
line 54, in alloc
allocated = self.do_alloc(cluster_spec)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/cluster/finite_subcluster.py", 
line 37, in do_alloc
good_nodes, bad_nodes = self._available_nodes.remove_spec(cluster_spec)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/cluster/node_container.py", 
line 131, in remove_spec
raise InsufficientResourcesError(err)
ducktape.cluster.node_container.InsufficientResourcesError: linux nodes 
requested: 1. linux nodes available: 0
{code}



If you want, I can revert the change as part of my PR. It should be ready for 
review by today or tomorrow.


was (Author: sagarrao):
[~kirktrue], I am working on modifying the  the system tests for connect, and 
this exact test `test_exactly_once_source` fails regularly for me for a 
different config. 
```

test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   7 minutes 59.232 seconds


InsufficientResourcesError('linux nodes requested: 1. linux nodes 
available: 0')
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", 
line 929, in test_exactly_once_source
consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, consumer_timeout_ms=1000, print_key=True, 
consumer_properties=consumer_properties)
  File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, 
in __init__
BackgroundThreadService.__init__(self, context, num_nodes)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py",
 line 26, in __init__
super(BackgroundThreadService, self).__init__(context, num_nodes, 
cluster_spec, *args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
line 107, in __init__

[jira] [Commented] (KAFKA-16578) Revert changes to connect_distributed_test.py for the new async Consumer

2024-04-18 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838609#comment-17838609
 ] 

Sagar Rao commented on KAFKA-16578:
---

[~kirktrue], I am working on modifying the  the system tests for connect, and 
this exact test `test_exactly_once_source` fails regularly for me for a 
different config. 
```

test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   7 minutes 59.232 seconds


InsufficientResourcesError('linux nodes requested: 1. linux nodes 
available: 0')
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", 
line 929, in test_exactly_once_source
consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, consumer_timeout_ms=1000, print_key=True, 
consumer_properties=consumer_properties)
  File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, 
in __init__
BackgroundThreadService.__init__(self, context, num_nodes)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py",
 line 26, in __init__
super(BackgroundThreadService, self).__init__(context, num_nodes, 
cluster_spec, *args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
line 107, in __init__
self.allocate_nodes()
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
line 217, in allocate_nodes
self.nodes = self.cluster.alloc(self.cluster_spec)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/cluster.py", 
line 54, in alloc
allocated = self.do_alloc(cluster_spec)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/cluster/finite_subcluster.py", 
line 37, in do_alloc
good_nodes, bad_nodes = self._available_nodes.remove_spec(cluster_spec)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/cluster/node_container.py", 
line 131, in remove_spec
raise InsufficientResourcesError(err)
ducktape.cluster.node_container.InsufficientResourcesError: linux nodes 
requested: 1. linux nodes available: 0
```

If you want, I can revert the change as part of my PR. It should be ready for 
review by today or tomorrow.

> Revert changes to connect_distributed_test.py for the new async Consumer
> 
>
> Key: KAFKA-16578
> URL: https://issues.apache.org/jira/browse/KAFKA-16578
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer, system tests
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated 
> a slew of system tests to run both the "old" and "new" implementations. 
> KAFKA-16272 updated the system tests in {{connect_distributed_test.py}} so it 
> could test the new consumer with Connect. However, we are not supporting 
> Connect with the new consumer in 3.8.0. Unsurprisingly, when we run the 
> Connect system tests with the new {{AsyncKafkaConsumer}}, we get errors like 
> the following:
> {code}
> test_id:
> kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=eager.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   6 minutes 3.899 seconds
> InsufficientResourcesError('Not enough nodes available to allocate. linux 
> nodes requested: 1. linux nodes available: 0')
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> 

Re: [PR] MINOR: Use Parametrized types correctly in RemoteLogMetadataSerde [kafka]

2024-04-18 Thread via GitHub


mimaison commented on PR #13824:
URL: https://github.com/apache/kafka/pull/13824#issuecomment-2063586815

   Feel free to merge once CI completes. Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Use Parametrized types correctly in RemoteLogMetadataSerde [kafka]

2024-04-18 Thread via GitHub


jlprat commented on PR #13824:
URL: https://github.com/apache/kafka/pull/13824#issuecomment-2063587938

   Thanks @mimaison. Will do!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]

2024-04-18 Thread via GitHub


brandboat commented on PR #15719:
URL: https://github.com/apache/kafka/pull/15719#issuecomment-2063574931

   Thanks for the reminder, soarez ! Already fix the error.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-18 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1569131419


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1189,50 +1216,61 @@ class LogManager(logDirs: Seq[File],
   val sourceLog = currentLogs.get(topicPartition)
   val destLog = futureLogs.get(topicPartition)
 
-  info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
   if (sourceLog == null)
 throw new KafkaStorageException(s"The current replica for 
$topicPartition is offline")
   if (destLog == null)
 throw new KafkaStorageException(s"The future replica for 
$topicPartition is offline")
 
-  destLog.renameDir(UnifiedLog.logDirName(topicPartition), 
shouldReinitialize = true)
-  // the metrics tags still contain "future", so we have to remove it.
-  // we will add metrics back after sourceLog remove the metrics
-  destLog.removeLogMetrics()
-  destLog.updateHighWatermark(sourceLog.highWatermark)
+  replaceCurrentWithFutureLog(Option(sourceLog), destLog, 
updateHighWatermark = true)
+}
+  }
+
+  def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: 
UnifiedLog, updateHighWatermark: Boolean = false): Unit = {
+val topicPartition = destLog.topicPartition
+info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
 
-  // Now that future replica has been successfully renamed to be the 
current replica
-  // Update the cached map and log cleaner as appropriate.
-  futureLogs.remove(topicPartition)
-  currentLogs.put(topicPartition, destLog)
-  if (cleaner != null) {
-cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, 
destLog.parentDirFile)
-resumeCleaning(topicPartition)
-  }
+destLog.renameDir(UnifiedLog.logDirName(topicPartition), 
shouldReinitialize = true)
+// the metrics tags still contain "future", so we have to remove it.
+// we will add metrics back after sourceLog remove the metrics
+destLog.removeLogMetrics()
+if (updateHighWatermark && sourceLog.isDefined) {
+  destLog.updateHighWatermark(sourceLog.get.highWatermark)
+}
 
-  try {
-sourceLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), 
shouldReinitialize = true)
+// Now that future replica has been successfully renamed to be the current 
replica
+// Update the cached map and log cleaner as appropriate.
+futureLogs.remove(topicPartition)
+currentLogs.put(topicPartition, destLog)
+if (cleaner != null) {
+  cleaner.alterCheckpointDir(topicPartition, 
sourceLog.map(_.parentDirFile), destLog.parentDirFile)

Review Comment:
   Addressed in 
[d02fc0b](https://github.com/apache/kafka/pull/15136/commits/d02fc0b9abeacf95096d4275232ea0cf829d836a)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-18 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1570449270


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1189,50 +1216,63 @@ class LogManager(logDirs: Seq[File],
   val sourceLog = currentLogs.get(topicPartition)
   val destLog = futureLogs.get(topicPartition)
 
-  info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
   if (sourceLog == null)
 throw new KafkaStorageException(s"The current replica for 
$topicPartition is offline")
   if (destLog == null)
 throw new KafkaStorageException(s"The future replica for 
$topicPartition is offline")
 
-  destLog.renameDir(UnifiedLog.logDirName(topicPartition), 
shouldReinitialize = true)
-  // the metrics tags still contain "future", so we have to remove it.
-  // we will add metrics back after sourceLog remove the metrics
-  destLog.removeLogMetrics()
-  destLog.updateHighWatermark(sourceLog.highWatermark)
+  replaceCurrentWithFutureLog(Option(sourceLog), destLog, 
updateHighWatermark = true)
+}
+  }
 
-  // Now that future replica has been successfully renamed to be the 
current replica
-  // Update the cached map and log cleaner as appropriate.
-  futureLogs.remove(topicPartition)
-  currentLogs.put(topicPartition, destLog)
-  if (cleaner != null) {
-cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, 
destLog.parentDirFile)
-resumeCleaning(topicPartition)
+  def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: 
UnifiedLog, updateHighWatermark: Boolean = false): Unit = {
+val topicPartition = destLog.topicPartition
+info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")

Review Comment:
   Addressed this and other comments in 
[31412a9](https://github.com/apache/kafka/pull/15136/commits/31412a9485b63731a49b5d44d6dc6fbeaf52dd0f)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-18 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1570449270


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1189,50 +1216,63 @@ class LogManager(logDirs: Seq[File],
   val sourceLog = currentLogs.get(topicPartition)
   val destLog = futureLogs.get(topicPartition)
 
-  info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
   if (sourceLog == null)
 throw new KafkaStorageException(s"The current replica for 
$topicPartition is offline")
   if (destLog == null)
 throw new KafkaStorageException(s"The future replica for 
$topicPartition is offline")
 
-  destLog.renameDir(UnifiedLog.logDirName(topicPartition), 
shouldReinitialize = true)
-  // the metrics tags still contain "future", so we have to remove it.
-  // we will add metrics back after sourceLog remove the metrics
-  destLog.removeLogMetrics()
-  destLog.updateHighWatermark(sourceLog.highWatermark)
+  replaceCurrentWithFutureLog(Option(sourceLog), destLog, 
updateHighWatermark = true)
+}
+  }
 
-  // Now that future replica has been successfully renamed to be the 
current replica
-  // Update the cached map and log cleaner as appropriate.
-  futureLogs.remove(topicPartition)
-  currentLogs.put(topicPartition, destLog)
-  if (cleaner != null) {
-cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, 
destLog.parentDirFile)
-resumeCleaning(topicPartition)
+  def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: 
UnifiedLog, updateHighWatermark: Boolean = false): Unit = {
+val topicPartition = destLog.topicPartition
+info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")

Review Comment:
   Addressed this and other comments in 
[d089929](https://github.com/apache/kafka/pull/15136/commits/d089929b5a92c0be308f685980e2f787b652ed86)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-8041) Flaky Test LogDirFailureTest#testIOExceptionDuringLogRoll

2024-04-18 Thread Omnia Ibrahim (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838583#comment-17838583
 ] 

Omnia Ibrahim edited comment on KAFKA-8041 at 4/18/24 10:14 AM:


[~soarez] Yes, sorry for the late respond. I believe this should be fixed now 
after the merge of [https://github.com/apache/kafka/pull/15335] . It has been 
passing for the last couple of weeks with no flakiness 
[https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=kafka.server.LogDirFailureTest=testIOExceptionDuringLogRoll(String)%5B2%5D
 
|https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=kafka.server.LogDirFailureTest=testIOExceptionDuringLogRoll(String)%5B2%5D]


was (Author: omnia_h_ibrahim):
[~soarez] I believe this should be fixed now after the merge of 
[https://github.com/apache/kafka/pull/15335] . It has been passing for the last 
couple of weeks with no flakiness 
[https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=kafka.server.LogDirFailureTest=testIOExceptionDuringLogRoll(String)%5B2%5D
 
|https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=kafka.server.LogDirFailureTest=testIOExceptionDuringLogRoll(String)%5B2%5D]

> Flaky Test LogDirFailureTest#testIOExceptionDuringLogRoll
> -
>
> Key: KAFKA-8041
> URL: https://issues.apache.org/jira/browse/KAFKA-8041
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.0.1, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Bob Barrett
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.4.0
>
>
> [https://builds.apache.org/blue/organizations/jenkins/kafka-2.0-jdk8/detail/kafka-2.0-jdk8/236/tests]
> {quote}java.lang.AssertionError: Expected some messages
> at kafka.utils.TestUtils$.fail(TestUtils.scala:357)
> at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:787)
> at 
> kafka.server.LogDirFailureTest.testProduceAfterLogDirFailureOnLeader(LogDirFailureTest.scala:189)
> at 
> kafka.server.LogDirFailureTest.testIOExceptionDuringLogRoll(LogDirFailureTest.scala:63){quote}
> STDOUT
> {quote}[2019-03-05 03:44:58,614] ERROR [ReplicaFetcher replicaId=1, 
> leaderId=0, fetcherId=0] Error for partition topic-6 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-05 03:44:58,614] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-05 03:44:58,615] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition topic-10 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-05 03:44:58,615] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition topic-4 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-05 03:44:58,615] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition topic-8 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-05 03:44:58,615] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition topic-2 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-05 03:45:00,248] ERROR Error while rolling log segment for topic-0 
> in dir 
> /home/jenkins/jenkins-slave/workspace/kafka-2.0-jdk8/core/data/kafka-3869208920357262216
>  (kafka.server.LogDirFailureChannel:76)
> java.io.FileNotFoundException: 
> /home/jenkins/jenkins-slave/workspace/kafka-2.0-jdk8/core/data/kafka-3869208920357262216/topic-0/.index
>  (Not a directory)
> at java.io.RandomAccessFile.open0(Native Method)
> at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
> at java.io.RandomAccessFile.(RandomAccessFile.java:243)
> at kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:121)
> at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
> at 

[jira] [Commented] (KAFKA-8041) Flaky Test LogDirFailureTest#testIOExceptionDuringLogRoll

2024-04-18 Thread Omnia Ibrahim (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838583#comment-17838583
 ] 

Omnia Ibrahim commented on KAFKA-8041:
--

[~soarez] I believe this should be fixed now after the merge of 
[https://github.com/apache/kafka/pull/15335] . It has been passing for the last 
couple of weeks with no flakiness 
[https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=kafka.server.LogDirFailureTest=testIOExceptionDuringLogRoll(String)%5B2%5D
 
|https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=kafka.server.LogDirFailureTest=testIOExceptionDuringLogRoll(String)%5B2%5D]

> Flaky Test LogDirFailureTest#testIOExceptionDuringLogRoll
> -
>
> Key: KAFKA-8041
> URL: https://issues.apache.org/jira/browse/KAFKA-8041
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.0.1, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Bob Barrett
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.4.0
>
>
> [https://builds.apache.org/blue/organizations/jenkins/kafka-2.0-jdk8/detail/kafka-2.0-jdk8/236/tests]
> {quote}java.lang.AssertionError: Expected some messages
> at kafka.utils.TestUtils$.fail(TestUtils.scala:357)
> at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:787)
> at 
> kafka.server.LogDirFailureTest.testProduceAfterLogDirFailureOnLeader(LogDirFailureTest.scala:189)
> at 
> kafka.server.LogDirFailureTest.testIOExceptionDuringLogRoll(LogDirFailureTest.scala:63){quote}
> STDOUT
> {quote}[2019-03-05 03:44:58,614] ERROR [ReplicaFetcher replicaId=1, 
> leaderId=0, fetcherId=0] Error for partition topic-6 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-05 03:44:58,614] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-05 03:44:58,615] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition topic-10 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-05 03:44:58,615] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition topic-4 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-05 03:44:58,615] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition topic-8 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-05 03:44:58,615] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition topic-2 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-05 03:45:00,248] ERROR Error while rolling log segment for topic-0 
> in dir 
> /home/jenkins/jenkins-slave/workspace/kafka-2.0-jdk8/core/data/kafka-3869208920357262216
>  (kafka.server.LogDirFailureChannel:76)
> java.io.FileNotFoundException: 
> /home/jenkins/jenkins-slave/workspace/kafka-2.0-jdk8/core/data/kafka-3869208920357262216/topic-0/.index
>  (Not a directory)
> at java.io.RandomAccessFile.open0(Native Method)
> at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
> at java.io.RandomAccessFile.(RandomAccessFile.java:243)
> at kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:121)
> at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
> at kafka.log.AbstractIndex.resize(AbstractIndex.scala:115)
> at kafka.log.AbstractIndex.$anonfun$trimToValidSize$1(AbstractIndex.scala:184)
> at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
> at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:184)
> at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:501)
> at kafka.log.Log.$anonfun$roll$8(Log.scala:1520)
> at kafka.log.Log.$anonfun$roll$8$adapted(Log.scala:1520)
> at scala.Option.foreach(Option.scala:257)
> at kafka.log.Log.$anonfun$roll$2(Log.scala:1520)
> at 

Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-04-18 Thread via GitHub


OmniaGM commented on code in PR #15569:
URL: https://github.com/apache/kafka/pull/15569#discussion_r1570406354


##
server/src/main/java/org/apache/kafka/server/config/KafkaLogConfigs.java:
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.TopicConfig;
+import static 
org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX;
+
+/**
+ * Common home for broker-side log configs which need to be accessible from 
the libraries shared
+ * between the broker and the multiple modules in Kafka.
+ *
+ * Note this is an internal API and subject to change without notice.
+ */
+public class KafkaLogConfigs {
+public static final String NUM_PARTITIONS_CONFIG = "num.partitions";
+public static final int NUM_PARTITIONS_DEFAULT = 1;
+public static final String NUM_PARTITIONS_DOC = "The default number of log 
partitions per topic";
+
+public static final String LOG_DIRS_CONFIG = LOG_PREFIX + "dirs";
+public static final String LOG_DIR_CONFIG = LOG_PREFIX + "dir";
+public static final String LOG_DIR_DEFAULT = "/tmp/kafka-logs";
+public static final String LOG_DIR_DOC = "The directory in which the log 
data is kept (supplemental for " + LOG_DIRS_CONFIG + " property)";
+public static final String LOG_DIRS_DOC = "A comma-separated list of the 
directories where the log data is stored. If not set, the value in " + 
LOG_DIR_CONFIG + " is used.";
+
+public static final String LOG_SEGMENT_BYTES_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_BYTES_CONFIG);
+public static final String LOG_SEGMENT_BYTES_DOC = "The maximum size of a 
single log file";
+
+public static final String LOG_ROLL_TIME_MILLIS_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_MS_CONFIG);
+public static final String LOG_ROLL_TIME_HOURS_CONFIG = LOG_PREFIX + 
"roll.hours";
+public static final String LOG_ROLL_TIME_MILLIS_DOC = "The maximum time 
before a new log segment is rolled out (in milliseconds). If not set, the value 
in " + LOG_ROLL_TIME_HOURS_CONFIG + " is used";
+public static final String LOG_ROLL_TIME_HOURS_DOC = "The maximum time 
before a new log segment is rolled out (in hours), secondary to " + 
LOG_ROLL_TIME_MILLIS_CONFIG + " property";
+
+public static final String LOG_ROLL_TIME_JITTER_MILLIS_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_JITTER_MS_CONFIG);
+public static final String LOG_ROLL_TIME_JITTER_HOURS_CONFIG = LOG_PREFIX 
+ "roll.jitter.hours";
+public static final String LOG_ROLL_TIME_JITTER_MILLIS_DOC = "The maximum 
jitter to subtract from logRollTimeMillis (in milliseconds). If not set, the 
value in " + LOG_ROLL_TIME_JITTER_HOURS_CONFIG + " is used";
+public static final String LOG_ROLL_TIME_JITTER_HOURS_DOC = "The maximum 
jitter to subtract from logRollTimeMillis (in hours), secondary to " + 
LOG_ROLL_TIME_JITTER_MILLIS_CONFIG + " property";
+
+
+public static final String LOG_RETENTION_TIME_MILLIS_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.RETENTION_MS_CONFIG);
+public static final String LOG_RETENTION_TIME_MINUTES_CONFIG = LOG_PREFIX 
+ "retention.minutes";
+public static final String LOG_RETENTION_TIME_HOURS_CONFIG = LOG_PREFIX + 
"retention.hours";
+public static final String LOG_RETENTION_TIME_MILLIS_DOC = "The number of 
milliseconds to keep a log file before deleting it (in milliseconds), If not 
set, the value in " + LOG_RETENTION_TIME_MINUTES_CONFIG + " is used. If set to 
-1, no time limit is applied.";
+public static final String LOG_RETENTION_TIME_MINUTES_DOC = "The number of 
minutes to keep a log file before deleting it (in minutes), secondary to " + 
LOG_RETENTION_TIME_MILLIS_CONFIG + " property. If not set, the value in " + 
LOG_RETENTION_TIME_HOURS_CONFIG + " is used";
+public static final String LOG_RETENTION_TIME_HOURS_DOC = "The number of 
hours to keep a log file before deleting it (in hours), tertiary to " + 
LOG_RETENTION_TIME_MILLIS_CONFIG + " property";
+
+public static final String LOG_RETENTION_BYTES_CONFIG = 

Re: [PR] MINOR: Use Parametrized types correctly in RemoteLogMetadataSerde [kafka]

2024-04-18 Thread via GitHub


jlprat commented on PR #13824:
URL: https://github.com/apache/kafka/pull/13824#issuecomment-2063483812

   @mimaison Rebased


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]

2024-04-18 Thread via GitHub


soarez commented on PR #15719:
URL: https://github.com/apache/kafka/pull/15719#issuecomment-2063477565

   Thanks for the changes.
   You have some compilation errors:
   
   ```
   [2024-04-17T15:27:16.786Z] > Task :jmh-benchmarks:compileJava
   [2024-04-17T15:27:16.786Z] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15719/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java:110:
 error: method createLogManager in class TestUtils cannot be applied to given 
types;
   [2024-04-17T15:27:16.786Z] this.logManager = 
TestUtils.createLogManager(JavaConverters.asScalaBuffer(files),
   [2024-04-17T15:27:16.786Z]^
   [2024-04-17T15:27:16.786Z]   required: 
Seq,LogConfig,ConfigRepository,CleanerConfig,MockTime,MetadataVersion,int,boolean,Option,boolean,long
   [2024-04-17T15:27:16.786Z]   found: 
Buffer,LogConfig,MockConfigRepository,CleanerConfig,MockTime,MetadataVersion,int,boolean,Option,boolean
   [2024-04-17T15:27:16.786Z]   reason: actual and formal argument lists differ 
in length
   [2024-04-17T15:27:16.786Z] 1 error
   ```
   
   Can you address these?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Use Parametrized types correctly in RemoteLogMetadataSerde [kafka]

2024-04-18 Thread via GitHub


mimaison commented on PR #13824:
URL: https://github.com/apache/kafka/pull/13824#issuecomment-2063460572

   @jlprat Can you rebase to resolve the conflict? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-18 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1570371982


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
+  val tp = futureLog.topicPartition
+  if (cleaner != null) {

Review Comment:
   > I guess cleaner.abortAndPauseCleaning is added because we call 
resumeCleaning later, and it will cause error if we don't call 
abortAndPauseCleaning here?
   
   That's correct if the cleaning operation hasn't started. The cleaning 
operation is scheduled on a separate thread so we cannot be sure if 
`inProgress` map in `LogCleanerManager` has a key for the given topicPartition 
at the time we iterate over these logs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-04-18 Thread via GitHub


mimaison commented on code in PR #15569:
URL: https://github.com/apache/kafka/pull/15569#discussion_r1570367941


##
server/src/main/java/org/apache/kafka/server/config/KafkaLogConfigs.java:
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.TopicConfig;
+import static 
org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX;
+
+/**
+ * Common home for broker-side log configs which need to be accessible from 
the libraries shared
+ * between the broker and the multiple modules in Kafka.
+ *
+ * Note this is an internal API and subject to change without notice.
+ */
+public class KafkaLogConfigs {
+public static final String NUM_PARTITIONS_CONFIG = "num.partitions";
+public static final int NUM_PARTITIONS_DEFAULT = 1;
+public static final String NUM_PARTITIONS_DOC = "The default number of log 
partitions per topic";
+
+public static final String LOG_DIRS_CONFIG = LOG_PREFIX + "dirs";
+public static final String LOG_DIR_CONFIG = LOG_PREFIX + "dir";
+public static final String LOG_DIR_DEFAULT = "/tmp/kafka-logs";
+public static final String LOG_DIR_DOC = "The directory in which the log 
data is kept (supplemental for " + LOG_DIRS_CONFIG + " property)";
+public static final String LOG_DIRS_DOC = "A comma-separated list of the 
directories where the log data is stored. If not set, the value in " + 
LOG_DIR_CONFIG + " is used.";
+
+public static final String LOG_SEGMENT_BYTES_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_BYTES_CONFIG);
+public static final String LOG_SEGMENT_BYTES_DOC = "The maximum size of a 
single log file";
+
+public static final String LOG_ROLL_TIME_MILLIS_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_MS_CONFIG);
+public static final String LOG_ROLL_TIME_HOURS_CONFIG = LOG_PREFIX + 
"roll.hours";
+public static final String LOG_ROLL_TIME_MILLIS_DOC = "The maximum time 
before a new log segment is rolled out (in milliseconds). If not set, the value 
in " + LOG_ROLL_TIME_HOURS_CONFIG + " is used";
+public static final String LOG_ROLL_TIME_HOURS_DOC = "The maximum time 
before a new log segment is rolled out (in hours), secondary to " + 
LOG_ROLL_TIME_MILLIS_CONFIG + " property";
+
+public static final String LOG_ROLL_TIME_JITTER_MILLIS_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_JITTER_MS_CONFIG);
+public static final String LOG_ROLL_TIME_JITTER_HOURS_CONFIG = LOG_PREFIX 
+ "roll.jitter.hours";
+public static final String LOG_ROLL_TIME_JITTER_MILLIS_DOC = "The maximum 
jitter to subtract from logRollTimeMillis (in milliseconds). If not set, the 
value in " + LOG_ROLL_TIME_JITTER_HOURS_CONFIG + " is used";
+public static final String LOG_ROLL_TIME_JITTER_HOURS_DOC = "The maximum 
jitter to subtract from logRollTimeMillis (in hours), secondary to " + 
LOG_ROLL_TIME_JITTER_MILLIS_CONFIG + " property";
+
+
+public static final String LOG_RETENTION_TIME_MILLIS_CONFIG = 
ServerTopicConfigSynonyms.serverSynonym(TopicConfig.RETENTION_MS_CONFIG);
+public static final String LOG_RETENTION_TIME_MINUTES_CONFIG = LOG_PREFIX 
+ "retention.minutes";
+public static final String LOG_RETENTION_TIME_HOURS_CONFIG = LOG_PREFIX + 
"retention.hours";
+public static final String LOG_RETENTION_TIME_MILLIS_DOC = "The number of 
milliseconds to keep a log file before deleting it (in milliseconds), If not 
set, the value in " + LOG_RETENTION_TIME_MINUTES_CONFIG + " is used. If set to 
-1, no time limit is applied.";
+public static final String LOG_RETENTION_TIME_MINUTES_DOC = "The number of 
minutes to keep a log file before deleting it (in minutes), secondary to " + 
LOG_RETENTION_TIME_MILLIS_CONFIG + " property. If not set, the value in " + 
LOG_RETENTION_TIME_HOURS_CONFIG + " is used";
+public static final String LOG_RETENTION_TIME_HOURS_DOC = "The number of 
hours to keep a log file before deleting it (in hours), tertiary to " + 
LOG_RETENTION_TIME_MILLIS_CONFIG + " property";
+
+public static final String LOG_RETENTION_BYTES_CONFIG = 

Re: [PR] debug for 15679 [kafka]

2024-04-18 Thread via GitHub


chia7712 closed pull request #15741: debug for 15679
URL: https://github.com/apache/kafka/pull/15741


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]

2024-04-18 Thread via GitHub


chia7712 commented on PR #15634:
URL: https://github.com/apache/kafka/pull/15634#issuecomment-2063435567

   > HWM is set to to localLogStartOffset in 
[UnifiedLog#updateLocalLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@f895ab5145077c5efa10a4a898628d901b01e2c2/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L162),
 then we load the HWM from the checkpoint file in 
[Partition#createLog](https://sourcegraph.com/github.com/apache/kafka@f895ab5145077c5efa10a4a898628d901b01e2c2/-/blob/core/src/main/scala/kafka/cluster/Partition.scala?L495).
   If the HWM checkpoint file is missing / does not contain the entry for 
partition, then the default value of 0 is taken. If 0 < LogStartOffset (LSO), 
then LSO is assumed as HWM . Thus, the non-monotonic update of highwatermark 
from LLSO to LSO can happen.
   
   Pardon me. I'm a bit confused about this. Please feel free to correct me to 
help me catch up :smile: 
   
   ### case 0: the checkpoint file is missing and the remote storage is 
**disabled**
   The LSO is initialized to LLSO
   
   
https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/core/src/main/scala/kafka/log/LogLoader.scala#L180
   
   so I can't understand why the non-monotonic update happens? After all, LLSO 
and LSO are the same in this scenario.
   
   ### case 1: the checkpoint file is missing and the remote storage is 
**enabled**
   The LSO is initialzied to `logStartOffsetCheckpoint` which is 0 since there 
are no checkpoint files.
   
   
https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/core/src/main/scala/kafka/log/LogLoader.scala#L178
   
   And then HWM will be update to LLSO which is larger than zero.
   
   
https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/core/src/main/scala/kafka/log/UnifiedLog.scala#L172
   
   And this could be a problem when 
[Partition#createLog](https://sourcegraph.com/github.com/apache/kafka@f895ab5145077c5efa10a4a898628d901b01e2c2/-/blob/core/src/main/scala/kafka/cluster/Partition.scala?L495)
 get called since the HWM is changed from LLSO (non-zero) to LSO (zero). Also, 
the incorrect HWM causes error in `convertToOffsetMetadataOrThrow`.
   
   If I understand correctly, it seems the root cause is that "when the 
checkpoint files are not working, we will initialize a `UnifiedLog` with 
incorrect LSO". 
   
   and so could we fix that by re-build `logStartOffsets` according remote 
storage when checkpoint is not working 
(https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/core/src/main/scala/kafka/log/LogManager.scala#L459)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15089) Consolidate all the group coordinator configs

2024-04-18 Thread Omnia Ibrahim (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838562#comment-17838562
 ] 

Omnia Ibrahim commented on KAFKA-15089:
---

Hi [~dajac] should we mark this as resolved now as we merged 
[https://github.com/apache/kafka/pull/15684] or is there anything left here to 
do?

> Consolidate all the group coordinator configs
> -
>
> Key: KAFKA-15089
> URL: https://issues.apache.org/jira/browse/KAFKA-15089
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Priority: Major
>
> The group coordinator configurations are defined in KafkaConfig at the 
> moment. As KafkaConfig is defined in the core module, we can't pass it to the 
> new java modules to pass the configurations along.
> A suggestion here is to centralize all the configurations of a module in the 
> module itself similarly to what we have do for RemoteLogManagerConfig and 
> RaftConfig. We also need a mechanism to add all the properties defined in the 
> module to the KafkaConfig's ConfigDef.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]

2024-04-18 Thread via GitHub


lucasbru commented on code in PR #15742:
URL: https://github.com/apache/kafka/pull/15742#discussion_r1570342805


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -228,13 +228,16 @@ private void process(final ErrorEvent event) {
 }
 
 private void process(final 
ConsumerRebalanceListenerCallbackNeededEvent event) {
-ApplicationEvent invokedEvent = invokeRebalanceCallbacks(
+ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = 
invokeRebalanceCallbacks(
 rebalanceListenerInvoker,
 event.methodName(),
 event.partitions(),
 event.future()
 );
 applicationEventHandler.add(invokedEvent);
+if (invokedEvent.error().isPresent()) {
+throw invokedEvent.error().get();

Review Comment:
   Looking at the reconciliation logic, I think if `onPartitionsRevoked` 
throws, we'll not execute `onPartitionsAssigned`. And the call to 
`onPartitionsLost` seems to be independent of reconciliation. So not sure how 
we'd end up with two exceptions.
   
   You are right that there is a behavioral difference around finishing the 
reconciliation. The old consumer throws _after_ finishing the reconciliation, 
while the new consumer throws on a different thread, so there is no strict time 
ordering between finishing the reconciliation and throwing. But I'm struggling 
to see how one can observe the difference. The reconciliation will have 
finished the next time the background thread processes any events, so in a 
sense, you cannot observe the difference based on the queue architecture. The 
difference may only be observable through shared state that breaks the 
queue-based architecture. SubscriptionState comes to mind here. Thinking of 
something like
   
   1. application thread enters poll, fails during rebalance listener execution 
and throws
   2. application thread somehow reads subscription state
   3. background thread updates subscription state as part of reconciliation
   
   Now the application thread has observed an "incomplete reconciliation". But 
after a listener execution has failed, we don't seem to update the subscription 
state in the reconciliation. 
   
   So in summary - not sure if we are going to notice the different behaviors?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]

2024-04-18 Thread via GitHub


lucasbru commented on PR #15723:
URL: https://github.com/apache/kafka/pull/15723#issuecomment-2063300823

   @lianetm did you want to make another pass or good to go?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]

2024-04-18 Thread via GitHub


showuon commented on PR #15748:
URL: https://github.com/apache/kafka/pull/15748#issuecomment-2063302914

   @kamalcph , the tests are still failing: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15748/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-18 Thread via GitHub


dajac commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1570226001


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.Integer.max;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 1)
+@Measurement(iterations = 0)

Review Comment:
   We should use better defaults than this. I think that we usually use 5 and 
10/15.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-18 Thread via GitHub


showuon commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1570226078


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +537,45 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+

Review Comment:
   What I want to say is, with your current change, this test will work like 
what you did before, which is not sending heartbeat response at all, and 
forcing session timeout directly. That works though, it's just not readable and 
doesn't make sense (i.e. not normal experience).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-18 Thread via GitHub


showuon commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1570217037


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +537,45 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+

Review Comment:
   Hmm.. I see. The reason is because we didn't make sure the heartbeat is sent 
after `time.sleep(sessionTimeoutMs - 1);`. Do you think we could make sure the 
heartbeat request or response is sent before next sleep? With that, we can make 
sure the session timeout won't happen because we did send out heartbeat every 
sessionTimeoutMs - 1. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-18 Thread via GitHub


showuon commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1570111991


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +537,45 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
+// prepare 3 heartBeatResponses because we will trigger 3 heartBeat 
requests until rebalanceTimeout,
+// that is (sessionTimeoutMs - 1) * 3 > rebalanceTimeoutMs
+client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
+client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
+client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+coordinator.ensureActiveGroup();
+coordinator.poll(0, () -> {
+return null;
+});
+
+// We keep the heartbeat thread running behind the scenes and poll 
frequently so that eventually
+// the time goes past now + rebalanceTimeoutMs which triggers poll 
timeout expiry.
+TestUtils.waitForCondition(() -> {
+// sleep until sessionTimeoutMs to trigger a heartBeat request 
to avoid session timeout.
+// Not sure if this will be flaky in CI because the heartbeat 
thread might not send out the heartBeat request in time.

Review Comment:
   Should we remove this line?



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +537,45 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+

Review Comment:
   Hmm.. I see. The reason is because we don't make sure the heartbeat is sent 
after `time.sleep(sessionTimeoutMs - 1);`. Do you think we could make sure the 
heartbeat request or response is sent before next sleep?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-04-18 Thread via GitHub


showuon commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1570217037


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##
@@ -533,6 +537,45 @@ public void testSkippingAssignmentFails() {
 verify(configStorage).snapshot();
 }
 
+@Test
+public void testPollTimeoutExpiry() throws InterruptedException {
+
+when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+Collections.singletonList(taskId1x0), Errors.NONE));
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+

Review Comment:
   Hmm.. I see. The reason is because we didn't make sure the heartbeat is sent 
after `time.sleep(sessionTimeoutMs - 1);`. Do you think we could make sure the 
heartbeat request or response is sent before next sleep?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16583: fix PartitionRegistration#toRecord directory check under metadata version 3_7_IV2 [kafka]

2024-04-18 Thread via GitHub


superhx opened a new pull request, #15751:
URL: https://github.com/apache/kafka/pull/15751

   fix https://issues.apache.org/jira/browse/KAFKA-16583
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16584) Make log processing summary configurable or debug

2024-04-18 Thread Andras Hatvani (Jira)
Andras Hatvani created KAFKA-16584:
--

 Summary: Make log processing summary configurable or debug
 Key: KAFKA-16584
 URL: https://issues.apache.org/jira/browse/KAFKA-16584
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.6.2
Reporter: Andras Hatvani


Currently *every two minutes for every stream thread* statistics will be logged 
on INFO log level. 
{code:log}
2024-04-18T09:18:23.790+02:00  INFO 33178 --- [service] [-StreamThread-1] 
o.a.k.s.p.internals.StreamThread         : stream-thread 
[service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 total 
records, ran 0 punctuators, and committed 0 total tasks since the last update 
{code}

This is absolutely unnecessary and even harmful since it fills the logs and 
thus storage space with unwanted and useless data. Otherwise the INFO logs are 
useful and helpful, therefore it's not an option to raise the log level to WARN.
Please make the logProcessingSummary 
* either to a DEBUG level log or
* make it configurable so that it can be disabled.
This is the relevant code: 
https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16584) Make log processing summary configurable or debug

2024-04-18 Thread Andras Hatvani (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andras Hatvani updated KAFKA-16584:
---
Description: 
Currently *every two minutes for every stream thread* statistics will be logged 
on INFO log level. 
{code}
2024-04-18T09:18:23.790+02:00  INFO 33178 --- [service] [-StreamThread-1] 
o.a.k.s.p.internals.StreamThread         : stream-thread 
[service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 total 
records, ran 0 punctuators, and committed 0 total tasks since the last update 
{code}

This is absolutely unnecessary and even harmful since it fills the logs and 
thus storage space with unwanted and useless data. Otherwise the INFO logs are 
useful and helpful, therefore it's not an option to raise the log level to WARN.
Please make the logProcessingSummary 
* either to a DEBUG level log or
* make it configurable so that it can be disabled.
This is the relevant code: 
https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073

  was:
Currently *every two minutes for every stream thread* statistics will be logged 
on INFO log level. 
{code:log}
2024-04-18T09:18:23.790+02:00  INFO 33178 --- [service] [-StreamThread-1] 
o.a.k.s.p.internals.StreamThread         : stream-thread 
[service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 total 
records, ran 0 punctuators, and committed 0 total tasks since the last update 
{code}

This is absolutely unnecessary and even harmful since it fills the logs and 
thus storage space with unwanted and useless data. Otherwise the INFO logs are 
useful and helpful, therefore it's not an option to raise the log level to WARN.
Please make the logProcessingSummary 
* either to a DEBUG level log or
* make it configurable so that it can be disabled.
This is the relevant code: 
https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073


> Make log processing summary configurable or debug
> -
>
> Key: KAFKA-16584
> URL: https://issues.apache.org/jira/browse/KAFKA-16584
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Andras Hatvani
>Priority: Major
>
> Currently *every two minutes for every stream thread* statistics will be 
> logged on INFO log level. 
> {code}
> 2024-04-18T09:18:23.790+02:00  INFO 33178 --- [service] [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread         : stream-thread 
> [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 
> total records, ran 0 punctuators, and committed 0 total tasks since the last 
> update {code}
> This is absolutely unnecessary and even harmful since it fills the logs and 
> thus storage space with unwanted and useless data. Otherwise the INFO logs 
> are useful and helpful, therefore it's not an option to raise the log level 
> to WARN.
> Please make the logProcessingSummary 
> * either to a DEBUG level log or
> * make it configurable so that it can be disabled.
> This is the relevant code: 
> https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16574) The metrics of LogCleaner disappear after reconfiguration

2024-04-18 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16574:
--

Assignee: Chia Chuan Yu  (was: Chia-Ping Tsai)

> The metrics of LogCleaner disappear after reconfiguration
> -
>
> Key: KAFKA-16574
> URL: https://issues.apache.org/jira/browse/KAFKA-16574
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia Chuan Yu
>Priority: Minor
>
> see 
> [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/core/src/main/scala/kafka/log/LogCleaner.scala#L227]
> We don't rebuild the metrics after calling shutdown. The following test can 
> prove that.
> {code:java}
> @Test
> def testMetricsAfterReconfiguration(): Unit = {
>   val logCleaner = new LogCleaner(new CleanerConfig(true),
> logDirs = Array(TestUtils.tempDir()),
> logs = new Pool[TopicPartition, UnifiedLog](),
> logDirFailureChannel = new LogDirFailureChannel(1),
> time = time)
>   def check(): Unit =
> LogCleaner.MetricNames.foreach(name => 
> assertNotNull(KafkaYammerMetrics.defaultRegistry.allMetrics().get(logCleaner.metricsGroup
>   .metricName(name, java.util.Collections.emptyMap())), s"$name is 
> gone?"))
>   try {
> check()
> logCleaner.reconfigure(new KafkaConfig(TestUtils.createBrokerConfig(1, 
> "localhost:2181")),
>   new KafkaConfig(TestUtils.createBrokerConfig(1, "localhost:2181")))
> check()
>   } finally logCleaner.shutdown()
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-18 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1570135556


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.VersionedMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TargetAssignmentBuilderBenchmark {
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+String groupId = "benchmark-group";
+
+private static final int groupEpoch = 0;
+
+private PartitionAssignor partitionAssignor;
+
+private Map subscriptionMetadata = 
Collections.emptyMap();
+
+private TargetAssignmentBuilder targetAssignmentBuilder;
+
+private AssignmentSpec assignmentSpec;
+
+private static final int numberOfRacks = 3;
+
+private final List allTopicNames = new ArrayList<>(topicCount);
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+// For this benchmark we will use the Uniform Assignor
+// and a group that has a homogeneous subscription model.
+partitionAssignor = new UniformAssignor();
+subscriptionMetadata = generateMockSubscriptionMetadata();
+Map members = generateMockMembers();
+Map existingTargetAssignment = 
generateMockInitialTargetAssignment();
+
+// Add a new member to trigger a rebalance.
+Set subscribedTopics = new 
HashSet<>(subscriptionMetadata.keySet());
+String rackId = isRackAware ? "rack" + (memberCount - 1) % 
numberOfRacks : "";

Review Comment:
   removed rack aware now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-18 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1570135213


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.VersionedMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TargetAssignmentBuilderBenchmark {
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+String groupId = "benchmark-group";
+
+private static final int groupEpoch = 0;
+
+private PartitionAssignor partitionAssignor;
+
+private Map subscriptionMetadata = 
Collections.emptyMap();
+
+private TargetAssignmentBuilder targetAssignmentBuilder;
+
+private AssignmentSpec assignmentSpec;
+
+private static final int numberOfRacks = 3;
+
+private final List allTopicNames = new ArrayList<>(topicCount);
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+// For this benchmark we will use the Uniform Assignor
+// and a group that has a homogeneous subscription model.
+partitionAssignor = new UniformAssignor();
+subscriptionMetadata = generateMockSubscriptionMetadata();
+Map members = generateMockMembers();
+Map existingTargetAssignment = 
generateMockInitialTargetAssignment();
+
+// Add a new member to trigger a rebalance.
+Set subscribedTopics = new 
HashSet<>(subscriptionMetadata.keySet());
+String rackId = isRackAware ? "rack" + (memberCount - 1) % 
numberOfRacks : "";
+ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder("new-member")
+.setSubscribedTopicNames(new ArrayList<>(subscribedTopics))
+.setRackId(rackId)
+.build();
+
+targetAssignmentBuilder = new TargetAssignmentBuilder(groupId, 
groupEpoch, partitionAssignor)
+.withMembers(members)
+.withSubscriptionMetadata(subscriptionMetadata)
+

Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-18 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1570128341


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.VersionedMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TargetAssignmentBuilderBenchmark {
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;

Review Comment:
   okay



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-18 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1570122759


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java:
##
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.Assignment;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.VersionedMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class TargetAssignmentBuilderBenchmark {
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+String groupId = "benchmark-group";
+
+private static final int groupEpoch = 0;
+
+private PartitionAssignor partitionAssignor;
+
+private Map subscriptionMetadata = 
Collections.emptyMap();
+
+private TargetAssignmentBuilder targetAssignmentBuilder;
+
+private AssignmentSpec assignmentSpec;
+
+private static final int numberOfRacks = 3;
+
+private final List allTopicNames = new ArrayList<>(topicCount);
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+// For this benchmark we will use the Uniform Assignor
+// and a group that has a homogeneous subscription model.
+partitionAssignor = new UniformAssignor();
+subscriptionMetadata = generateMockSubscriptionMetadata();
+Map members = generateMockMembers();
+Map existingTargetAssignment = 
generateMockInitialTargetAssignment();
+
+// Add a new member to trigger a rebalance.
+Set subscribedTopics = new 
HashSet<>(subscriptionMetadata.keySet());
+String rackId = isRackAware ? "rack" + (memberCount - 1) % 
numberOfRacks : "";

Review Comment:
   It is defined with the Optional return value
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16583) Update from 3.4.0 to 3.7.0 image write failed in Kraft mode

2024-04-18 Thread HanXu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

HanXu updated KAFKA-16583:
--
Description: 
How to reproduce:
1. Launch a 3.4.0 controller and a 3.4.0 broker(BrokerA) in Kraft mode;
2. Create a topic with 1 partition;
3. Launch a 3.4.0 broker(Broker B) in Kraft mode and reassign the step 2 
partition to Broker B;
4. Upgrade Broker B to 3.7.0;

The Broker B will keep log the following error:
{code:java}
[2024-04-18 14:46:54,144] ERROR Encountered metadata loading fault: Unhandled 
error initializing new publishers 
(org.apache.kafka.server.fault.LoggingFaultHandler)
org.apache.kafka.image.writer.UnwritableMetadataException: Metadata has been 
lost because the following could not be represented in metadata version 
3.4-IV0: the directory assignment state of one or more replicas
at 
org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94)
at 
org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391)
at org.apache.kafka.image.TopicImage.write(TopicImage.java:71)
at org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84)
at org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155)
at 
org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295)
at 
org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266)
at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
at java.base/java.lang.Thread.run(Thread.java:840)
{code}
Bug:
 - When reassigning partition, PartitionRegistration#merge will set the new 
replicas with UNASSIGNED directory;
 - But in metadata version 3.4.0 PartitionRegistration#toRecord only allows 
MIGRATING directory;
{code:java}
if (options.metadataVersion().isDirectoryAssignmentSupported()) {
record.setDirectories(Uuid.toList(directories));
} else {
for (Uuid directory : directories) {
if (!DirectoryId.MIGRATING.equals(directory)) {
options.handleLoss("the directory assignment state of one 
or more replicas");
break;
}
}
}
{code}

Solution:
- PartitionRegistration#toRecord allows both MIGRATING and UNASSIGNED

  was:
How to reproduce:
1. Launch a 3.4.0 controller and a 3.4.0 broker(BrokerA) in Kraft mode;
2. Create a topic with 1 partition;
3. Launch a 3.4.0 broker(Broker B) in Kraft mode and reassign the step 2 
partition to Broker B;
4. Upgrade Broker B to 3.7.0;

The Broker B will keep log the following error:
{code:java}
[2024-04-18 14:46:54,144] ERROR Encountered metadata loading fault: Unhandled 
error initializing new publishers 
(org.apache.kafka.server.fault.LoggingFaultHandler)
org.apache.kafka.image.writer.UnwritableMetadataException: Metadata has been 
lost because the following could not be represented in metadata version 
3.4-IV0: the directory assignment state of one or more replicas
at 
org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94)
at 
org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391)
at org.apache.kafka.image.TopicImage.write(TopicImage.java:71)
at org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84)
at org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155)
at 
org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295)
at 
org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266)
at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
at java.base/java.lang.Thread.run(Thread.java:840)
{code}

Bug:
- When reassigning partition, PartitionRegistration#merge will set the new 
replicas with UNASSIGNED directory;
- But in metadata version 3.4.0 PartitionRegistration#toRecord only allows 
MIGRATING directory;
{code:java}
if (options.metadataVersion().isDirectoryAssignmentSupported()) {
record.setDirectories(Uuid.toList(directories));
} else {
for (Uuid directory : directories) {
if (!DirectoryId.MIGRATING.equals(directory)) {
options.handleLoss("the directory assignment state of one 
or more replicas");
break;
  

[jira] [Created] (KAFKA-16583) Update from 3.4.0 to 3.7.0 image write failed in Kraft mode

2024-04-18 Thread HanXu (Jira)
HanXu created KAFKA-16583:
-

 Summary: Update from 3.4.0 to 3.7.0 image write failed in Kraft 
mode
 Key: KAFKA-16583
 URL: https://issues.apache.org/jira/browse/KAFKA-16583
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Affects Versions: 3.7.0
Reporter: HanXu
Assignee: HanXu


How to reproduce:
1. Launch a 3.4.0 controller and a 3.4.0 broker(BrokerA) in Kraft mode;
2. Create a topic with 1 partition;
3. Launch a 3.4.0 broker(Broker B) in Kraft mode and reassign the step 2 
partition to Broker B;
4. Upgrade Broker B to 3.7.0;

The Broker B will keep log the following error:
{code:java}
[2024-04-18 14:46:54,144] ERROR Encountered metadata loading fault: Unhandled 
error initializing new publishers 
(org.apache.kafka.server.fault.LoggingFaultHandler)
org.apache.kafka.image.writer.UnwritableMetadataException: Metadata has been 
lost because the following could not be represented in metadata version 
3.4-IV0: the directory assignment state of one or more replicas
at 
org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94)
at 
org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391)
at org.apache.kafka.image.TopicImage.write(TopicImage.java:71)
at org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84)
at org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155)
at 
org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295)
at 
org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266)
at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
at java.base/java.lang.Thread.run(Thread.java:840)
{code}

Bug:
- When reassigning partition, PartitionRegistration#merge will set the new 
replicas with UNASSIGNED directory;
- But in metadata version 3.4.0 PartitionRegistration#toRecord only allows 
MIGRATING directory;
{code:java}
if (options.metadataVersion().isDirectoryAssignmentSupported()) {
record.setDirectories(Uuid.toList(directories));
} else {
for (Uuid directory : directories) {
if (!DirectoryId.MIGRATING.equals(directory)) {
options.handleLoss("the directory assignment state of one 
or more replicas");
break;
}
}
}
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15396:Add a metric indicating the version of the current running kafka server [kafka]

2024-04-18 Thread via GitHub


hudeqi closed pull request #14284: KAFKA-15396:Add a metric indicating the 
version of the current running kafka server
URL: https://github.com/apache/kafka/pull/14284


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15134:Enrich the prompt reason in CommitFailedException [kafka]

2024-04-18 Thread via GitHub


hudeqi closed pull request #13930: KAFKA-15134:Enrich the prompt reason in 
CommitFailedException
URL: https://github.com/apache/kafka/pull/13930


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception [kafka]

2024-04-18 Thread via GitHub


hudeqi closed pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may 
cause serious disk growing in case of potential exception
URL: https://github.com/apache/kafka/pull/13421


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15086:Set a reasonable segment size upper limit for MM2 internal topics [kafka]

2024-04-18 Thread via GitHub


hudeqi closed pull request #13852: KAFKA-15086:Set a reasonable segment size 
upper limit for MM2 internal topics
URL: https://github.com/apache/kafka/pull/13852


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14906:Extract the coordinator service log from server log [kafka]

2024-04-18 Thread via GitHub


hudeqi closed pull request #13568: KAFKA-14906:Extract the coordinator service 
log from server log
URL: https://github.com/apache/kafka/pull/13568


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector [kafka]

2024-04-18 Thread via GitHub


hudeqi closed pull request #13913: KAFKA-15119:Support incremental 
syncTopicAcls in MirrorSourceConnector
URL: https://github.com/apache/kafka/pull/13913


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



<    1   2