[jira] [Commented] (KAFKA-15709) KRaft support in ServerStartupTest
[ https://issues.apache.org/jira/browse/KAFKA-15709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838640#comment-17838640 ] Matthew de Detrich commented on KAFKA-15709: [~linzihao1999] are you still working on this or can I take over > KRaft support in ServerStartupTest > -- > > Key: KAFKA-15709 > URL: https://issues.apache.org/jira/browse/KAFKA-15709 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Assignee: Zihao Lin >Priority: Minor > Labels: kraft, kraft-test, newbie > > The following tests in ServerStartupTest in > core/src/test/scala/unit/kafka/server/ServerStartupTest.scala need to be > updated to support KRaft > 38 : def testBrokerCreatesZKChroot(): Unit = { > 51 : def testConflictBrokerStartupWithSamePort(): Unit = { > 65 : def testConflictBrokerRegistration(): Unit = { > 82 : def testBrokerSelfAware(): Unit = { > 93 : def testBrokerStateRunningAfterZK(): Unit = { > Scanned 107 lines. Found 0 KRaft tests out of 5 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15737) KRaft support in ConsumerBounceTest
[ https://issues.apache.org/jira/browse/KAFKA-15737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838639#comment-17838639 ] Matthew de Detrich edited comment on KAFKA-15737 at 4/18/24 12:39 PM: -- [~k-raina] are you working on this or can I have a look into it? was (Author: JIRAUSER297606): Ill have a look at this > KRaft support in ConsumerBounceTest > --- > > Key: KAFKA-15737 > URL: https://issues.apache.org/jira/browse/KAFKA-15737 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Assignee: Kaushik Raina >Priority: Minor > Labels: kraft, kraft-test, newbie > > The following tests in ConsumerBounceTest in > core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala need to be > updated to support KRaft > 81 : def testConsumptionWithBrokerFailures(): Unit = > consumeWithBrokerFailures(10) > 122 : def testSeekAndCommitWithBrokerFailures(): Unit = > seekAndCommitWithBrokerFailures(5) > 161 : def testSubscribeWhenTopicUnavailable(): Unit = { > 212 : def testClose(): Unit = { > 297 : def > testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(): > Unit = { > 337 : def testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize(): Unit = { > 370 : def testCloseDuringRebalance(): Unit = { > Scanned 535 lines. Found 0 KRaft tests out of 7 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15737) KRaft support in ConsumerBounceTest
[ https://issues.apache.org/jira/browse/KAFKA-15737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838639#comment-17838639 ] Matthew de Detrich commented on KAFKA-15737: Ill have a look at this > KRaft support in ConsumerBounceTest > --- > > Key: KAFKA-15737 > URL: https://issues.apache.org/jira/browse/KAFKA-15737 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Assignee: Kaushik Raina >Priority: Minor > Labels: kraft, kraft-test, newbie > > The following tests in ConsumerBounceTest in > core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala need to be > updated to support KRaft > 81 : def testConsumptionWithBrokerFailures(): Unit = > consumeWithBrokerFailures(10) > 122 : def testSeekAndCommitWithBrokerFailures(): Unit = > seekAndCommitWithBrokerFailures(5) > 161 : def testSubscribeWhenTopicUnavailable(): Unit = { > 212 : def testClose(): Unit = { > 297 : def > testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(): > Unit = { > 337 : def testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize(): Unit = { > 370 : def testCloseDuringRebalance(): Unit = { > Scanned 535 lines. Found 0 KRaft tests out of 7 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14547) Be able to run kafka KRaft Server in tests without needing to run a storage setup script
[ https://issues.apache.org/jira/browse/KAFKA-14547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838637#comment-17838637 ] Matthew de Detrich commented on KAFKA-14547: Ill have a look into this > Be able to run kafka KRaft Server in tests without needing to run a storage > setup script > > > Key: KAFKA-14547 > URL: https://issues.apache.org/jira/browse/KAFKA-14547 > Project: Kafka > Issue Type: Improvement > Components: kraft >Affects Versions: 3.3.1 >Reporter: Natan Silnitsky >Priority: Major > > Currently kafka KRaft Server requires running kafka-storage.sh in order to > start properly. > This makes setup much more cubersome for build tools like bazel to work > properly. > One way to mitigate this is to configure the paths via kafkaConfig... -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16378) Under tiered storage, deleting local logs does not free disk space
[ https://issues.apache.org/jira/browse/KAFKA-16378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838635#comment-17838635 ] Ivan Yurchenko commented on KAFKA-16378: This was a problem in the RemoteStorageManager. I'll close this issue. [~jianbin] feel free to reopen if you think this needs more attention. > Under tiered storage, deleting local logs does not free disk space > -- > > Key: KAFKA-16378 > URL: https://issues.apache.org/jira/browse/KAFKA-16378 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.7.0 >Reporter: Jianbin Chen >Priority: Major > Attachments: image-2024-03-15-09-33-13-903.png > > > Of course, this is an occasional phenomenon, as long as the tiered storage > topic triggered the deletion of the local log action, there is always the > possibility of residual file references, but these files on the local disk is > already impossible to find! > I use the implementation as: [Aiven-Open/tiered-storage-for-apache-kafka: > RemoteStorageManager for Apache Kafka® Tiered Storage > (github.com)|https://github.com/Aiven-Open/tiered-storage-for-apache-kafka] > I also filed an issue in their community, which also contains a full > description of the problem > [Disk space not released · Issue #513 · > Aiven-Open/tiered-storage-for-apache-kafka > (github.com)|https://github.com/Aiven-Open/tiered-storage-for-apache-kafka/issues/513] > !image-2024-03-15-09-33-13-903.png! > You can clearly see in this figure that the kafka log has already output the > log of the operation that deleted the log, but the log is still referenced > and the disk space has not been released -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (KAFKA-16378) Under tiered storage, deleting local logs does not free disk space
[ https://issues.apache.org/jira/browse/KAFKA-16378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Yurchenko closed KAFKA-16378. -- Was a problem in the remote storage manager plugin > Under tiered storage, deleting local logs does not free disk space > -- > > Key: KAFKA-16378 > URL: https://issues.apache.org/jira/browse/KAFKA-16378 > Project: Kafka > Issue Type: Bug > Components: Tiered-Storage >Affects Versions: 3.7.0 >Reporter: Jianbin Chen >Priority: Major > Attachments: image-2024-03-15-09-33-13-903.png > > > Of course, this is an occasional phenomenon, as long as the tiered storage > topic triggered the deletion of the local log action, there is always the > possibility of residual file references, but these files on the local disk is > already impossible to find! > I use the implementation as: [Aiven-Open/tiered-storage-for-apache-kafka: > RemoteStorageManager for Apache Kafka® Tiered Storage > (github.com)|https://github.com/Aiven-Open/tiered-storage-for-apache-kafka] > I also filed an issue in their community, which also contains a full > description of the problem > [Disk space not released · Issue #513 · > Aiven-Open/tiered-storage-for-apache-kafka > (github.com)|https://github.com/Aiven-Open/tiered-storage-for-apache-kafka/issues/513] > !image-2024-03-15-09-33-13-903.png! > You can clearly see in this figure that the kafka log has already output the > log of the operation that deleted the log, but the log is still referenced > and the disk space has not been released -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
chia7712 commented on PR #15634: URL: https://github.com/apache/kafka/pull/15634#issuecomment-2063727744 Sorry that the story I mentioned above seems be another issue. Let me have the summary about my thought. 1. `log-start-offset-checkpoint` is missing and remote storage is enabled. The `logStartOffset` will be set to zero, and it seems be a potential issue since the `ListOffsetRequest` could get incorrect result 2. `replication-offset-checkpoint` is missing and remote storage is enabled. This is what your described. The HWM is pointed to middle of tiered storage and so it causes error when fetching records from local segments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors [kafka]
showuon commented on code in PR #15732: URL: https://github.com/apache/kafka/pull/15732#discussion_r1570596642 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -391,13 +391,16 @@ void enqueueMetadataChangeEvent( // Events handled by Migration Driver. abstract class MigrationEvent implements EventQueue.Event { +// Use no-op handler by default because the retryHandler will be overridden if needed +public void retryHandler() { } @SuppressWarnings("ThrowableNotThrown") @Override public void handleException(Throwable e) { if (e instanceof MigrationClientAuthException) { KRaftMigrationDriver.this.faultHandler.handleFault("Encountered ZooKeeper authentication in " + this, e); } else if (e instanceof MigrationClientException) { log.info(String.format("Encountered ZooKeeper error during event %s. Will retry.", this), e.getCause()); +retryHandler(); Review Comment: @chia7712 , I take your suggestion to add `RecoverMigrationStateFromZKEvent` so that we don't need to worry about retry anymore. I was checking if this change will cause any side effect, and here is my finding: 1. `recoverMigrationStateFromZK` is expected to run before the driver starts the state machine. 2. In the `recoverMigrationStateFromZK`, we'll do these things: a. create a ZNode for migration and initial migration state b. install this class as a metadata publisher c. transition to INACTIVE state 3. If this `recoverMigrationStateFromZK` is keep failing, the log will keep outputting errors and keep retrying. Once it succeeds, the metadata publisher will be installed and the `onControllerChange` and `onMetadataUpdate` will be triggered to start the process. That means, if we change `recoverMigrationStateFromZK` into an event, it won't affect anything because what we need to do at this state is just waiting for the (a)(b)(c) operation completes. So, I'm +1 with this suggestion. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Kafka Streams tests from Zookeeper to KRaft [kafka]
mdedetrich commented on PR #15341: URL: https://github.com/apache/kafka/pull/15341#issuecomment-2063692520 @OmniaGM So the structure of the PR/tests is now finalized, I ended up entirely deleting `KafkaEmbedded` and so now the tests are a lot closer to idioomatic testing with KRaft/`KafkaClusterTestKit`. Regarding using a global `EmbeddedKafkaCluster`, I think this makes sense to do in a separate PR or at least when I manage to get the tests to pass in this PR. On that note, the current issue with the PR is that not all of the tests are passing, i.e. `HandlingSourceTopicDeletionIntegrationTest` as an example on the top of my head. Currently debugging to figure out why but any help would be appreciated, its likely some config/prop is not properly set -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor
Stanislav Spiridonov created KAFKA-16585: Summary: No way to forward message from punctuation method in the FixedKeyProcessor Key: KAFKA-16585 URL: https://issues.apache.org/jira/browse/KAFKA-16585 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.6.2 Reporter: Stanislav Spiridonov The FixedKeyProcessorContext can forward only FixedKeyRecord. This class doesn't have a public constructor and can be created based on existing records. But such record usually is absent in the punctuation method. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR:Add hint for `resumeDeletions` in `TopicDeletionManager` [kafka]
hudeqi commented on PR #15543: URL: https://github.com/apache/kafka/pull/15543#issuecomment-2063665249 Hi, @jlprat , could you help to review this pr? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR:Optimize the log output when cleaning up offsets for groups with a generation less than or equal to 0 [kafka]
hudeqi commented on PR #15726: URL: https://github.com/apache/kafka/pull/15726#issuecomment-2063663065 @dajac Hi, does this PR make sense for the group's offset cleanup log output? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16578) Revert changes to connect_distributed_test.py for the new async Consumer
[ https://issues.apache.org/jira/browse/KAFKA-16578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838609#comment-17838609 ] Sagar Rao edited comment on KAFKA-16578 at 4/18/24 11:11 AM: - [~kirktrue], I am running the the system tests for connect using the test suite on my local, and this exact test *test_exactly_once_source* fails regularly for me for a different config. {code:java} test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 7 minutes 59.232 seconds InsufficientResourcesError('linux nodes requested: 1. linux nodes available: 0') Traceback (most recent call last): File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", line 929, in test_exactly_once_source consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True, consumer_properties=consumer_properties) File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, in __init__ BackgroundThreadService.__init__(self, context, num_nodes) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py", line 26, in __init__ super(BackgroundThreadService, self).__init__(context, num_nodes, cluster_spec, *args, **kwargs) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 107, in __init__ self.allocate_nodes() File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 217, in allocate_nodes self.nodes = self.cluster.alloc(self.cluster_spec) File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/cluster.py", line 54, in alloc allocated = self.do_alloc(cluster_spec) File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/finite_subcluster.py", line 37, in do_alloc good_nodes, bad_nodes = self._available_nodes.remove_spec(cluster_spec) File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/node_container.py", line 131, in remove_spec raise InsufficientResourcesError(err) ducktape.cluster.node_container.InsufficientResourcesError: linux nodes requested: 1. linux nodes available: 0 {code} If you want, I can revert the change as part of my PR. It should be ready for review by today or tomorrow. was (Author: sagarrao): [~kirktrue], I am working on modifying the the system tests for connect, and this exact test *test_exactly_once_source* fails regularly for me for a different config. {code:java} test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 7 minutes 59.232 seconds InsufficientResourcesError('linux nodes requested: 1. linux nodes available: 0') Traceback (most recent call last): File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", line 929, in test_exactly_once_source consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True, consumer_properties=consumer_properties) File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, in __init__ BackgroundThreadService.__init__(self, context, num_nodes) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py", line 26, in __init__ super(BackgroundThreadService, self).__init__(context, num_nodes, cluster_spec, *args, **kwargs) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 107, in
[jira] [Comment Edited] (KAFKA-16578) Revert changes to connect_distributed_test.py for the new async Consumer
[ https://issues.apache.org/jira/browse/KAFKA-16578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838609#comment-17838609 ] Sagar Rao edited comment on KAFKA-16578 at 4/18/24 11:10 AM: - [~kirktrue], I am working on modifying the the system tests for connect, and this exact test *test_exactly_once_source* fails regularly for me for a different config. {code:java} test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 7 minutes 59.232 seconds InsufficientResourcesError('linux nodes requested: 1. linux nodes available: 0') Traceback (most recent call last): File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", line 929, in test_exactly_once_source consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True, consumer_properties=consumer_properties) File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, in __init__ BackgroundThreadService.__init__(self, context, num_nodes) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py", line 26, in __init__ super(BackgroundThreadService, self).__init__(context, num_nodes, cluster_spec, *args, **kwargs) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 107, in __init__ self.allocate_nodes() File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 217, in allocate_nodes self.nodes = self.cluster.alloc(self.cluster_spec) File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/cluster.py", line 54, in alloc allocated = self.do_alloc(cluster_spec) File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/finite_subcluster.py", line 37, in do_alloc good_nodes, bad_nodes = self._available_nodes.remove_spec(cluster_spec) File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/node_container.py", line 131, in remove_spec raise InsufficientResourcesError(err) ducktape.cluster.node_container.InsufficientResourcesError: linux nodes requested: 1. linux nodes available: 0 {code} If you want, I can revert the change as part of my PR. It should be ready for review by today or tomorrow. was (Author: sagarrao): [~kirktrue], I am working on modifying the the system tests for connect, and this exact test `test_exactly_once_source` fails regularly for me for a different config. ``` test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 7 minutes 59.232 seconds InsufficientResourcesError('linux nodes requested: 1. linux nodes available: 0') Traceback (most recent call last): File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", line 929, in test_exactly_once_source consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True, consumer_properties=consumer_properties) File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, in __init__ BackgroundThreadService.__init__(self, context, num_nodes) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py", line 26, in __init__ super(BackgroundThreadService, self).__init__(context, num_nodes, cluster_spec, *args, **kwargs) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 107, in __init__
[jira] [Commented] (KAFKA-16578) Revert changes to connect_distributed_test.py for the new async Consumer
[ https://issues.apache.org/jira/browse/KAFKA-16578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838609#comment-17838609 ] Sagar Rao commented on KAFKA-16578: --- [~kirktrue], I am working on modifying the the system tests for connect, and this exact test `test_exactly_once_source` fails regularly for me for a different config. ``` test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 7 minutes 59.232 seconds InsufficientResourcesError('linux nodes requested: 1. linux nodes available: 0') Traceback (most recent call last): File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", line 929, in test_exactly_once_source consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True, consumer_properties=consumer_properties) File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, in __init__ BackgroundThreadService.__init__(self, context, num_nodes) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py", line 26, in __init__ super(BackgroundThreadService, self).__init__(context, num_nodes, cluster_spec, *args, **kwargs) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 107, in __init__ self.allocate_nodes() File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 217, in allocate_nodes self.nodes = self.cluster.alloc(self.cluster_spec) File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/cluster.py", line 54, in alloc allocated = self.do_alloc(cluster_spec) File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/finite_subcluster.py", line 37, in do_alloc good_nodes, bad_nodes = self._available_nodes.remove_spec(cluster_spec) File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/node_container.py", line 131, in remove_spec raise InsufficientResourcesError(err) ducktape.cluster.node_container.InsufficientResourcesError: linux nodes requested: 1. linux nodes available: 0 ``` If you want, I can revert the change as part of my PR. It should be ready for review by today or tomorrow. > Revert changes to connect_distributed_test.py for the new async Consumer > > > Key: KAFKA-16578 > URL: https://issues.apache.org/jira/browse/KAFKA-16578 > Project: Kafka > Issue Type: Task > Components: clients, consumer, system tests >Affects Versions: 3.8.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated > a slew of system tests to run both the "old" and "new" implementations. > KAFKA-16272 updated the system tests in {{connect_distributed_test.py}} so it > could test the new consumer with Connect. However, we are not supporting > Connect with the new consumer in 3.8.0. Unsurprisingly, when we run the > Connect system tests with the new {{AsyncKafkaConsumer}}, we get errors like > the following: > {code} > test_id: > kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=eager.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > status: FAIL > run time: 6 minutes 3.899 seconds > InsufficientResourcesError('Not enough nodes available to allocate. linux > nodes requested: 1. linux nodes available: 0') > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File >
Re: [PR] MINOR: Use Parametrized types correctly in RemoteLogMetadataSerde [kafka]
mimaison commented on PR #13824: URL: https://github.com/apache/kafka/pull/13824#issuecomment-2063586815 Feel free to merge once CI completes. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Use Parametrized types correctly in RemoteLogMetadataSerde [kafka]
jlprat commented on PR #13824: URL: https://github.com/apache/kafka/pull/13824#issuecomment-2063587938 Thanks @mimaison. Will do! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]
brandboat commented on PR #15719: URL: https://github.com/apache/kafka/pull/15719#issuecomment-2063574931 Thanks for the reminder, soarez ! Already fix the error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1569131419 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1189,50 +1216,61 @@ class LogManager(logDirs: Seq[File], val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) - info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") - destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) - // the metrics tags still contain "future", so we have to remove it. - // we will add metrics back after sourceLog remove the metrics - destLog.removeLogMetrics() - destLog.updateHighWatermark(sourceLog.highWatermark) + replaceCurrentWithFutureLog(Option(sourceLog), destLog, updateHighWatermark = true) +} + } + + def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = { +val topicPartition = destLog.topicPartition +info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") - // Now that future replica has been successfully renamed to be the current replica - // Update the cached map and log cleaner as appropriate. - futureLogs.remove(topicPartition) - currentLogs.put(topicPartition, destLog) - if (cleaner != null) { -cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, destLog.parentDirFile) -resumeCleaning(topicPartition) - } +destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) +// the metrics tags still contain "future", so we have to remove it. +// we will add metrics back after sourceLog remove the metrics +destLog.removeLogMetrics() +if (updateHighWatermark && sourceLog.isDefined) { + destLog.updateHighWatermark(sourceLog.get.highWatermark) +} - try { -sourceLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), shouldReinitialize = true) +// Now that future replica has been successfully renamed to be the current replica +// Update the cached map and log cleaner as appropriate. +futureLogs.remove(topicPartition) +currentLogs.put(topicPartition, destLog) +if (cleaner != null) { + cleaner.alterCheckpointDir(topicPartition, sourceLog.map(_.parentDirFile), destLog.parentDirFile) Review Comment: Addressed in [d02fc0b](https://github.com/apache/kafka/pull/15136/commits/d02fc0b9abeacf95096d4275232ea0cf829d836a) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1570449270 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1189,50 +1216,63 @@ class LogManager(logDirs: Seq[File], val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) - info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") - destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) - // the metrics tags still contain "future", so we have to remove it. - // we will add metrics back after sourceLog remove the metrics - destLog.removeLogMetrics() - destLog.updateHighWatermark(sourceLog.highWatermark) + replaceCurrentWithFutureLog(Option(sourceLog), destLog, updateHighWatermark = true) +} + } - // Now that future replica has been successfully renamed to be the current replica - // Update the cached map and log cleaner as appropriate. - futureLogs.remove(topicPartition) - currentLogs.put(topicPartition, destLog) - if (cleaner != null) { -cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, destLog.parentDirFile) -resumeCleaning(topicPartition) + def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = { +val topicPartition = destLog.topicPartition +info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") Review Comment: Addressed this and other comments in [31412a9](https://github.com/apache/kafka/pull/15136/commits/31412a9485b63731a49b5d44d6dc6fbeaf52dd0f) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1570449270 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1189,50 +1216,63 @@ class LogManager(logDirs: Seq[File], val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) - info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") - destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) - // the metrics tags still contain "future", so we have to remove it. - // we will add metrics back after sourceLog remove the metrics - destLog.removeLogMetrics() - destLog.updateHighWatermark(sourceLog.highWatermark) + replaceCurrentWithFutureLog(Option(sourceLog), destLog, updateHighWatermark = true) +} + } - // Now that future replica has been successfully renamed to be the current replica - // Update the cached map and log cleaner as appropriate. - futureLogs.remove(topicPartition) - currentLogs.put(topicPartition, destLog) - if (cleaner != null) { -cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, destLog.parentDirFile) -resumeCleaning(topicPartition) + def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = { +val topicPartition = destLog.topicPartition +info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") Review Comment: Addressed this and other comments in [d089929](https://github.com/apache/kafka/pull/15136/commits/d089929b5a92c0be308f685980e2f787b652ed86) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-8041) Flaky Test LogDirFailureTest#testIOExceptionDuringLogRoll
[ https://issues.apache.org/jira/browse/KAFKA-8041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838583#comment-17838583 ] Omnia Ibrahim edited comment on KAFKA-8041 at 4/18/24 10:14 AM: [~soarez] Yes, sorry for the late respond. I believe this should be fixed now after the merge of [https://github.com/apache/kafka/pull/15335] . It has been passing for the last couple of weeks with no flakiness [https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=kafka.server.LogDirFailureTest=testIOExceptionDuringLogRoll(String)%5B2%5D |https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=kafka.server.LogDirFailureTest=testIOExceptionDuringLogRoll(String)%5B2%5D] was (Author: omnia_h_ibrahim): [~soarez] I believe this should be fixed now after the merge of [https://github.com/apache/kafka/pull/15335] . It has been passing for the last couple of weeks with no flakiness [https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=kafka.server.LogDirFailureTest=testIOExceptionDuringLogRoll(String)%5B2%5D |https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=kafka.server.LogDirFailureTest=testIOExceptionDuringLogRoll(String)%5B2%5D] > Flaky Test LogDirFailureTest#testIOExceptionDuringLogRoll > - > > Key: KAFKA-8041 > URL: https://issues.apache.org/jira/browse/KAFKA-8041 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.0.1, 2.3.0 >Reporter: Matthias J. Sax >Assignee: Bob Barrett >Priority: Critical > Labels: flaky-test > Fix For: 2.4.0 > > > [https://builds.apache.org/blue/organizations/jenkins/kafka-2.0-jdk8/detail/kafka-2.0-jdk8/236/tests] > {quote}java.lang.AssertionError: Expected some messages > at kafka.utils.TestUtils$.fail(TestUtils.scala:357) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:787) > at > kafka.server.LogDirFailureTest.testProduceAfterLogDirFailureOnLeader(LogDirFailureTest.scala:189) > at > kafka.server.LogDirFailureTest.testIOExceptionDuringLogRoll(LogDirFailureTest.scala:63){quote} > STDOUT > {quote}[2019-03-05 03:44:58,614] ERROR [ReplicaFetcher replicaId=1, > leaderId=0, fetcherId=0] Error for partition topic-6 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-05 03:44:58,614] ERROR [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error for partition topic-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-05 03:44:58,615] ERROR [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error for partition topic-10 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-05 03:44:58,615] ERROR [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error for partition topic-4 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-05 03:44:58,615] ERROR [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error for partition topic-8 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-05 03:44:58,615] ERROR [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error for partition topic-2 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-05 03:45:00,248] ERROR Error while rolling log segment for topic-0 > in dir > /home/jenkins/jenkins-slave/workspace/kafka-2.0-jdk8/core/data/kafka-3869208920357262216 > (kafka.server.LogDirFailureChannel:76) > java.io.FileNotFoundException: > /home/jenkins/jenkins-slave/workspace/kafka-2.0-jdk8/core/data/kafka-3869208920357262216/topic-0/.index > (Not a directory) > at java.io.RandomAccessFile.open0(Native Method) > at java.io.RandomAccessFile.open(RandomAccessFile.java:316) > at java.io.RandomAccessFile.(RandomAccessFile.java:243) > at kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:121) > at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at
[jira] [Commented] (KAFKA-8041) Flaky Test LogDirFailureTest#testIOExceptionDuringLogRoll
[ https://issues.apache.org/jira/browse/KAFKA-8041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838583#comment-17838583 ] Omnia Ibrahim commented on KAFKA-8041: -- [~soarez] I believe this should be fixed now after the merge of [https://github.com/apache/kafka/pull/15335] . It has been passing for the last couple of weeks with no flakiness [https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=kafka.server.LogDirFailureTest=testIOExceptionDuringLogRoll(String)%5B2%5D |https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=kafka.server.LogDirFailureTest=testIOExceptionDuringLogRoll(String)%5B2%5D] > Flaky Test LogDirFailureTest#testIOExceptionDuringLogRoll > - > > Key: KAFKA-8041 > URL: https://issues.apache.org/jira/browse/KAFKA-8041 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.0.1, 2.3.0 >Reporter: Matthias J. Sax >Assignee: Bob Barrett >Priority: Critical > Labels: flaky-test > Fix For: 2.4.0 > > > [https://builds.apache.org/blue/organizations/jenkins/kafka-2.0-jdk8/detail/kafka-2.0-jdk8/236/tests] > {quote}java.lang.AssertionError: Expected some messages > at kafka.utils.TestUtils$.fail(TestUtils.scala:357) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:787) > at > kafka.server.LogDirFailureTest.testProduceAfterLogDirFailureOnLeader(LogDirFailureTest.scala:189) > at > kafka.server.LogDirFailureTest.testIOExceptionDuringLogRoll(LogDirFailureTest.scala:63){quote} > STDOUT > {quote}[2019-03-05 03:44:58,614] ERROR [ReplicaFetcher replicaId=1, > leaderId=0, fetcherId=0] Error for partition topic-6 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-05 03:44:58,614] ERROR [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error for partition topic-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-05 03:44:58,615] ERROR [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error for partition topic-10 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-05 03:44:58,615] ERROR [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error for partition topic-4 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-05 03:44:58,615] ERROR [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error for partition topic-8 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-05 03:44:58,615] ERROR [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error for partition topic-2 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-05 03:45:00,248] ERROR Error while rolling log segment for topic-0 > in dir > /home/jenkins/jenkins-slave/workspace/kafka-2.0-jdk8/core/data/kafka-3869208920357262216 > (kafka.server.LogDirFailureChannel:76) > java.io.FileNotFoundException: > /home/jenkins/jenkins-slave/workspace/kafka-2.0-jdk8/core/data/kafka-3869208920357262216/topic-0/.index > (Not a directory) > at java.io.RandomAccessFile.open0(Native Method) > at java.io.RandomAccessFile.open(RandomAccessFile.java:316) > at java.io.RandomAccessFile.(RandomAccessFile.java:243) > at kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:121) > at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at kafka.log.AbstractIndex.resize(AbstractIndex.scala:115) > at kafka.log.AbstractIndex.$anonfun$trimToValidSize$1(AbstractIndex.scala:184) > at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:184) > at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:501) > at kafka.log.Log.$anonfun$roll$8(Log.scala:1520) > at kafka.log.Log.$anonfun$roll$8$adapted(Log.scala:1520) > at scala.Option.foreach(Option.scala:257) > at kafka.log.Log.$anonfun$roll$2(Log.scala:1520) > at
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
OmniaGM commented on code in PR #15569: URL: https://github.com/apache/kafka/pull/15569#discussion_r1570406354 ## server/src/main/java/org/apache/kafka/server/config/KafkaLogConfigs.java: ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.TopicConfig; +import static org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX; + +/** + * Common home for broker-side log configs which need to be accessible from the libraries shared + * between the broker and the multiple modules in Kafka. + * + * Note this is an internal API and subject to change without notice. + */ +public class KafkaLogConfigs { +public static final String NUM_PARTITIONS_CONFIG = "num.partitions"; +public static final int NUM_PARTITIONS_DEFAULT = 1; +public static final String NUM_PARTITIONS_DOC = "The default number of log partitions per topic"; + +public static final String LOG_DIRS_CONFIG = LOG_PREFIX + "dirs"; +public static final String LOG_DIR_CONFIG = LOG_PREFIX + "dir"; +public static final String LOG_DIR_DEFAULT = "/tmp/kafka-logs"; +public static final String LOG_DIR_DOC = "The directory in which the log data is kept (supplemental for " + LOG_DIRS_CONFIG + " property)"; +public static final String LOG_DIRS_DOC = "A comma-separated list of the directories where the log data is stored. If not set, the value in " + LOG_DIR_CONFIG + " is used."; + +public static final String LOG_SEGMENT_BYTES_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_BYTES_CONFIG); +public static final String LOG_SEGMENT_BYTES_DOC = "The maximum size of a single log file"; + +public static final String LOG_ROLL_TIME_MILLIS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_MS_CONFIG); +public static final String LOG_ROLL_TIME_HOURS_CONFIG = LOG_PREFIX + "roll.hours"; +public static final String LOG_ROLL_TIME_MILLIS_DOC = "The maximum time before a new log segment is rolled out (in milliseconds). If not set, the value in " + LOG_ROLL_TIME_HOURS_CONFIG + " is used"; +public static final String LOG_ROLL_TIME_HOURS_DOC = "The maximum time before a new log segment is rolled out (in hours), secondary to " + LOG_ROLL_TIME_MILLIS_CONFIG + " property"; + +public static final String LOG_ROLL_TIME_JITTER_MILLIS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_JITTER_MS_CONFIG); +public static final String LOG_ROLL_TIME_JITTER_HOURS_CONFIG = LOG_PREFIX + "roll.jitter.hours"; +public static final String LOG_ROLL_TIME_JITTER_MILLIS_DOC = "The maximum jitter to subtract from logRollTimeMillis (in milliseconds). If not set, the value in " + LOG_ROLL_TIME_JITTER_HOURS_CONFIG + " is used"; +public static final String LOG_ROLL_TIME_JITTER_HOURS_DOC = "The maximum jitter to subtract from logRollTimeMillis (in hours), secondary to " + LOG_ROLL_TIME_JITTER_MILLIS_CONFIG + " property"; + + +public static final String LOG_RETENTION_TIME_MILLIS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.RETENTION_MS_CONFIG); +public static final String LOG_RETENTION_TIME_MINUTES_CONFIG = LOG_PREFIX + "retention.minutes"; +public static final String LOG_RETENTION_TIME_HOURS_CONFIG = LOG_PREFIX + "retention.hours"; +public static final String LOG_RETENTION_TIME_MILLIS_DOC = "The number of milliseconds to keep a log file before deleting it (in milliseconds), If not set, the value in " + LOG_RETENTION_TIME_MINUTES_CONFIG + " is used. If set to -1, no time limit is applied."; +public static final String LOG_RETENTION_TIME_MINUTES_DOC = "The number of minutes to keep a log file before deleting it (in minutes), secondary to " + LOG_RETENTION_TIME_MILLIS_CONFIG + " property. If not set, the value in " + LOG_RETENTION_TIME_HOURS_CONFIG + " is used"; +public static final String LOG_RETENTION_TIME_HOURS_DOC = "The number of hours to keep a log file before deleting it (in hours), tertiary to " + LOG_RETENTION_TIME_MILLIS_CONFIG + " property"; + +public static final String LOG_RETENTION_BYTES_CONFIG =
Re: [PR] MINOR: Use Parametrized types correctly in RemoteLogMetadataSerde [kafka]
jlprat commented on PR #13824: URL: https://github.com/apache/kafka/pull/13824#issuecomment-2063483812 @mimaison Rebased -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16552: Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests [kafka]
soarez commented on PR #15719: URL: https://github.com/apache/kafka/pull/15719#issuecomment-2063477565 Thanks for the changes. You have some compilation errors: ``` [2024-04-17T15:27:16.786Z] > Task :jmh-benchmarks:compileJava [2024-04-17T15:27:16.786Z] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15719/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java:110: error: method createLogManager in class TestUtils cannot be applied to given types; [2024-04-17T15:27:16.786Z] this.logManager = TestUtils.createLogManager(JavaConverters.asScalaBuffer(files), [2024-04-17T15:27:16.786Z]^ [2024-04-17T15:27:16.786Z] required: Seq,LogConfig,ConfigRepository,CleanerConfig,MockTime,MetadataVersion,int,boolean,Option,boolean,long [2024-04-17T15:27:16.786Z] found: Buffer,LogConfig,MockConfigRepository,CleanerConfig,MockTime,MetadataVersion,int,boolean,Option,boolean [2024-04-17T15:27:16.786Z] reason: actual and formal argument lists differ in length [2024-04-17T15:27:16.786Z] 1 error ``` Can you address these? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Use Parametrized types correctly in RemoteLogMetadataSerde [kafka]
mimaison commented on PR #13824: URL: https://github.com/apache/kafka/pull/13824#issuecomment-2063460572 @jlprat Can you rebase to resolve the conflict? Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1570371982 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { case (futureLog, currentLog) => + val tp = futureLog.topicPartition + if (cleaner != null) { Review Comment: > I guess cleaner.abortAndPauseCleaning is added because we call resumeCleaning later, and it will cause error if we don't call abortAndPauseCleaning here? That's correct if the cleaning operation hasn't started. The cleaning operation is scheduled on a separate thread so we cannot be sure if `inProgress` map in `LogCleanerManager` has a key for the given topicPartition at the time we iterate over these logs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
mimaison commented on code in PR #15569: URL: https://github.com/apache/kafka/pull/15569#discussion_r1570367941 ## server/src/main/java/org/apache/kafka/server/config/KafkaLogConfigs.java: ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.TopicConfig; +import static org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX; + +/** + * Common home for broker-side log configs which need to be accessible from the libraries shared + * between the broker and the multiple modules in Kafka. + * + * Note this is an internal API and subject to change without notice. + */ +public class KafkaLogConfigs { +public static final String NUM_PARTITIONS_CONFIG = "num.partitions"; +public static final int NUM_PARTITIONS_DEFAULT = 1; +public static final String NUM_PARTITIONS_DOC = "The default number of log partitions per topic"; + +public static final String LOG_DIRS_CONFIG = LOG_PREFIX + "dirs"; +public static final String LOG_DIR_CONFIG = LOG_PREFIX + "dir"; +public static final String LOG_DIR_DEFAULT = "/tmp/kafka-logs"; +public static final String LOG_DIR_DOC = "The directory in which the log data is kept (supplemental for " + LOG_DIRS_CONFIG + " property)"; +public static final String LOG_DIRS_DOC = "A comma-separated list of the directories where the log data is stored. If not set, the value in " + LOG_DIR_CONFIG + " is used."; + +public static final String LOG_SEGMENT_BYTES_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_BYTES_CONFIG); +public static final String LOG_SEGMENT_BYTES_DOC = "The maximum size of a single log file"; + +public static final String LOG_ROLL_TIME_MILLIS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_MS_CONFIG); +public static final String LOG_ROLL_TIME_HOURS_CONFIG = LOG_PREFIX + "roll.hours"; +public static final String LOG_ROLL_TIME_MILLIS_DOC = "The maximum time before a new log segment is rolled out (in milliseconds). If not set, the value in " + LOG_ROLL_TIME_HOURS_CONFIG + " is used"; +public static final String LOG_ROLL_TIME_HOURS_DOC = "The maximum time before a new log segment is rolled out (in hours), secondary to " + LOG_ROLL_TIME_MILLIS_CONFIG + " property"; + +public static final String LOG_ROLL_TIME_JITTER_MILLIS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.SEGMENT_JITTER_MS_CONFIG); +public static final String LOG_ROLL_TIME_JITTER_HOURS_CONFIG = LOG_PREFIX + "roll.jitter.hours"; +public static final String LOG_ROLL_TIME_JITTER_MILLIS_DOC = "The maximum jitter to subtract from logRollTimeMillis (in milliseconds). If not set, the value in " + LOG_ROLL_TIME_JITTER_HOURS_CONFIG + " is used"; +public static final String LOG_ROLL_TIME_JITTER_HOURS_DOC = "The maximum jitter to subtract from logRollTimeMillis (in hours), secondary to " + LOG_ROLL_TIME_JITTER_MILLIS_CONFIG + " property"; + + +public static final String LOG_RETENTION_TIME_MILLIS_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.RETENTION_MS_CONFIG); +public static final String LOG_RETENTION_TIME_MINUTES_CONFIG = LOG_PREFIX + "retention.minutes"; +public static final String LOG_RETENTION_TIME_HOURS_CONFIG = LOG_PREFIX + "retention.hours"; +public static final String LOG_RETENTION_TIME_MILLIS_DOC = "The number of milliseconds to keep a log file before deleting it (in milliseconds), If not set, the value in " + LOG_RETENTION_TIME_MINUTES_CONFIG + " is used. If set to -1, no time limit is applied."; +public static final String LOG_RETENTION_TIME_MINUTES_DOC = "The number of minutes to keep a log file before deleting it (in minutes), secondary to " + LOG_RETENTION_TIME_MILLIS_CONFIG + " property. If not set, the value in " + LOG_RETENTION_TIME_HOURS_CONFIG + " is used"; +public static final String LOG_RETENTION_TIME_HOURS_DOC = "The number of hours to keep a log file before deleting it (in hours), tertiary to " + LOG_RETENTION_TIME_MILLIS_CONFIG + " property"; + +public static final String LOG_RETENTION_BYTES_CONFIG =
Re: [PR] debug for 15679 [kafka]
chia7712 closed pull request #15741: debug for 15679 URL: https://github.com/apache/kafka/pull/15741 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Bound high-watermark offset to range between LLSO and LEO [kafka]
chia7712 commented on PR #15634: URL: https://github.com/apache/kafka/pull/15634#issuecomment-2063435567 > HWM is set to to localLogStartOffset in [UnifiedLog#updateLocalLogStartOffset](https://sourcegraph.com/github.com/apache/kafka@f895ab5145077c5efa10a4a898628d901b01e2c2/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L162), then we load the HWM from the checkpoint file in [Partition#createLog](https://sourcegraph.com/github.com/apache/kafka@f895ab5145077c5efa10a4a898628d901b01e2c2/-/blob/core/src/main/scala/kafka/cluster/Partition.scala?L495). If the HWM checkpoint file is missing / does not contain the entry for partition, then the default value of 0 is taken. If 0 < LogStartOffset (LSO), then LSO is assumed as HWM . Thus, the non-monotonic update of highwatermark from LLSO to LSO can happen. Pardon me. I'm a bit confused about this. Please feel free to correct me to help me catch up :smile: ### case 0: the checkpoint file is missing and the remote storage is **disabled** The LSO is initialized to LLSO https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/core/src/main/scala/kafka/log/LogLoader.scala#L180 so I can't understand why the non-monotonic update happens? After all, LLSO and LSO are the same in this scenario. ### case 1: the checkpoint file is missing and the remote storage is **enabled** The LSO is initialzied to `logStartOffsetCheckpoint` which is 0 since there are no checkpoint files. https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/core/src/main/scala/kafka/log/LogLoader.scala#L178 And then HWM will be update to LLSO which is larger than zero. https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/core/src/main/scala/kafka/log/UnifiedLog.scala#L172 And this could be a problem when [Partition#createLog](https://sourcegraph.com/github.com/apache/kafka@f895ab5145077c5efa10a4a898628d901b01e2c2/-/blob/core/src/main/scala/kafka/cluster/Partition.scala?L495) get called since the HWM is changed from LLSO (non-zero) to LSO (zero). Also, the incorrect HWM causes error in `convertToOffsetMetadataOrThrow`. If I understand correctly, it seems the root cause is that "when the checkpoint files are not working, we will initialize a `UnifiedLog` with incorrect LSO". and so could we fix that by re-build `logStartOffsets` according remote storage when checkpoint is not working (https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/core/src/main/scala/kafka/log/LogManager.scala#L459)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15089) Consolidate all the group coordinator configs
[ https://issues.apache.org/jira/browse/KAFKA-15089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838562#comment-17838562 ] Omnia Ibrahim commented on KAFKA-15089: --- Hi [~dajac] should we mark this as resolved now as we merged [https://github.com/apache/kafka/pull/15684] or is there anything left here to do? > Consolidate all the group coordinator configs > - > > Key: KAFKA-15089 > URL: https://issues.apache.org/jira/browse/KAFKA-15089 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Priority: Major > > The group coordinator configurations are defined in KafkaConfig at the > moment. As KafkaConfig is defined in the core module, we can't pass it to the > new java modules to pass the configurations along. > A suggestion here is to centralize all the configurations of a module in the > module itself similarly to what we have do for RemoteLogManagerConfig and > RaftConfig. We also need a mechanism to add all the properties defined in the > module to the KafkaConfig's ConfigDef. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16298: Ensure rebalance listener exceptions are propagated to the user on consumer poll [kafka]
lucasbru commented on code in PR #15742: URL: https://github.com/apache/kafka/pull/15742#discussion_r1570342805 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -228,13 +228,16 @@ private void process(final ErrorEvent event) { } private void process(final ConsumerRebalanceListenerCallbackNeededEvent event) { -ApplicationEvent invokedEvent = invokeRebalanceCallbacks( +ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent = invokeRebalanceCallbacks( rebalanceListenerInvoker, event.methodName(), event.partitions(), event.future() ); applicationEventHandler.add(invokedEvent); +if (invokedEvent.error().isPresent()) { +throw invokedEvent.error().get(); Review Comment: Looking at the reconciliation logic, I think if `onPartitionsRevoked` throws, we'll not execute `onPartitionsAssigned`. And the call to `onPartitionsLost` seems to be independent of reconciliation. So not sure how we'd end up with two exceptions. You are right that there is a behavioral difference around finishing the reconciliation. The old consumer throws _after_ finishing the reconciliation, while the new consumer throws on a different thread, so there is no strict time ordering between finishing the reconciliation and throwing. But I'm struggling to see how one can observe the difference. The reconciliation will have finished the next time the background thread processes any events, so in a sense, you cannot observe the difference based on the queue architecture. The difference may only be observable through shared state that breaks the queue-based architecture. SubscriptionState comes to mind here. Thinking of something like 1. application thread enters poll, fails during rebalance listener execution and throws 2. application thread somehow reads subscription state 3. background thread updates subscription state as part of reconciliation Now the application thread has observed an "incomplete reconciliation". But after a listener execution has failed, we don't seem to update the subscription state in the reconciliation. So in summary - not sure if we are going to notice the different behaviors? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16555: Consumer's RequestState has incorrect logic to determine if inflight [kafka]
lucasbru commented on PR #15723: URL: https://github.com/apache/kafka/pull/15723#issuecomment-2063300823 @lianetm did you want to make another pass or good to go? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]
showuon commented on PR #15748: URL: https://github.com/apache/kafka/pull/15748#issuecomment-2063302914 @kamalcph , the tests are still failing: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15748/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
dajac commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1570226001 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.lang.Integer.max; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 1) +@Measurement(iterations = 0) Review Comment: We should use better defaults than this. I think that we usually use 5 and 10/15. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
showuon commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1570226078 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java: ## @@ -533,6 +537,45 @@ public void testSkippingAssignmentFails() { verify(configStorage).snapshot(); } +@Test +public void testPollTimeoutExpiry() throws InterruptedException { + +when(configStorage.snapshot()).thenReturn(configState1); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + +client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), +Collections.singletonList(taskId1x0), Errors.NONE)); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + Review Comment: What I want to say is, with your current change, this test will work like what you did before, which is not sending heartbeat response at all, and forcing session timeout directly. That works though, it's just not readable and doesn't make sense (i.e. not normal experience). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
showuon commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1570217037 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java: ## @@ -533,6 +537,45 @@ public void testSkippingAssignmentFails() { verify(configStorage).snapshot(); } +@Test +public void testPollTimeoutExpiry() throws InterruptedException { + +when(configStorage.snapshot()).thenReturn(configState1); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + +client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), +Collections.singletonList(taskId1x0), Errors.NONE)); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + Review Comment: Hmm.. I see. The reason is because we didn't make sure the heartbeat is sent after `time.sleep(sessionTimeoutMs - 1);`. Do you think we could make sure the heartbeat request or response is sent before next sleep? With that, we can make sure the session timeout won't happen because we did send out heartbeat every sessionTimeoutMs - 1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
showuon commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1570111991 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java: ## @@ -533,6 +537,45 @@ public void testSkippingAssignmentFails() { verify(configStorage).snapshot(); } +@Test +public void testPollTimeoutExpiry() throws InterruptedException { + +when(configStorage.snapshot()).thenReturn(configState1); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + +client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), +Collections.singletonList(taskId1x0), Errors.NONE)); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + +// prepare 3 heartBeatResponses because we will trigger 3 heartBeat requests until rebalanceTimeout, +// that is (sessionTimeoutMs - 1) * 3 > rebalanceTimeoutMs +client.prepareResponse(new HeartbeatResponse(new HeartbeatResponseData())); +client.prepareResponse(new HeartbeatResponse(new HeartbeatResponseData())); +client.prepareResponse(new HeartbeatResponse(new HeartbeatResponseData())); + +try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) { +coordinator.ensureActiveGroup(); +coordinator.poll(0, () -> { +return null; +}); + +// We keep the heartbeat thread running behind the scenes and poll frequently so that eventually +// the time goes past now + rebalanceTimeoutMs which triggers poll timeout expiry. +TestUtils.waitForCondition(() -> { +// sleep until sessionTimeoutMs to trigger a heartBeat request to avoid session timeout. +// Not sure if this will be flaky in CI because the heartbeat thread might not send out the heartBeat request in time. Review Comment: Should we remove this line? ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java: ## @@ -533,6 +537,45 @@ public void testSkippingAssignmentFails() { verify(configStorage).snapshot(); } +@Test +public void testPollTimeoutExpiry() throws InterruptedException { + +when(configStorage.snapshot()).thenReturn(configState1); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + +client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), +Collections.singletonList(taskId1x0), Errors.NONE)); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + Review Comment: Hmm.. I see. The reason is because we don't make sure the heartbeat is sent after `time.sleep(sessionTimeoutMs - 1);`. Do you think we could make sure the heartbeat request or response is sent before next sleep? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
showuon commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1570217037 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java: ## @@ -533,6 +537,45 @@ public void testSkippingAssignmentFails() { verify(configStorage).snapshot(); } +@Test +public void testPollTimeoutExpiry() throws InterruptedException { + +when(configStorage.snapshot()).thenReturn(configState1); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + +client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), +Collections.singletonList(taskId1x0), Errors.NONE)); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + Review Comment: Hmm.. I see. The reason is because we didn't make sure the heartbeat is sent after `time.sleep(sessionTimeoutMs - 1);`. Do you think we could make sure the heartbeat request or response is sent before next sleep? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16583: fix PartitionRegistration#toRecord directory check under metadata version 3_7_IV2 [kafka]
superhx opened a new pull request, #15751: URL: https://github.com/apache/kafka/pull/15751 fix https://issues.apache.org/jira/browse/KAFKA-16583 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16584) Make log processing summary configurable or debug
Andras Hatvani created KAFKA-16584: -- Summary: Make log processing summary configurable or debug Key: KAFKA-16584 URL: https://issues.apache.org/jira/browse/KAFKA-16584 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.6.2 Reporter: Andras Hatvani Currently *every two minutes for every stream thread* statistics will be logged on INFO log level. {code:log} 2024-04-18T09:18:23.790+02:00 INFO 33178 --- [service] [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 total records, ran 0 punctuators, and committed 0 total tasks since the last update {code} This is absolutely unnecessary and even harmful since it fills the logs and thus storage space with unwanted and useless data. Otherwise the INFO logs are useful and helpful, therefore it's not an option to raise the log level to WARN. Please make the logProcessingSummary * either to a DEBUG level log or * make it configurable so that it can be disabled. This is the relevant code: https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16584) Make log processing summary configurable or debug
[ https://issues.apache.org/jira/browse/KAFKA-16584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andras Hatvani updated KAFKA-16584: --- Description: Currently *every two minutes for every stream thread* statistics will be logged on INFO log level. {code} 2024-04-18T09:18:23.790+02:00 INFO 33178 --- [service] [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 total records, ran 0 punctuators, and committed 0 total tasks since the last update {code} This is absolutely unnecessary and even harmful since it fills the logs and thus storage space with unwanted and useless data. Otherwise the INFO logs are useful and helpful, therefore it's not an option to raise the log level to WARN. Please make the logProcessingSummary * either to a DEBUG level log or * make it configurable so that it can be disabled. This is the relevant code: https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073 was: Currently *every two minutes for every stream thread* statistics will be logged on INFO log level. {code:log} 2024-04-18T09:18:23.790+02:00 INFO 33178 --- [service] [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 total records, ran 0 punctuators, and committed 0 total tasks since the last update {code} This is absolutely unnecessary and even harmful since it fills the logs and thus storage space with unwanted and useless data. Otherwise the INFO logs are useful and helpful, therefore it's not an option to raise the log level to WARN. Please make the logProcessingSummary * either to a DEBUG level log or * make it configurable so that it can be disabled. This is the relevant code: https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073 > Make log processing summary configurable or debug > - > > Key: KAFKA-16584 > URL: https://issues.apache.org/jira/browse/KAFKA-16584 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.2 >Reporter: Andras Hatvani >Priority: Major > > Currently *every two minutes for every stream thread* statistics will be > logged on INFO log level. > {code} > 2024-04-18T09:18:23.790+02:00 INFO 33178 --- [service] [-StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread > [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 > total records, ran 0 punctuators, and committed 0 total tasks since the last > update {code} > This is absolutely unnecessary and even harmful since it fills the logs and > thus storage space with unwanted and useless data. Otherwise the INFO logs > are useful and helpful, therefore it's not an option to raise the log level > to WARN. > Please make the logProcessingSummary > * either to a DEBUG level log or > * make it configurable so that it can be disabled. > This is the relevant code: > https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16574) The metrics of LogCleaner disappear after reconfiguration
[ https://issues.apache.org/jira/browse/KAFKA-16574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16574: -- Assignee: Chia Chuan Yu (was: Chia-Ping Tsai) > The metrics of LogCleaner disappear after reconfiguration > - > > Key: KAFKA-16574 > URL: https://issues.apache.org/jira/browse/KAFKA-16574 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia Chuan Yu >Priority: Minor > > see > [https://github.com/apache/kafka/blob/a3dcbd4e28a35f79f75ec1bf316ef0b39c0df164/core/src/main/scala/kafka/log/LogCleaner.scala#L227] > We don't rebuild the metrics after calling shutdown. The following test can > prove that. > {code:java} > @Test > def testMetricsAfterReconfiguration(): Unit = { > val logCleaner = new LogCleaner(new CleanerConfig(true), > logDirs = Array(TestUtils.tempDir()), > logs = new Pool[TopicPartition, UnifiedLog](), > logDirFailureChannel = new LogDirFailureChannel(1), > time = time) > def check(): Unit = > LogCleaner.MetricNames.foreach(name => > assertNotNull(KafkaYammerMetrics.defaultRegistry.allMetrics().get(logCleaner.metricsGroup > .metricName(name, java.util.Collections.emptyMap())), s"$name is > gone?")) > try { > check() > logCleaner.reconfigure(new KafkaConfig(TestUtils.createBrokerConfig(1, > "localhost:2181")), > new KafkaConfig(TestUtils.createBrokerConfig(1, "localhost:2181"))) > check() > } finally logCleaner.shutdown() > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1570135556 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.consumer.VersionedMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class TargetAssignmentBuilderBenchmark { + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +String groupId = "benchmark-group"; + +private static final int groupEpoch = 0; + +private PartitionAssignor partitionAssignor; + +private Map subscriptionMetadata = Collections.emptyMap(); + +private TargetAssignmentBuilder targetAssignmentBuilder; + +private AssignmentSpec assignmentSpec; + +private static final int numberOfRacks = 3; + +private final List allTopicNames = new ArrayList<>(topicCount); + +private final List allTopicIds = new ArrayList<>(topicCount); + +@Setup(Level.Trial) +public void setup() { +// For this benchmark we will use the Uniform Assignor +// and a group that has a homogeneous subscription model. +partitionAssignor = new UniformAssignor(); +subscriptionMetadata = generateMockSubscriptionMetadata(); +Map members = generateMockMembers(); +Map existingTargetAssignment = generateMockInitialTargetAssignment(); + +// Add a new member to trigger a rebalance. +Set subscribedTopics = new HashSet<>(subscriptionMetadata.keySet()); +String rackId = isRackAware ? "rack" + (memberCount - 1) % numberOfRacks : ""; Review Comment: removed rack aware now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1570135213 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.consumer.VersionedMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class TargetAssignmentBuilderBenchmark { + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +String groupId = "benchmark-group"; + +private static final int groupEpoch = 0; + +private PartitionAssignor partitionAssignor; + +private Map subscriptionMetadata = Collections.emptyMap(); + +private TargetAssignmentBuilder targetAssignmentBuilder; + +private AssignmentSpec assignmentSpec; + +private static final int numberOfRacks = 3; + +private final List allTopicNames = new ArrayList<>(topicCount); + +private final List allTopicIds = new ArrayList<>(topicCount); + +@Setup(Level.Trial) +public void setup() { +// For this benchmark we will use the Uniform Assignor +// and a group that has a homogeneous subscription model. +partitionAssignor = new UniformAssignor(); +subscriptionMetadata = generateMockSubscriptionMetadata(); +Map members = generateMockMembers(); +Map existingTargetAssignment = generateMockInitialTargetAssignment(); + +// Add a new member to trigger a rebalance. +Set subscribedTopics = new HashSet<>(subscriptionMetadata.keySet()); +String rackId = isRackAware ? "rack" + (memberCount - 1) % numberOfRacks : ""; +ConsumerGroupMember newMember = new ConsumerGroupMember.Builder("new-member") +.setSubscribedTopicNames(new ArrayList<>(subscribedTopics)) +.setRackId(rackId) +.build(); + +targetAssignmentBuilder = new TargetAssignmentBuilder(groupId, groupEpoch, partitionAssignor) +.withMembers(members) +.withSubscriptionMetadata(subscriptionMetadata) +
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1570128341 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.consumer.VersionedMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class TargetAssignmentBuilderBenchmark { + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; Review Comment: okay -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1570122759 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java: ## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.consumer.VersionedMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class TargetAssignmentBuilderBenchmark { + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +String groupId = "benchmark-group"; + +private static final int groupEpoch = 0; + +private PartitionAssignor partitionAssignor; + +private Map subscriptionMetadata = Collections.emptyMap(); + +private TargetAssignmentBuilder targetAssignmentBuilder; + +private AssignmentSpec assignmentSpec; + +private static final int numberOfRacks = 3; + +private final List allTopicNames = new ArrayList<>(topicCount); + +private final List allTopicIds = new ArrayList<>(topicCount); + +@Setup(Level.Trial) +public void setup() { +// For this benchmark we will use the Uniform Assignor +// and a group that has a homogeneous subscription model. +partitionAssignor = new UniformAssignor(); +subscriptionMetadata = generateMockSubscriptionMetadata(); +Map members = generateMockMembers(); +Map existingTargetAssignment = generateMockInitialTargetAssignment(); + +// Add a new member to trigger a rebalance. +Set subscribedTopics = new HashSet<>(subscriptionMetadata.keySet()); +String rackId = isRackAware ? "rack" + (memberCount - 1) % numberOfRacks : ""; Review Comment: It is defined with the Optional return value -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16583) Update from 3.4.0 to 3.7.0 image write failed in Kraft mode
[ https://issues.apache.org/jira/browse/KAFKA-16583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] HanXu updated KAFKA-16583: -- Description: How to reproduce: 1. Launch a 3.4.0 controller and a 3.4.0 broker(BrokerA) in Kraft mode; 2. Create a topic with 1 partition; 3. Launch a 3.4.0 broker(Broker B) in Kraft mode and reassign the step 2 partition to Broker B; 4. Upgrade Broker B to 3.7.0; The Broker B will keep log the following error: {code:java} [2024-04-18 14:46:54,144] ERROR Encountered metadata loading fault: Unhandled error initializing new publishers (org.apache.kafka.server.fault.LoggingFaultHandler) org.apache.kafka.image.writer.UnwritableMetadataException: Metadata has been lost because the following could not be represented in metadata version 3.4-IV0: the directory assignment state of one or more replicas at org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94) at org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391) at org.apache.kafka.image.TopicImage.write(TopicImage.java:71) at org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84) at org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155) at org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295) at org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266) at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) at java.base/java.lang.Thread.run(Thread.java:840) {code} Bug: - When reassigning partition, PartitionRegistration#merge will set the new replicas with UNASSIGNED directory; - But in metadata version 3.4.0 PartitionRegistration#toRecord only allows MIGRATING directory; {code:java} if (options.metadataVersion().isDirectoryAssignmentSupported()) { record.setDirectories(Uuid.toList(directories)); } else { for (Uuid directory : directories) { if (!DirectoryId.MIGRATING.equals(directory)) { options.handleLoss("the directory assignment state of one or more replicas"); break; } } } {code} Solution: - PartitionRegistration#toRecord allows both MIGRATING and UNASSIGNED was: How to reproduce: 1. Launch a 3.4.0 controller and a 3.4.0 broker(BrokerA) in Kraft mode; 2. Create a topic with 1 partition; 3. Launch a 3.4.0 broker(Broker B) in Kraft mode and reassign the step 2 partition to Broker B; 4. Upgrade Broker B to 3.7.0; The Broker B will keep log the following error: {code:java} [2024-04-18 14:46:54,144] ERROR Encountered metadata loading fault: Unhandled error initializing new publishers (org.apache.kafka.server.fault.LoggingFaultHandler) org.apache.kafka.image.writer.UnwritableMetadataException: Metadata has been lost because the following could not be represented in metadata version 3.4-IV0: the directory assignment state of one or more replicas at org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94) at org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391) at org.apache.kafka.image.TopicImage.write(TopicImage.java:71) at org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84) at org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155) at org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295) at org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266) at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) at java.base/java.lang.Thread.run(Thread.java:840) {code} Bug: - When reassigning partition, PartitionRegistration#merge will set the new replicas with UNASSIGNED directory; - But in metadata version 3.4.0 PartitionRegistration#toRecord only allows MIGRATING directory; {code:java} if (options.metadataVersion().isDirectoryAssignmentSupported()) { record.setDirectories(Uuid.toList(directories)); } else { for (Uuid directory : directories) { if (!DirectoryId.MIGRATING.equals(directory)) { options.handleLoss("the directory assignment state of one or more replicas"); break;
[jira] [Created] (KAFKA-16583) Update from 3.4.0 to 3.7.0 image write failed in Kraft mode
HanXu created KAFKA-16583: - Summary: Update from 3.4.0 to 3.7.0 image write failed in Kraft mode Key: KAFKA-16583 URL: https://issues.apache.org/jira/browse/KAFKA-16583 Project: Kafka Issue Type: Bug Components: kraft Affects Versions: 3.7.0 Reporter: HanXu Assignee: HanXu How to reproduce: 1. Launch a 3.4.0 controller and a 3.4.0 broker(BrokerA) in Kraft mode; 2. Create a topic with 1 partition; 3. Launch a 3.4.0 broker(Broker B) in Kraft mode and reassign the step 2 partition to Broker B; 4. Upgrade Broker B to 3.7.0; The Broker B will keep log the following error: {code:java} [2024-04-18 14:46:54,144] ERROR Encountered metadata loading fault: Unhandled error initializing new publishers (org.apache.kafka.server.fault.LoggingFaultHandler) org.apache.kafka.image.writer.UnwritableMetadataException: Metadata has been lost because the following could not be represented in metadata version 3.4-IV0: the directory assignment state of one or more replicas at org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94) at org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391) at org.apache.kafka.image.TopicImage.write(TopicImage.java:71) at org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84) at org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155) at org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295) at org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266) at org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) at org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) at java.base/java.lang.Thread.run(Thread.java:840) {code} Bug: - When reassigning partition, PartitionRegistration#merge will set the new replicas with UNASSIGNED directory; - But in metadata version 3.4.0 PartitionRegistration#toRecord only allows MIGRATING directory; {code:java} if (options.metadataVersion().isDirectoryAssignmentSupported()) { record.setDirectories(Uuid.toList(directories)); } else { for (Uuid directory : directories) { if (!DirectoryId.MIGRATING.equals(directory)) { options.handleLoss("the directory assignment state of one or more replicas"); break; } } } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15396:Add a metric indicating the version of the current running kafka server [kafka]
hudeqi closed pull request #14284: KAFKA-15396:Add a metric indicating the version of the current running kafka server URL: https://github.com/apache/kafka/pull/14284 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15134:Enrich the prompt reason in CommitFailedException [kafka]
hudeqi closed pull request #13930: KAFKA-15134:Enrich the prompt reason in CommitFailedException URL: https://github.com/apache/kafka/pull/13930 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception [kafka]
hudeqi closed pull request #13421: KAFKA-14824: ReplicaAlterLogDirsThread may cause serious disk growing in case of potential exception URL: https://github.com/apache/kafka/pull/13421 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15086:Set a reasonable segment size upper limit for MM2 internal topics [kafka]
hudeqi closed pull request #13852: KAFKA-15086:Set a reasonable segment size upper limit for MM2 internal topics URL: https://github.com/apache/kafka/pull/13852 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14906:Extract the coordinator service log from server log [kafka]
hudeqi closed pull request #13568: KAFKA-14906:Extract the coordinator service log from server log URL: https://github.com/apache/kafka/pull/13568 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector [kafka]
hudeqi closed pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector URL: https://github.com/apache/kafka/pull/13913 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org