Re: [VOTE] 3.7.1 RC1
Hi Edoardo, It is late, but not too late. I have cherry-picked your change to the 3.7 branch and I'll build a second release candidate. If you could have a look at the first RC, please let me know if you spot any issues with it that can be avoided in the next RC. Thanks, -- Igor
[jira] [Created] (KAFKA-16941) Flaky test - testDynamicBrokerConfigUpdateUsingKraft [1] Type=Raft-Combined, MetadataVersion=4.0-IV0,Security=PLAINTEXT – kafka.admin.ConfigCommandIntegrationTest
Igor Soarez created KAFKA-16941: --- Summary: Flaky test - testDynamicBrokerConfigUpdateUsingKraft [1] Type=Raft-Combined, MetadataVersion=4.0-IV0,Security=PLAINTEXT – kafka.admin.ConfigCommandIntegrationTest Key: KAFKA-16941 URL: https://issues.apache.org/jira/browse/KAFKA-16941 Project: Kafka Issue Type: Test Reporter: Igor Soarez https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-16077/4/tests/ {code:java} org.opentest4j.AssertionFailedError: Condition not met within timeout 5000. [listener.name.internal.ssl.keystore.location] are not updated ==> expected: but was: at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:367) at kafka.admin.ConfigCommandIntegrationTest.verifyConfigDefaultValue(ConfigCommandIntegrationTest.java:519) at kafka.admin.ConfigCommandIntegrationTest.deleteAndVerifyConfig(ConfigCommandIntegrationTest.java:514) at kafka.admin.ConfigCommandIntegrationTest.testDynamicBrokerConfigUpdateUsingKraft(ConfigCommandIntegrationTest.java:237) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16940) Flaky test "testNoConsumeWithDescribeAclViaSubscribe(String).quorum=kraft" – kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest
Igor Soarez created KAFKA-16940: --- Summary: Flaky test "testNoConsumeWithDescribeAclViaSubscribe(String).quorum=kraft" – kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest Key: KAFKA-16940 URL: https://issues.apache.org/jira/browse/KAFKA-16940 Project: Kafka Issue Type: Test Reporter: Igor Soarez In [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-16077/4/tests/] {code:java} java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-256 at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) at kafka.api.DelegationTokenEndToEndAuthorizationTest.createDelegationTokens(DelegationTokenEndToEndAuthorizationTest.scala:164) at kafka.api.DelegationTokenEndToEndAuthorizationTest.createDelegationTokens(DelegationTokenEndToEndAuthorizationTest.scala:156) at kafka.api.DelegationTokenEndToEndAuthorizationTest.configureSecurityAfterServersStart(DelegationTokenEndToEndAuthorizationTest.scala:99) at kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.configureSecurityAfterServersStart(DelegationTokenEndToEndAuthorizationWithOwnerTest.scala:77) at kafka.integration.KafkaServerTestHarness.setUp(KafkaServerTestHarness.scala:129) at kafka.api.IntegrationTestHarness.doSetup(IntegrationTestHarness.scala:141) at kafka.api.IntegrationTestHarness.setUp(IntegrationTestHarness.scala:121) at kafka.api.EndToEndAuthorizationTest.setUp(EndToEndAuthorizationTest.scala:167) at kafka.api.DelegationTokenEndToEndAuthorizationTest.setUp(DelegationTokenEndToEndAuthorizationTest.scala:134) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16934) Clean up and refactor release.py
Igor Soarez created KAFKA-16934: --- Summary: Clean up and refactor release.py Key: KAFKA-16934 URL: https://issues.apache.org/jira/browse/KAFKA-16934 Project: Kafka Issue Type: Improvement Reporter: Igor Soarez Assignee: Igor Soarez The current release script has a couple of issues: * It's a single long file with duplicated logic, which makes it difficult to understand and make changes * When a command fails, the user is forced to start from the beginning, expanding feedback loops. e.g. publishing step fails because the credentials were set incorrectly in ~/.gradle/gradle.properties -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [VOTE] 3.7.1 RC1
Now attaching the container build reports, which I seem to have forgotten to include before. -- Igor kafka/test:test (alpine 3.19.1) === Total: 0 (HIGH: 0, CRITICAL: 0)
[VOTE] 3.7.1 RC1
Hello Kafka users, developers and client-developers, This is the first candidate for release of Apache Kafka 3.7.1. This is a bugfix release with several fixes. Release notes for the 3.7.1 release: https://home.apache.org/~soarez/kafka-3.7.1-rc1/RELEASE_NOTES.html *** Please download, test and vote by Tuesday June 18th, 11am UTC. Kafka's KEYS file containing PGP keys we use to sign the release: https://kafka.apache.org/KEYS * Release artifacts to be voted upon (source and binary): https://home.apache.org/~soarez/kafka-3.7.1-rc1/ * Docker release artifact to be voted upon: apache/kafka:3.7.1-rc1 * Maven artifacts to be voted upon: https://repository.apache.org/content/groups/staging/org/apache/kafka/ * Javadoc: https://home.apache.org/~soarez/kafka-3.7.1-rc1/javadoc/ * Tag to be voted upon (off 3.7 branch) is the 3.7.1 tag: https://github.com/apache/kafka/releases/tag/3.7.1-rc1 * Documentation: https://kafka.apache.org/37/documentation.html * Protocol: https://kafka.apache.org/37/protocol.html * Successful Jenkins builds for the 3.7 branch: Unit/integration tests: https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.7/175/ The run shows some flaky tests, but I have confirmed they all pass locally. One of them — kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe, has an issue with the sequential run of the different test parameters, but when run independently, all the variants pass. System tests: I don't have access to the Confluent URL for system tests: https://jenkins.confluent.io/job/system-test-kafka/job/3.7 I am still working on finishing a full run using resources available to me. If anyone with access is able to check them, please reply with details. * Successful Docker Image Github Actions Pipeline for 3.7 branch: Docker Build Test Pipeline: https://github.com/apache/kafka/actions/runs/9455339546 /** Thanks, -- Igor Soarez
[jira] [Created] (KAFKA-16920) Flaky test testControlledShutdown() - kafka.server.BrokerLifecycleManagerTest
Igor Soarez created KAFKA-16920: --- Summary: Flaky test testControlledShutdown() - kafka.server.BrokerLifecycleManagerTest Key: KAFKA-16920 URL: https://issues.apache.org/jira/browse/KAFKA-16920 Project: Kafka Issue Type: Test Reporter: Igor Soarez Several tests failed in this pipeline because `testControlledShutdown` did not clean up properly: [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-16232/3/tests] Stack trace {code:java} org.opentest4j.AssertionFailedError: Found 1 unexpected threads during @BeforeAll: `controlled-shutdown-lifecycle-manager-event-handler` ==> expected: but was: at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) at kafka.utils.TestUtils$.verifyNoUnexpectedThreads(TestUtils.scala:1917) at kafka.server.QuorumTestHarness$.setUpClass(QuorumTestHarness.scala:489) at kafka.server.QuorumTestHarness.setUpClass(QuorumTestHarness.scala) {code} Note that the thread name {{controlled-shutdown-lifecycle-manager-event-handler}} originates from the {{BrokerLifecycleManager}} instance created in {{{}kafka.server.BrokerLifecycleManagerTest#testControlledShutdown{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16919) Flaky test testNoCheckpointsIfNoRecordsAreMirrored() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest
Igor Soarez created KAFKA-16919: --- Summary: Flaky test testNoCheckpointsIfNoRecordsAreMirrored() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest Key: KAFKA-16919 URL: https://issues.apache.org/jira/browse/KAFKA-16919 Project: Kafka Issue Type: Test Reporter: Igor Soarez Source [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-16251/1/tests] {code:java} java.lang.AssertionError: Workers of backup-connect-cluster did not start in time. at org.apache.kafka.connect.util.clusters.ConnectAssertions.assertAtLeastNumWorkersAreUp(ConnectAssertions.java:74) at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:230) at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:150) at org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.startClusters(MirrorConnectorsIntegrationSSLTest.java:64) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16583) Update from 3.4.0 to 3.7.0 image write failed in Kraft mode
[ https://issues.apache.org/jira/browse/KAFKA-16583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez resolved KAFKA-16583. - Resolution: Fixed > Update from 3.4.0 to 3.7.0 image write failed in Kraft mode > --- > > Key: KAFKA-16583 > URL: https://issues.apache.org/jira/browse/KAFKA-16583 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.7.0 >Reporter: HanXu >Assignee: HanXu >Priority: Blocker > Fix For: 3.8.0, 3.7.1 > > Original Estimate: 6h > Remaining Estimate: 6h > > How to reproduce: > 1. Launch a 3.4.0 controller and a 3.4.0 broker(BrokerA) in Kraft mode; > 2. Create a topic with 1 partition; > 3. Launch a 3.4.0 broker(Broker B) in Kraft mode and reassign the step 2 > partition to Broker B; > 4. Upgrade Broker B to 3.7.0; > The Broker B will keep log the following error: > {code:java} > [2024-04-18 14:46:54,144] ERROR Encountered metadata loading fault: Unhandled > error initializing new publishers > (org.apache.kafka.server.fault.LoggingFaultHandler) > org.apache.kafka.image.writer.UnwritableMetadataException: Metadata has been > lost because the following could not be represented in metadata version > 3.4-IV0: the directory assignment state of one or more replicas > at > org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94) > at > org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391) > at org.apache.kafka.image.TopicImage.write(TopicImage.java:71) > at org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84) > at org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155) > at > org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295) > at > org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266) > at > org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base/java.lang.Thread.run(Thread.java:840) > {code} > Bug: > - When reassigning partition, PartitionRegistration#merge will set the new > replicas with UNASSIGNED directory; > - But in metadata version 3.4.0 PartitionRegistration#toRecord only allows > MIGRATING directory; > {code:java} > if (options.metadataVersion().isDirectoryAssignmentSupported()) { > record.setDirectories(Uuid.toList(directories)); > } else { > for (Uuid directory : directories) { > if (!DirectoryId.MIGRATING.equals(directory)) { > options.handleLoss("the directory assignment state of one > or more replicas"); > break; > } > } > } > {code} > Solution: > - PartitionRegistration#toRecord allows both MIGRATING and UNASSIGNED -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-16886) KRaft partition reassignment failed after upgrade to 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez reopened KAFKA-16886: - > KRaft partition reassignment failed after upgrade to 3.7.0 > --- > > Key: KAFKA-16886 > URL: https://issues.apache.org/jira/browse/KAFKA-16886 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Luke Chen > Assignee: Igor Soarez >Priority: Blocker > Fix For: 3.8.0, 3.7.1 > > > Before upgrade, the topic image doesn't have dirID for the assignment. After > upgrade, the assignment has the dirID. So in the > {{{}ReplicaManager#applyDelta{}}}, we'll have have directoryId changes in > {{{}localChanges{}}}, which will invoke {{AssignmentEvent}} > [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2748]. > With that, we'll get the unexpected {{NOT_LEADER_OR_FOLLOWER}} error. > Reproduce steps: > # Launch a 3.6.0 controller and a 3.6.0 broker(BrokerA) in Kraft mode; > # Create a topic with 1 partition; > # Upgrade Broker A, B, Controllers to 3.7.0 > # Upgrade MV to 3.7: ./bin/kafka-features.sh --bootstrap-server > localhost:9092 upgrade --metadata 3.7 > # reassign the step 2 partition to Broker B > > The logs in broker B: > {code:java} > [2024-05-31 15:33:25,763] INFO [ReplicaFetcherManager on broker 2] Removed > fetcher for partitions Set(t1-0) (kafka.server.ReplicaFetcherManager) > [2024-05-31 15:33:25,837] INFO [ReplicaFetcherManager on broker 2] Removed > fetcher for partitions Set(t1-0) (kafka.server.ReplicaFetcherManager) > [2024-05-31 15:33:25,837] INFO [ReplicaAlterLogDirsManager on broker 2] > Removed fetcher for partitions Set(t1-0) > (kafka.server.ReplicaAlterLogDirsManager) > [2024-05-31 15:33:25,853] INFO Log for partition t1-0 is renamed to > /tmp/kraft-broker-logs/t1-0.3e6d8bebc1c04f3186ad6cf63145b6fd-delete and is > scheduled for deletion (kafka.log.LogManager) > [2024-05-31 15:33:26,279] ERROR Controller returned error > NOT_LEADER_OR_FOLLOWER for assignment of partition > PartitionData(partitionIndex=0, errorCode=6) into directory > oULBCf49aiRXaWJpO3I-GA (org.apache.kafka.server.AssignmentsManager) > [2024-05-31 15:33:26,280] WARN Re-queueing assignments: > [Assignment\{timestampNs=26022187148625, partition=t1:0, > dir=/tmp/kraft-broker-logs, reason='Applying metadata delta'}] > (org.apache.kafka.server.AssignmentsManager) > [2024-05-31 15:33:26,786] ERROR Controller returned error > NOT_LEADER_OR_FOLLOWER for assignment of partition > PartitionData(partitionIndex=0, errorCode=6) into directory > oULBCf49aiRXaWJpO3I-GA (org.apache.kafka.server.AssignmentsManager) > [2024-05-31 15:33:27,296] WARN Re-queueing assignments: > [Assignment\{timestampNs=26022187148625, partition=t1:0, > dir=/tmp/kraft-broker-logs, reason='Applying metadata delta'}] > (org.apache.kafka.server.AssignmentsManager) > ...{{}} > {code} > > Logs in controller: > {code:java} > [2024-05-31 15:33:25,727] INFO [QuorumController id=1] Successfully altered 1 > out of 1 partition reassignment(s). > (org.apache.kafka.controller.ReplicationControlManager) > [2024-05-31 15:33:25,727] INFO [QuorumController id=1] Replayed partition > assignment change PartitionChangeRecord(partitionId=0, > topicId=tMiJOQznTLKtOZ8rLqdgqw, isr=null, leader=-2, replicas=[6, 2], > removingReplicas=[2], addingReplicas=[6], leaderRecoveryState=-1, > directories=[RuDIAGGJrTG2NU6tEOkbHw, AA], > eligibleLeaderReplicas=null, lastKnownElr=null) for topic t1 > (org.apache.kafka.controller.ReplicationControlManager) > [2024-05-31 15:33:25,802] INFO [QuorumController id=1] AlterPartition request > from node 2 for t1-0 completed the ongoing partition reassignment and > triggered a leadership change. Returning NEW_LEADER_ELECTED. > (org.apache.kafka.controller.ReplicationControlManager) > [2024-05-31 15:33:25,802] INFO [QuorumController id=1] UNCLEAN partition > change for t1-0 with topic ID tMiJOQznTLKtOZ8rLqdgqw: replicas: [6, 2] -> > [6], directories: [RuDIAGGJrTG2NU6tEOkbHw, AA] -> > [RuDIAGGJrTG2NU6tEOkbHw], isr: [2] -> [6], removingReplicas: [2] -> [], > addingReplicas: [6] -> [], leader: 2 -> 6, leaderEpoch: 3 -> 4, > partitionEpoch: 5 -> 6 (org.apache.kafka.controller.ReplicationControlManager) > [2024-05-31 15:33:25,802] INFO [QuorumController id=1] Replayed partition > assignment change PartitionChangeRecord(partitionId=0, > topicId=tMiJOQznTLKtOZ8rLqdgqw, isr=[6], leade
[jira] [Resolved] (KAFKA-16886) KRaft partition reassignment failed after upgrade to 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez resolved KAFKA-16886. - Resolution: Fixed > KRaft partition reassignment failed after upgrade to 3.7.0 > --- > > Key: KAFKA-16886 > URL: https://issues.apache.org/jira/browse/KAFKA-16886 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Luke Chen > Assignee: Igor Soarez >Priority: Blocker > Fix For: 3.8.0, 3.7.1 > > > Before upgrade, the topic image doesn't have dirID for the assignment. After > upgrade, the assignment has the dirID. So in the > {{{}ReplicaManager#applyDelta{}}}, we'll have have directoryId changes in > {{{}localChanges{}}}, which will invoke {{AssignmentEvent}} > [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2748]. > With that, we'll get the unexpected {{NOT_LEADER_OR_FOLLOWER}} error. > Reproduce steps: > # Launch a 3.6.0 controller and a 3.6.0 broker(BrokerA) in Kraft mode; > # Create a topic with 1 partition; > # Upgrade Broker A, B, Controllers to 3.7.0 > # Upgrade MV to 3.7: ./bin/kafka-features.sh --bootstrap-server > localhost:9092 upgrade --metadata 3.7 > # reassign the step 2 partition to Broker B > > The logs in broker B: > {code:java} > [2024-05-31 15:33:25,763] INFO [ReplicaFetcherManager on broker 2] Removed > fetcher for partitions Set(t1-0) (kafka.server.ReplicaFetcherManager) > [2024-05-31 15:33:25,837] INFO [ReplicaFetcherManager on broker 2] Removed > fetcher for partitions Set(t1-0) (kafka.server.ReplicaFetcherManager) > [2024-05-31 15:33:25,837] INFO [ReplicaAlterLogDirsManager on broker 2] > Removed fetcher for partitions Set(t1-0) > (kafka.server.ReplicaAlterLogDirsManager) > [2024-05-31 15:33:25,853] INFO Log for partition t1-0 is renamed to > /tmp/kraft-broker-logs/t1-0.3e6d8bebc1c04f3186ad6cf63145b6fd-delete and is > scheduled for deletion (kafka.log.LogManager) > [2024-05-31 15:33:26,279] ERROR Controller returned error > NOT_LEADER_OR_FOLLOWER for assignment of partition > PartitionData(partitionIndex=0, errorCode=6) into directory > oULBCf49aiRXaWJpO3I-GA (org.apache.kafka.server.AssignmentsManager) > [2024-05-31 15:33:26,280] WARN Re-queueing assignments: > [Assignment\{timestampNs=26022187148625, partition=t1:0, > dir=/tmp/kraft-broker-logs, reason='Applying metadata delta'}] > (org.apache.kafka.server.AssignmentsManager) > [2024-05-31 15:33:26,786] ERROR Controller returned error > NOT_LEADER_OR_FOLLOWER for assignment of partition > PartitionData(partitionIndex=0, errorCode=6) into directory > oULBCf49aiRXaWJpO3I-GA (org.apache.kafka.server.AssignmentsManager) > [2024-05-31 15:33:27,296] WARN Re-queueing assignments: > [Assignment\{timestampNs=26022187148625, partition=t1:0, > dir=/tmp/kraft-broker-logs, reason='Applying metadata delta'}] > (org.apache.kafka.server.AssignmentsManager) > ...{{}} > {code} > > Logs in controller: > {code:java} > [2024-05-31 15:33:25,727] INFO [QuorumController id=1] Successfully altered 1 > out of 1 partition reassignment(s). > (org.apache.kafka.controller.ReplicationControlManager) > [2024-05-31 15:33:25,727] INFO [QuorumController id=1] Replayed partition > assignment change PartitionChangeRecord(partitionId=0, > topicId=tMiJOQznTLKtOZ8rLqdgqw, isr=null, leader=-2, replicas=[6, 2], > removingReplicas=[2], addingReplicas=[6], leaderRecoveryState=-1, > directories=[RuDIAGGJrTG2NU6tEOkbHw, AA], > eligibleLeaderReplicas=null, lastKnownElr=null) for topic t1 > (org.apache.kafka.controller.ReplicationControlManager) > [2024-05-31 15:33:25,802] INFO [QuorumController id=1] AlterPartition request > from node 2 for t1-0 completed the ongoing partition reassignment and > triggered a leadership change. Returning NEW_LEADER_ELECTED. > (org.apache.kafka.controller.ReplicationControlManager) > [2024-05-31 15:33:25,802] INFO [QuorumController id=1] UNCLEAN partition > change for t1-0 with topic ID tMiJOQznTLKtOZ8rLqdgqw: replicas: [6, 2] -> > [6], directories: [RuDIAGGJrTG2NU6tEOkbHw, AA] -> > [RuDIAGGJrTG2NU6tEOkbHw], isr: [2] -> [6], removingReplicas: [2] -> [], > addingReplicas: [6] -> [], leader: 2 -> 6, leaderEpoch: 3 -> 4, > partitionEpoch: 5 -> 6 (org.apache.kafka.controller.ReplicationControlManager) > [2024-05-31 15:33:25,802] INFO [QuorumController id=1] Replayed partition > assignment change PartitionChangeRecord(partitionId=0, > topicId=tMiJOQznTLKtOZ8rLqd
Re: [DISCUSS] Apache Kafka 3.7.1 release
Hi Justine, I'm sorry this release is delayed. A few new blockers have come up and we're working through them. Here's the release plan: https://cwiki.apache.org/confluence/display/KAFKA/Release+plan+3.7.1 Best, -- Igor
[jira] [Resolved] (KAFKA-16757) Fix broker re-registration issues around MV 3.7-IV2
[ https://issues.apache.org/jira/browse/KAFKA-16757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez resolved KAFKA-16757. - Resolution: Fixed > Fix broker re-registration issues around MV 3.7-IV2 > --- > > Key: KAFKA-16757 > URL: https://issues.apache.org/jira/browse/KAFKA-16757 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Fix For: 3.7.1, 3.8 > > > When upgrading from a MetadataVersion older than 3.7-IV2, we need to resend > the broker registration, so that the controller can record the storage > directories. The current code for doing this has several problems, however. > One is that it tends to trigger even in cases where we don't actually need > it. Another is that when re-registering the broker, the broker is marked as > fenced. > This PR moves the handling of the re-registration case out of > BrokerMetadataPublisher and into BrokerRegistrationTracker. The > re-registration code there will only trigger in the case where the broker > sees an existing registration for itself with no directories set. This is > much more targetted than the original code. > Additionally, in ClusterControlManager, when re-registering the same broker, > we now preserve its fencing and shutdown state, rather than clearing those. > (There isn't any good reason re-registering the same broker should clear > these things... this was purely an oversight.) Note that we can tell the > broker is "the same" because it has the same IncarnationId. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] KIP-1044: A proposal to change idempotent producer -- server implementation
Hi Claude, MappedByteBuffer is the fastest, but allows for this global pause. RandomAccessFile or FileChannel (without .map) will be slower, but involve a syscall for IO. Because threads get marked safe before entering a syscall, any delays in the operation affect just that thread, not the whole JVM. So either of these are preferable. Best, -- Igor
Re: [DISCUSS] KIP-1044: A proposal to change idempotent producer -- server implementation
Hi Claude, Thanks for writing this KIP. This issue seems particularly thorny, and I appreciate everyone's effort to address this. I want to share my concern with the KIP's proposal of the use of memory mapped files – mmap is Java's achilles heel, Kafka should make less use of it, not more. The JVM often needs to stop all application threads (aka mutator threads) before some operations, such as GC, optimizations, redefinitions, internal cleanups and various other internal reasons. This is known as Safepointing. Because the JVM cannot forcefully stop threads, it must instead wait for each thread to observe the Safepointing request, mark themselves as safe and stop. A single delayed thread can leave the whole JVM hanging, waiting. Reads and writes to memory mapped files can trigger system interrupts, which can block on IO for prolonged amounts of time. One particualrly bad example is hitting the page cage dirty ratio, and having to flush all of the page cage, in a potentially large (high RAM) system into a potentially slow filesystem. I have seen pauses as extreme as 1 minute, and others have reported There are other public reports on this. [1][2] Safepointing in the JVM is designed with mechanisms to prevent having to wait for a single busy thread: Threads mark themselves as safe before waiting on locks, before system calls, before doing JNI, etc, and upon returning they check if a Safepoint is ongoing. So if a read or write syscall takes a bit longer that's fine, the JVM won't halt for Safepointing, it will proceed knowing that any thread stuck on a syscall will stop if necessary when it returns. But there's no protection against long system interrups. >From the JVM's perspective the use of mmap is just a simple memory access, so there's no Safepointing protection around that. The kernel does know nor care for Java's Safepointing, and does not treat halting a single unsuspecting thread for a longer period of time with the severity that it may imply during a JVM Safepoint. So for this reason, I urge you to consider alternatives to the use of memory mapped files. Best, -- Igor https://groups.google.com/g/mechanical-sympathy/c/LFrJPhyVOJ4 https://groups.google.com/g/mechanical-sympathy/c/tepoA7PRFRU/m/7HbSINaFBgAJ
[jira] [Resolved] (KAFKA-16645) CVEs in 3.7.0 docker image
[ https://issues.apache.org/jira/browse/KAFKA-16645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez resolved KAFKA-16645. - Resolution: Resolved > CVEs in 3.7.0 docker image > -- > > Key: KAFKA-16645 > URL: https://issues.apache.org/jira/browse/KAFKA-16645 > Project: Kafka > Issue Type: Task >Affects Versions: 3.7.0 >Reporter: Mickael Maison > Assignee: Igor Soarez >Priority: Blocker > Fix For: 3.8.0, 3.7.1 > > > Our [Docker Image CVE > Scanner|https://github.com/apache/kafka/actions/runs/874393] GitHub > action reports 2 high CVEs in our base image: > apache/kafka:3.7.0 (alpine 3.19.1) > == > Total: 2 (HIGH: 2, CRITICAL: 0) > ┌──┬┬──┬┬───┬───┬─┐ > │ Library │ Vulnerability │ Severity │ Status │ Installed Version │ Fixed > Version │Title│ > ├──┼┼──┼┼───┼───┼─┤ > │ libexpat │ CVE-2023-52425 │ HIGH │ fixed │ 2.5.0-r2 │ > 2.6.0-r0 │ expat: parsing large tokens can trigger a denial of service │ > │ ││ ││ │ > │ https://avd.aquasec.com/nvd/cve-2023-52425 │ > │ ├┤ ││ > ├───┼─┤ > │ │ CVE-2024-28757 │ ││ │ > 2.6.2-r0 │ expat: XML Entity Expansion │ > │ ││ ││ │ > │ https://avd.aquasec.com/nvd/cve-2024-28757 │ > └──┴┴──┴┴───┴───┴─┘ > Looking at the > [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka#KIP975:DockerImageforApacheKafka-WhatifweobserveabugoracriticalCVEinthereleasedApacheKafkaDockerImage?] > that introduced the docker images, it seems we should release a bugfix when > high CVEs are detected. It would be good to investigate and assess whether > Kafka is impacted or not. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-16645) CVEs in 3.7.0 docker image
[ https://issues.apache.org/jira/browse/KAFKA-16645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez reopened KAFKA-16645: - Need to re-open to change the resolution, release_notes.py doesn't like the one I picked > CVEs in 3.7.0 docker image > -- > > Key: KAFKA-16645 > URL: https://issues.apache.org/jira/browse/KAFKA-16645 > Project: Kafka > Issue Type: Task >Affects Versions: 3.7.0 >Reporter: Mickael Maison > Assignee: Igor Soarez >Priority: Blocker > Fix For: 3.8.0, 3.7.1 > > > Our [Docker Image CVE > Scanner|https://github.com/apache/kafka/actions/runs/874393] GitHub > action reports 2 high CVEs in our base image: > apache/kafka:3.7.0 (alpine 3.19.1) > == > Total: 2 (HIGH: 2, CRITICAL: 0) > ┌──┬┬──┬┬───┬───┬─┐ > │ Library │ Vulnerability │ Severity │ Status │ Installed Version │ Fixed > Version │Title│ > ├──┼┼──┼┼───┼───┼─┤ > │ libexpat │ CVE-2023-52425 │ HIGH │ fixed │ 2.5.0-r2 │ > 2.6.0-r0 │ expat: parsing large tokens can trigger a denial of service │ > │ ││ ││ │ > │ https://avd.aquasec.com/nvd/cve-2023-52425 │ > │ ├┤ ││ > ├───┼─┤ > │ │ CVE-2024-28757 │ ││ │ > 2.6.2-r0 │ expat: XML Entity Expansion │ > │ ││ ││ │ > │ https://avd.aquasec.com/nvd/cve-2024-28757 │ > └──┴┴──┴┴───┴───┴─┘ > Looking at the > [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka#KIP975:DockerImageforApacheKafka-WhatifweobserveabugoracriticalCVEinthereleasedApacheKafkaDockerImage?] > that introduced the docker images, it seems we should release a bugfix when > high CVEs are detected. It would be good to investigate and assess whether > Kafka is impacted or not. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez reopened KAFKA-16692: - Re-opening as 3.6 backport is still missing > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.7.0, 3.6.1, 3.8 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > Fix For: 3.7.1, 3.8 > > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, these requests are still sent to other brokers in our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping the _latestUsableVersion_ check. I > am wondering if it is possible that because _discoverBrokerVersions_ is set > to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it > skips fetching {_}NodeApiVersions{_}? I can see that we create the network > client here: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641] > The _NetworkUtils.buildNetworkClient_ method seems to create
[jira] [Resolved] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez resolved KAFKA-16692. - Resolution: Fixed > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.7.0, 3.6.1, 3.8 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > Fix For: 3.7.1, 3.8 > > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, these requests are still sent to other brokers in our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping the _latestUsableVersion_ check. I > am wondering if it is possible that because _discoverBrokerVersions_ is set > to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it > skips fetching {_}NodeApiVersions{_}? I can see that we create the network > client here: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641] > The _NetworkUtils.buildNetworkClient_ method seems to create a network client > that has _discoverBrokerVersions_ set to {_}false{_}. > I was hoping I could get some assistance debugging this issue. Happy to > provide any additional information needed. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16645) CVEs in 3.7.0 docker image
[ https://issues.apache.org/jira/browse/KAFKA-16645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez resolved KAFKA-16645. - Assignee: Igor Soarez Resolution: Won't Fix The vulnerability has already been addressed in the base image, under the same image tag, so the next published Kafka images will not contain ship the vulnerability. We do not republish previous releases, so we're not taking any action here. > CVEs in 3.7.0 docker image > -- > > Key: KAFKA-16645 > URL: https://issues.apache.org/jira/browse/KAFKA-16645 > Project: Kafka > Issue Type: Task >Affects Versions: 3.7.0 >Reporter: Mickael Maison > Assignee: Igor Soarez >Priority: Blocker > Fix For: 3.8.0, 3.7.1 > > > Our [Docker Image CVE > Scanner|https://github.com/apache/kafka/actions/runs/874393] GitHub > action reports 2 high CVEs in our base image: > apache/kafka:3.7.0 (alpine 3.19.1) > == > Total: 2 (HIGH: 2, CRITICAL: 0) > ┌──┬┬──┬┬───┬───┬─┐ > │ Library │ Vulnerability │ Severity │ Status │ Installed Version │ Fixed > Version │Title│ > ├──┼┼──┼┼───┼───┼─┤ > │ libexpat │ CVE-2023-52425 │ HIGH │ fixed │ 2.5.0-r2 │ > 2.6.0-r0 │ expat: parsing large tokens can trigger a denial of service │ > │ ││ ││ │ > │ https://avd.aquasec.com/nvd/cve-2023-52425 │ > │ ├┤ ││ > ├───┼─┤ > │ │ CVE-2024-28757 │ ││ │ > 2.6.2-r0 │ expat: XML Entity Expansion │ > │ ││ ││ │ > │ https://avd.aquasec.com/nvd/cve-2024-28757 │ > └──┴┴──┴┴───┴───┴─┘ > Looking at the > [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka#KIP975:DockerImageforApacheKafka-WhatifweobserveabugoracriticalCVEinthereleasedApacheKafkaDockerImage?] > that introduced the docker images, it seems we should release a bugfix when > high CVEs are detected. It would be good to investigate and assess whether > Kafka is impacted or not. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16688) SystemTimer leaks resources on close
[ https://issues.apache.org/jira/browse/KAFKA-16688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez resolved KAFKA-16688. - Resolution: Fixed > SystemTimer leaks resources on close > > > Key: KAFKA-16688 > URL: https://issues.apache.org/jira/browse/KAFKA-16688 > Project: Kafka > Issue Type: Test >Affects Versions: 3.8.0 >Reporter: Gaurav Narula >Assignee: Gaurav Narula >Priority: Major > > We observe some thread leaks with thread name {{executor-client-metrics}}. > This may happen because {{SystemTimer}} doesn't attempt to shutdown its > executor service properly. > Refer: > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15885/1/tests > and tests with {{initializationError}} in them for stacktrace -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16624) Don't generate useless PartitionChangeRecord on older MV
[ https://issues.apache.org/jira/browse/KAFKA-16624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez resolved KAFKA-16624. - Resolution: Fixed > Don't generate useless PartitionChangeRecord on older MV > > > Key: KAFKA-16624 > URL: https://issues.apache.org/jira/browse/KAFKA-16624 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Minor > > Fix a case where we could generate useless PartitionChangeRecords on metadata > versions older than 3.6-IV0. This could happen in the case where we had an > ISR with only one broker in it, and we were trying to go down to a fully > empty ISR. In this case, PartitionChangeBuilder would block the record to > going down to a fully empty ISR (since that is not valid in these pre-KIP-966 > metadata versions), but it would still emit the record, even though it had no > effect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] KIP-936 Throttle number of active PIDs
Hi Omnia, Hi Claude, Thanks for putting this KIP together. This is an important unresolved issue in Kafka, which I have witnessed several times in production. Please see my questions below: 10 Given the goal is to prevent OOMs, do we also need to limit the number of KafkaPrincipals in use? 11. How would an operator know or decide to change the configuration for the number layers – producer.id.quota.cache.layer.count – e.g. increasing from 4 to 5; and why? Do we need a new metric to indicate that change could be useful? 12. Is producer.id.quota.cache.cleanup.scheduler.interval.ms a guaranteed interval, or rather simply a delay between cleanups? How did you decide on the default value of 10ms? 13. Under "New ProducerIdQuotaManagerCache", the documentation for the constructor params for ProducerIDQuotaManagerCache does not match the constructor signature. 14. Under "New ProducerIdQuotaManagerCache": public boolean track(KafkaPrincipal principal, int producerIdRate, long pid) How is producerIdRate used? The reference implementation Claude shared does not use it. https://github.com/Claudenw/kafka/blob/49b6eb0fb5cfaf19b072fd87986072a683ab976c/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java 15. I could not find a description or definition for TimestampedBloomFilter, could we add that to the KIP? 16. LayeredBloomFilter will have a fixed size (right?), but some users (KafkaPrincipal) might only use a small number of PIDs. It it worth having a dual strategy, where we simply keep a Set of PIDs until we reach certain size where it pays off to use the LayeredBloomFilter? 17. Under "Rejected Alternatives" > "4. Throttle INIT_PRODUCER_ID requests", the KIP states: a. INIT_PRODUCER_ID for idempotent producer request PIDs from random controller every time so if a client got throttled on one controller doesn't guarantee it will not go through on next controller causing OOM at the leader later. Is the INIT_PRODUCER_ID request really sent to a "random controller"? >From a quick look at Sender.maybeSendAndPollTransactionalRequest, for an idempotent producer, targetNode is set to the broker with fewest outstanding requests. Am I looking at the wrong place? 18. Under "Rejected Alternatives" > "4. Throttle INIT_PRODUCER_ID requests", the KIP states: This solution might look simple however throttling the INIT_PRODUCER_ID doesn't guarantee the OOM wouldn't happened as (...) b. The problem happened on the activation of the PID when it produce and not at the initialisation. Which means Kafka wouldn't have OOM problem if the producer got assigned PID but crashed before producing anything. Point b. does not seem to support the claim above? 19. Under "Rejected Alternatives" > "4. Throttle INIT_PRODUCER_ID requests", the KIP states: c. Throttling producers that crash between initialisation and producing could slow them down when they recover/fix the problem that caused them to crash right after initialising PID. Doesn't it depend on the back-off time or how quotas are enforced? I’m not sure this would necessarily be a problem? 20. If the allocation of PIDs for idempotent producers was centralized, or otherwise the the targetNode for that request was predictable, would that make throttling INIT_PRODUCER_ID a viable solution? Best, -- Igor
[jira] [Created] (KAFKA-16636) Flaky test - testStickyTaskAssignorLargePartitionCount – org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest
Igor Soarez created KAFKA-16636: --- Summary: Flaky test - testStickyTaskAssignorLargePartitionCount – org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest Key: KAFKA-16636 URL: https://issues.apache.org/jira/browse/KAFKA-16636 Project: Kafka Issue Type: Test Reporter: Igor Soarez Attachments: log (1).txt testStickyTaskAssignorLargePartitionCount – org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest {code:java} java.lang.AssertionError: The first assignment took too long to complete at 131680ms. at org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest.completeLargeAssignment(StreamsAssignmentScaleTest.java:220) at org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest.testStickyTaskAssignorLargePartitionCount(StreamsAssignmentScaleTest.java:102) {code} [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15816/1/tests/] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16635) Flaky test "shouldThrottleOldSegments(String).quorum=kraft" – kafka.server.ReplicationQuotasTest
Igor Soarez created KAFKA-16635: --- Summary: Flaky test "shouldThrottleOldSegments(String).quorum=kraft" – kafka.server.ReplicationQuotasTest Key: KAFKA-16635 URL: https://issues.apache.org/jira/browse/KAFKA-16635 Project: Kafka Issue Type: Test Reporter: Igor Soarez "shouldThrottleOldSegments(String).quorum=kraft" – kafka.server.ReplicationQuotasTest {code:java} org.opentest4j.AssertionFailedError: Throttled replication of 2203ms should be > 3600.0ms ==> expected: but was: at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)at app//kafka.server.ReplicationQuotasTest.shouldThrottleOldSegments(ReplicationQuotasTest.scala:260) {code} https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15816/1/tests/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16634) Flaky test - testFenceMultipleBrokers() – org.apache.kafka.controller.QuorumControllerTest
Igor Soarez created KAFKA-16634: --- Summary: Flaky test - testFenceMultipleBrokers() – org.apache.kafka.controller.QuorumControllerTest Key: KAFKA-16634 URL: https://issues.apache.org/jira/browse/KAFKA-16634 Project: Kafka Issue Type: Test Reporter: Igor Soarez Attachments: output.txt testFenceMultipleBrokers() – org.apache.kafka.controller.QuorumControllerTest Error: {code:java} java.util.concurrent.TimeoutException: testFenceMultipleBrokers() timed out after 40 seconds {code} Test logs in attached output.txt https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15816/1/tests/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16633) Flaky test - testDescribeExistingGroupWithNoMembers(String, String).quorum=kraft+kip848.groupProtocol=consumer – org.apache.kafka.tools.consumer.group.DescribeConsumerGr
Igor Soarez created KAFKA-16633: --- Summary: Flaky test - testDescribeExistingGroupWithNoMembers(String, String).quorum=kraft+kip848.groupProtocol=consumer – org.apache.kafka.tools.consumer.group.DescribeConsumerGroupTest Key: KAFKA-16633 URL: https://issues.apache.org/jira/browse/KAFKA-16633 Project: Kafka Issue Type: Test Reporter: Igor Soarez testDescribeExistingGroupWithNoMembers(String, String).quorum=kraft+kip848.groupProtocol=consumer – org.apache.kafka.tools.consumer.group.DescribeConsumerGroupTest {code:java} org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. Expected no active member in describe group results with describe type --offsets ==> expected: but was: at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)at app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:396) at app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:393) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:350) at app//org.apache.kafka.tools.consumer.group.DescribeConsumerGroupTest.testDescribeExistingGroupWithNoMembers(DescribeConsumerGroupTest.java:430) {code} https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15816/1/tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16632) Flaky test testDeleteOffsetsOfStableConsumerGroupWithTopicPartition [1] Type=Raft-Isolated, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.consumer
Igor Soarez created KAFKA-16632: --- Summary: Flaky test testDeleteOffsetsOfStableConsumerGroupWithTopicPartition [1] Type=Raft-Isolated, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.consumer.group.DeleteOffsetsConsumerGroupCommandIntegrationTest Key: KAFKA-16632 URL: https://issues.apache.org/jira/browse/KAFKA-16632 Project: Kafka Issue Type: Test Reporter: Igor Soarez testDeleteOffsetsOfStableConsumerGroupWithTopicPartition [1] Type=Raft-Isolated, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.consumer.group.DeleteOffsetsConsumerGroupCommandIntegrationTest {code:java} org.opentest4j.AssertionFailedError: expected: not equal but was: <0> at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:152) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertNotEquals.failEqual(AssertNotEquals.java:277) at app//org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:94) at app//org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:86) at app//org.junit.jupiter.api.Assertions.assertNotEquals(Assertions.java:1981) at app//org.apache.kafka.tools.consumer.group.DeleteOffsetsConsumerGroupCommandIntegrationTest.withConsumerGroup(DeleteOffsetsConsumerGroupCommandIntegrationTest.java:213) at app//org.apache.kafka.tools.consumer.group.DeleteOffsetsConsumerGroupCommandIntegrationTest.testWithConsumerGroup(DeleteOffsetsConsumerGroupCommandIntegrationTest.java:179) at app//org.apache.kafka.tools.consumer.group.DeleteOffsetsConsumerGroupCommandIntegrationTest.testDeleteOffsetsOfStableConsumerGroupWithTopicPartition(DeleteOffsetsConsumerGroupCommandIntegrationTest.java:96) {code} https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15816/1/tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16631) Flaky test - testDeleteOffsetsOfStableConsumerGroupWithTopicOnly [1] Type=Raft-Isolated, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.consumer.gr
Igor Soarez created KAFKA-16631: --- Summary: Flaky test - testDeleteOffsetsOfStableConsumerGroupWithTopicOnly [1] Type=Raft-Isolated, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.consumer.group.DeleteOffsetsConsumerGroupCommandIntegrationTest Key: KAFKA-16631 URL: https://issues.apache.org/jira/browse/KAFKA-16631 Project: Kafka Issue Type: Test Reporter: Igor Soarez testDeleteOffsetsOfStableConsumerGroupWithTopicOnly [1] Type=Raft-Isolated, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.consumer.group.DeleteOffsetsConsumerGroupCommandIntegrationTest {code:java} org.opentest4j.AssertionFailedError: expected: not equal but was: <0> at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:152) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertNotEquals.failEqual(AssertNotEquals.java:277) at app//org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:94) at app//org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:86) at app//org.junit.jupiter.api.Assertions.assertNotEquals(Assertions.java:1981) at app//org.apache.kafka.tools.consumer.group.DeleteOffsetsConsumerGroupCommandIntegrationTest.withConsumerGroup(DeleteOffsetsConsumerGroupCommandIntegrationTest.java:213) at app//org.apache.kafka.tools.consumer.group.DeleteOffsetsConsumerGroupCommandIntegrationTest.testWithConsumerGroup(DeleteOffsetsConsumerGroupCommandIntegrationTest.java:179) at app//org.apache.kafka.tools.consumer.group.DeleteOffsetsConsumerGroupCommandIntegrationTest.testDeleteOffsetsOfStableConsumerGroupWithTopicOnly(DeleteOffsetsConsumerGroupCommandIntegrationTest.java:105) {code} https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15816/1/tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16630) Flaky test "testPollReturnsRecords(GroupProtocol).groupProtocol=CLASSIC" – org.apache.kafka.clients.consumer.KafkaConsumerTest
Igor Soarez created KAFKA-16630: --- Summary: Flaky test "testPollReturnsRecords(GroupProtocol).groupProtocol=CLASSIC" – org.apache.kafka.clients.consumer.KafkaConsumerTest Key: KAFKA-16630 URL: https://issues.apache.org/jira/browse/KAFKA-16630 Project: Kafka Issue Type: Test Reporter: Igor Soarez "testPollReturnsRecords(GroupProtocol).groupProtocol=CLASSIC" – org.apache.kafka.clients.consumer.KafkaConsumerTest {code:java} org.opentest4j.AssertionFailedError: expected: <0> but was: <5> at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150) at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145) at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:531) at app//org.apache.kafka.clients.consumer.KafkaConsumerTest.testPollReturnsRecords(KafkaConsumerTest.java:289) {code} [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15816/1/tests] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [ANNOUNCE] New committer: Igor Soarez
Thanks everyone, I'm very honoured to join! -- Igor
[DISCUSS] Apache Kafka 3.7.1 release
Hi everyone, I'd like to volunteer to be the release manager for a 3.7.1 release. Please keep in mind, this would be my first release, so I might have some questions, and it might also take me a bit longer to work through the release process. So I'm thinking a good target would be toward the end of May. Please let me know your thoughts and if there are any objections or concerns. Thanks, -- Igor
[jira] [Resolved] (KAFKA-16610) Replace "Map#entrySet#forEach" by "Map#forEach"
[ https://issues.apache.org/jira/browse/KAFKA-16610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez resolved KAFKA-16610. - Resolution: Resolved > Replace "Map#entrySet#forEach" by "Map#forEach" > --- > > Key: KAFKA-16610 > URL: https://issues.apache.org/jira/browse/KAFKA-16610 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Minor > Fix For: 3.8.0 > > > {quote} > Targets > Occurrences of 'entrySet().forEach' in Project > Found occurrences in Project (16 usages found) > Unclassified (16 usages found) > kafka.core.main (9 usages found) > kafka.server (4 usages found) > ControllerApis.scala (2 usages found) > ControllerApis (2 usages found) > handleIncrementalAlterConfigs (1 usage found) > 774 controllerResults.entrySet().forEach(entry => > response.responses().add( > handleLegacyAlterConfigs (1 usage found) > 533 controllerResults.entrySet().forEach(entry => > response.responses().add( > ControllerConfigurationValidator.scala (2 usages found) > ControllerConfigurationValidator (2 usages found) > validate (2 usages found) > 99 config.entrySet().forEach(e => { > 114 config.entrySet().forEach(e => > properties.setProperty(e.getKey, e.getValue)) > kafka.server.metadata (5 usages found) > AclPublisher.scala (1 usage found) > AclPublisher (1 usage found) > onMetadataUpdate (1 usage found) > 73 aclsDelta.changes().entrySet().forEach(e => > ClientQuotaMetadataManager.scala (3 usages found) > ClientQuotaMetadataManager (3 usages found) > handleIpQuota (1 usage found) > 119 quotaDelta.changes().entrySet().forEach { e => > update (2 usages found) > 54 quotasDelta.changes().entrySet().forEach { e => > 99 quotaDelta.changes().entrySet().forEach { e => > KRaftMetadataCache.scala (1 usage found) > KRaftMetadataCache (1 usage found) > getClusterMetadata (1 usage found) > 491 topic.partitions().entrySet().forEach { entry > => > kafka.core.test (1 usage found) > unit.kafka.integration (1 usage found) > KafkaServerTestHarness.scala (1 usage found) > KafkaServerTestHarness (1 usage found) > getTopicNames (1 usage found) > 349 > controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().entrySet().forEach > { > kafka.metadata.main (3 usages found) > org.apache.kafka.controller (2 usages found) > QuorumFeatures.java (1 usage found) > toString() (1 usage found) > 144 localSupportedFeatures.entrySet().forEach(f -> > features.add(f.getKey() + ": " + f.getValue())); > ReplicationControlManager.java (1 usage found) > createTopic(ControllerRequestContext, CreatableTopic, > List, Map, > List, boolean) (1 usage found) > 732 newParts.entrySet().forEach(e -> > assignments.put(e.getKey(), > org.apache.kafka.metadata.properties (1 usage found) > MetaPropertiesEnsemble.java (1 usage found) > toString() (1 usage found) > 610 logDirProps.entrySet().forEach( > kafka.metadata.test (1 usage found) > org.apache.kafka.controller (1 usage found) > ReplicationControlManagerTest.java (1 usage found) > createTestTopic(String, int[][], Map, > short) (1 usage found) > 307 configs.entrySet().forEach(e -> > topic.configs().add( > kafka.streams.main (1 usage found) > org.apache.kafka.streams.processor.internals (1 usage found) > StreamsMetadataState.java (1 usage found) > onChange(Map>, > Map>, Map) (1 > usage f
[jira] [Reopened] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version
[ https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez reopened KAFKA-16606: - Assignee: Igor Soarez > JBOD support in KRaft does not seem to be gated by the metadata version > --- > > Key: KAFKA-16606 > URL: https://issues.apache.org/jira/browse/KAFKA-16606 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Jakub Scholz > Assignee: Igor Soarez >Priority: Major > > JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka > [source > code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195] > suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. > However, it seems to be possible to run KRaft cluster with JBOD even with > older metadata versions such as {{{}3.6{}}}. For example, I have a cluster > using the {{3.6}} metadata version: > {code:java} > bin/kafka-features.sh --bootstrap-server localhost:9092 describe > Feature: metadata.version SupportedMinVersion: 3.0-IV1 > SupportedMaxVersion: 3.7-IV4 FinalizedVersionLevel: 3.6-IV2 Epoch: 1375 > {code} > Yet a KRaft cluster with JBOD seems to run fine: > {code:java} > bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe > Querying brokers for log directories information > Received log directory information from brokers 2000,3000,1000 > {"brokers":[{"broker":2000,"logDirs":[{"partitions":[{"partition":"__consumer_offsets-13","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-46","size":0,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-0","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-9","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-42","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-21","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-17","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-30","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-26","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-5","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-38","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-1","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-34","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-16","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-45","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-12","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-41","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-24","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-20","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-49","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-0","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-29","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-25","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-8","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-37",
Re: [VOTE] KIP-1031: Control offset translation in MirrorSourceConnector
Hi Omnia, Thanks for your answers, and I see you've updated the KIP so thanks for the changes too. +1 (binding), thanks for the KIP -- Igor
[jira] [Created] (KAFKA-16602) Flaky test – org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord()
Igor Soarez created KAFKA-16602: --- Summary: Flaky test – org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord() Key: KAFKA-16602 URL: https://issues.apache.org/jira/browse/KAFKA-16602 Project: Kafka Issue Type: Test Components: controller Reporter: Igor Soarez org.apache.kafka.controller.QuorumControllerTest.testBootstrapZkMigrationRecord() failed with: h4. Error {code:java} org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: exception while renouncing leadership: Attempt to resign from epoch 1 which is larger than the current epoch 0{code} h4. Stacktrace {code:java} org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: exception while renouncing leadership: Attempt to resign from epoch 1 which is larger than the current epoch 0 at app//org.apache.kafka.metalog.LocalLogManager.resign(LocalLogManager.java:808) at app//org.apache.kafka.controller.QuorumController.renounce(QuorumController.java:1270) at app//org.apache.kafka.controller.QuorumController.handleEventException(QuorumController.java:547) at app//org.apache.kafka.controller.QuorumController.access$800(QuorumController.java:179) at app//org.apache.kafka.controller.QuorumController$ControllerWriteEvent.complete(QuorumController.java:881) at app//org.apache.kafka.controller.QuorumController$ControllerWriteEvent.handleException(QuorumController.java:871) at app//org.apache.kafka.queue.KafkaEventQueue$EventContext.completeWithException(KafkaEventQueue.java:149) at app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:138) at app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211) at app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182) at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.IllegalArgumentException: Attempt to resign from epoch 1 which is larger than the current epoch 0{code} Source: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15701/3/tests/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16601) Flaky test – org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest.testClosingQuorumControllerClosesMetrics()
Igor Soarez created KAFKA-16601: --- Summary: Flaky test – org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest.testClosingQuorumControllerClosesMetrics() Key: KAFKA-16601 URL: https://issues.apache.org/jira/browse/KAFKA-16601 Project: Kafka Issue Type: Test Components: controller Reporter: Igor Soarez org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest.testClosingQuorumControllerClosesMetrics() failed with: h4. Error {code:java} org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: exception while renouncing leadership: Attempt to resign from epoch 1 which is larger than the current epoch 0{code} h4. Stacktrace {code:java} org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: exception while renouncing leadership: Attempt to resign from epoch 1 which is larger than the current epoch 0 at app//org.apache.kafka.metalog.LocalLogManager.resign(LocalLogManager.java:808) at app//org.apache.kafka.controller.QuorumController.renounce(QuorumController.java:1270) at app//org.apache.kafka.controller.QuorumController.handleEventException(QuorumController.java:547) at app//org.apache.kafka.controller.QuorumController.access$800(QuorumController.java:179) at app//org.apache.kafka.controller.QuorumController$ControllerWriteEvent.complete(QuorumController.java:881) at app//org.apache.kafka.controller.QuorumController$ControllerWriteEvent.handleException(QuorumController.java:871) at app//org.apache.kafka.queue.KafkaEventQueue$EventContext.completeWithException(KafkaEventQueue.java:149) at app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:138) at app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211) at app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182) at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.IllegalArgumentException: Attempt to resign from epoch 1 which is larger than the current epoch 0{code} Source: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15701/3/tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16597) Flaky test - org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificStalePartitionStoresMultiStreamThreads()
Igor Soarez created KAFKA-16597: --- Summary: Flaky test - org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificStalePartitionStoresMultiStreamThreads() Key: KAFKA-16597 URL: https://issues.apache.org/jira/browse/KAFKA-16597 Project: Kafka Issue Type: Test Components: streams Reporter: Igor Soarez org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificStalePartitionStoresMultiStreamThreads() failed with: {code:java} Error org.apache.kafka.streams.errors.InvalidStateStorePartitionException: The specified partition 1 for store source-table does not exist. Stacktrace org.apache.kafka.streams.errors.InvalidStateStorePartitionException: The specified partition 1 for store source-table does not exist. at app//org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:63) at app//org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:53) at app//org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificStalePartitionStoresMultiStreamThreads(StoreQueryIntegrationTest.java:411) {code} https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15701/2/tests/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16596) Flaky test – org.apache.kafka.clients.ClientUtilsTest.testParseAndValidateAddressesWithReverseLookup()
Igor Soarez created KAFKA-16596: --- Summary: Flaky test – org.apache.kafka.clients.ClientUtilsTest.testParseAndValidateAddressesWithReverseLookup() Key: KAFKA-16596 URL: https://issues.apache.org/jira/browse/KAFKA-16596 Project: Kafka Issue Type: Test Reporter: Igor Soarez org.apache.kafka.clients.ClientUtilsTest.testParseAndValidateAddressesWithReverseLookup() failed in the following way: {code:java} org.opentest4j.AssertionFailedError: Unexpected addresses [93.184.215.14, 2606:2800:21f:cb07:6820:80da:af6b:8b2c] ==> expected: but was: at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)at app//org.apache.kafka.clients.ClientUtilsTest.testParseAndValidateAddressesWithReverseLookup(ClientUtilsTest.java:65) {code} As a result of the following assertions: {code:java} // With lookup of example.com, either one or two addresses are expected depending on // whether ipv4 and ipv6 are enabled List validatedAddresses = checkWithLookup(asList("example.com:1")); assertTrue(validatedAddresses.size() >= 1, "Unexpected addresses " + validatedAddresses); List validatedHostNames = validatedAddresses.stream().map(InetSocketAddress::getHostName) .collect(Collectors.toList()); List expectedHostNames = asList("93.184.216.34", "2606:2800:220:1:248:1893:25c8:1946"); {code} It seems that the DNS result has changed for example.com. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [VOTE] KIP-1031: Control offset translation in MirrorSourceConnector
Hi Omnia, Thanks for this KIP. 11. These seem to me to be small misspellings, please double-check: s/MM2 main features/MM2's main features s/syncing consumer group offset/syncing consumer group offsets s/relays/relies s/recored's offset/recorded offsets s/clusters without need for/clusters without the need for s/creating internal topic./creating an internal topic. s/This KIP propose that/This KIP proposes that 12. The use of the word "customers" seems a bit odd to me in this context. Did you perhaps mean one of "use-cases", "users" or "operators"? 13. "They still left with part#1 of this feature which add cost to the progress of their replication." I'm unsure what this means. Do you mean to say that MirrorCheckpointConnector is disabled but MirrorSourceConnector is not? Could you also clarify where the additional cost comes from? 14. This is probably more ignorance of mine: it doesn't seem obvious in the KIP how increasing offset.lag.max to INT_MAX helps reduce latency. I'm guessing it's related to KAFKA-14610 but after having a look I still couldn't understand why. -- Igor On Wed, Apr 17, 2024, at 3:22 PM, Omnia Ibrahim wrote: > Thanks Chris and Mickael for the votes. > Can I please get one last +1 binding vote please? > > Thanks > Omnia > > > On 12 Apr 2024, at 13:21, Chris Egerton wrote: > > > > +1 (binding), thanks Omnia! > > > > On Fri, Apr 12, 2024, 03:46 Mickael Maison wrote: > > > >> Hi Omnia, > >> > >> +1 (binding), thanks for the KIP! > >> > >> Mickael > >> > >> On Fri, Apr 12, 2024 at 9:01 AM Omnia Ibrahim > >> wrote: > >>> > >>> Hi everyone, I would like to start a voting thread for KIP-1031: Control > >> offset translation in MirrorSourceConnector > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1031%3A+Control+offset+translation+in+MirrorSourceConnector > >>> > >>> For comments or feedback please check the discussion thread here > >> https://lists.apache.org/thread/ym6zr0wrhglft5c000x9c8ych098s7h6 > >>> > >>> Thanks > >>> Omnia > >>> > >> > >
[jira] [Reopened] (KAFKA-15793) Flaky test ZkMigrationIntegrationTest.testMigrateTopicDeletions
[ https://issues.apache.org/jira/browse/KAFKA-15793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez reopened KAFKA-15793: - This has come up again: {code:java} [2024-04-09T21:06:17.307Z] Gradle Test Run :core:test > Gradle Test Executor 97 > DeleteTopicsRequestTest > testTopicDeletionClusterHasOfflinePartitions(String) > "testTopicDeletionClusterHasOfflinePartitions(String).quorum=zk" STARTED [2024-04-09T21:06:17.307Z] kafka.zk.ZkMigrationIntegrationTest.testMigrateTopicDeletions(ClusterInstance)[7] failed, log available in /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15522/core/build/reports/testOutput/kafka.zk.ZkMigrationIntegrationTest.testMigrateTopicDeletions(ClusterInstance)[7].test.stdout [2024-04-09T21:06:17.307Z] [2024-04-09T21:06:17.307Z] Gradle Test Run :core:test > Gradle Test Executor 96 > ZkMigrationIntegrationTest > testMigrateTopicDeletions(ClusterInstance) > testMigrateTopicDeletions [7] Type=ZK, MetadataVersion=3.7-IV4, Security=PLAINTEXT FAILED [2024-04-09T21:06:17.307Z] org.apache.kafka.server.fault.FaultHandlerException: nonFatalFaultHandler: Unhandled error in MetadataChangeEvent: Check op on KRaft Migration ZNode failed. Expected zkVersion = 5. This indicates that another KRaft controller is making writes to ZooKeeper. [2024-04-09T21:06:17.307Z] at app//kafka.zk.KafkaZkClient.handleUnwrappedMigrationResult$1(KafkaZkClient.scala:2001) [2024-04-09T21:06:17.307Z] at app//kafka.zk.KafkaZkClient.unwrapMigrationResponse$1(KafkaZkClient.scala:2027) [2024-04-09T21:06:17.307Z] at app//kafka.zk.KafkaZkClient.$anonfun$retryMigrationRequestsUntilConnected$2(KafkaZkClient.scala:2052) [2024-04-09T21:06:17.307Z] at app//scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100) [2024-04-09T21:06:17.307Z] at app//scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87) [2024-04-09T21:06:17.307Z] at app//scala.collection.mutable.ArrayBuffer.map(ArrayBuffer.scala:43) [2024-04-09T21:06:17.307Z] at app//kafka.zk.KafkaZkClient.retryMigrationRequestsUntilConnected(KafkaZkClient.scala:2052) [2024-04-09T21:06:17.307Z] at app//kafka.zk.migration.ZkTopicMigrationClient.$anonfun$updateTopicPartitions$1(ZkTopicMigrationClient.scala:265) [2024-04-09T21:06:17.307Z] at app//kafka.zk.migration.ZkTopicMigrationClient.updateTopicPartitions(ZkTopicMigrationClient.scala:255) [2024-04-09T21:06:17.307Z] at app//org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsDelta$20(KRaftMigrationZkWriter.java:334) [2024-04-09T21:06:17.307Z] at app//org.apache.kafka.metadata.migration.KRaftMigrationDriver.applyMigrationOperation(KRaftMigrationDriver.java:248) [2024-04-09T21:06:17.307Z] at app//org.apache.kafka.metadata.migration.KRaftMigrationDriver.access$300(KRaftMigrationDriver.java:62) [2024-04-09T21:06:17.307Z] at app//org.apache.kafka.metadata.migration.KRaftMigrationDriver$MetadataChangeEvent.lambda$run$1(KRaftMigrationDriver.java:532) [2024-04-09T21:06:17.307Z] at app//org.apache.kafka.metadata.migration.KRaftMigrationDriver.lambda$countingOperationConsumer$6(KRaftMigrationDriver.java:845) [2024-04-09T21:06:17.307Z] at app//org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.lambda$handleTopicsDelta$21(KRaftMigrationZkWriter.java:331) [2024-04-09T21:06:17.307Z] at java.base@17.0.7/java.util.HashMap.forEach(HashMap.java:1421) [2024-04-09T21:06:17.307Z] at app//org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleTopicsDelta(KRaftMigrationZkWriter.java:297) [2024-04-09T21:06:17.307Z] at app//org.apache.kafka.metadata.migration.KRaftMigrationZkWriter.handleDelta(KRaftMigrationZkWriter.java:112) [2024-04-09T21:06:17.307Z] at app//org.apache.kafka.metadata.migration.KRaftMigrationDriver$MetadataChangeEvent.run(KRaftMigrationDriver.java:531) [2024-04-09T21:06:17.307Z] at app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:128) [2024-04-09T21:06:17.307Z] at app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211) [2024-04-09T21:06:17.307Z] at app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182) [2024-04-09T21:06:17.307Z] at java.base@17.0.7/java.lang.Thread.run(Thread.java:833) [2024-04-09T21:06:17.307Z] [2024-04-09T21:06:17.307Z] Caused by: [2024-04-09T21:06:17.307Z] java.lang.RuntimeException: Check op on KRaft Migration ZNode failed. Expected zkVersion = 5. This indicates that another KRaft controller is making writes to ZooKeeper. [2024-04-09T21:06:17.307Z] at kafka.zk.KafkaZkClient.handleUnwrappedMigrationResult$1(KafkaZkClient.scala:2001) [2024-04-09
[jira] [Created] (KAFKA-16504) Flaky test org.apache.kafka.controller.QuorumControllerTest.testConfigurationOperations
Igor Soarez created KAFKA-16504: --- Summary: Flaky test org.apache.kafka.controller.QuorumControllerTest.testConfigurationOperations Key: KAFKA-16504 URL: https://issues.apache.org/jira/browse/KAFKA-16504 Project: Kafka Issue Type: Test Components: controller Reporter: Igor Soarez {code:java} [2024-04-09T20:26:55.840Z] Gradle Test Run :metadata:test > Gradle Test Executor 54 > QuorumControllerTest > testConfigurationOperations() STARTED [2024-04-09T20:26:55.840Z] org.apache.kafka.controller.QuorumControllerTest.testConfigurationOperations() failed, log available in /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15522/metadata/build/reports/testOutput/org.apache.kafka.controller.QuorumControllerTest.testConfigurationOperations().test.stdout [2024-04-09T20:26:55.840Z] [2024-04-09T20:26:55.840Z] Gradle Test Run :metadata:test > Gradle Test Executor 54 > QuorumControllerTest > testConfigurationOperations() FAILED [2024-04-09T20:26:55.840Z] java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NotControllerException: No controller appears to be active. [2024-04-09T20:26:55.840Z] at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) [2024-04-09T20:26:55.840Z] at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) [2024-04-09T20:26:55.840Z] at org.apache.kafka.controller.QuorumControllerTest.testConfigurationOperations(QuorumControllerTest.java:202) [2024-04-09T20:26:55.840Z] [2024-04-09T20:26:55.840Z] Caused by: [2024-04-09T20:26:55.840Z] org.apache.kafka.common.errors.NotControllerException: No controller appears to be active. {code} [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15522/5/tests/] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16403) Flaky test org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords
[ https://issues.apache.org/jira/browse/KAFKA-16403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez resolved KAFKA-16403. - Resolution: Not A Bug > Flaky test > org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords > - > > Key: KAFKA-16403 > URL: https://issues.apache.org/jira/browse/KAFKA-16403 > Project: Kafka > Issue Type: Bug > Reporter: Igor Soarez >Priority: Major > > {code:java} > org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords() > failed, log available in > /home/jenkins/workspace/Kafka_kafka-pr_PR-14903/streams/examples/build/reports/testOutput/org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords().test.stdout > Gradle Test Run :streams:examples:test > Gradle Test Executor 82 > > WordCountDemoTest > testCountListOfWords() FAILED > org.apache.kafka.streams.errors.ProcessorStateException: Error opening > store KSTREAM-AGGREGATE-STATE-STORE-03 at location > /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03 > at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329) > at > org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:69) > at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254) > at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:151) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:151) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232) > at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102) > at > org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258) > at > org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:530) > at > org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:373) > at > org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:300) > at > org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:276) > at > org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.setup(WordCountDemoTest.java:60) > Caused by: > org.rocksdb.RocksDBException: Corruption: IO error: No such file or > directory: While open a file for random read: > /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/10.ldb: > No such file or directory in file > /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/MANIFEST-05 > at org.rocksdb.RocksDB.open(Native Method) > at org.rocksdb.RocksDB.open(RocksDB.java:307) > at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323) > ... 17 more > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16404) Flaky test org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-16404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez resolved KAFKA-16404. - Resolution: Not A Bug Same as KAFKA-16403, this only failed once. It was likely the result of a testing infrastructure problem. We can always re-open if we see this again and suspect otherwise. > Flaky test > org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig > - > > Key: KAFKA-16404 > URL: https://issues.apache.org/jira/browse/KAFKA-16404 > Project: Kafka > Issue Type: Bug > Reporter: Igor Soarez >Priority: Major > > > {code:java} > org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig() > failed, log available in > /home/jenkins/workspace/Kafka_kafka-pr_PR-14903@2/streams/examples/build/reports/testOutput/org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig().test.stdout > Gradle Test Run :streams:examples:test > Gradle Test Executor 87 > > WordCountDemoTest > testGetStreamsConfig() FAILED > org.apache.kafka.streams.errors.ProcessorStateException: Error opening > store KSTREAM-AGGREGATE-STATE-STORE-03 at location > /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03 > at > app//org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329) > at > app//org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:69) > at > app//org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254) > at > app//org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175) > at > app//org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > at > app//org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56) > at > app//org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > at > app//org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:151) > at > app//org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) > at > app//org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:151) > at > app//org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232) > at > app//org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102) > at > app//org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258) > at > app//org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:530) > at > app//org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:373) > at > app//org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:300) > at > app//org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:276) > at > app//org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.setup(WordCountDemoTest.java:60) > Caused by: > org.rocksdb.RocksDBException: While lock file: > /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/LOCK: > Resource temporarily unavailable > at app//org.rocksdb.RocksDB.open(Native Method) > at app//org.rocksdb.RocksDB.open(RocksDB.java:307) > at > app//org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323) > ... 17 more > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16422) Flaky test org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest."testFailingOverIncrementsNewActiveControllerCount(boolean).true"
Igor Soarez created KAFKA-16422: --- Summary: Flaky test org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest."testFailingOverIncrementsNewActiveControllerCount(boolean).true" Key: KAFKA-16422 URL: https://issues.apache.org/jira/browse/KAFKA-16422 Project: Kafka Issue Type: Bug Reporter: Igor Soarez {code:java} [2024-03-22T10:39:59.911Z] Gradle Test Run :metadata:test > Gradle Test Executor 92 > QuorumControllerMetricsIntegrationTest > testFailingOverIncrementsNewActiveControllerCount(boolean) > "testFailingOverIncrementsNewActiveControllerCount(boolean).true" FAILED [2024-03-22T10:39:59.912Z] org.opentest4j.AssertionFailedError: expected: <1> but was: <2> [2024-03-22T10:39:59.912Z] at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) [2024-03-22T10:39:59.912Z] at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) [2024-03-22T10:39:59.912Z] at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) [2024-03-22T10:39:59.912Z] at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:166) [2024-03-22T10:39:59.912Z] at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:161) [2024-03-22T10:39:59.912Z] at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:632) [2024-03-22T10:39:59.912Z] at app//org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest.lambda$testFailingOverIncrementsNewActiveControllerCount$1(QuorumControllerMetricsIntegrationTest.java:107) [2024-03-22T10:39:59.912Z] at app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:444) [2024-03-22T10:39:59.912Z] at app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:412) [2024-03-22T10:39:59.912Z] at app//org.apache.kafka.controller.QuorumControllerMetricsIntegrationTest.testFailingOverIncrementsNewActiveControllerCount(QuorumControllerMetricsIntegrationTest.java:105) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16404) Flaky test org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig
Igor Soarez created KAFKA-16404: --- Summary: Flaky test org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig Key: KAFKA-16404 URL: https://issues.apache.org/jira/browse/KAFKA-16404 Project: Kafka Issue Type: Bug Reporter: Igor Soarez {code:java} org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig() failed, log available in /home/jenkins/workspace/Kafka_kafka-pr_PR-14903@2/streams/examples/build/reports/testOutput/org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig().test.stdout Gradle Test Run :streams:examples:test > Gradle Test Executor 87 > WordCountDemoTest > testGetStreamsConfig() FAILED org.apache.kafka.streams.errors.ProcessorStateException: Error opening store KSTREAM-AGGREGATE-STATE-STORE-03 at location /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03 at app//org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329) at app//org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:69) at app//org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254) at app//org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175) at app//org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) at app//org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56) at app//org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) at app//org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:151) at app//org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) at app//org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:151) at app//org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232) at app//org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102) at app//org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258) at app//org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:530) at app//org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:373) at app//org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:300) at app//org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:276) at app//org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.setup(WordCountDemoTest.java:60) Caused by: org.rocksdb.RocksDBException: While lock file: /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/LOCK: Resource temporarily unavailable at app//org.rocksdb.RocksDB.open(Native Method) at app//org.rocksdb.RocksDB.open(RocksDB.java:307) at app//org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323) ... 17 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16403) Flaky test org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords
Igor Soarez created KAFKA-16403: --- Summary: Flaky test org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords Key: KAFKA-16403 URL: https://issues.apache.org/jira/browse/KAFKA-16403 Project: Kafka Issue Type: Bug Reporter: Igor Soarez {code:java} org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords() failed, log available in /home/jenkins/workspace/Kafka_kafka-pr_PR-14903/streams/examples/build/reports/testOutput/org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords().test.stdout Gradle Test Run :streams:examples:test > Gradle Test Executor 82 > WordCountDemoTest > testCountListOfWords() FAILED org.apache.kafka.streams.errors.ProcessorStateException: Error opening store KSTREAM-AGGREGATE-STATE-STORE-03 at location /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03 at org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329) at org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:69) at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254) at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:151) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:151) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232) at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102) at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258) at org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:530) at org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:373) at org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:300) at org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:276) at org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.setup(WordCountDemoTest.java:60) Caused by: org.rocksdb.RocksDBException: Corruption: IO error: No such file or directory: While open a file for random read: /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/10.ldb: No such file or directory in file /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/MANIFEST-05 at org.rocksdb.RocksDB.open(Native Method) at org.rocksdb.RocksDB.open(RocksDB.java:307) at org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323) ... 17 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16402) Flaky test org.apache.kafka.controller.QuorumControllerTest.testSnapshotSaveAndLoad
Igor Soarez created KAFKA-16402: --- Summary: Flaky test org.apache.kafka.controller.QuorumControllerTest.testSnapshotSaveAndLoad Key: KAFKA-16402 URL: https://issues.apache.org/jira/browse/KAFKA-16402 Project: Kafka Issue Type: Bug Reporter: Igor Soarez {code:java} org.apache.kafka.controller.QuorumControllerTest.testSnapshotSaveAndLoad() failed, log available in /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-14903/metadata/build/reports/testOutput/org.apache.kafka.controller.QuorumControllerTest.testSnapshotSaveAndLoad().test.stdout Gradle Test Run :metadata:test > Gradle Test Executor 93 > QuorumControllerTest > testSnapshotSaveAndLoad() FAILED java.lang.IllegalArgumentException: Self-suppression not permitted at java.base/java.lang.Throwable.addSuppressed(Throwable.java:1072) at org.apache.kafka.controller.QuorumControllerTest.testSnapshotSaveAndLoad(QuorumControllerTest.java:645) Caused by: org.apache.kafka.server.fault.FaultHandlerException: fatalFaultHandler: exception while renouncing leadership: Attempt to resign from epoch 1 which is larger than the current epoch 0 at app//org.apache.kafka.metalog.LocalLogManager.resign(LocalLogManager.java:808) at app//org.apache.kafka.controller.QuorumController.renounce(QuorumController.java:1270) at app//org.apache.kafka.controller.QuorumController.handleEventException(QuorumController.java:547) at app//org.apache.kafka.controller.QuorumController.access$800(QuorumController.java:179) at app//org.apache.kafka.controller.QuorumController$ControllerWriteEvent.complete(QuorumController.java:881) at app//org.apache.kafka.controller.QuorumController$ControllerWriteEvent.handleException(QuorumController.java:871) at app//org.apache.kafka.queue.KafkaEventQueue$EventContext.completeWithException(KafkaEventQueue.java:149) at app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:138) at app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:211) at app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:182) at java.base@17.0.7/java.lang.Thread.run(Thread.java:833) Caused by: java.lang.IllegalArgumentException: Attempt to resign from epoch 1 which is larger than the current epoch 0 at org.apache.kafka.metalog.LocalLogManager.resign(LocalLogManager.java:808) ... 10 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16365) AssignmentsManager mismanages completion notifications
Igor Soarez created KAFKA-16365: --- Summary: AssignmentsManager mismanages completion notifications Key: KAFKA-16365 URL: https://issues.apache.org/jira/browse/KAFKA-16365 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez Assignee: Igor Soarez When moving replicas between directories in the same broker, future replica promotion hinges on acknowledgment from the controller of a change in the directory assignment. ReplicaAlterLogDirsThread relies on AssignmentsManager for a completion notification of the directory assignment change. In its current form, under certain assignment scheduling, AssignmentsManager both miss completion notifications, or prematurely trigger them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16363) Storage crashes if dir is unavailable
Igor Soarez created KAFKA-16363: --- Summary: Storage crashes if dir is unavailable Key: KAFKA-16363 URL: https://issues.apache.org/jira/browse/KAFKA-16363 Project: Kafka Issue Type: Sub-task Components: tools Affects Versions: 3.7.0 Reporter: Igor Soarez The storage tool crashes if one of the configured log directories is unavailable. {code:java} sh-4.4# ./bin/kafka-storage.sh format --ignore-formatted -t $KAFKA_CLUSTER_ID -c server.properties [2024-03-11 17:51:05,391] ERROR Error while reading meta.properties file /data/d2/meta.properties (org.apache.kafka.metadata.properties.MetaPropertiesEnsemble) java.nio.file.AccessDeniedException: /data/d2/meta.properties at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:90) at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:106) at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) at java.base/sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:218) at java.base/java.nio.file.Files.newByteChannel(Files.java:380) at java.base/java.nio.file.Files.newByteChannel(Files.java:432) at java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:422) at java.base/java.nio.file.Files.newInputStream(Files.java:160) at org.apache.kafka.metadata.properties.PropertiesUtils.readPropertiesFile(PropertiesUtils.java:77) at org.apache.kafka.metadata.properties.MetaPropertiesEnsemble$Loader.load(MetaPropertiesEnsemble.java:135) at kafka.tools.StorageTool$.formatCommand(StorageTool.scala:431) at kafka.tools.StorageTool$.main(StorageTool.scala:95) at kafka.tools.StorageTool.main(StorageTool.scala) metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, dirs={/data/d1: MetaProperties(version=1, clusterId=RwO2UIkmTBWltwRllP05aA, nodeId=101, directoryId=zm7fSw3zso9aR0AtuzsI_A), /data/metadata: MetaProperties(version=1, clusterId=RwO2UIkmTBWltwRllP05aA, nodeId=101, directoryId=eRO8vOP7ddbpx_W2ZazjLw), /data/d2: ERROR}) I/O error trying to read log directory /data/d2. {code} When configured with multiple directories, Kafka tolerates some of them (but not all) being inaccessible, so this tool should be able to handle the same scenarios without crashing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16297) Race condition while promoting future replica can lead to partition unavailability.
Igor Soarez created KAFKA-16297: --- Summary: Race condition while promoting future replica can lead to partition unavailability. Key: KAFKA-16297 URL: https://issues.apache.org/jira/browse/KAFKA-16297 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez KIP-858 proposed that when a directory failure occurs after changing the assignment of a replica that's moved between two directories in the same broker, but before the future replica promotion completes, the broker should reassign the replica to inform the controller of its correct status. But this hasn't yet been implemented, and without it this failure may lead to indefinite partition unavailability. Example scenario: # A broker which leads partition P receives a request to alter the replica from directory A to directory B. # The broker creates a future replica in directory B and starts a replica fetcher. # Once the future replica first catches up, the broker queues a reassignment to inform the controller of the directory change. # The next time the replica catches up, the broker briefly blocks appends and promotes the replica. However, before the promotion is attempted, directory A fails. # The controller was informed that P in now in directory B before it received the notification that directory A has failed, so it does not elect a new leader, and as long as the broker is online, partition A remains unavailable. As per KIP-858, the broker should detect this scenario and queue a reassignment of P into directory ID {{{}DirectoryId.LOST{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [VOTE] KIP-858: Handle JBOD broker disk failure in KRaft
Hi Viktor, Thanks for pointing this out. I forgot to make this clear in the KIP. I'll update it. ClusterAction on Cluster resource is exactly right, see `ControllerApis.handleAssignReplicasToDirs`. [1] -- Igor [1]: https://github.com/apache/kafka/pull/14863/files#diff-91060c918c99d25342f625c146f14425716eda9d8fcfe1126b2c45feff388362R1073 On Mon, Dec 4, 2023, at 12:28 PM, Viktor Somogyi-Vass wrote: > Hi Igor, > > I'm just reading through your KIP and noticed that the new protocol you > created doesn't say anything about ACLs of the new AssignReplicasToDirs > API. Would it make sense to authorize these requests as other inter-broker > protocol calls are usually authorized, that is ClusterAction on Cluster > resource? > > Thanks, > Viktor > > On Tue, Nov 28, 2023 at 4:18 PM Igor Soarez wrote: > > > Hi everyone, > > > > There have been a number of further changes. > > > > I have updated the KIP to reflect them, but for reference, > > I'd also like to update this thread with a summary. > > > > 1. The reserved Uuids and their names for directories have changed. > > The first 100 Uuids are reserved for future use. > > > > 2. During the ZooKeeper to KRaft migration, if a broker still > > configured in ZK mode has any log directory offline, it will > > shutdown and refuse to startup. The expectation is that this > > escalation from a log directory's unavailability to the entire > > broker's unavailability will be temporary, limited to the migration > > period. And that the simplification will help develop, test and > > support this feature. > > > > 3. The representation of replica directories in metadata records is > > no longer tightly coupled with the respective broker IDs. Instead of > > replacing the int[] replicas field in PartitionRecord and > > PartitionChangeRecord, we are instead introducing a new field Uuid[] > > named directories, that should be kept in the same size and order as > > the existing replicas field. See > > https://github.com/apache/kafka/pull/14290 for further details. > > > > 4. Assignments that are respective to failed log directories are no > > longer prioritized. Previously the KIP proposed prioritizing > > assignments that related to failed log directories, aiming to > > synchronize the necessary replica to directory mapping on the > > controller before handling the directory failure. Recently, we have > > decided to remove any prioritization of these assignments, as > > delaying the reporting of directory failures is considered > > detrimental for any reason > > > > 5. Uuids for log directories that failed after startup are always > > included in every broker heartbeat request. Previously the KIP > > proposed sending Uuids for failed directories in the broker > > heartbeat until a successful reply is received. However, due to the > > overload mode handling of broker heartbeats, because broker > > heartbeat requests may receive a successful response without being > > fully processed, it is preferable to always send the cumulative list > > of directory IDs that have failed since startup. In the future, this > > list can be trimmed to remove directory IDs that are seen to be > > removed from the broker registration, as the broker catches up with > > metadata. > > > > 6. The proposal to shutdown the broker log.dir.failure.timeout.ms > > after not being able to communicate that some log directory is > > offline is now more of an open question. It's unclear if this will > > actually be necessary. > > > > Please share if you have any thoughts. > > > > Best, > > > > -- > > Igor > > > > > > On Tue, Oct 10, 2023, at 5:28 AM, Igor Soarez wrote: > > > Hi Colin, > > > > > > Thanks for the renaming suggestions. UNASSIGNED is better then > > > UNKNOWN, MIGRATING is also better than SELECTED and I don't > > > expect it to be used outside of the migration phase. > > > LOST can also work instead of OFFLINE, but I think there will > > > be other uses for this value outside of the migration, like > > > in the intra-broker replica movement edge cases described in the KIP. > > > I've updated the KIP and also filed a tiny PR with your suggestion, > > > except I'm keeping the description of LOST more broad than just > > > scoped to the migration. > > > > > > https://github.com/apache/kafka/pull/14517 > > > > > > > > > The KIP already proposes that the broker does not want to unfence > > > until it has confirmed all the assignment
[jira] [Created] (KAFKA-15955) Migrating ZK brokers send dir assignments
Igor Soarez created KAFKA-15955: --- Summary: Migrating ZK brokers send dir assignments Key: KAFKA-15955 URL: https://issues.apache.org/jira/browse/KAFKA-15955 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez Broker in ZooKeeper mode, while in migration mode, should start sending directory assignments to the KRaft Controller using AssignmentsManager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [VOTE] KIP-858: Handle JBOD broker disk failure in KRaft
Hi everyone, There have been a number of further changes. I have updated the KIP to reflect them, but for reference, I'd also like to update this thread with a summary. 1. The reserved Uuids and their names for directories have changed. The first 100 Uuids are reserved for future use. 2. During the ZooKeeper to KRaft migration, if a broker still configured in ZK mode has any log directory offline, it will shutdown and refuse to startup. The expectation is that this escalation from a log directory's unavailability to the entire broker's unavailability will be temporary, limited to the migration period. And that the simplification will help develop, test and support this feature. 3. The representation of replica directories in metadata records is no longer tightly coupled with the respective broker IDs. Instead of replacing the int[] replicas field in PartitionRecord and PartitionChangeRecord, we are instead introducing a new field Uuid[] named directories, that should be kept in the same size and order as the existing replicas field. See https://github.com/apache/kafka/pull/14290 for further details. 4. Assignments that are respective to failed log directories are no longer prioritized. Previously the KIP proposed prioritizing assignments that related to failed log directories, aiming to synchronize the necessary replica to directory mapping on the controller before handling the directory failure. Recently, we have decided to remove any prioritization of these assignments, as delaying the reporting of directory failures is considered detrimental for any reason 5. Uuids for log directories that failed after startup are always included in every broker heartbeat request. Previously the KIP proposed sending Uuids for failed directories in the broker heartbeat until a successful reply is received. However, due to the overload mode handling of broker heartbeats, because broker heartbeat requests may receive a successful response without being fully processed, it is preferable to always send the cumulative list of directory IDs that have failed since startup. In the future, this list can be trimmed to remove directory IDs that are seen to be removed from the broker registration, as the broker catches up with metadata. 6. The proposal to shutdown the broker log.dir.failure.timeout.ms after not being able to communicate that some log directory is offline is now more of an open question. It's unclear if this will actually be necessary. Please share if you have any thoughts. Best, -- Igor On Tue, Oct 10, 2023, at 5:28 AM, Igor Soarez wrote: > Hi Colin, > > Thanks for the renaming suggestions. UNASSIGNED is better then > UNKNOWN, MIGRATING is also better than SELECTED and I don't > expect it to be used outside of the migration phase. > LOST can also work instead of OFFLINE, but I think there will > be other uses for this value outside of the migration, like > in the intra-broker replica movement edge cases described in the KIP. > I've updated the KIP and also filed a tiny PR with your suggestion, > except I'm keeping the description of LOST more broad than just > scoped to the migration. > > https://github.com/apache/kafka/pull/14517 > > > The KIP already proposes that the broker does not want to unfence > until it has confirmed all the assignments are communicated > with the controller. And you're right about the interaction > with ReplicaManager, we definitely don't want RPCs coming > out of there. My intention is to introduce a new manager, as you > suggest, with its own event loop, that batches and prioritizes > assignment and dir failure events, called DirectoryEventManager. > There's already an open PR, perhaps you could have a look? > > KAFKA-15357: Aggregate and propagate assignments and logdir failures > https://github.com/apache/kafka/pull/14369 > > > > With regard to the failure detection "gap" during hybrid mode: the > > kraft controller sends a full LeaderAndIsrRequest to the brokers > > that are in hybrid mode, right? And there is a per-partition > > response as well. Right now, we don't pay attention to the error > > codes sent back in the response. But we could. Any replica with an > > error could be transitioned from MIGRATING -> LOST, right? That > > would close the failure detection gap. > > Almost. The missing bit is that the controller would also need to > watch the /log_dir_event_notification znode, and on any change > read the broker ID in the value and send a full LeaderAndIsrRequest > to the respective broker. Without this watch, it might be a very long > time between a dir failing and the controller sending > LeaderAndIsrRequests covering every partition hosted in the failed dir. > > Watching the znode is a read-only action, and it doesn't seem like > a big thing to add this watch plus the error handling. > Maybe extending the ZK Controller compatibility functionality in this > way isn't such a bad option after all? > > > Best, > > -- > Igor >
[jira] [Created] (KAFKA-15893) Bump MetadataVersion for directory assignments
Igor Soarez created KAFKA-15893: --- Summary: Bump MetadataVersion for directory assignments Key: KAFKA-15893 URL: https://issues.apache.org/jira/browse/KAFKA-15893 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15886) Always specify directories for new partition registrations
Igor Soarez created KAFKA-15886: --- Summary: Always specify directories for new partition registrations Key: KAFKA-15886 URL: https://issues.apache.org/jira/browse/KAFKA-15886 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez When creating partition registrations directories must always be defined. If creating a partition from a PartitionRecord or PartitionChangeRecord from an older version that does not support directory assignments, then DirectoryId.MIGRATING is assumed. If creating a new partition, or triggering a change in assignment, DirectoryId.UNASSIGNED should be specified, unless the target broker has a single online directory registered, in which case the replica should be assigned directly to that single directory. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15858) Broker stays fenced until all assignments are correct
Igor Soarez created KAFKA-15858: --- Summary: Broker stays fenced until all assignments are correct Key: KAFKA-15858 URL: https://issues.apache.org/jira/browse/KAFKA-15858 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez Until there the broker has caught up with metadata AND corrected any incorrect directory assignments, it should continue to want to stay fenced. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] Should we continue to merge without a green build? No!
Hi all, I think at least one of those is my fault, apologies. I'll try to make sure all my tests are passing from now on. It doesn't help that GitHub always shows that the tests have failed, even when they have not. I suspect this is because Jenkins always marks the builds as unstable, even when all tests pass, because the "Archive JUnit-formatted test results" step seems to persistently fail with "[Checks API] No suitable checks publisher found.". e.g. https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14770/1/pipeline/ Can we get rid of that persistent failure and actually mark successful test runs as green? -- Igor
[jira] [Created] (KAFKA-15650) Data-loss on leader shutdown right after partition creation?
Igor Soarez created KAFKA-15650: --- Summary: Data-loss on leader shutdown right after partition creation? Key: KAFKA-15650 URL: https://issues.apache.org/jira/browse/KAFKA-15650 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez As per KIP-858, when a replica is created, the broker selects a log directory to host the replica and queues the propagation of the directory assignment to the controller. The replica becomes immediately active, it isn't blocked until the controller confirms the metadata change. If the replica is the leader replica it can immediately start accepting writes. Consider the following scenario: # A partition is created in some selected log directory, and some produce traffic is accepted # Before the broker is able to notify the controller of the directory assignment, the broker shuts down # Upon coming back online, the broker has an offline directory, the same directory which was chosen to host the replica # The broker assumes leadership for the replica, but cannot find it in any available directory and has no way of knowing it was already created because the directory assignment is still missing # The replica is created and the previously produced records are lost Step 4. may seem unlikely due to ISR membership gating leadership, but even assuming acks=all and replicas>1, if all other replicas are also offline the broker may still gain leadership. Perhaps KIP-966 is relevant here. We may need to delay new replica activation until the assignment is propagated successfully. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15649) Handle directory failure timeout
Igor Soarez created KAFKA-15649: --- Summary: Handle directory failure timeout Key: KAFKA-15649 URL: https://issues.apache.org/jira/browse/KAFKA-15649 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez If a broker with an offline log directory continues to fail to notify the controller of either: * the fact that the directory is offline; or * of any replica assignment into a failed directory then the controller will not check if a leadership change is required, and this may lead to partitions remaining indefinitely offline. KIP-858 proposes that the broker should shut down after a configurable timeout to force a leadership change. Alternatively, the broker could also request to be fenced, as long as there's a path for it to later become unfenced. While this unavailability is possible in theory, in practice it's not easy to entertain a scenario where a broker continues to appear as healthy before the controller, but fails to send this information. So it's not clear if this is a real problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-15355) Message schema changes
[ https://issues.apache.org/jira/browse/KAFKA-15355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez reopened KAFKA-15355: - closed by mistake > Message schema changes > -- > > Key: KAFKA-15355 > URL: https://issues.apache.org/jira/browse/KAFKA-15355 > Project: Kafka > Issue Type: Sub-task > Reporter: Igor Soarez > Assignee: Igor Soarez >Priority: Major > Fix For: 3.7.0 > > > Metadata records changes: > * BrokerRegistrationChangeRecord > * PartitionChangeRecord > * PartitionRecord > * RegisterBrokerRecord > New RPCs > * AssignReplicasToDirsRequest > * AssignReplicasToDirsResponse > RPC changes: > * BrokerHeartbeatRequest > * BrokerRegistrationRequest -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [VOTE] KIP-858: Handle JBOD broker disk failure in KRaft
Hi Colin, > I would call #2 LOST. It was assigned in the past, but we don't know where. > I see that you called this OFFLINE). This is not really normal... > it should happen only when we're migrating from ZK mode to KRaft mode, > or going from an older KRaft release with multiple directories to a > post-JBOD release. What you refer to as #2 LOST is actually what I named SELECTED, as in: a directory has already been _selected_ sometime before, we just don't know which one yet. In the mean time this change has already been merged, but let me know if you feel strongly about the naming here as I'm happy to rename SELECTED_DIR to LOST_DIR in a new PR. https://github.com/apache/kafka/pull/14291 > As for the third state -- I'm not sure why SELECTED_DIR needs to exist. The third state (actually it is ordered second) - OFFLINE_DIR - conveys that a replica is assigned to an unspecified offline directory. This can be used by the broker in the following way: * When catching up with metadata, seeing that one of it's partitions is mapped to SELECTED_DIR, and it cannot find that partition in any of the online log directories, and at least one log dir is offline, then the broker sends AssignReplicasToDirs to converge the assignment to OFFLINE_DIR * If a log directory failure happens during an intra-broker (across dir) replica movement, after sending AssignReplicasToDirs with the new UUID, and before the future replica catches up again. (there's a section in the KIP about this). We could just use a random UUID, as if a replica is assigned to a dir that is not in the broker's registration online dirs set then it is considered offline by controllers and metadata cache, but using a reserved UUID feels cleaner. > I think we need a general mechanism for checking that replicas are > in the directories we expect and sending an RPC to the controller > if they are not. A mechanism like this will automatically get rid > of the LOST replicas just as part of normal operation -- nothing > special required. Thanks for pointing this out, I forgot to put in the notes in my previous email that we discussed this too. The KIP proposes this is done when catching up with metadata, but you also suggested we extend the stray replica detection mechanism to also check for these inconsistencies. I think this is a good idea, and we'll look into that as well. Best, -- Igor
Re: [VOTE] KIP-858: Handle JBOD broker disk failure in KRaft
Hi David, Thanks for shedding light on migration goals, makes sense. Your preference for option a) makes it even more attractive. We'll keep that as the preferred approach, thanks for the advice. > One question with this approach is how the KRaft controller learns about > the multiple log directories after the broker is restarted in KRaft mode. > If I understand the design correctly, this would be similar to a single > directory kraft broker being reconfigured as a multiple directory broker. > That is, the broker sees that the PartitionRecords are missing the > directory assignments and then sends AssignReplicasToDirs to the controller. It is not similar to single dir KRaft transitioning to multi-dir mode. On single-dir mode AssignReplicasToDirs is not sent, but the dir assignment is still written along in the partition records, as the Controller knows the UUID of the only log dir that can be selected. But you're right about the rest. The multiple log dirs are indicated in the broker registration request. The first registration in a migrating ZK broker will include dir UUIDs. Directory assignments are communicated via the new RPC AssignReplicasToDirs. We have debated whether the broker should start sending these while still in mixed (migration) mode, or if it should wait until it is restarted into full KRaft mode, and stay fenced until all dir assignments are sent to the controller. Any advice for us on this one? Best, -- Igor
Re: [VOTE] KIP-858: Handle JBOD broker disk failure in KRaft
Hi everyone, Earlier today Colin, Ron, Proven and I had a chat about this work. We discussed several aspects which I’d like to share here. ## A new reserved UUID We'll reserve a third UUID to indicate an unspecified dir, but one that is known to be selected. As opposed to the default UNKNOWN_DIR (ZERO_UUID) which is used for new replicas, which may or may not have been placed in a some directory, this new UUID can disambiguate transition scenarios where we previously did not have any directory assignment information — e.g. single-logdir KRaft into JBOD KRaft, or ZK JBOD to KRaft JBOD mode. Unless anyone has a better suggestion for naming or any objection, I'll update the KIP to designate the following reserved UUIDs: * UNKNOWN_DIR - new Uuid(0L, 0L) * OFFLINE_DIR - new Uuid(0L, 1L) * SELECTED_DIR - new Uuid(0L, 2L) <-- new When transitioning to the directory assignment feature, without any previous directory assignment state, the controller can assign all existing partitions in the broker to SELECTED_DIR, to distinguish them from new partitions. When a log directory is offline, it is important that the broker does not replace the offline partitions in the remaining online directories. So, if some directory is offline, and some partition is missing the directory assignment, then it is important to distinguish new partitions from old ones. Old partitions may already exist in the offline dirs, but new partitions can safely be placed in the available (online) dirs. In ZK mode, the `isNew` flag in the LeaderAndIsr request serves this purpose. And for KRaft this KIP proposes keeping the broker in fenced mode until all initial assignments are known. But this additional UUID serves as an additional signal and covers a gap in the ZK->KRaft migration stage, where ZK brokers do not support fencing state. SELECTED_DIR is always a temporary transition state, which is due to be resolved by the broker. When catching up with metadata, for any partitions associated with SELECTED_DIR: * If the partition is found in some directory, AssignReplicasToDirs is used to correct the assignment to the actual dir. * If the partition is not found, and no directory is offline, a directory is selected, and AssignReplicasToDirs is used to correct the assignment to the chosen directory. * If the partition is not found and some directory is offline, the broker assumes that the partition must be in one of the offline dirs and AssignReplicasToDirs is used to converge the state to OFFLINE_DIR. This contrasts with UNKNOWN_DIR, for which brokers always select a directory, regardless of the online/offline state of any log dirs. ## Reserving a pool of non-designated UUIDs for future use It’s probably a good idea to reserve a bunch of UUIDs for future use in directory assignments. The decision is essentially costless right now, and it may prove to be useful in the future. The first 100 UUIDs (including the 3 already designated above) will be reserved for future use. ## Dir failures during ZK->KRaft migration The KRaft controller ZK compatibility controller functionality does not currently implement dir failure handling. So we need to select a strategy to deal with dir failures during the migration. We discussed different options: a) If a migrating ZK mode broker encounters a directory failure, it will shutdown. While this degrades failure handling during, the temporary migration window, it is a useful simplification. This is an attractive option, and it isn't ruled out, but it is also not clear that it is necessary at this point. b) Extending the ZK Controller compatibility functionality in KRaft controllers to watch the /log_dir_event_notification znode, and rely on LeaderAndIsr requests for dir failure handling, same as ZK Controllers do. As there is a desire to limit the scope of the compatibility functionality, this option looks less attractive. c) Extend the ZK mode broker functionality during the migration to both send AssignReplicasToDirs populating the assignment state earlier, and propagate dir failures in the heartbeat in the same way the KIP proposes regular KRaft brokers do as well. There are several phases in the ZK->KRaft migration, and part of the process requires ZK mode brokers to send BrokerRegistration and BrokerHeartbeat requests already, so this doesn't look like a big change, and seems to be the preferred option. If you have any thoughts or questions on any of these matters, please let me know. Best, -- Igor
Re: [VOTE] KIP-858: Handle JBOD broker disk failure in KRaft
Hi everyone, After a conversation with Colin McCabe and Proven Provenzano yesterday, we decided that the benefits outweigh the concerns with the overhead of associating a directory UUID to every replica in the metadata partition records. i.e. We prefer to always associate the log dir UUID even when only one log dir is configured in the broker. This reduces complexity in several ways: * We no longer need to distinguish between JBOD and non JBOD modes when changing or when interpreting partition metadata. * Determining whether a replica is online through the metadata cache no longer depends on the number of registered log directories. * We can get rid of edge cases in the transition in or out of multi log dir configuration, where the Controller would have to update a lot of replica assignments. * The OfflineLogDirs field in the broker registration is no longer necessary. So I'm updating the KIP with the following changes: - "Directory" is no longer a tagged field in PartitionRecord and PartitionChangeRecord - "OfflineLogDirs" is removed from BrokerRegistrationRequest, RegisterBrokerRecord and BrokerRegistrationChangeRecord - "OnlineLogDirs" is renamed to "LogDirs" in BrokerRegistrationRequest, RegisterBrokerRecord and BrokerRegistrationChangeRecord -- Igor
[jira] [Created] (KAFKA-15514) Controller-side replica management changes
Igor Soarez created KAFKA-15514: --- Summary: Controller-side replica management changes Key: KAFKA-15514 URL: https://issues.apache.org/jira/browse/KAFKA-15514 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez The new "Assignments" field replaces the "Replicas" field in PartitionRecord and PartitionChangeRecord. On the controller side, any changes to partitions need to consider both fields. * ISR updates * Partiton reassignments & reverts * Partition creation -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [VOTE] KIP-858: Handle JBOD broker disk failure in KRaft
Hi Ron, I think we can generalize the deconfigured directory scenario in your last question to address this situation too. When handling a broker registration request, the controller can check if OfflineLogDirs=false and any UUIDs are missing in OnlineLogDirs, compared with the previous registration, and assign replicas in the missing dirs to Uuid.OfflineDir. -- Igor
Re: [VOTE] KIP-858: Handle JBOD broker disk failure in KRaft
Hi everyone, I believe we can close this voting thread now, as there were three +1 binding votes from Ziming, Mickael and Ron. With that, this vote passes. Thanks to everyone who participated in reviewing, and/or taking the time to vote on this KIP! Best, -- Igor
Re: [VOTE] KIP-858: Handle JBOD broker disk failure in KRaft
Hi Ron, Thanks for pointing this out. I was assuming this case was already handled in current KRaft operation with a single log directory — I wouldn't expect a broker restarting with an empty disk to cause data loss in a current KRaft system. But your question made me go look again, so here's my understanding: When unclean leader election (ULE) is disabled (by default), a broker can only be elected as a leader if both of these are true: * It is part of the ISR - see PartitionChangeBuilder.isValidNewLeader(r) * It is either + Not fenced and not in a controlled shutdown; or + Currently being unfenced Unless of course unclean leader election is enabled, in which case yes it looks like you could have full data loss. But I don't think we want to address this case. If the broker is coming back with an empty disk, it would still need to join the ISR to become the leader. So as far as I can tell, this doesn't seem to be a problem. If we do always add the assignment, even for single log dir uses, there could potentially be some smarter and safer choices we can do in the broker, but AFAICT this doesn't seem to be a hard requirement right now. And as Colin pointed out before, addding the assignment even when only a single dir is configured may unecessarily penalize non JBOD usecases. Does this make sense? Did I understand your point correctly? WDYT? -- Igor On Fri, Sep 22, 2023, at 2:44 PM, Ron Dagostino wrote: > Hi Igor. Someone just asked about the case where a broker with a > single log directory restarts with a blank disk. I looked at the > "Metadata caching" section, and I don't think it covers it as > currently written. The PartitionRecord will not have an explicit UUID > for brokers that have just a single log directory (the idea was to > save space), but the UUID is inferred to be the UUID associated with > the single log directory that the broker had registered. Assume a > broker restarts with an empty disk. That disk will get a new UUID, > and upon broker registration the controller will see the UUID mismatch > between what the broker is presenting now and what it had presented > the last time it registered. So we need to deal with this possibility > even for the case where a broker has a single log directory. WDYT? > > Ron > > On Tue, Sep 19, 2023 at 10:04 AM Ron Dagostino wrote: > > > > Ok, great, that makes sense, Igor. Thanks. +1 (binding) on the KIP from > > me. > > > > Ron > > > > > On Sep 13, 2023, at 11:58 AM, Igor Soarez > > > wrote: > > > > > > Hi Ron, > > > > > > Thanks for drilling down on this. I think the KIP isn't really clear here, > > > and the metadata caching section you quoted needs clarification. > > > > > > The "hosting broker's latest registration" refers to the previous, > > > not the current registration. The registrations are only compared by > > > the controller, when handling the broker registration request. > > > > > > Suppose broker b1 hosts two partitions, t-1 and t-2, in two > > > directories, d1 and d2. The broker is registered, and the > > > metadata correlates the replicas to their respective directories. > > > i.e. OnlineLogDirs=[d1,d2] and OfflineLogDirs=false > > > > > > The broker is then reconfigured to remove t-2 from log.dirs, and at > > > startup, > > > the registration request shows OnlineLogDirs=[d1] and > > > OfflineLogDirs=false. > > > The previous registration will only be replaced after a new successful > > > registration, regardless of how quickly or how often b1 restarts. > > > The controller compares the previous registration, and notices > > > that one of the directories has been removed. > > > So for any replica hosted in the broker that is assigned to that > > > missing log directory, a logical metadata update takes place > > > that assigned them to Uuid.OfflineDir, so Assignment.Directory > > > is updated for t-2. This value is indicates that the replica > > > is offline — I have updated the section you quoted to address this. > > > > > > Once the broker catches up with metadata, it will select the only > > > configured log directory — d1 — for any partitions assigned to > > > Uuid.OfflineDir, and update the assignment. > > > > > > Best, > > > > > > -- > > > Igor > > > > > > > > > >
Re: [VOTE] KIP-858: Handle JBOD broker disk failure in KRaft
Hi Ron, Thanks for drilling down on this. I think the KIP isn't really clear here, and the metadata caching section you quoted needs clarification. The "hosting broker's latest registration" refers to the previous, not the current registration. The registrations are only compared by the controller, when handling the broker registration request. Suppose broker b1 hosts two partitions, t-1 and t-2, in two directories, d1 and d2. The broker is registered, and the metadata correlates the replicas to their respective directories. i.e. OnlineLogDirs=[d1,d2] and OfflineLogDirs=false The broker is then reconfigured to remove t-2 from log.dirs, and at startup, the registration request shows OnlineLogDirs=[d1] and OfflineLogDirs=false. The previous registration will only be replaced after a new successful registration, regardless of how quickly or how often b1 restarts. The controller compares the previous registration, and notices that one of the directories has been removed. So for any replica hosted in the broker that is assigned to that missing log directory, a logical metadata update takes place that assigned them to Uuid.OfflineDir, so Assignment.Directory is updated for t-2. This value is indicates that the replica is offline — I have updated the section you quoted to address this. Once the broker catches up with metadata, it will select the only configured log directory — d1 — for any partitions assigned to Uuid.OfflineDir, and update the assignment. Best, -- Igor
Re: [VOTE] KIP-858: Handle JBOD broker disk failure in KRaft
Hi Ron, Thank you for having a look a this KIP. Indeed, the log directory UUID should always be generated and loaded. I've have corrected the wording in the KIP to clarify. It is a bit of a pain to replace the field, but I agree that is the best approach for the same reason you pointed out. I have updated the log.dir.failure.timeout.ms config documentation to make it clear that it only applies when there are partitions being led from the failed directory. Your understanding is correct regarding the snapshot result after the logical update when a broker transitions to multiple log directories. I have updated the KIP to clarify that. > I wonder about the corner case where a broker that previously > had multiple log dirs is restarted with a new config that specifies > just a single log directory. What would happen here? If the broker > were not the leader then perhaps it would replicate the data into the > single log directory. What would happen if it were the leader of a > partition that had been marked as offline? Would we have data loss > even if other replicas still had data? There would be no data loss. After the configuration change, the broker would register indicating a single log directory and OfflineLogDirs==false. This indicates to the controller that any replicas in this broker that referenced a different (and non null / default) log directory, require a leadership update, that would prevent this broker from become a leader for those partitions. Those partitions are then created by the broker into the single configured log directory, and streamed from the new leaders. Does this make sense? Thanks, -- Igor
Re: [VOTE] KIP-858: Handle JBOD broker disk failure in KRaft
Hi Ziming, Thank you for having a look and taking the time to vote. I have already opened some PRs, see: https://issues.apache.org/jira/browse/KAFKA-14127 Best, -- Igor
[jira] [Created] (KAFKA-15451) Include offline dirs in BrokerHeartbeatRequest
Igor Soarez created KAFKA-15451: --- Summary: Include offline dirs in BrokerHeartbeatRequest Key: KAFKA-15451 URL: https://issues.apache.org/jira/browse/KAFKA-15451 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez Assignee: Igor Soarez -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15426) Process and persist directory assignments
Igor Soarez created KAFKA-15426: --- Summary: Process and persist directory assignments Key: KAFKA-15426 URL: https://issues.apache.org/jira/browse/KAFKA-15426 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez * Handle AssignReplicasToDirsRequest * Persist metadata changes -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15368) Test ZK JBOD to KRaft migration
Igor Soarez created KAFKA-15368: --- Summary: Test ZK JBOD to KRaft migration Key: KAFKA-15368 URL: https://issues.apache.org/jira/browse/KAFKA-15368 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez A ZK cluster running JBOD should be able to migrate to KRaft mode without issues -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15367) Test KRaft JBOD enabling migration
Igor Soarez created KAFKA-15367: --- Summary: Test KRaft JBOD enabling migration Key: KAFKA-15367 URL: https://issues.apache.org/jira/browse/KAFKA-15367 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez A cluster running in KRaft without JBOD should be able to transition into JBOD mode without issues -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15366) Log directory failure integration test
Igor Soarez created KAFKA-15366: --- Summary: Log directory failure integration test Key: KAFKA-15366 URL: https://issues.apache.org/jira/browse/KAFKA-15366 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15365) Replica management changes
Igor Soarez created KAFKA-15365: --- Summary: Replica management changes Key: KAFKA-15365 URL: https://issues.apache.org/jira/browse/KAFKA-15365 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15364) Handle log directory failure in the Controller
Igor Soarez created KAFKA-15364: --- Summary: Handle log directory failure in the Controller Key: KAFKA-15364 URL: https://issues.apache.org/jira/browse/KAFKA-15364 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15363) Broker log directory failure changes
Igor Soarez created KAFKA-15363: --- Summary: Broker log directory failure changes Key: KAFKA-15363 URL: https://issues.apache.org/jira/browse/KAFKA-15363 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15362) Resolve offline replicas in metadata cache
Igor Soarez created KAFKA-15362: --- Summary: Resolve offline replicas in metadata cache Key: KAFKA-15362 URL: https://issues.apache.org/jira/browse/KAFKA-15362 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez Considering broker's offline log directories and replica to dir assignments -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15361) Process and persist dir info with broker registration
Igor Soarez created KAFKA-15361: --- Summary: Process and persist dir info with broker registration Key: KAFKA-15361 URL: https://issues.apache.org/jira/browse/KAFKA-15361 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez Controllers should process and persist directory information from the broker registration request -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15360) Include directory info in BrokerRegistration
Igor Soarez created KAFKA-15360: --- Summary: Include directory info in BrokerRegistration Key: KAFKA-15360 URL: https://issues.apache.org/jira/browse/KAFKA-15360 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez Assignee: Igor Soarez Brokers should correctly set the OnlineLogDirs and OfflineLogDirs fields in each BrokerRegistrationRequest. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15359) log.dir.failure.timeout.ms configuration
Igor Soarez created KAFKA-15359: --- Summary: log.dir.failure.timeout.ms configuration Key: KAFKA-15359 URL: https://issues.apache.org/jira/browse/KAFKA-15359 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez If the Broker repeatedly cannot communicate fails to communicate a log directory failure after a configurable amount of time — {{log.dir.failure.timeout.ms}} — and it is the leader for any replicas in the failed log directory the broker will shutdown, as that is the only other way to guarantee that the controller will elect a new leader for those partitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15358) QueuedReplicaToDirAssignments metric
Igor Soarez created KAFKA-15358: --- Summary: QueuedReplicaToDirAssignments metric Key: KAFKA-15358 URL: https://issues.apache.org/jira/browse/KAFKA-15358 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez Assignee: Igor Soarez -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15357) Propagates assignments and logdir failures to controller
Igor Soarez created KAFKA-15357: --- Summary: Propagates assignments and logdir failures to controller Key: KAFKA-15357 URL: https://issues.apache.org/jira/browse/KAFKA-15357 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez Assignee: Igor Soarez LogDirEventManager accumulates, batches, and sends assignments or failure events to the Controller, prioritizing assignments to ensure the Controller has the correct assignment view before processing log dir failures. Assignments are sent via AssignReplicasToDirs, logdir failures are sent via BrokerHeartbeat. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15356) Generate and persist log directory UUIDs
Igor Soarez created KAFKA-15356: --- Summary: Generate and persist log directory UUIDs Key: KAFKA-15356 URL: https://issues.apache.org/jira/browse/KAFKA-15356 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez Assignee: Igor Soarez * The StorageTool format command should ensure each dir has a generated UUID * BrokerMetadataCheckpoint should parse directory.id from meta.properties, or generate and persist if missing -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15355) Update metadata records
Igor Soarez created KAFKA-15355: --- Summary: Update metadata records Key: KAFKA-15355 URL: https://issues.apache.org/jira/browse/KAFKA-15355 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez Includes: * BrokerRegistrationChangeRecord * PartitionChangeRecord * PartitionRecord * RegisterBrokerRecord -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [VOTE] KIP-858: Handle JBOD broker disk failure in KRaft
Hi Mickael, Thanks for voting, and for pointing out the mistake. I've corrected it in the KIP now. The proposed name is "QueuedReplicaToDirAssignments". Best, -- Igor
Re: [VOTE] KIP-858: Handle JBOD broker disk failure in KRaft
Hi Ismael, I believe I have addressed all concerns. Please have a look, and consider a vote on this KIP. Thank you, -- Igor
Re: [VOTE] KIP-858: Handle JBOD broker disk failure in KRaft
Hi everyone, Following a face-to-face discussion with Ron and Colin, I have just made further improvements to this KIP: 1. Every log directory gets a random UUID assigned, even if just one log dir is configured in the Broker. 2. All online log directories are registered, even if just one if configured. 3. Partition-to-directory assignments are only performed if more than one log directory is configured/registered. 4. A successful reply from the Controller to a AssignReplicasToDirsRequest is taken as a guarantee that the metadata changes are successfully persisted. 5. Replica assignments that refer log directories pending a failure notification are prioritized to guarantee the Controller and Broker agree on the assignments before acting on the failure notification. 6. The transition from one log directory to multiple log directories relies on a logical update to efficiently update directory assignments to the previously registered single log directory when that's possible. I have also introduced a configuration for the maximum time the broker will keep trying to send a log directory notification before shutting down. https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft Best, -- Igor
Re: [VOTE] KIP-858: Handle JBOD broker disk failure in KRaft
Hi Colin, Thanks for your questions. Please have a look at my answers below. > In the previous email I asked, "who is responsible for assigning replicas to > broker directories?" Can you clarify what the answer is to that? If the > answer is the controller, there is no need for an "unknown" state for > assignments, since the controller can simply choose an assignment immediately > when it creates a replica. Apologies, I thought I had made this clear in my previous email and in the KIP. It is the Broker who is responsible for assigning replicas to log directories. > The broker has access to the same metadata, of course, and already knows what > directory a replica is supposed to be in. If that directory doesn't exist, > then the replica is offline. There is no need to modify replicas to have an > "unknown" assignment state. You are correct, we can avoid this intermediate metadata update by the Controller. The Controller doesn't have to reset assignments to Uuid.ZERO, instead the Broker can just select a new directory for replicas assigned to other UUIDs when there are no offline dirs. I have updated the KIP with this change. However, I still think we need reserved UUIDs. a) If there are any offline log directories the broker cannot distinguish between replicas that are assigned to offline dirs, vs replicas that are assigned to deconfigured directories and need new placement. Since new replicas are created with UUID.Zero, it is safe for the broker to select a directory for them, even if some are offline. This replaces the use of the `isNew` flag in ZK mode. b) As described in the KIP, under "Handling log directory failures", in a race between syncing assignments and failing log directories, the Broker might exceptionally need to convey to the Controller that some replica that was in the failed directory had an incorrect assignment in the metadata, and the Broker can do that with a AssignReplicasToDirs using a reserved UUID value. b) During a Migration from a ZK JBOD cluster, the Controller can enforce fencing of a Broker until all replica assignments are known (i.e. not Uuid.ZERO). I've updated the KIP to name our use of Uuid.ZERO as Uuid.UnknownDir, and I've introduced Uuid.OfflineDir to deal with case b). Let me know what you think. Best, -- Igor
Re: [VOTE] KIP-858: Handle JBOD broker disk failure in KRaft
Hi Colin, Thanks for your support with getting this over the line and that’s great re the preliminary pass! Thanks also for sharing your thoughts, I've had a careful look at each of these and sharing my comments below. I agree, it is important to avoid a perf hit on non-JBOD. I've opted for tagged fields in: - Assignment.Directory in PartitionRecord and PartitionChangeRecord - OnlineLogDirs in RegisterBrokerRecord, BrokerRegistrationChangeRecord and BrokerRegistrationRequest - OfflineLogDirs in BrokerHeartbeatRequest I don't think we should use UUID.Zero to refer to the first log directory because this value also indicates "unknown, or no log dir yet assigned". We can work with the default value by gating on JBOD configuration — determined by log.dirs (Broker side) and BrokerRegistration (Controller side). In non-JBOD: - The single logdir won't have a UUID - BrokerRegistration doesn't list any log dir UUIDs - AssignReplicasToDirs is never used Directory reassignment will work the same way as in ZK mode, but with the difference that the promotion of the future replica requires an AssignReplicasToDirs request to update the assignment. I've tried to improve the description of this operation and included a diagram to illustrate it. I've renamed LogDirsOfflined to OfflineLogDirs in the BrokerHeartbeatRequest. This field was named differently because it's only used for log directories that have become offline but are not yet represented as offline in the metadata, from the Broker's point of view — as opposed to always listing the full set of offline log dirs. I don't think we should identify log directories using system paths, because those may be arbitrary. A set of storage devices may be mounted and re-mounted on the same set of system paths using a different order every time. Kafka only cares about the content, not the location of the log directories. I think I have overcomplicated this by trying to identify offline log directories. In ZK mode we don't care to do this, and we shouldn't do it in KRaft either. What we need to know is if there are any offline log directories, to prevent re-streaming the offline replicas into the remaining online log dirs. In ZK mode, the 'isNew' flag is used to prevent the Broker from creating partitions when any logdir is offline unless they're new. In KRaft the Controller can reset the assignment to UUID.Zero for replicas in log dirs not listed in the broker registration only when the broker registration indicates no offline log dirs. So I've updated the KIP to: - Remove directory.ids from meta.properties - Change OfflineLogDirs in RegisterBrokerRecord, BrokerRegistrationChangeRecord and BrokerRegistrationRequest to a boolean - Describe this behavior in the Controller in Broker It’s been a lot of work to get here and I’m similarly excited to get this released soon! The vote has been open over the last week and I'm happy to give it another two to get any other thoughts without rushing. Thanks again for the support and input! Best, -- Igor
Re: [ANNOUNCE] New committer: Divij Vaidya
Congratulations Divij! -- Igor
[VOTE] KIP-858: Handle JBOD broker disk failure in KRaft
Hi everyone, We're getting closer to dropping ZooKeeper support, and JBOD in KRaft mode is one of the outstanding big missing features. It's been a while since there was new feedback on KIP-858 [1] which aims to address this gap, so I'm calling for a vote. A huge thank you to everyone who has reviewed this KIP, and participated in the discussion thread! [2] I'd also like to thank you in advance for taking the time to vote. Best, -- Igor [1] https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft [2] https://lists.apache.org/thread/8dqvfhzcyy87zyy12837pxx9lgsdhvft
Re: [VOTE] KIP-938: Add more metrics for measuring KRaft performance
Thanks for the KIP. Seems straightforward, LGTM. Non binding +1. -- Igor
Re: [DISCUSS] KIP-858: Handle JBOD broker disk failure in KRaft
Hi all, We just had a video call to discuss this KIP and I just wanted update this thread with a note on the meeting. Attendees: - Igor - Christo - Divij - Colt Items discussed: - Context, motivation and overview of the proposal. - How log directories are identified by each Broker. - How old vs new log directories are dealt with by a restarting Broker. - TLA+ specification, and its role in ironing out edge cases. - Edge cases in the synchronization of assignments, their consequences and how they are currently addressed in the proposal. - Consequences and benefits of being able to move partitions in offline Brokers. Some questions were raised but left unanswered: - Can partition epochs be useful in ruling out edge cases in the synchronization of assignments between Broker and Controller? - How to attract more feedback from committers for this proposal? - Should this be a recurring meeting? Best, -- Igor
Re: [DISCUSS] KIP-928: Making Kafka resilient to log directories becoming full
Hi Christo, Thank you for the KIP. Kafka is very sensitive to filesystem errors, and at the first IO error the whole log directory is permanently considered offline. It seems your proposal aims to increase the robustness of Kafka, and that's a positive improvement. I have some questions: 11. "Instead of sending a delete topic request only to replicas we know to be online, we will allow a delete topic request to be sent to all replicas regardless of their state. Previously a controller did not send delete topic requests to brokers because it knew they would fail. In the future, topic deletions for saturated topics will succeed, but topic deletions for the offline scenario will continue to fail." It seems you're describing ZK mode behavior? In KRaft mode the Controller does not send requests to Brokers. Instead the Controller persists new metadata records which all online Brokers then fetch. Since it's too late to be proposing design changes for ZK mode, is this change necessary? Is there a difference in how the metadata records should be processed by Brokers? 12. "We will add a new state to the broker state machines of a log directory (saturated) and a partition replica (saturated)." How are log directories and partitions replicas in these states represented in the Admin API? e.g. `DescribeReplicaLogDirs` 13. Should there be any metrics indicating the new saturated state for log directories and replicas? 14. "If an IOException due to No space left on device is raised (we will check the remaining space at that point in time rather than the exception message) the broker will stop all operations on logs located in that directory, remove all fetchers and stop compaction. Retention will continue to be respected. The same node as the current state will be written to in Zookeeper. All other IOExceptions will continue to be treated the same way they are treated now and will result in a log directory going offline." Does a log directory in this "saturated" state transition back to online if more storage space becomes available, e.g. due to retention policy enforcement or due to topic deletion, or does the Broker still require a restart to bring the log directory back to full operation? Best, -- Igor
Re: [DISCUSS] KIP-858: Handle JBOD broker disk failure in KRaft
Hi all, I have created a TLA+ specification for this KIP, available here: https://github.com/soarez/kafka/blob/kip-858-tla-plus/tla/Kip858.tla If there are no further comments I'll start a voting thread next week. -- Igor
Re: [DISCUSS] KIP-858: Handle JBOD broker disk failure in KRaft
Hi Alexandre, Thank you for having a look at this KIP, and thank you for pointing this out. I like the idea of expanding the health status of a log directory beyond just online/offline status. This KIP currently proposes a single logdir state transition, from online to offline, conveyed in a list of logdir UUIDs sent in the new field `LogDirsOfflined` as part of the broker heartbeat request. It's nice that the request schema itself doesn't allow for a heartbeat request to convey a state transition for a logdir from offline to online, as that transition is not (currently) valid, as brokers need to be restarted for logdirs to be allowed to come back online. We could make changes now to accommodate for further logdir states in the future, by instead conveying new state and logdir pairs in the heartbeat request, for any logdir which had a state change. But right now, that would look a bit strange since there's only one state we'd allow to represented in the request – offline. Since creating new request versions/schemas is relatively easy now, and since this logdir QoS feature would merit a KIP anyway, I'm a bit more inclined to keep things simple for now. Best, -- Igor