[GitHub] [kafka] vcrfxia opened a new pull request, #13554: KAFKA-14834: [7/N] Update VersionedKeyValueStore#put() to return boolean
vcrfxia opened a new pull request, #13554: URL: https://github.com/apache/kafka/pull/13554 This PR updates the return type of `VersionedKeyValueStore#put(...)` from void to boolean, where the boolean represents whether the record that was put is the latest record for the particular key. As part of making this change, VersionedBytesStore introduces its own `put(key, value, timestamp)` method to avoid method signature conflicts with the existing `put(key, value)` method from `KeyValueStore` which has void return type. As a result, the previously added `NullableValueAndTimestampSerde` class is no longer needed so it's also been removed in this PR as cleanup. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14883) Broker state should be "observer" in KRaft quorum
[ https://issues.apache.org/jira/browse/KAFKA-14883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-14883. --- Fix Version/s: 3.5.0 Resolution: Fixed > Broker state should be "observer" in KRaft quorum > - > > Key: KAFKA-14883 > URL: https://issues.apache.org/jira/browse/KAFKA-14883 > Project: Kafka > Issue Type: Improvement > Components: kraft, metrics >Affects Versions: 3.4.0 >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Major > Fix For: 3.5.0 > > > Currently, the `current-state` KRaft related metric reports `follower` state > for a broker while technically it should be reported as an `observer` as the > `kafka-metadata-quorum` tool does. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-9550) RemoteLogManager - copying eligible log segments to remote storage implementation
[ https://issues.apache.org/jira/browse/KAFKA-9550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-9550. -- Resolution: Fixed > RemoteLogManager - copying eligible log segments to remote storage > implementation > -- > > Key: KAFKA-9550 > URL: https://issues.apache.org/jira/browse/KAFKA-9550 > Project: Kafka > Issue Type: Sub-task >Reporter: Satish Duggana >Assignee: Satish Duggana >Priority: Major > Fix For: 3.5.0 > > > Implementation of RLM as mentioned in the HLD section of KIP-405, this JIRA > covers copying segments to remote storage. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP-405:KafkaTieredStorage-High-leveldesign] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon merged pull request #13525: KAFKA-14883: Expose `observer` state in KRaft metrics
showuon merged PR #13525: URL: https://github.com/apache/kafka/pull/13525 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13525: KAFKA-14883: Expose `observer` state in KRaft metrics
showuon commented on PR #13525: URL: https://github.com/apache/kafka/pull/13525#issuecomment-1506342972 Failed tests are unrelated ``` Build / JDK 8 and Scala 2.12 / kafka.security.authorizer.AuthorizerTest.testAllowAllAccess(String).quorum=kraft Build / JDK 8 and Scala 2.12 / org.apache.kafka.controller.QuorumControllerTest.testDelayedConfigurationOperations() Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration Build / JDK 17 and Scala 2.13 / kafka.admin.AddPartitionsTest.testWrongReplicaCount(String).quorum=zk Build / JDK 17 and Scala 2.13 / kafka.security.authorizer.AuthorizerTest.testAuthorizeWithPrefixedResource(String).quorum=kraft Build / JDK 17 and Scala 2.13 / kafka.security.authorizer.AuthorizerTest.testNoAclFound(String).quorum=kraft Build / JDK 17 and Scala 2.13 / kafka.security.authorizer.AuthorizerTest.testDeleteAclOnPrefixedResource(String).quorum=kraft ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13525: KAFKA-14883: Expose `observer` state in KRaft metrics
showuon commented on code in PR #13525: URL: https://github.com/apache/kafka/pull/13525#discussion_r1164996982 ## raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java: ## @@ -137,14 +137,14 @@ public void shouldRecordNonVoterQuorumState() throws IOException { state.initialize(new OffsetAndEpoch(0L, 0)); raftMetrics = new KafkaRaftMetrics(metrics, "raft", state); -assertEquals("unattached", getMetric(metrics, "current-state").metricValue()); +assertEquals("observer", getMetric(metrics, "current-state").metricValue()); assertEquals((double) -1L, getMetric(metrics, "current-leader").metricValue()); assertEquals((double) -1L, getMetric(metrics, "current-vote").metricValue()); assertEquals((double) 0, getMetric(metrics, "current-epoch").metricValue()); assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); state.transitionToFollower(2, 1); -assertEquals("follower", getMetric(metrics, "current-state").metricValue()); +assertEquals("observer", getMetric(metrics, "current-state").metricValue()); Review Comment: Thanks, @dengziming ! :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes
vcrfxia commented on code in PR #13552: URL: https://github.com/apache/kafka/pull/13552#discussion_r1164980568 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java: ## @@ -705,12 +711,304 @@ public void shouldNotOptimizeJoinWhenNotInConfig() { assertEquals(count.get(), 2); } +@Test +public void shouldSetUseVersionedSemanticsOnTableFilter() { +// Given: +final MaterializedInternal> materializedInternal = +new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("store", Duration.ofMinutes(5))), builder, storePrefix); +final KTable table1 = builder.table("t1", consumed, materializedInternal); +table1.filter((k, v) -> v != null); + +// When: +builder.buildAndOptimizeTopology(); + +// Then: +final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>()); +assertNotNull(filter); +verifyVersionedSemantics((TableFilterNode) filter, true); +} + +@Test +public void shouldSetUseVersionedSemanticsWithIntermediateNode() { +// Given: +final MaterializedInternal> versionedMaterialize = +new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix); +final KTable table1 = builder.table("t1", consumed, versionedMaterialize); +final KTable table2 = table1.mapValues(v -> v != null ? v + v : null); +table2.filter((k, v) -> v != null); + +// When: +builder.buildAndOptimizeTopology(); + +// Then: +final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>()); +assertNotNull(filter); +verifyVersionedSemantics((TableFilterNode) filter, true); +} + +@Test +public void shouldNotSetUseVersionedSemanticsWithMaterializedIntermediateNode() { +// Given: +final MaterializedInternal> versionedMaterialize = +new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix); +final MaterializedInternal> unversionedMaterialize = +new MaterializedInternal<>(Materialized.as("unversioned"), builder, storePrefix); +final KTable table1 = builder.table("t1", consumed, versionedMaterialize); +final KTable table2 = table1.mapValues(v -> v != null ? v + v : null, unversionedMaterialize); +table2.filter((k, v) -> v != null); + +// When: +builder.buildAndOptimizeTopology(); + +// Then: +final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>()); +assertNotNull(filter); +verifyVersionedSemantics((TableFilterNode) filter, false); +} + +@Test +public void shouldSetUseVersionedSemanticsWithIntermediateNodeMaterializedAsVersioned() { +// Given: +final MaterializedInternal> versionedMaterialize = +new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix); +final MaterializedInternal> versionedMaterialize2 = +new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix); +final KTable table1 = builder.table("t1", consumed, versionedMaterialize); +final KTable table2 = table1.mapValues(v -> v != null ? v + v : null, versionedMaterialize2); +table2.filter((k, v) -> v != null); + +// When: +builder.buildAndOptimizeTopology(); + +// Then: +final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>()); +assertNotNull(filter); +verifyVersionedSemantics((TableFilterNode) filter, true); +} + +@Test +public void shouldNotSetUseVersionedSemanticsWithIntermediateAggregation() { +// Given: +final MaterializedInternal> versionedMaterialize = +new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix); +final KTable table1 = builder.table("t1", consumed, versionedMaterialize); +final KTable table2 = table1.groupBy(KeyValue::new).count(); +table2.filter((k, v) -> v != null); + +// When: +builder.buildAndOptimizeTopology(); + +// Then: +final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>()); +assertNotNull(filter); +verifyVersionedSemantics((TableFilterNode) filter, false); +} + +@Test +public void shouldSetUseVersionedSemanticsWithIntermediateAggregationMaterializedAsVersioned() { +
[jira] [Assigned] (KAFKA-14901) Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
[ https://issues.apache.org/jira/browse/KAFKA-14901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] li xiangyuan reassigned KAFKA-14901: Assignee: (was: li xiangyuan) > Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration > > > Key: KAFKA-14901 > URL: https://issues.apache.org/jira/browse/KAFKA-14901 > Project: Kafka > Issue Type: Test > Components: KafkaConnect >Reporter: Greg Harris >Priority: Blocker > Labels: flaky-test > Fix For: 3.5.0 > > Attachments: transaction-flake-trace-0.out, transaction-flake.out > > > The EOS Source test appears to be very rarely failing (<5% chance) with the > following error: > {noformat} > org.apache.kafka.common.KafkaException: Unexpected error in > InitProducerIdResponse; The server experienced an unexpected error when > processing the request. > at > app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303) > at > app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207) > at > app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) > at > app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594) > at > app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586) > at > app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426) > at > app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316) > at > app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) > at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829){noformat} > which appears to be triggered by the following failure inside the broker: > {noformat} > [2023-04-12 14:01:38,931] ERROR [KafkaApi-0] Unexpected error handling > request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, > clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, > headerVersion=2) -- > InitProducerIdRequestData(transactionalId='exactly-once-source-integration-test-exactlyOnceQuestionMark-1', > transactionTimeoutMs=6, producerId=-1, producerEpoch=-1) with context > RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, > clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, > headerVersion=2), connectionId='127.0.0.1:54213-127.0.0.1:54367-46', > clientAddress=/127.0.0.1, principal=User:ANONYMOUS, > listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, > clientInformation=ClientInformation(softwareName=apache-kafka-java, > softwareVersion=3.5.0-SNAPSHOT), fromPrivilegedListener=true, > principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@615924cd]) > (kafka.server.KafkaApis:76) > java.lang.IllegalStateException: Preparing transaction state transition to > Empty while it already a pending state Ongoing > at > kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:380) > at > kafka.coordinator.transaction.TransactionMetadata.prepareIncrementProducerEpoch(TransactionMetadata.scala:311) > at > kafka.coordinator.transaction.TransactionCoordinator.prepareInitProducerIdTransit(TransactionCoordinator.scala:240) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$3(TransactionCoordinator.scala:151) > at > kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:242) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$2(TransactionCoordinator.scala:150) > at scala.util.Either.flatMap(Either.scala:352) > at > kafka.coordinator.transaction.TransactionCoordinator.handleInitProducerId(TransactionCoordinator.scala:145) > at > kafka.server.KafkaApis.handleInitProducerIdRequest(KafkaApis.scala:2236) > at kafka.server.KafkaApis.handle(KafkaApis.scala:202) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76) > at java.base/java.lang.Thread.run(Thread.java:829{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14901) Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
[ https://issues.apache.org/jira/browse/KAFKA-14901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] li xiangyuan reassigned KAFKA-14901: Assignee: li xiangyuan > Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration > > > Key: KAFKA-14901 > URL: https://issues.apache.org/jira/browse/KAFKA-14901 > Project: Kafka > Issue Type: Test > Components: KafkaConnect >Reporter: Greg Harris >Assignee: li xiangyuan >Priority: Blocker > Labels: flaky-test > Fix For: 3.5.0 > > Attachments: transaction-flake-trace-0.out, transaction-flake.out > > > The EOS Source test appears to be very rarely failing (<5% chance) with the > following error: > {noformat} > org.apache.kafka.common.KafkaException: Unexpected error in > InitProducerIdResponse; The server experienced an unexpected error when > processing the request. > at > app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303) > at > app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207) > at > app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) > at > app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594) > at > app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586) > at > app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426) > at > app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316) > at > app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) > at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829){noformat} > which appears to be triggered by the following failure inside the broker: > {noformat} > [2023-04-12 14:01:38,931] ERROR [KafkaApi-0] Unexpected error handling > request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, > clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, > headerVersion=2) -- > InitProducerIdRequestData(transactionalId='exactly-once-source-integration-test-exactlyOnceQuestionMark-1', > transactionTimeoutMs=6, producerId=-1, producerEpoch=-1) with context > RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, > clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, > headerVersion=2), connectionId='127.0.0.1:54213-127.0.0.1:54367-46', > clientAddress=/127.0.0.1, principal=User:ANONYMOUS, > listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, > clientInformation=ClientInformation(softwareName=apache-kafka-java, > softwareVersion=3.5.0-SNAPSHOT), fromPrivilegedListener=true, > principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@615924cd]) > (kafka.server.KafkaApis:76) > java.lang.IllegalStateException: Preparing transaction state transition to > Empty while it already a pending state Ongoing > at > kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:380) > at > kafka.coordinator.transaction.TransactionMetadata.prepareIncrementProducerEpoch(TransactionMetadata.scala:311) > at > kafka.coordinator.transaction.TransactionCoordinator.prepareInitProducerIdTransit(TransactionCoordinator.scala:240) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$3(TransactionCoordinator.scala:151) > at > kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:242) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$2(TransactionCoordinator.scala:150) > at scala.util.Either.flatMap(Either.scala:352) > at > kafka.coordinator.transaction.TransactionCoordinator.handleInitProducerId(TransactionCoordinator.scala:145) > at > kafka.server.KafkaApis.handleInitProducerIdRequest(KafkaApis.scala:2236) > at kafka.server.KafkaApis.handle(KafkaApis.scala:202) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76) > at java.base/java.lang.Thread.run(Thread.java:829{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dengziming commented on a diff in pull request #13525: KAFKA-14883: Expose `observer` state in KRaft metrics
dengziming commented on code in PR #13525: URL: https://github.com/apache/kafka/pull/13525#discussion_r1164956809 ## raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java: ## @@ -137,14 +137,14 @@ public void shouldRecordNonVoterQuorumState() throws IOException { state.initialize(new OffsetAndEpoch(0L, 0)); raftMetrics = new KafkaRaftMetrics(metrics, "raft", state); -assertEquals("unattached", getMetric(metrics, "current-state").metricValue()); +assertEquals("observer", getMetric(metrics, "current-state").metricValue()); assertEquals((double) -1L, getMetric(metrics, "current-leader").metricValue()); assertEquals((double) -1L, getMetric(metrics, "current-vote").metricValue()); assertEquals((double) 0, getMetric(metrics, "current-epoch").metricValue()); assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); state.transitionToFollower(2, 1); -assertEquals("follower", getMetric(metrics, "current-state").metricValue()); +assertEquals("observer", getMetric(metrics, "current-state").metricValue()); Review Comment: Yes, `resigned` is another issue, in the PR we can try to distinguish `observer` and `follower` to match the terminology we use in public api. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes
mjsax commented on code in PR #13552: URL: https://github.com/apache/kafka/pull/13552#discussion_r1164930944 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java: ## @@ -705,12 +711,304 @@ public void shouldNotOptimizeJoinWhenNotInConfig() { assertEquals(count.get(), 2); } +@Test +public void shouldSetUseVersionedSemanticsOnTableFilter() { +// Given: +final MaterializedInternal> materializedInternal = +new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("store", Duration.ofMinutes(5))), builder, storePrefix); +final KTable table1 = builder.table("t1", consumed, materializedInternal); +table1.filter((k, v) -> v != null); + +// When: +builder.buildAndOptimizeTopology(); + +// Then: +final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>()); +assertNotNull(filter); +verifyVersionedSemantics((TableFilterNode) filter, true); +} + +@Test +public void shouldSetUseVersionedSemanticsWithIntermediateNode() { +// Given: +final MaterializedInternal> versionedMaterialize = +new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix); +final KTable table1 = builder.table("t1", consumed, versionedMaterialize); +final KTable table2 = table1.mapValues(v -> v != null ? v + v : null); +table2.filter((k, v) -> v != null); + +// When: +builder.buildAndOptimizeTopology(); + +// Then: +final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>()); +assertNotNull(filter); +verifyVersionedSemantics((TableFilterNode) filter, true); +} + +@Test +public void shouldNotSetUseVersionedSemanticsWithMaterializedIntermediateNode() { +// Given: +final MaterializedInternal> versionedMaterialize = +new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix); +final MaterializedInternal> unversionedMaterialize = +new MaterializedInternal<>(Materialized.as("unversioned"), builder, storePrefix); +final KTable table1 = builder.table("t1", consumed, versionedMaterialize); +final KTable table2 = table1.mapValues(v -> v != null ? v + v : null, unversionedMaterialize); +table2.filter((k, v) -> v != null); + +// When: +builder.buildAndOptimizeTopology(); + +// Then: +final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>()); +assertNotNull(filter); +verifyVersionedSemantics((TableFilterNode) filter, false); +} + +@Test +public void shouldSetUseVersionedSemanticsWithIntermediateNodeMaterializedAsVersioned() { +// Given: +final MaterializedInternal> versionedMaterialize = +new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix); +final MaterializedInternal> versionedMaterialize2 = +new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2", Duration.ofMinutes(5))), builder, storePrefix); +final KTable table1 = builder.table("t1", consumed, versionedMaterialize); +final KTable table2 = table1.mapValues(v -> v != null ? v + v : null, versionedMaterialize2); +table2.filter((k, v) -> v != null); + +// When: +builder.buildAndOptimizeTopology(); + +// Then: +final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>()); +assertNotNull(filter); +verifyVersionedSemantics((TableFilterNode) filter, true); +} + +@Test +public void shouldNotSetUseVersionedSemanticsWithIntermediateAggregation() { +// Given: +final MaterializedInternal> versionedMaterialize = +new MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned", Duration.ofMinutes(5))), builder, storePrefix); +final KTable table1 = builder.table("t1", consumed, versionedMaterialize); +final KTable table2 = table1.groupBy(KeyValue::new).count(); +table2.filter((k, v) -> v != null); + +// When: +builder.buildAndOptimizeTopology(); + +// Then: +final GraphNode filter = getNodeByType(builder.root, TableFilterNode.class, new HashSet<>()); +assertNotNull(filter); +verifyVersionedSemantics((TableFilterNode) filter, false); +} + +@Test +public void shouldSetUseVersionedSemanticsWithIntermediateAggregationMaterializedAsVersioned() { +
[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes
mjsax commented on code in PR #13552: URL: https://github.com/apache/kafka/pull/13552#discussion_r1164922362 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java: ## @@ -1294,4 +1307,11 @@ private KTable doJoinOnForeignKey(final KTable forei builder ); } + +private static void maybeSetOutputVersioned(final GraphNode tableNode, Review Comment: To avoid this helper, would it be worth to rewrite this class a little bit and ensure that `materializedInternal` is never `null`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes
mjsax commented on code in PR #13552: URL: https://github.com/apache/kafka/pull/13552#discussion_r1164920870 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java: ## @@ -1281,6 +1293,7 @@ private KTable doJoinOnForeignKey(final KTable forei ), resultStore ); +resultNode.setOutputVersioned(materializedInternal.storeSupplier() instanceof VersionedBytesStoreSupplier); Review Comment: Should we make `setOutputVersioned` part of `TableProcessorNode` constructor? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes
mjsax commented on code in PR #13552: URL: https://github.com/apache/kafka/pull/13552#discussion_r1164915423 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java: ## @@ -73,6 +74,7 @@ KTable build(final Map, Aggregator
[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes
mjsax commented on code in PR #13552: URL: https://github.com/apache/kafka/pull/13552#discussion_r1164913480 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java: ## @@ -236,13 +237,16 @@ public SessionWindowedKStream windowedBy(final SessionWindows windows) { private KTable doAggregate(final KStreamAggProcessorSupplier aggregateSupplier, final String functionName, final MaterializedInternal> materializedInternal) { +final boolean isOutputVersioned = materializedInternal != null Review Comment: as above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1164911085 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * Properties are :- + * + * 1) Each consumer must get at least one partition per topic that it is subscribed to whenever the number of consumers is + *less than or equal to the number of partitions for that topic. (Range) + * 2) Partitions should be assigned to consumers in a way that facilitates join operations where required. (Range) + *This can only be done if the topics are co-partitioned in the first place + *Co-partitioned:- + *Two streams are co-partitioned if the following conditions are met:- + * ->The keys must have the same schemas + * ->The topics involved must have the same number of partitions + * 3) Consumers should retain as much as their previous assignment as possible. (Sticky) + * + * + * + * The algorithm works mainly in 5 steps described below + * + * 1) Get a map of the consumersPerTopic created using the member subscriptions. + * 2) Get a list of consumers (potentiallyUnfilled) that have not met the minimum required quota for assignment AND + * get a list of sticky partitions that we want to retain in the new assignment. + * 3) Add consumers from potentiallyUnfilled to Unfilled if they haven't met the total required quota = minQuota + (if necessary) extraPartition + * 4) Get a list of available partitions by calculating the difference between total partitions and assigned sticky partitions + * 5) Iterate through unfilled consumers and assign partitions from available partitions + * + * + * + */ +public class ServerSideStickyRangeAssignor implements PartitionAssignor { + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +private static void putList(Map> map, K key, V value) { +List list = map.computeIfAbsent(key, k -> new ArrayList<>()); +list.add(value); +} + +private static void putSet(Map> map, K key, V value) { +Set set = map.computeIfAbsent(key, k -> new HashSet<>()); +set.add(value); +} + +static class Pair { +private final T first; +private final U second; + +public Pair(T first, U second) { +this.first = first; +this.second = second; +} + +public T getFirst() { +return first; +} + +public U getSecond() { +return second; +} + +@Override +public String toString() { +return "(" + first + ", " + second + ")"; +} +} + +// Returns a map of the list of consumers per Topic (keyed by topicId) +private Map> consumersPerTopic(AssignmentSpec assignmentSpec) { +Map> mapTopicsToConsumers = new HashMap<>(); +Map membersData = assignmentSpec.members; + +for (Map.Entry memberEntry : membersData.entrySet()) { +String memberId = memberEntry.getKey(); +AssignmentMemberSpec memberMetadata = memberEntry.getValue(); +Collection topics = memberMetadata.subscribedTopics; +for (Uuid topicId: topics) { +putList(mapTopicsToConsumers, topicId, memberId); +} +} +return mapTopicsToConsumers; +} + +private Map> getAvailablePartitionsPerTopic(AssignmentSpec assignmentSpec, Map> assignedStickyPartitionsPerTopic) { +Map> availablePartitionsPerTopic = new HashMap<>(); +Map topicsMetadata =
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1164908509 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * Properties are :- + * + * 1) Each consumer must get at least one partition per topic that it is subscribed to whenever the number of consumers is + *less than or equal to the number of partitions for that topic. (Range) + * 2) Partitions should be assigned to consumers in a way that facilitates join operations where required. (Range) + *This can only be done if the topics are co-partitioned in the first place + *Co-partitioned:- + *Two streams are co-partitioned if the following conditions are met:- + * ->The keys must have the same schemas + * ->The topics involved must have the same number of partitions + * 3) Consumers should retain as much as their previous assignment as possible. (Sticky) + * + * + * + * The algorithm works mainly in 5 steps described below + * + * 1) Get a map of the consumersPerTopic created using the member subscriptions. + * 2) Get a list of consumers (potentiallyUnfilled) that have not met the minimum required quota for assignment AND + * get a list of sticky partitions that we want to retain in the new assignment. + * 3) Add consumers from potentiallyUnfilled to Unfilled if they haven't met the total required quota = minQuota + (if necessary) extraPartition + * 4) Get a list of available partitions by calculating the difference between total partitions and assigned sticky partitions + * 5) Iterate through unfilled consumers and assign partitions from available partitions + * + * + * + */ +public class ServerSideStickyRangeAssignor implements PartitionAssignor { + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +private static void putList(Map> map, K key, V value) { +List list = map.computeIfAbsent(key, k -> new ArrayList<>()); +list.add(value); +} + +private static void putSet(Map> map, K key, V value) { +Set set = map.computeIfAbsent(key, k -> new HashSet<>()); +set.add(value); +} + +static class Pair { +private final T first; +private final U second; + +public Pair(T first, U second) { +this.first = first; +this.second = second; +} + +public T getFirst() { +return first; +} + +public U getSecond() { +return second; +} + +@Override +public String toString() { +return "(" + first + ", " + second + ")"; +} +} + +// Returns a map of the list of consumers per Topic (keyed by topicId) +private Map> consumersPerTopic(AssignmentSpec assignmentSpec) { +Map> mapTopicsToConsumers = new HashMap<>(); +Map membersData = assignmentSpec.members; + +for (Map.Entry memberEntry : membersData.entrySet()) { +String memberId = memberEntry.getKey(); +AssignmentMemberSpec memberMetadata = memberEntry.getValue(); +Collection topics = memberMetadata.subscribedTopics; +for (Uuid topicId: topics) { +putList(mapTopicsToConsumers, topicId, memberId); +} +} +return mapTopicsToConsumers; +} + +private Map> getAvailablePartitionsPerTopic(AssignmentSpec assignmentSpec, Map> assignedStickyPartitionsPerTopic) { +Map> availablePartitionsPerTopic = new HashMap<>(); +Map topicsMetadata =
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1164908245 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * Properties are :- + * + * 1) Each consumer must get at least one partition per topic that it is subscribed to whenever the number of consumers is + *less than or equal to the number of partitions for that topic. (Range) + * 2) Partitions should be assigned to consumers in a way that facilitates join operations where required. (Range) + *This can only be done if the topics are co-partitioned in the first place + *Co-partitioned:- + *Two streams are co-partitioned if the following conditions are met:- + * ->The keys must have the same schemas + * ->The topics involved must have the same number of partitions + * 3) Consumers should retain as much as their previous assignment as possible. (Sticky) + * + * + * + * The algorithm works mainly in 5 steps described below + * + * 1) Get a map of the consumersPerTopic created using the member subscriptions. + * 2) Get a list of consumers (potentiallyUnfilled) that have not met the minimum required quota for assignment AND + * get a list of sticky partitions that we want to retain in the new assignment. + * 3) Add consumers from potentiallyUnfilled to Unfilled if they haven't met the total required quota = minQuota + (if necessary) extraPartition + * 4) Get a list of available partitions by calculating the difference between total partitions and assigned sticky partitions + * 5) Iterate through unfilled consumers and assign partitions from available partitions + * + * + * + */ +public class ServerSideStickyRangeAssignor implements PartitionAssignor { + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +private static void putList(Map> map, K key, V value) { +List list = map.computeIfAbsent(key, k -> new ArrayList<>()); +list.add(value); +} + +private static void putSet(Map> map, K key, V value) { +Set set = map.computeIfAbsent(key, k -> new HashSet<>()); +set.add(value); +} + +static class Pair { +private final T first; +private final U second; + +public Pair(T first, U second) { +this.first = first; +this.second = second; +} + +public T getFirst() { +return first; +} + +public U getSecond() { +return second; +} + +@Override +public String toString() { +return "(" + first + ", " + second + ")"; +} +} + +// Returns a map of the list of consumers per Topic (keyed by topicId) +private Map> consumersPerTopic(AssignmentSpec assignmentSpec) { +Map> mapTopicsToConsumers = new HashMap<>(); +Map membersData = assignmentSpec.members; + +for (Map.Entry memberEntry : membersData.entrySet()) { +String memberId = memberEntry.getKey(); +AssignmentMemberSpec memberMetadata = memberEntry.getValue(); +Collection topics = memberMetadata.subscribedTopics; +for (Uuid topicId: topics) { +putList(mapTopicsToConsumers, topicId, memberId); +} +} +return mapTopicsToConsumers; +} + +private Map> getAvailablePartitionsPerTopic(AssignmentSpec assignmentSpec, Map> assignedStickyPartitionsPerTopic) { +Map> availablePartitionsPerTopic = new HashMap<>(); +Map topicsMetadata =
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1164906996 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * Properties are :- + * + * 1) Each consumer must get at least one partition per topic that it is subscribed to whenever the number of consumers is + *less than or equal to the number of partitions for that topic. (Range) + * 2) Partitions should be assigned to consumers in a way that facilitates join operations where required. (Range) + *This can only be done if the topics are co-partitioned in the first place + *Co-partitioned:- + *Two streams are co-partitioned if the following conditions are met:- + * ->The keys must have the same schemas + * ->The topics involved must have the same number of partitions + * 3) Consumers should retain as much as their previous assignment as possible. (Sticky) + * + * + * + * The algorithm works mainly in 5 steps described below + * + * 1) Get a map of the consumersPerTopic created using the member subscriptions. + * 2) Get a list of consumers (potentiallyUnfilled) that have not met the minimum required quota for assignment AND + * get a list of sticky partitions that we want to retain in the new assignment. + * 3) Add consumers from potentiallyUnfilled to Unfilled if they haven't met the total required quota = minQuota + (if necessary) extraPartition + * 4) Get a list of available partitions by calculating the difference between total partitions and assigned sticky partitions + * 5) Iterate through unfilled consumers and assign partitions from available partitions + * + * + * + */ +public class ServerSideStickyRangeAssignor implements PartitionAssignor { + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +private static void putList(Map> map, K key, V value) { +List list = map.computeIfAbsent(key, k -> new ArrayList<>()); +list.add(value); +} + +private static void putSet(Map> map, K key, V value) { +Set set = map.computeIfAbsent(key, k -> new HashSet<>()); +set.add(value); +} + +static class Pair { +private final T first; +private final U second; + +public Pair(T first, U second) { +this.first = first; +this.second = second; +} + +public T getFirst() { +return first; +} + +public U getSecond() { +return second; +} + +@Override +public String toString() { +return "(" + first + ", " + second + ")"; +} +} + +// Returns a map of the list of consumers per Topic (keyed by topicId) +private Map> consumersPerTopic(AssignmentSpec assignmentSpec) { +Map> mapTopicsToConsumers = new HashMap<>(); +Map membersData = assignmentSpec.members; + +for (Map.Entry memberEntry : membersData.entrySet()) { +String memberId = memberEntry.getKey(); +AssignmentMemberSpec memberMetadata = memberEntry.getValue(); +Collection topics = memberMetadata.subscribedTopics; +for (Uuid topicId: topics) { +putList(mapTopicsToConsumers, topicId, memberId); +} +} +return mapTopicsToConsumers; +} + +private Map> getAvailablePartitionsPerTopic(AssignmentSpec assignmentSpec, Map> assignedStickyPartitionsPerTopic) { +Map> availablePartitionsPerTopic = new HashMap<>(); +Map topicsMetadata =
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1164906229 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * Properties are :- + * + * 1) Each consumer must get at least one partition per topic that it is subscribed to whenever the number of consumers is + *less than or equal to the number of partitions for that topic. (Range) + * 2) Partitions should be assigned to consumers in a way that facilitates join operations where required. (Range) + *This can only be done if the topics are co-partitioned in the first place + *Co-partitioned:- + *Two streams are co-partitioned if the following conditions are met:- + * ->The keys must have the same schemas + * ->The topics involved must have the same number of partitions + * 3) Consumers should retain as much as their previous assignment as possible. (Sticky) + * + * + * + * The algorithm works mainly in 5 steps described below + * + * 1) Get a map of the consumersPerTopic created using the member subscriptions. + * 2) Get a list of consumers (potentiallyUnfilled) that have not met the minimum required quota for assignment AND + * get a list of sticky partitions that we want to retain in the new assignment. + * 3) Add consumers from potentiallyUnfilled to Unfilled if they haven't met the total required quota = minQuota + (if necessary) extraPartition + * 4) Get a list of available partitions by calculating the difference between total partitions and assigned sticky partitions + * 5) Iterate through unfilled consumers and assign partitions from available partitions + * + * + * + */ +public class ServerSideStickyRangeAssignor implements PartitionAssignor { + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +private static void putList(Map> map, K key, V value) { +List list = map.computeIfAbsent(key, k -> new ArrayList<>()); +list.add(value); +} + +private static void putSet(Map> map, K key, V value) { +Set set = map.computeIfAbsent(key, k -> new HashSet<>()); +set.add(value); +} + +static class Pair { +private final T first; +private final U second; + +public Pair(T first, U second) { +this.first = first; +this.second = second; +} + +public T getFirst() { +return first; +} + +public U getSecond() { +return second; +} + +@Override +public String toString() { +return "(" + first + ", " + second + ")"; +} +} + +// Returns a map of the list of consumers per Topic (keyed by topicId) +private Map> consumersPerTopic(AssignmentSpec assignmentSpec) { +Map> mapTopicsToConsumers = new HashMap<>(); +Map membersData = assignmentSpec.members; + +for (Map.Entry memberEntry : membersData.entrySet()) { +String memberId = memberEntry.getKey(); +AssignmentMemberSpec memberMetadata = memberEntry.getValue(); +Collection topics = memberMetadata.subscribedTopics; +for (Uuid topicId: topics) { +putList(mapTopicsToConsumers, topicId, memberId); +} +} +return mapTopicsToConsumers; +} + +private Map> getAvailablePartitionsPerTopic(AssignmentSpec assignmentSpec, Map> assignedStickyPartitionsPerTopic) { +Map> availablePartitionsPerTopic = new HashMap<>(); +Map topicsMetadata =
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1164905279 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * Properties are :- + * + * 1) Each consumer must get at least one partition per topic that it is subscribed to whenever the number of consumers is + *less than or equal to the number of partitions for that topic. (Range) + * 2) Partitions should be assigned to consumers in a way that facilitates join operations where required. (Range) + *This can only be done if the topics are co-partitioned in the first place + *Co-partitioned:- + *Two streams are co-partitioned if the following conditions are met:- + * ->The keys must have the same schemas + * ->The topics involved must have the same number of partitions + * 3) Consumers should retain as much as their previous assignment as possible. (Sticky) + * + * + * + * The algorithm works mainly in 5 steps described below + * + * 1) Get a map of the consumersPerTopic created using the member subscriptions. + * 2) Get a list of consumers (potentiallyUnfilled) that have not met the minimum required quota for assignment AND + * get a list of sticky partitions that we want to retain in the new assignment. + * 3) Add consumers from potentiallyUnfilled to Unfilled if they haven't met the total required quota = minQuota + (if necessary) extraPartition + * 4) Get a list of available partitions by calculating the difference between total partitions and assigned sticky partitions + * 5) Iterate through unfilled consumers and assign partitions from available partitions + * + * + * + */ +public class ServerSideStickyRangeAssignor implements PartitionAssignor { + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +private static void putList(Map> map, K key, V value) { +List list = map.computeIfAbsent(key, k -> new ArrayList<>()); +list.add(value); +} + +private static void putSet(Map> map, K key, V value) { +Set set = map.computeIfAbsent(key, k -> new HashSet<>()); +set.add(value); +} + +static class Pair { +private final T first; +private final U second; + +public Pair(T first, U second) { +this.first = first; +this.second = second; +} + +public T getFirst() { +return first; +} + +public U getSecond() { +return second; +} + +@Override +public String toString() { +return "(" + first + ", " + second + ")"; +} +} + +// Returns a map of the list of consumers per Topic (keyed by topicId) +private Map> consumersPerTopic(AssignmentSpec assignmentSpec) { +Map> mapTopicsToConsumers = new HashMap<>(); +Map membersData = assignmentSpec.members; + +for (Map.Entry memberEntry : membersData.entrySet()) { +String memberId = memberEntry.getKey(); +AssignmentMemberSpec memberMetadata = memberEntry.getValue(); +Collection topics = memberMetadata.subscribedTopics; +for (Uuid topicId: topics) { +putList(mapTopicsToConsumers, topicId, memberId); +} +} +return mapTopicsToConsumers; +} + +private Map> getAvailablePartitionsPerTopic(AssignmentSpec assignmentSpec, Map> assignedStickyPartitionsPerTopic) { +Map> availablePartitionsPerTopic = new HashMap<>(); +Map topicsMetadata =
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1164902619 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * Properties are :- + * + * 1) Each consumer must get at least one partition per topic that it is subscribed to whenever the number of consumers is + *less than or equal to the number of partitions for that topic. (Range) + * 2) Partitions should be assigned to consumers in a way that facilitates join operations where required. (Range) + *This can only be done if the topics are co-partitioned in the first place + *Co-partitioned:- + *Two streams are co-partitioned if the following conditions are met:- + * ->The keys must have the same schemas + * ->The topics involved must have the same number of partitions + * 3) Consumers should retain as much as their previous assignment as possible. (Sticky) + * + * + * + * The algorithm works mainly in 5 steps described below + * + * 1) Get a map of the consumersPerTopic created using the member subscriptions. + * 2) Get a list of consumers (potentiallyUnfilled) that have not met the minimum required quota for assignment AND + * get a list of sticky partitions that we want to retain in the new assignment. + * 3) Add consumers from potentiallyUnfilled to Unfilled if they haven't met the total required quota = minQuota + (if necessary) extraPartition + * 4) Get a list of available partitions by calculating the difference between total partitions and assigned sticky partitions + * 5) Iterate through unfilled consumers and assign partitions from available partitions + * + * + * + */ +public class ServerSideStickyRangeAssignor implements PartitionAssignor { + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +private static void putList(Map> map, K key, V value) { +List list = map.computeIfAbsent(key, k -> new ArrayList<>()); +list.add(value); +} + +private static void putSet(Map> map, K key, V value) { +Set set = map.computeIfAbsent(key, k -> new HashSet<>()); +set.add(value); +} + +static class Pair { +private final T first; +private final U second; + +public Pair(T first, U second) { +this.first = first; +this.second = second; +} + +public T getFirst() { +return first; +} + +public U getSecond() { +return second; +} + +@Override +public String toString() { +return "(" + first + ", " + second + ")"; +} +} + +// Returns a map of the list of consumers per Topic (keyed by topicId) +private Map> consumersPerTopic(AssignmentSpec assignmentSpec) { +Map> mapTopicsToConsumers = new HashMap<>(); +Map membersData = assignmentSpec.members; + +for (Map.Entry memberEntry : membersData.entrySet()) { +String memberId = memberEntry.getKey(); +AssignmentMemberSpec memberMetadata = memberEntry.getValue(); +Collection topics = memberMetadata.subscribedTopics; +for (Uuid topicId: topics) { +putList(mapTopicsToConsumers, topicId, memberId); +} +} +return mapTopicsToConsumers; +} + +private Map> getAvailablePartitionsPerTopic(AssignmentSpec assignmentSpec, Map> assignedStickyPartitionsPerTopic) { +Map> availablePartitionsPerTopic = new HashMap<>(); +Map topicsMetadata =
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1164901673 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * Properties are :- + * + * 1) Each consumer must get at least one partition per topic that it is subscribed to whenever the number of consumers is + *less than or equal to the number of partitions for that topic. (Range) + * 2) Partitions should be assigned to consumers in a way that facilitates join operations where required. (Range) + *This can only be done if the topics are co-partitioned in the first place + *Co-partitioned:- + *Two streams are co-partitioned if the following conditions are met:- + * ->The keys must have the same schemas + * ->The topics involved must have the same number of partitions + * 3) Consumers should retain as much as their previous assignment as possible. (Sticky) + * + * + * + * The algorithm works mainly in 5 steps described below + * + * 1) Get a map of the consumersPerTopic created using the member subscriptions. + * 2) Get a list of consumers (potentiallyUnfilled) that have not met the minimum required quota for assignment AND + * get a list of sticky partitions that we want to retain in the new assignment. + * 3) Add consumers from potentiallyUnfilled to Unfilled if they haven't met the total required quota = minQuota + (if necessary) extraPartition + * 4) Get a list of available partitions by calculating the difference between total partitions and assigned sticky partitions + * 5) Iterate through unfilled consumers and assign partitions from available partitions + * + * + * + */ +public class ServerSideStickyRangeAssignor implements PartitionAssignor { + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +private static void putList(Map> map, K key, V value) { +List list = map.computeIfAbsent(key, k -> new ArrayList<>()); +list.add(value); +} + +private static void putSet(Map> map, K key, V value) { +Set set = map.computeIfAbsent(key, k -> new HashSet<>()); +set.add(value); +} + +static class Pair { +private final T first; +private final U second; + +public Pair(T first, U second) { +this.first = first; +this.second = second; +} + +public T getFirst() { +return first; +} + +public U getSecond() { +return second; +} + +@Override +public String toString() { +return "(" + first + ", " + second + ")"; +} +} + +// Returns a map of the list of consumers per Topic (keyed by topicId) +private Map> consumersPerTopic(AssignmentSpec assignmentSpec) { +Map> mapTopicsToConsumers = new HashMap<>(); +Map membersData = assignmentSpec.members; + +for (Map.Entry memberEntry : membersData.entrySet()) { +String memberId = memberEntry.getKey(); +AssignmentMemberSpec memberMetadata = memberEntry.getValue(); +Collection topics = memberMetadata.subscribedTopics; +for (Uuid topicId: topics) { +putList(mapTopicsToConsumers, topicId, memberId); +} +} +return mapTopicsToConsumers; +} + +private Map> getAvailablePartitionsPerTopic(AssignmentSpec assignmentSpec, Map> assignedStickyPartitionsPerTopic) { +Map> availablePartitionsPerTopic = new HashMap<>(); +Map topicsMetadata =
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1164899524 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * Properties are :- + * + * 1) Each consumer must get at least one partition per topic that it is subscribed to whenever the number of consumers is + *less than or equal to the number of partitions for that topic. (Range) + * 2) Partitions should be assigned to consumers in a way that facilitates join operations where required. (Range) + *This can only be done if the topics are co-partitioned in the first place + *Co-partitioned:- + *Two streams are co-partitioned if the following conditions are met:- + * ->The keys must have the same schemas + * ->The topics involved must have the same number of partitions + * 3) Consumers should retain as much as their previous assignment as possible. (Sticky) + * + * + * + * The algorithm works mainly in 5 steps described below + * + * 1) Get a map of the consumersPerTopic created using the member subscriptions. + * 2) Get a list of consumers (potentiallyUnfilled) that have not met the minimum required quota for assignment AND + * get a list of sticky partitions that we want to retain in the new assignment. + * 3) Add consumers from potentiallyUnfilled to Unfilled if they haven't met the total required quota = minQuota + (if necessary) extraPartition + * 4) Get a list of available partitions by calculating the difference between total partitions and assigned sticky partitions + * 5) Iterate through unfilled consumers and assign partitions from available partitions + * + * + * + */ +public class ServerSideStickyRangeAssignor implements PartitionAssignor { + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +private static void putList(Map> map, K key, V value) { +List list = map.computeIfAbsent(key, k -> new ArrayList<>()); +list.add(value); +} + +private static void putSet(Map> map, K key, V value) { +Set set = map.computeIfAbsent(key, k -> new HashSet<>()); +set.add(value); +} + +static class Pair { +private final T first; +private final U second; + +public Pair(T first, U second) { +this.first = first; +this.second = second; +} + +public T getFirst() { +return first; +} + +public U getSecond() { +return second; +} + +@Override +public String toString() { +return "(" + first + ", " + second + ")"; +} +} + +// Returns a map of the list of consumers per Topic (keyed by topicId) +private Map> consumersPerTopic(AssignmentSpec assignmentSpec) { +Map> mapTopicsToConsumers = new HashMap<>(); +Map membersData = assignmentSpec.members; + +for (Map.Entry memberEntry : membersData.entrySet()) { +String memberId = memberEntry.getKey(); +AssignmentMemberSpec memberMetadata = memberEntry.getValue(); +Collection topics = memberMetadata.subscribedTopics; +for (Uuid topicId: topics) { +putList(mapTopicsToConsumers, topicId, memberId); +} +} +return mapTopicsToConsumers; +} + +private Map> getAvailablePartitionsPerTopic(AssignmentSpec assignmentSpec, Map> assignedStickyPartitionsPerTopic) { Review Comment: ok -- This is an automated message from the Apache Git Service. To
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1164894366 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * Properties are :- + * + * 1) Each consumer must get at least one partition per topic that it is subscribed to whenever the number of consumers is + *less than or equal to the number of partitions for that topic. (Range) + * 2) Partitions should be assigned to consumers in a way that facilitates join operations where required. (Range) + *This can only be done if the topics are co-partitioned in the first place + *Co-partitioned:- + *Two streams are co-partitioned if the following conditions are met:- + * ->The keys must have the same schemas + * ->The topics involved must have the same number of partitions + * 3) Consumers should retain as much as their previous assignment as possible. (Sticky) + * + * + * + * The algorithm works mainly in 5 steps described below + * + * 1) Get a map of the consumersPerTopic created using the member subscriptions. + * 2) Get a list of consumers (potentiallyUnfilled) that have not met the minimum required quota for assignment AND + * get a list of sticky partitions that we want to retain in the new assignment. + * 3) Add consumers from potentiallyUnfilled to Unfilled if they haven't met the total required quota = minQuota + (if necessary) extraPartition + * 4) Get a list of available partitions by calculating the difference between total partitions and assigned sticky partitions + * 5) Iterate through unfilled consumers and assign partitions from available partitions + * + * + * + */ +public class ServerSideStickyRangeAssignor implements PartitionAssignor { + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +private static void putList(Map> map, K key, V value) { Review Comment: removed ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * Properties are :- + * + * 1) Each consumer must get at least one partition per topic that it is subscribed to whenever the number of consumers is + *less than or equal to the number of
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1164893597 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * Properties are :- + * + * 1) Each consumer must get at least one partition per topic that it is subscribed to whenever the number of consumers is + *less than or equal to the number of partitions for that topic. (Range) + * 2) Partitions should be assigned to consumers in a way that facilitates join operations where required. (Range) + *This can only be done if the topics are co-partitioned in the first place + *Co-partitioned:- + *Two streams are co-partitioned if the following conditions are met:- + * ->The keys must have the same schemas + * ->The topics involved must have the same number of partitions + * 3) Consumers should retain as much as their previous assignment as possible. (Sticky) + * + * + * + * The algorithm works mainly in 5 steps described below + * + * 1) Get a map of the consumersPerTopic created using the member subscriptions. + * 2) Get a list of consumers (potentiallyUnfilled) that have not met the minimum required quota for assignment AND + * get a list of sticky partitions that we want to retain in the new assignment. + * 3) Add consumers from potentiallyUnfilled to Unfilled if they haven't met the total required quota = minQuota + (if necessary) extraPartition + * 4) Get a list of available partitions by calculating the difference between total partitions and assigned sticky partitions + * 5) Iterate through unfilled consumers and assign partitions from available partitions + * + * + * + */ +public class ServerSideStickyRangeAssignor implements PartitionAssignor { + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +private static void putList(Map> map, K key, V value) { +List list = map.computeIfAbsent(key, k -> new ArrayList<>()); +list.add(value); +} + +private static void putSet(Map> map, K key, V value) { +Set set = map.computeIfAbsent(key, k -> new HashSet<>()); +set.add(value); +} + +static class Pair { +private final T first; +private final U second; + +public Pair(T first, U second) { +this.first = first; +this.second = second; +} + +public T getFirst() { +return first; +} + +public U getSecond() { +return second; +} + +@Override +public String toString() { +return "(" + first + ", " + second + ")"; +} +} + +// Returns a map of the list of consumers per Topic (keyed by topicId) +private Map> consumersPerTopic(AssignmentSpec assignmentSpec) { +Map> mapTopicsToConsumers = new HashMap<>(); +Map membersData = assignmentSpec.members; + +for (Map.Entry memberEntry : membersData.entrySet()) { Review Comment: changed it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes
mjsax commented on code in PR #13552: URL: https://github.com/apache/kafka/pull/13552#discussion_r1164893058 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -142,6 +145,7 @@ public KTable table(final String topic, .withMaterializedInternal(materialized) .withProcessorParameters(processorParameters) .build(); +tableSourceNode.setOutputVersioned(materialized.storeSupplier() instanceof VersionedBytesStoreSupplier); Review Comment: Should we make `setOutputVersioned` a `withOutputVersioned` builder method like all the others? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes
mjsax commented on code in PR #13552: URL: https://github.com/apache/kafka/pull/13552#discussion_r1164891330 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java: ## @@ -136,13 +137,16 @@ public SessionWindowedCogroupedKStream windowedBy(final SessionWindows private KTable doAggregate(final Initializer initializer, final NamedInternal named, final MaterializedInternal> materializedInternal) { +final boolean isOutputVersioned = materializedInternal != null +&& materializedInternal.storeSupplier() instanceof VersionedBytesStoreSupplier; return aggregateBuilder.build( groupPatterns, initializer, named, new KeyValueStoreMaterializer<>(materializedInternal).materialize(), materializedInternal.keySerde(), materializedInternal.valueSerde(), -materializedInternal.queryableStoreName()); +materializedInternal.queryableStoreName(), +isOutputVersioned); Review Comment: Think we can just pass in `materializedInternal.storeSupplier() instanceof VersionedBytesStoreSupplier` directly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes
mjsax commented on code in PR #13552: URL: https://github.com/apache/kafka/pull/13552#discussion_r1164890769 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java: ## @@ -136,13 +137,16 @@ public SessionWindowedCogroupedKStream windowedBy(final SessionWindows private KTable doAggregate(final Initializer initializer, final NamedInternal named, final MaterializedInternal> materializedInternal) { +final boolean isOutputVersioned = materializedInternal != null Review Comment: `materializedInternal` cannot be `null` here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #13522: KAFKA-14834: [5/N] Drop out-of-order records from FK join with versioned tables
mjsax merged PR #13522: URL: https://github.com/apache/kafka/pull/13522 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1164884866 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * Properties are :- + * + * 1) Each consumer must get at least one partition per topic that it is subscribed to whenever the number of consumers is + *less than or equal to the number of partitions for that topic. (Range) + * 2) Partitions should be assigned to consumers in a way that facilitates join operations where required. (Range) + *This can only be done if the topics are co-partitioned in the first place + *Co-partitioned:- + *Two streams are co-partitioned if the following conditions are met:- + * ->The keys must have the same schemas + * ->The topics involved must have the same number of partitions + * 3) Consumers should retain as much as their previous assignment as possible. (Sticky) + * + * + * + * The algorithm works mainly in 5 steps described below + * + * 1) Get a map of the consumersPerTopic created using the member subscriptions. + * 2) Get a list of consumers (potentiallyUnfilled) that have not met the minimum required quota for assignment AND + * get a list of sticky partitions that we want to retain in the new assignment. + * 3) Add consumers from potentiallyUnfilled to Unfilled if they haven't met the total required quota = minQuota + (if necessary) extraPartition + * 4) Get a list of available partitions by calculating the difference between total partitions and assigned sticky partitions + * 5) Iterate through unfilled consumers and assign partitions from available partitions + * + * + * + */ +public class ServerSideStickyRangeAssignor implements PartitionAssignor { + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +private static void putList(Map> map, K key, V value) { +List list = map.computeIfAbsent(key, k -> new ArrayList<>()); +list.add(value); +} + +private static void putSet(Map> map, K key, V value) { +Set set = map.computeIfAbsent(key, k -> new HashSet<>()); +set.add(value); +} + +static class Pair { +private final T first; +private final U second; + +public Pair(T first, U second) { +this.first = first; +this.second = second; +} + +public T getFirst() { +return first; +} + +public U getSecond() { +return second; +} + +@Override +public String toString() { +return "(" + first + ", " + second + ")"; +} +} + +// Returns a map of the list of consumers per Topic (keyed by topicId) Review Comment: removed the whole comment cause unnecessary -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
rreddy-22 commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1164884200 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * Properties are :- + * + * 1) Each consumer must get at least one partition per topic that it is subscribed to whenever the number of consumers is + *less than or equal to the number of partitions for that topic. (Range) + * 2) Partitions should be assigned to consumers in a way that facilitates join operations where required. (Range) + *This can only be done if the topics are co-partitioned in the first place + *Co-partitioned:- + *Two streams are co-partitioned if the following conditions are met:- + * ->The keys must have the same schemas + * ->The topics involved must have the same number of partitions + * 3) Consumers should retain as much as their previous assignment as possible. (Sticky) + * + * + * + * The algorithm works mainly in 5 steps described below + * + * 1) Get a map of the consumersPerTopic created using the member subscriptions. + * 2) Get a list of consumers (potentiallyUnfilled) that have not met the minimum required quota for assignment AND + * get a list of sticky partitions that we want to retain in the new assignment. + * 3) Add consumers from potentiallyUnfilled to Unfilled if they haven't met the total required quota = minQuota + (if necessary) extraPartition + * 4) Get a list of available partitions by calculating the difference between total partitions and assigned sticky partitions + * 5) Iterate through unfilled consumers and assign partitions from available partitions + * + * + * + */ +public class ServerSideStickyRangeAssignor implements PartitionAssignor { + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +private static void putList(Map> map, K key, V value) { +List list = map.computeIfAbsent(key, k -> new ArrayList<>()); +list.add(value); +} + +private static void putSet(Map> map, K key, V value) { +Set set = map.computeIfAbsent(key, k -> new HashSet<>()); +set.add(value); +} + +static class Pair { +private final T first; +private final U second; + +public Pair(T first, U second) { +this.first = first; +this.second = second; +} + +public T getFirst() { +return first; +} + +public U getSecond() { +return second; +} + +@Override +public String toString() { +return "(" + first + ", " + second + ")"; +} +} + +// Returns a map of the list of consumers per Topic (keyed by topicId) +private Map> consumersPerTopic(AssignmentSpec assignmentSpec) { +Map> mapTopicsToConsumers = new HashMap<>(); Review Comment: changed to membersPerTopic -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13522: KAFKA-14834: [5/N] Drop out-of-order records from FK join with versioned tables
mjsax commented on code in PR #13522: URL: https://github.com/apache/kafka/pull/13522#discussion_r1164874948 ## streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyVersionedJoinIntegrationTest.java: ## @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import static java.util.Collections.emptyMap; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.test.IntegrationTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +@Category(IntegrationTest.class) +public class KTableKTableForeignKeyVersionedJoinIntegrationTest extends KTableKTableForeignKeyJoinIntegrationTest { + +public KTableKTableForeignKeyVersionedJoinIntegrationTest(final boolean leftJoin, + final boolean materialized, + final boolean leftVersioned, + final boolean rightVersioned) { +// optimizations and rejoin are disabled for these tests, as these tests focus on versioning. +// see KTableKTableForeignKeyJoinIntegrationTest for test coverage for optimizations and rejoin +super(leftJoin, StreamsConfig.NO_OPTIMIZATION, materialized, false, leftVersioned, rightVersioned); +} + +@Parameterized.Parameters(name = "leftJoin={0}, materialized={1}, leftVersioned={2}, rightVersioned={3}") +public static Collection data() { +final List booleans = Arrays.asList(true, false); +return buildParameters(booleans, booleans, booleans, booleans); +} + +@Test +public void shouldIgnoreOutOfOrderRecordsIffVersioned() { Review Comment: `Iff` -> `If` (or is it `if and only if`)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13522: KAFKA-14834: [5/N] Drop out-of-order records from FK join with versioned tables
mjsax commented on code in PR #13522: URL: https://github.com/apache/kafka/pull/13522#discussion_r1164874948 ## streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyVersionedJoinIntegrationTest.java: ## @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import static java.util.Collections.emptyMap; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.test.IntegrationTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +@Category(IntegrationTest.class) +public class KTableKTableForeignKeyVersionedJoinIntegrationTest extends KTableKTableForeignKeyJoinIntegrationTest { + +public KTableKTableForeignKeyVersionedJoinIntegrationTest(final boolean leftJoin, + final boolean materialized, + final boolean leftVersioned, + final boolean rightVersioned) { +// optimizations and rejoin are disabled for these tests, as these tests focus on versioning. +// see KTableKTableForeignKeyJoinIntegrationTest for test coverage for optimizations and rejoin +super(leftJoin, StreamsConfig.NO_OPTIMIZATION, materialized, false, leftVersioned, rightVersioned); +} + +@Parameterized.Parameters(name = "leftJoin={0}, materialized={1}, leftVersioned={2}, rightVersioned={3}") +public static Collection data() { +final List booleans = Arrays.asList(true, false); +return buildParameters(booleans, booleans, booleans, booleans); +} + +@Test +public void shouldIgnoreOutOfOrderRecordsIffVersioned() { Review Comment: `Iff` -> `If` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13522: KAFKA-14834: [5/N] Drop out-of-order records from FK join with versioned tables
mjsax commented on code in PR #13522: URL: https://github.com/apache/kafka/pull/13522#discussion_r1164864257 ## streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java: ## @@ -140,9 +163,9 @@ public void doJoinFromLeftThenDeleteLeftEntity() { final KeyValueStore store = driver.getKeyValueStore("store"); // Pre-populate the RHS records. This test is all about what happens when we add/remove LHS records -right.pipeInput("rhs1", "rhsValue1"); -right.pipeInput("rhs2", "rhsValue2"); -right.pipeInput("rhs3", "rhsValue3"); // this unreferenced FK won't show up in any results +right.pipeInput("rhs1", "rhsValue1", baseTimestamp); Review Comment: Should we use "auto advance" instead of passing in timestamps expliclity? Could also be an advantage to keep it explicit for readability? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13522: KAFKA-14834: [5/N] Drop out-of-order records from FK join with versioned tables
mjsax commented on code in PR #13522: URL: https://github.com/apache/kafka/pull/13522#discussion_r1164867385 ## streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java: ## @@ -249,62 +272,62 @@ public void doJoinFromLeftThenDeleteLeftEntity() { @Test public void doJoinFromRightThenDeleteRightEntity() { -final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin); +final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned); try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); final KeyValueStore store = driver.getKeyValueStore("store"); // Pre-populate the LHS records. This test is all about what happens when we add/remove RHS records -left.pipeInput("lhs1", "lhsValue1|rhs1"); -left.pipeInput("lhs2", "lhsValue2|rhs2"); -left.pipeInput("lhs3", "lhsValue3|rhs1"); +left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp); +left.pipeInput("lhs2", "lhsValue2|rhs2", baseTimestamp + 1); +left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 2); assertThat( outputTopic.readKeyValuesToMap(), is(leftJoin - ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"), - mkEntry("lhs2", "(lhsValue2|rhs2,null)"), - mkEntry("lhs3", "(lhsValue3|rhs1,null)")) - : emptyMap() +? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"), +mkEntry("lhs2", "(lhsValue2|rhs2,null)"), +mkEntry("lhs3", "(lhsValue3|rhs1,null)")) +: emptyMap() Review Comment: nit: avoid unnecessary reformatting (seems to be an IDE setting that triggered?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13522: KAFKA-14834: [5/N] Drop out-of-order records from FK join with versioned tables
mjsax commented on code in PR #13522: URL: https://github.com/apache/kafka/pull/13522#discussion_r1164864257 ## streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java: ## @@ -140,9 +163,9 @@ public void doJoinFromLeftThenDeleteLeftEntity() { final KeyValueStore store = driver.getKeyValueStore("store"); // Pre-populate the RHS records. This test is all about what happens when we add/remove LHS records -right.pipeInput("rhs1", "rhsValue1"); -right.pipeInput("rhs2", "rhsValue2"); -right.pipeInput("rhs3", "rhsValue3"); // this unreferenced FK won't show up in any results +right.pipeInput("rhs1", "rhsValue1", baseTimestamp); Review Comment: Should we use "auto advance" instead of passing in timestamps expliclity? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13547: Add note for KIP-894 and DelegationTokenCommand
showuon commented on code in PR #13547: URL: https://github.com/apache/kafka/pull/13547#discussion_r1164858999 ## docs/upgrade.html: ## @@ -27,6 +27,15 @@ Notable changes in 3 See https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores;>KIP-889 for more details. +MirrorMaker now uses incrementalAlterConfigs API to synchronize topic configurations instead of the deprecated alterConfigs API. Review Comment: MirrorMaker now uses incrementalAlterConfigs API [by default] to synchronize topic configurations instead of the deprecated alterConfigs API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #13500: MINOR: Install missing iputils-ping for system tests
showuon merged PR #13500: URL: https://github.com/apache/kafka/pull/13500 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio opened a new pull request, #13553: MINOR; Improve logging to help debug kraft
jsancio opened a new pull request, #13553: URL: https://github.com/apache/kafka/pull/13553 Minor changes to the log message and log levels to help us debug KRaft issues. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14561) Improve transactions experience for older clients by ensuring ongoing transaction
[ https://issues.apache.org/jira/browse/KAFKA-14561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-14561. - Fix Version/s: 3.5.0 Resolution: Fixed merged the PR to trunk. > Improve transactions experience for older clients by ensuring ongoing > transaction > - > > Key: KAFKA-14561 > URL: https://issues.apache.org/jira/browse/KAFKA-14561 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > Fix For: 3.5.0 > > > This is part 3 of KIP-890: > 3. *To cover older clients, we will ensure a transaction is ongoing before we > write to a transaction. We can do this by querying the transaction > coordinator and caching the result.* > See KIP-890 for more details: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
jolshan commented on PR #13391: URL: https://github.com/apache/kafka/pull/13391#issuecomment-1506123475 Here are errors on the latest build on trunk I could find: https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/1754/#showFailuresLink Seems to roughly correlate with the failures I see. I think the only suspicious one is [Build / JDK 8 and Scala 2.12 / kafka.api.TransactionsBounceTest.testWithGroupId()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13391/26/testReport/junit/kafka.api/TransactionsBounceTest/Build___JDK_8_and_Scala_2_12___testWithGroupId__/) which was failing before without my fix. This error is slightly different and related to endTxn. I did see this flake one time when I was repeatedly testing on my branch. I can look on trunk as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #13510: KAFKA-14834: [4/N] Drop out-of-order records from table-table join with versioned tables
mjsax merged PR #13510: URL: https://github.com/apache/kafka/pull/13510 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao merged pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
junrao merged PR #13391: URL: https://github.com/apache/kafka/pull/13391 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #13509: KAFKA-14834: [3/N] Timestamped lookups for stream-table joins
mjsax commented on PR #13509: URL: https://github.com/apache/kafka/pull/13509#issuecomment-1506112408 Merged this PR, but as discussed in person, it seems questionable to allow versioned-stores for global-ktables -- the code change to the stream-table join processor that is shared for stream-table and stream-globalTable join is fine, but when we disallow versioned stores for global-tables, we would need to revert the corresponding test cases in a follow up PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #13509: KAFKA-14834: [3/N] Timestamped lookups for stream-table joins
mjsax merged PR #13509: URL: https://github.com/apache/kafka/pull/13509 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14901) Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
[ https://issues.apache.org/jira/browse/KAFKA-14901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711608#comment-17711608 ] Justine Olshan commented on KAFKA-14901: So many transaction/init producer ID issues lately. I had to check this wasn't the same as https://issues.apache.org/jira/browse/KAFKA-14830 I've seen some init producer ID failures a few times as well (seeming more frequently) when testing. I was also wondering if this change could be related, but it seems like this one caused deadlock and not the same error: [https://github.com/apache/kafka/pull/13267] Maybe this is yet another issue to investigate. Thanks for bringing it to our attention. > Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration > > > Key: KAFKA-14901 > URL: https://issues.apache.org/jira/browse/KAFKA-14901 > Project: Kafka > Issue Type: Test > Components: KafkaConnect >Reporter: Greg Harris >Priority: Blocker > Labels: flaky-test > Fix For: 3.5.0 > > Attachments: transaction-flake-trace-0.out, transaction-flake.out > > > The EOS Source test appears to be very rarely failing (<5% chance) with the > following error: > {noformat} > org.apache.kafka.common.KafkaException: Unexpected error in > InitProducerIdResponse; The server experienced an unexpected error when > processing the request. > at > app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303) > at > app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207) > at > app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) > at > app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594) > at > app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586) > at > app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426) > at > app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316) > at > app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) > at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829){noformat} > which appears to be triggered by the following failure inside the broker: > {noformat} > [2023-04-12 14:01:38,931] ERROR [KafkaApi-0] Unexpected error handling > request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, > clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, > headerVersion=2) -- > InitProducerIdRequestData(transactionalId='exactly-once-source-integration-test-exactlyOnceQuestionMark-1', > transactionTimeoutMs=6, producerId=-1, producerEpoch=-1) with context > RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, > clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, > headerVersion=2), connectionId='127.0.0.1:54213-127.0.0.1:54367-46', > clientAddress=/127.0.0.1, principal=User:ANONYMOUS, > listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, > clientInformation=ClientInformation(softwareName=apache-kafka-java, > softwareVersion=3.5.0-SNAPSHOT), fromPrivilegedListener=true, > principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@615924cd]) > (kafka.server.KafkaApis:76) > java.lang.IllegalStateException: Preparing transaction state transition to > Empty while it already a pending state Ongoing > at > kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:380) > at > kafka.coordinator.transaction.TransactionMetadata.prepareIncrementProducerEpoch(TransactionMetadata.scala:311) > at > kafka.coordinator.transaction.TransactionCoordinator.prepareInitProducerIdTransit(TransactionCoordinator.scala:240) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$3(TransactionCoordinator.scala:151) > at > kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:242) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$2(TransactionCoordinator.scala:150) > at scala.util.Either.flatMap(Either.scala:352) > at > kafka.coordinator.transaction.TransactionCoordinator.handleInitProducerId(TransactionCoordinator.scala:145) > at > kafka.server.KafkaApis.handleInitProducerIdRequest(KafkaApis.scala:2236) > at kafka.server.KafkaApis.handle(KafkaApis.scala:202) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76) > at
[GitHub] [kafka] mjsax commented on pull request #13477: KAFKA-7499: Handle serialization error in ProductionExceptionHandler
mjsax commented on PR #13477: URL: https://github.com/apache/kafka/pull/13477#issuecomment-1506098834 > I believe I saw something about the class casting handling but I couldn't see that in the comments. I did leave a commend on the PR, but delete it later as the comment was wrong -- the KIP expliclity says that `ClassCastException` should be excluded and we should always fail, and not allow to skip over it via the handler. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe closed pull request #13546: Cherry-pick KAFKA-14857: Fix some MetadataLoader bugs (#13462)
cmccabe closed pull request #13546: Cherry-pick KAFKA-14857: Fix some MetadataLoader bugs (#13462) URL: https://github.com/apache/kafka/pull/13546 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia opened a new pull request, #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes
vcrfxia opened a new pull request, #13552: URL: https://github.com/apache/kafka/pull/13552 This PR adds a method into GraphNode to assist in tracking whether tables are materialized as versioned or unversioned stores. This is needed in order to allow processors which have different behavior on versioned vs unversioned tables to use the correct semantics. For the full definition of when a table is considered "versioned" and which processors behave differently on versioned tables, see [KIP-914](https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+DSL+Processor+Semantics+for+Versioned+Stores). Unit tests will be added shortly, and integration tests will be in a follow-up PR. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14750) Sink connector fails if a topic matching its topics.regex gets deleted
[ https://issues.apache.org/jira/browse/KAFKA-14750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711589#comment-17711589 ] Sergei Morozov edited comment on KAFKA-14750 at 4/12/23 10:48 PM: -- {quote}how big of an impact is this to you for your use case actually? {quote} This issue is quite impactful. For the background: we use Connect in a multi-tenant environment where each topic represents a tenant. There is a Streams application that reads a topic containing the messages from about a hundred tenants and routes them to the tenant-scoped topics. To handle the situations when a new tenant is provisioned, in order to reduce the latency, we pre-provision a few standby topics per Streams replica. Topics are managed by Strimzi and are owned by the Streams pod. So if a Streams application is restarted, all its standby topics are deleted, and then new ones are created for the new instance. Restarting the Streams application is a common thing (e.g. due to a rolling upgrade, internal housekeeping in Kubernetes, etc.), so we get sink connector failures in pretty much common scenarios. {quote}Is the test you listed in the OP a reflection of how the connect is used by you? {quote} Yes. The above should explain it. Note that in our case, there are a couple of dozens of standby topics per Kafka cluster, so not every restart causes a connector failure. The 300 topics in the reproducer are used just to increase the likelihood of the connector failure. {quote}If it doesn't exist, then we can skip the offset commit but what happens if it exists? Do we retry? {quote} I'd say we throw the original exception. was (Author: morozov): {quote}how big of an impact is this to you for your use case actually? {quote} This issue is quite impactful. For the background: we use Connect in a multi-tenant environment where each topic represents a tenant. There is a Streams application that reads a topic containing the messages from about a hundred tenants and routes them to the tenant-scoped topics. To handle the situations when a new tenant is provisioned, in order to reduce the latency, we pre-provision a few standby topics per Streams replica. Topics are managed by Strimzi and are owned by the Streams pod. So if a Streams application is restarted, all its standby topics are deleted, and then new ones are created for the new instance. Restarting the Streams application is a common thing (e.g. due to a rolling upgrade, internal housekeeping in Kubernetes, etc.), so we get sink connector failures in pretty much common scenarios. {quote}Is the test you listed in the OP a reflection of how the connect is used by you? {quote} Yes. The above should explain it. Note that in our case, there are a couple of dozens of standby topics per standby cluster, so not every restart causes a connector failure. The 300 topics in the reproducer are used just to increase the likelihood of the connector failure. {quote}If it doesn't exist, then we can skip the offset commit but what happens if it exists? Do we retry? {quote} I'd say we throw the original exception. > Sink connector fails if a topic matching its topics.regex gets deleted > -- > > Key: KAFKA-14750 > URL: https://issues.apache.org/jira/browse/KAFKA-14750 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.3.1 >Reporter: Sergei Morozov >Assignee: Sagar Rao >Priority: Major > > Steps to reproduce: > # In {{{}config/connect-standalone.properties{}}}, set: > {code:bash} > plugin.path=libs/connect-file-3.3.1.jar > {code} > # In {{{}config/connect-file-sink.properties{}}}, remove the {{topics=}} line > and add this one: > {code:bash} > topics.regex=connect-test-.* > {code} > # Start zookeeper: > {code:bash} > bin/zookeeper-server-start.sh config/zookeeper.properties > {code} > # Start the brokers: > {code:bash} > bin/kafka-server-start.sh config/server.properties > {code} > # Start the file sink connector: > {code:bash} > bin/connect-standalone.sh config/connect-standalone.properties > config/connect-file-sink.properties > {code} > # Create topics for the sink connector to subscribe to: > {code:bash} > for i in {0..2}; do > for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do > bin/kafka-topics.sh \ > --bootstrap-server localhost:9092 \ > --create \ > --topic connect-test-$j > done & > done > wait > {code} > # Wait until all the created topics are assigned to the connector. Check the > number of partitions to be > 0 in the output of: > {code:bash} > bin/kafka-consumer-groups.sh \ > --bootstrap-server localhost:9092 \ > --group connect-local-file-sink \ > --describe --members > {code} > # Delete the created topics: >
[jira] [Comment Edited] (KAFKA-14750) Sink connector fails if a topic matching its topics.regex gets deleted
[ https://issues.apache.org/jira/browse/KAFKA-14750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711589#comment-17711589 ] Sergei Morozov edited comment on KAFKA-14750 at 4/12/23 10:47 PM: -- {quote}how big of an impact is this to you for your use case actually? {quote} This issue is quite impactful. For the background: we use Connect in a multi-tenant environment where each topic represents a tenant. There is a Streams application that reads a topic containing the messages from about a hundred tenants and routes them to the tenant-scoped topics. To handle the situations when a new tenant is provisioned, in order to reduce the latency, we pre-provision a few standby topics per Streams replica. Topics are managed by Strimzi and are owned by the Streams pod. So if a Streams application is restarted, all its standby topics are deleted, and then new ones are created for the new instance. Restarting the Streams application is a common thing (e.g. due to a rolling upgrade, internal housekeeping in Kubernetes, etc.), so we get sink connector failures in pretty much common scenarios. {quote}Is the test you listed in the OP a reflection of how the connect is used by you? {quote} Yes. The above should explain it. Note that in our case, there are a couple of dozens of standby topics per standby cluster, so not every restart causes a connector failure. The 300 topics in the reproducer are used just to increase the likelihood of the connector failure. {quote}If it doesn't exist, then we can skip the offset commit but what happens if it exists? Do we retry? {quote} I'd say we throw the original exception. was (Author: morozov): {quote}how big of an impact is this to you for your use case actually? {quote} This issue is quite impactful. For the background: we use Connect in a multi-tenant environment where each topic represents a tenant. There is a Streams application that reads a topic containing the messages from about a hundred tenants and routes them to the tenant-scoped topics. To handle the situations when a new tenant is provisioned, in order to reduce the latency, we pre-provision a few standby topics per Streams replica. Topics are managed by Strimzi and are owned by the Streams pod. So if a Streams application is restarted, all its standby topics are deleted, and then new ones are created for the new instance. Restarting the Streams application is a common thing (e.g. due to a rolling upgrade, internal housekeeping in Kubernetes, etc.), so we get sink connector failures in pretty much common scenarios. {quote}Is the test you listed in the OP a reflection of how the connect is used by you? {quote} Yes. The above should explain it. {quote}If it doesn't exist, then we can skip the offset commit but what happens if it exists? Do we retry? {quote} I'd say we throw the original exception. > Sink connector fails if a topic matching its topics.regex gets deleted > -- > > Key: KAFKA-14750 > URL: https://issues.apache.org/jira/browse/KAFKA-14750 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.3.1 >Reporter: Sergei Morozov >Assignee: Sagar Rao >Priority: Major > > Steps to reproduce: > # In {{{}config/connect-standalone.properties{}}}, set: > {code:bash} > plugin.path=libs/connect-file-3.3.1.jar > {code} > # In {{{}config/connect-file-sink.properties{}}}, remove the {{topics=}} line > and add this one: > {code:bash} > topics.regex=connect-test-.* > {code} > # Start zookeeper: > {code:bash} > bin/zookeeper-server-start.sh config/zookeeper.properties > {code} > # Start the brokers: > {code:bash} > bin/kafka-server-start.sh config/server.properties > {code} > # Start the file sink connector: > {code:bash} > bin/connect-standalone.sh config/connect-standalone.properties > config/connect-file-sink.properties > {code} > # Create topics for the sink connector to subscribe to: > {code:bash} > for i in {0..2}; do > for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do > bin/kafka-topics.sh \ > --bootstrap-server localhost:9092 \ > --create \ > --topic connect-test-$j > done & > done > wait > {code} > # Wait until all the created topics are assigned to the connector. Check the > number of partitions to be > 0 in the output of: > {code:bash} > bin/kafka-consumer-groups.sh \ > --bootstrap-server localhost:9092 \ > --group connect-local-file-sink \ > --describe --members > {code} > # Delete the created topics: > {code:bash} > for i in {0..2}; do > for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do > bin/kafka-topics.sh \ > --bootstrap-server localhost:9092 \ > --delete \ > --topic connect-test-$j > echo
[jira] [Comment Edited] (KAFKA-14750) Sink connector fails if a topic matching its topics.regex gets deleted
[ https://issues.apache.org/jira/browse/KAFKA-14750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711589#comment-17711589 ] Sergei Morozov edited comment on KAFKA-14750 at 4/12/23 10:46 PM: -- {quote}how big of an impact is this to you for your use case actually? {quote} This issue is quite impactful. For the background: we use Connect in a multi-tenant environment where each topic represents a tenant. There is a Streams application that reads a topic containing the messages from about a hundred tenants and routes them to the tenant-scoped topics. To handle the situations when a new tenant is provisioned, in order to reduce the latency, we pre-provision a few standby topics per Streams replica. Topics are managed by Strimzi and are owned by the Streams pod. So if a Streams application is restarted, all its standby topics are deleted, and then new ones are created for the new instance. Restarting the Streams application is a common thing (e.g. due to a rolling upgrade, internal housekeeping in Kubernetes, etc.), so we get sink connector failures in pretty much common scenarios. {quote}Is the test you listed in the OP a reflection of how the connect is used by you? {quote} Yes. The above should explain it. {quote}If it doesn't exist, then we can skip the offset commit but what happens if it exists? Do we retry? {quote} I'd say we throw the original exception. was (Author: morozov): {quote}how big of an impact is this to you for your use case actually? {quote} This issue is quite impactful. For the background: we use Connect in a multi-tenant environment where each topic represents a tenant. There is a Streams application that reads a topic containing the messages from about a hundred tenants and routes them to the tenant-scoped topics. To handle the situations when a new tenant is provisioned, in order to reduce the latency, we pre-provision a few standby topics per Streams replica. Topics are managed by Strimzi and are owned by the Streams pod. So if a Streams application is restarted, all its standby topics are deleted, and then new ones are created for the new instance. Restarting the Streams application is a common thing (e.g. due to a rolling upgrade), so we get sink connector failures in pretty much common scenarios. {quote}Is the test you listed in the OP a reflection of how the connect is used by you? {quote} Yes. The above should explain it. {quote}If it doesn't exist, then we can skip the offset commit but what happens if it exists? Do we retry? {quote} I'd say we throw the original exception. > Sink connector fails if a topic matching its topics.regex gets deleted > -- > > Key: KAFKA-14750 > URL: https://issues.apache.org/jira/browse/KAFKA-14750 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.3.1 >Reporter: Sergei Morozov >Assignee: Sagar Rao >Priority: Major > > Steps to reproduce: > # In {{{}config/connect-standalone.properties{}}}, set: > {code:bash} > plugin.path=libs/connect-file-3.3.1.jar > {code} > # In {{{}config/connect-file-sink.properties{}}}, remove the {{topics=}} line > and add this one: > {code:bash} > topics.regex=connect-test-.* > {code} > # Start zookeeper: > {code:bash} > bin/zookeeper-server-start.sh config/zookeeper.properties > {code} > # Start the brokers: > {code:bash} > bin/kafka-server-start.sh config/server.properties > {code} > # Start the file sink connector: > {code:bash} > bin/connect-standalone.sh config/connect-standalone.properties > config/connect-file-sink.properties > {code} > # Create topics for the sink connector to subscribe to: > {code:bash} > for i in {0..2}; do > for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do > bin/kafka-topics.sh \ > --bootstrap-server localhost:9092 \ > --create \ > --topic connect-test-$j > done & > done > wait > {code} > # Wait until all the created topics are assigned to the connector. Check the > number of partitions to be > 0 in the output of: > {code:bash} > bin/kafka-consumer-groups.sh \ > --bootstrap-server localhost:9092 \ > --group connect-local-file-sink \ > --describe --members > {code} > # Delete the created topics: > {code:bash} > for i in {0..2}; do > for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do > bin/kafka-topics.sh \ > --bootstrap-server localhost:9092 \ > --delete \ > --topic connect-test-$j > echo Deleted topic connect-test-$j. > done & > done > wait > {code} > # Observe the connector fail with the following error: > {quote}org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms > expired before the position for partition connect-test-211-0 could be > determined >
[jira] [Commented] (KAFKA-14750) Sink connector fails if a topic matching its topics.regex gets deleted
[ https://issues.apache.org/jira/browse/KAFKA-14750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711589#comment-17711589 ] Sergei Morozov commented on KAFKA-14750: {quote}how big of an impact is this to you for your use case actually? {quote} This issue is quite impactful. For the background: we use Connect in a multi-tenant environment where each topic represents a tenant. There is a Streams application that reads a topic containing the messages from about a hundred tenants and routes them to the tenant-scoped topics. To handle the situations when a new tenant is provisioned, in order to reduce the latency, we pre-provision a few standby topics per Streams replica. Topics are managed by Strimzi and are owned by the Streams pod. So if a Streams application is restarted, all its standby topics are deleted, and then new ones are created for the new instance. Restarting the Streams application is a common thing (e.g. due to a rolling upgrade), so we get sink connector failures in pretty much common scenarios. {quote}Is the test you listed in the OP a reflection of how the connect is used by you? {quote} Yes. The above should explain it. {quote}If it doesn't exist, then we can skip the offset commit but what happens if it exists? Do we retry? {quote} I'd say we throw the original exception. > Sink connector fails if a topic matching its topics.regex gets deleted > -- > > Key: KAFKA-14750 > URL: https://issues.apache.org/jira/browse/KAFKA-14750 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.3.1 >Reporter: Sergei Morozov >Assignee: Sagar Rao >Priority: Major > > Steps to reproduce: > # In {{{}config/connect-standalone.properties{}}}, set: > {code:bash} > plugin.path=libs/connect-file-3.3.1.jar > {code} > # In {{{}config/connect-file-sink.properties{}}}, remove the {{topics=}} line > and add this one: > {code:bash} > topics.regex=connect-test-.* > {code} > # Start zookeeper: > {code:bash} > bin/zookeeper-server-start.sh config/zookeeper.properties > {code} > # Start the brokers: > {code:bash} > bin/kafka-server-start.sh config/server.properties > {code} > # Start the file sink connector: > {code:bash} > bin/connect-standalone.sh config/connect-standalone.properties > config/connect-file-sink.properties > {code} > # Create topics for the sink connector to subscribe to: > {code:bash} > for i in {0..2}; do > for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do > bin/kafka-topics.sh \ > --bootstrap-server localhost:9092 \ > --create \ > --topic connect-test-$j > done & > done > wait > {code} > # Wait until all the created topics are assigned to the connector. Check the > number of partitions to be > 0 in the output of: > {code:bash} > bin/kafka-consumer-groups.sh \ > --bootstrap-server localhost:9092 \ > --group connect-local-file-sink \ > --describe --members > {code} > # Delete the created topics: > {code:bash} > for i in {0..2}; do > for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do > bin/kafka-topics.sh \ > --bootstrap-server localhost:9092 \ > --delete \ > --topic connect-test-$j > echo Deleted topic connect-test-$j. > done & > done > wait > {code} > # Observe the connector fail with the following error: > {quote}org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms > expired before the position for partition connect-test-211-0 could be > determined > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe commented on pull request #13549: KAFKA-14900: Fix NPE in SharedServer causing flaky failure in AuthorizerTest
cmccabe commented on PR #13549: URL: https://github.com/apache/kafka/pull/13549#issuecomment-1506025212 Thanks @gharris1727 . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader
guozhangwang merged PR #13523: URL: https://github.com/apache/kafka/pull/13523 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14900) Flaky test AuthorizerTest failing with NPE
[ https://issues.apache.org/jira/browse/KAFKA-14900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711581#comment-17711581 ] Greg Harris commented on KAFKA-14900: - This will be addressed in https://github.com/apache/kafka/pull/13543 > Flaky test AuthorizerTest failing with NPE > -- > > Key: KAFKA-14900 > URL: https://issues.apache.org/jira/browse/KAFKA-14900 > Project: Kafka > Issue Type: Test > Components: kraft >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > Labels: flaky-test > > The AuthorizerTest has multiple tests that appear to have the same flaky > failure: > {noformat} > org.apache.kafka.server.fault.FaultHandlerException: > quorumTestHarnessFaultHandler: Unhandled error initializing new publishers: > Cannot invoke "kafka.raft.KafkaRaftManager.client()" because the return value > of "kafka.server.SharedServer.raftManager()" is null > at > app//kafka.server.SharedServer.$anonfun$start$3(SharedServer.scala:256) > at > app//org.apache.kafka.image.loader.MetadataLoader.stillNeedToCatchUp(MetadataLoader.java:229) > at > app//org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:270) > at > app//org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:258) > at > app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base@17.0.4.1/java.lang.Thread.run(Thread.java:833) > Caused by: java.lang.NullPointerException: Cannot invoke > "kafka.raft.KafkaRaftManager.client()" because the return value of > "kafka.server.SharedServer.raftManager()" is null > ... 8 more{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 closed pull request #13549: KAFKA-14900: Fix NPE in SharedServer causing flaky failure in AuthorizerTest
gharris1727 closed pull request #13549: KAFKA-14900: Fix NPE in SharedServer causing flaky failure in AuthorizerTest URL: https://github.com/apache/kafka/pull/13549 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #13549: KAFKA-14900: Fix NPE in SharedServer causing flaky failure in AuthorizerTest
gharris1727 commented on PR #13549: URL: https://github.com/apache/kafka/pull/13549#issuecomment-1506006379 This duplicates #13543 so I'll close this in favor of that more comprehensive change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14901) Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
[ https://issues.apache.org/jira/browse/KAFKA-14901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711580#comment-17711580 ] Greg Harris edited comment on KAFKA-14901 at 4/12/23 9:49 PM: -- cc [~jolshan] [~dajac] [~hachikuji] [~mimaison] I have not seen this failure mode before, and I'm worried that this might be a recent regression. It also doesn't look like an error that is intended to be surfaced by the API in normal operations (I might expect a disconnect or ProducerFencedException, maybe). Just to be safe I've marked this as a blocker for 3.5.0, but we can downgrade the severity with some more investigation. Since the incidence rate for this is relatively low, I've attached a couple of logs of the test run leading up to the error. The trace-0 log fails on PrepareEpochFence instead of Empty, but it appears both runs fail when the pending state is Ongoing. To reproduce yourself: Run the version of test on trunk 10-50 times (30s each) until it fails. This test is different from the other tests in the same suite as it makes use of active producer fencing to eliminate other tasks/producers when reconfiguring tasks. was (Author: gharris1727): cc [~jolshan] [~dajac] [~hachikuji] [~mimaison] I have not seen this failure mode before, and I'm worried that this might be a recent regression. It also doesn't look like an error that is intended to be surfaced by the API in normal operations (I might expect a disconnect or ProducerFencedException, maybe). Just to be safe I've marked this as a blocker for 3.5.0, but we can downgrade the severity with some more investigation. Since the incidence rate for this is relatively low, I've attached a couple of logs of the test run leading up to the error. The trace-0 log fails on PrepareEpochFence instead of Empty, but it appears both runs fail when the pending state is Ongoing. To reproduce yourself: Run the version of test on trunk 10-50 times until it fails. This test is different from the other tests in the same suite as it makes use of active producer fencing to eliminate other tasks/producers when reconfiguring tasks. > Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration > > > Key: KAFKA-14901 > URL: https://issues.apache.org/jira/browse/KAFKA-14901 > Project: Kafka > Issue Type: Test > Components: KafkaConnect >Reporter: Greg Harris >Priority: Blocker > Labels: flaky-test > Fix For: 3.5.0 > > Attachments: transaction-flake-trace-0.out, transaction-flake.out > > > The EOS Source test appears to be very rarely failing (<5% chance) with the > following error: > {noformat} > org.apache.kafka.common.KafkaException: Unexpected error in > InitProducerIdResponse; The server experienced an unexpected error when > processing the request. > at > app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303) > at > app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207) > at > app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) > at > app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594) > at > app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586) > at > app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426) > at > app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316) > at > app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) > at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829){noformat} > which appears to be triggered by the following failure inside the broker: > {noformat} > [2023-04-12 14:01:38,931] ERROR [KafkaApi-0] Unexpected error handling > request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, > clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, > headerVersion=2) -- > InitProducerIdRequestData(transactionalId='exactly-once-source-integration-test-exactlyOnceQuestionMark-1', > transactionTimeoutMs=6, producerId=-1, producerEpoch=-1) with context > RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, > clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, > headerVersion=2), connectionId='127.0.0.1:54213-127.0.0.1:54367-46', > clientAddress=/127.0.0.1, principal=User:ANONYMOUS, > listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, > clientInformation=ClientInformation(softwareName=apache-kafka-java, > softwareVersion=3.5.0-SNAPSHOT),
[jira] [Commented] (KAFKA-14901) Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
[ https://issues.apache.org/jira/browse/KAFKA-14901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711580#comment-17711580 ] Greg Harris commented on KAFKA-14901: - cc [~jolshan] [~dajac] [~hachikuji] [~mimaison] I have not seen this failure mode before, and I'm worried that this might be a recent regression. It also doesn't look like an error that is intended to be surfaced by the API in normal operations (I might expect a disconnect or ProducerFencedException, maybe). Just to be safe I've marked this as a blocker for 3.5.0, but we can downgrade the severity with some more investigation. Since the incidence rate for this is relatively low, I've attached a couple of logs of the test run leading up to the error. The trace-0 log fails on PrepareEpochFence instead of Empty, but it appears both runs fail when the pending state is Ongoing. To reproduce yourself: Run the version of test on trunk 10-50 times until it fails. This test is different from the other tests in the same suite as it makes use of active producer fencing to eliminate other tasks/producers when reconfiguring tasks. > Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration > > > Key: KAFKA-14901 > URL: https://issues.apache.org/jira/browse/KAFKA-14901 > Project: Kafka > Issue Type: Test > Components: KafkaConnect >Reporter: Greg Harris >Priority: Blocker > Labels: flaky-test > Fix For: 3.5.0 > > Attachments: transaction-flake-trace-0.out, transaction-flake.out > > > The EOS Source test appears to be very rarely failing (<5% chance) with the > following error: > {noformat} > org.apache.kafka.common.KafkaException: Unexpected error in > InitProducerIdResponse; The server experienced an unexpected error when > processing the request. > at > app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303) > at > app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207) > at > app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) > at > app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594) > at > app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586) > at > app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426) > at > app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316) > at > app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) > at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829){noformat} > which appears to be triggered by the following failure inside the broker: > {noformat} > [2023-04-12 14:01:38,931] ERROR [KafkaApi-0] Unexpected error handling > request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, > clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, > headerVersion=2) -- > InitProducerIdRequestData(transactionalId='exactly-once-source-integration-test-exactlyOnceQuestionMark-1', > transactionTimeoutMs=6, producerId=-1, producerEpoch=-1) with context > RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, > clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, > headerVersion=2), connectionId='127.0.0.1:54213-127.0.0.1:54367-46', > clientAddress=/127.0.0.1, principal=User:ANONYMOUS, > listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, > clientInformation=ClientInformation(softwareName=apache-kafka-java, > softwareVersion=3.5.0-SNAPSHOT), fromPrivilegedListener=true, > principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@615924cd]) > (kafka.server.KafkaApis:76) > java.lang.IllegalStateException: Preparing transaction state transition to > Empty while it already a pending state Ongoing > at > kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:380) > at > kafka.coordinator.transaction.TransactionMetadata.prepareIncrementProducerEpoch(TransactionMetadata.scala:311) > at > kafka.coordinator.transaction.TransactionCoordinator.prepareInitProducerIdTransit(TransactionCoordinator.scala:240) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$3(TransactionCoordinator.scala:151) > at > kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:242) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$2(TransactionCoordinator.scala:150) > at
[jira] [Updated] (KAFKA-14901) Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
[ https://issues.apache.org/jira/browse/KAFKA-14901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-14901: Attachment: transaction-flake-trace-0.out > Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration > > > Key: KAFKA-14901 > URL: https://issues.apache.org/jira/browse/KAFKA-14901 > Project: Kafka > Issue Type: Test > Components: KafkaConnect >Reporter: Greg Harris >Priority: Blocker > Labels: flaky-test > Fix For: 3.5.0 > > Attachments: transaction-flake-trace-0.out, transaction-flake.out > > > The EOS Source test appears to be very rarely failing (<5% chance) with the > following error: > {noformat} > org.apache.kafka.common.KafkaException: Unexpected error in > InitProducerIdResponse; The server experienced an unexpected error when > processing the request. > at > app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303) > at > app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207) > at > app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) > at > app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594) > at > app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586) > at > app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426) > at > app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316) > at > app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) > at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829){noformat} > which appears to be triggered by the following failure inside the broker: > {noformat} > [2023-04-12 14:01:38,931] ERROR [KafkaApi-0] Unexpected error handling > request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, > clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, > headerVersion=2) -- > InitProducerIdRequestData(transactionalId='exactly-once-source-integration-test-exactlyOnceQuestionMark-1', > transactionTimeoutMs=6, producerId=-1, producerEpoch=-1) with context > RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, > clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, > headerVersion=2), connectionId='127.0.0.1:54213-127.0.0.1:54367-46', > clientAddress=/127.0.0.1, principal=User:ANONYMOUS, > listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, > clientInformation=ClientInformation(softwareName=apache-kafka-java, > softwareVersion=3.5.0-SNAPSHOT), fromPrivilegedListener=true, > principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@615924cd]) > (kafka.server.KafkaApis:76) > java.lang.IllegalStateException: Preparing transaction state transition to > Empty while it already a pending state Ongoing > at > kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:380) > at > kafka.coordinator.transaction.TransactionMetadata.prepareIncrementProducerEpoch(TransactionMetadata.scala:311) > at > kafka.coordinator.transaction.TransactionCoordinator.prepareInitProducerIdTransit(TransactionCoordinator.scala:240) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$3(TransactionCoordinator.scala:151) > at > kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:242) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$2(TransactionCoordinator.scala:150) > at scala.util.Either.flatMap(Either.scala:352) > at > kafka.coordinator.transaction.TransactionCoordinator.handleInitProducerId(TransactionCoordinator.scala:145) > at > kafka.server.KafkaApis.handleInitProducerIdRequest(KafkaApis.scala:2236) > at kafka.server.KafkaApis.handle(KafkaApis.scala:202) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76) > at java.base/java.lang.Thread.run(Thread.java:829{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on pull request #13477: KAFKA-7499: Handle serialization error in ProductionExceptionHandler
philipnee commented on PR #13477: URL: https://github.com/apache/kafka/pull/13477#issuecomment-1505978765 @bbejeck and @mjsax - thanks for the reviews. I believe I saw something about the class casting handling but I couldn't see that in the comments. I believe that is already implemented some time ago. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14901) Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
[ https://issues.apache.org/jira/browse/KAFKA-14901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-14901: Fix Version/s: 3.5.0 > Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration > > > Key: KAFKA-14901 > URL: https://issues.apache.org/jira/browse/KAFKA-14901 > Project: Kafka > Issue Type: Test > Components: KafkaConnect >Reporter: Greg Harris >Priority: Major > Labels: flaky-test > Fix For: 3.5.0 > > Attachments: transaction-flake.out > > > The EOS Source test appears to be very rarely failing (<5% chance) with the > following error: > {noformat} > org.apache.kafka.common.KafkaException: Unexpected error in > InitProducerIdResponse; The server experienced an unexpected error when > processing the request. > at > app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303) > at > app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207) > at > app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) > at > app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594) > at > app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586) > at > app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426) > at > app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316) > at > app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) > at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829){noformat} > which appears to be triggered by the following failure inside the broker: > {noformat} > [2023-04-12 14:01:38,931] ERROR [KafkaApi-0] Unexpected error handling > request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, > clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, > headerVersion=2) -- > InitProducerIdRequestData(transactionalId='exactly-once-source-integration-test-exactlyOnceQuestionMark-1', > transactionTimeoutMs=6, producerId=-1, producerEpoch=-1) with context > RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, > clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, > headerVersion=2), connectionId='127.0.0.1:54213-127.0.0.1:54367-46', > clientAddress=/127.0.0.1, principal=User:ANONYMOUS, > listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, > clientInformation=ClientInformation(softwareName=apache-kafka-java, > softwareVersion=3.5.0-SNAPSHOT), fromPrivilegedListener=true, > principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@615924cd]) > (kafka.server.KafkaApis:76) > java.lang.IllegalStateException: Preparing transaction state transition to > Empty while it already a pending state Ongoing > at > kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:380) > at > kafka.coordinator.transaction.TransactionMetadata.prepareIncrementProducerEpoch(TransactionMetadata.scala:311) > at > kafka.coordinator.transaction.TransactionCoordinator.prepareInitProducerIdTransit(TransactionCoordinator.scala:240) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$3(TransactionCoordinator.scala:151) > at > kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:242) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$2(TransactionCoordinator.scala:150) > at scala.util.Either.flatMap(Either.scala:352) > at > kafka.coordinator.transaction.TransactionCoordinator.handleInitProducerId(TransactionCoordinator.scala:145) > at > kafka.server.KafkaApis.handleInitProducerIdRequest(KafkaApis.scala:2236) > at kafka.server.KafkaApis.handle(KafkaApis.scala:202) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76) > at java.base/java.lang.Thread.run(Thread.java:829{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14901) Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
[ https://issues.apache.org/jira/browse/KAFKA-14901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-14901: Priority: Blocker (was: Major) > Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration > > > Key: KAFKA-14901 > URL: https://issues.apache.org/jira/browse/KAFKA-14901 > Project: Kafka > Issue Type: Test > Components: KafkaConnect >Reporter: Greg Harris >Priority: Blocker > Labels: flaky-test > Fix For: 3.5.0 > > Attachments: transaction-flake.out > > > The EOS Source test appears to be very rarely failing (<5% chance) with the > following error: > {noformat} > org.apache.kafka.common.KafkaException: Unexpected error in > InitProducerIdResponse; The server experienced an unexpected error when > processing the request. > at > app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303) > at > app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207) > at > app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) > at > app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594) > at > app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586) > at > app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426) > at > app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316) > at > app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) > at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829){noformat} > which appears to be triggered by the following failure inside the broker: > {noformat} > [2023-04-12 14:01:38,931] ERROR [KafkaApi-0] Unexpected error handling > request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, > clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, > headerVersion=2) -- > InitProducerIdRequestData(transactionalId='exactly-once-source-integration-test-exactlyOnceQuestionMark-1', > transactionTimeoutMs=6, producerId=-1, producerEpoch=-1) with context > RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, > clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, > headerVersion=2), connectionId='127.0.0.1:54213-127.0.0.1:54367-46', > clientAddress=/127.0.0.1, principal=User:ANONYMOUS, > listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, > clientInformation=ClientInformation(softwareName=apache-kafka-java, > softwareVersion=3.5.0-SNAPSHOT), fromPrivilegedListener=true, > principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@615924cd]) > (kafka.server.KafkaApis:76) > java.lang.IllegalStateException: Preparing transaction state transition to > Empty while it already a pending state Ongoing > at > kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:380) > at > kafka.coordinator.transaction.TransactionMetadata.prepareIncrementProducerEpoch(TransactionMetadata.scala:311) > at > kafka.coordinator.transaction.TransactionCoordinator.prepareInitProducerIdTransit(TransactionCoordinator.scala:240) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$3(TransactionCoordinator.scala:151) > at > kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:242) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$2(TransactionCoordinator.scala:150) > at scala.util.Either.flatMap(Either.scala:352) > at > kafka.coordinator.transaction.TransactionCoordinator.handleInitProducerId(TransactionCoordinator.scala:145) > at > kafka.server.KafkaApis.handleInitProducerIdRequest(KafkaApis.scala:2236) > at kafka.server.KafkaApis.handle(KafkaApis.scala:202) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76) > at java.base/java.lang.Thread.run(Thread.java:829{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14901) Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
[ https://issues.apache.org/jira/browse/KAFKA-14901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-14901: Attachment: transaction-flake.out > Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration > > > Key: KAFKA-14901 > URL: https://issues.apache.org/jira/browse/KAFKA-14901 > Project: Kafka > Issue Type: Test > Components: KafkaConnect >Reporter: Greg Harris >Priority: Major > Labels: flaky-test > Attachments: transaction-flake.out > > > The EOS Source test appears to be very rarely failing (<5% chance) with the > following error: > {noformat} > org.apache.kafka.common.KafkaException: Unexpected error in > InitProducerIdResponse; The server experienced an unexpected error when > processing the request. > at > app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303) > at > app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207) > at > app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) > at > app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594) > at > app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586) > at > app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426) > at > app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316) > at > app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) > at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829){noformat} > which appears to be triggered by the following failure inside the broker: > {noformat} > [2023-04-12 14:01:38,931] ERROR [KafkaApi-0] Unexpected error handling > request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, > clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, > headerVersion=2) -- > InitProducerIdRequestData(transactionalId='exactly-once-source-integration-test-exactlyOnceQuestionMark-1', > transactionTimeoutMs=6, producerId=-1, producerEpoch=-1) with context > RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, > clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, > headerVersion=2), connectionId='127.0.0.1:54213-127.0.0.1:54367-46', > clientAddress=/127.0.0.1, principal=User:ANONYMOUS, > listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, > clientInformation=ClientInformation(softwareName=apache-kafka-java, > softwareVersion=3.5.0-SNAPSHOT), fromPrivilegedListener=true, > principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@615924cd]) > (kafka.server.KafkaApis:76) > java.lang.IllegalStateException: Preparing transaction state transition to > Empty while it already a pending state Ongoing > at > kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:380) > at > kafka.coordinator.transaction.TransactionMetadata.prepareIncrementProducerEpoch(TransactionMetadata.scala:311) > at > kafka.coordinator.transaction.TransactionCoordinator.prepareInitProducerIdTransit(TransactionCoordinator.scala:240) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$3(TransactionCoordinator.scala:151) > at > kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:242) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$2(TransactionCoordinator.scala:150) > at scala.util.Either.flatMap(Either.scala:352) > at > kafka.coordinator.transaction.TransactionCoordinator.handleInitProducerId(TransactionCoordinator.scala:145) > at > kafka.server.KafkaApis.handleInitProducerIdRequest(KafkaApis.scala:2236) > at kafka.server.KafkaApis.handle(KafkaApis.scala:202) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76) > at java.base/java.lang.Thread.run(Thread.java:829{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14901) Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
[ https://issues.apache.org/jira/browse/KAFKA-14901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-14901: Description: The EOS Source test appears to be very rarely failing (<5% chance) with the following error: {noformat} org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The server experienced an unexpected error when processing the request. at app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303) at app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207) at app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) at app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594) at app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586) at app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426) at app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316) at app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829){noformat} which appears to be triggered by the following failure inside the broker: {noformat} [2023-04-12 14:01:38,931] ERROR [KafkaApi-0] Unexpected error handling request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, headerVersion=2) -- InitProducerIdRequestData(transactionalId='exactly-once-source-integration-test-exactlyOnceQuestionMark-1', transactionTimeoutMs=6, producerId=-1, producerEpoch=-1) with context RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, headerVersion=2), connectionId='127.0.0.1:54213-127.0.0.1:54367-46', clientAddress=/127.0.0.1, principal=User:ANONYMOUS, listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, clientInformation=ClientInformation(softwareName=apache-kafka-java, softwareVersion=3.5.0-SNAPSHOT), fromPrivilegedListener=true, principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@615924cd]) (kafka.server.KafkaApis:76) java.lang.IllegalStateException: Preparing transaction state transition to Empty while it already a pending state Ongoing at kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:380) at kafka.coordinator.transaction.TransactionMetadata.prepareIncrementProducerEpoch(TransactionMetadata.scala:311) at kafka.coordinator.transaction.TransactionCoordinator.prepareInitProducerIdTransit(TransactionCoordinator.scala:240) at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$3(TransactionCoordinator.scala:151) at kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:242) at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$2(TransactionCoordinator.scala:150) at scala.util.Either.flatMap(Either.scala:352) at kafka.coordinator.transaction.TransactionCoordinator.handleInitProducerId(TransactionCoordinator.scala:145) at kafka.server.KafkaApis.handleInitProducerIdRequest(KafkaApis.scala:2236) at kafka.server.KafkaApis.handle(KafkaApis.scala:202) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76) at java.base/java.lang.Thread.run(Thread.java:829{noformat} was: The EOS Source test appears to be occasionally failing with the following error: {noformat} org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The server experienced an unexpected error when processing the request. at app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303) at app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207) at app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) at app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594) at app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586) at app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426) at app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316) at app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) at
[jira] [Created] (KAFKA-14901) Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
Greg Harris created KAFKA-14901: --- Summary: Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration Key: KAFKA-14901 URL: https://issues.apache.org/jira/browse/KAFKA-14901 Project: Kafka Issue Type: Test Components: KafkaConnect Reporter: Greg Harris The EOS Source test appears to be occasionally failing with the following error: {noformat} org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The server experienced an unexpected error when processing the request. at app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303) at app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207) at app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) at app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594) at app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586) at app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426) at app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316) at app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829){noformat} which appears to be triggered by the following failure inside the broker: {noformat} [2023-04-12 14:01:38,931] ERROR [KafkaApi-0] Unexpected error handling request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, headerVersion=2) -- InitProducerIdRequestData(transactionalId='exactly-once-source-integration-test-exactlyOnceQuestionMark-1', transactionTimeoutMs=6, producerId=-1, producerEpoch=-1) with context RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, headerVersion=2), connectionId='127.0.0.1:54213-127.0.0.1:54367-46', clientAddress=/127.0.0.1, principal=User:ANONYMOUS, listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, clientInformation=ClientInformation(softwareName=apache-kafka-java, softwareVersion=3.5.0-SNAPSHOT), fromPrivilegedListener=true, principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@615924cd]) (kafka.server.KafkaApis:76) java.lang.IllegalStateException: Preparing transaction state transition to Empty while it already a pending state Ongoing at kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:380) at kafka.coordinator.transaction.TransactionMetadata.prepareIncrementProducerEpoch(TransactionMetadata.scala:311) at kafka.coordinator.transaction.TransactionCoordinator.prepareInitProducerIdTransit(TransactionCoordinator.scala:240) at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$3(TransactionCoordinator.scala:151) at kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:242) at kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$2(TransactionCoordinator.scala:150) at scala.util.Either.flatMap(Either.scala:352) at kafka.coordinator.transaction.TransactionCoordinator.handleInitProducerId(TransactionCoordinator.scala:145) at kafka.server.KafkaApis.handleInitProducerIdRequest(KafkaApis.scala:2236) at kafka.server.KafkaApis.handle(KafkaApis.scala:202) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76) at java.base/java.lang.Thread.run(Thread.java:829{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on a diff in pull request #13477: KAFKA-7499: Handle serialization error in ProductionExceptionHandler
philipnee commented on code in PR #13477: URL: https://github.com/apache/kafka/pull/13477#discussion_r1164651827 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java: ## @@ -213,6 +212,31 @@ public void send(final String topic, keyClass, valueClass), exception); +} catch (final SerializationException exception) { +final ProducerRecord record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers); +final ProductionExceptionHandler.ProductionExceptionHandlerResponse response; +try { +response = productionExceptionHandler.onSerializationException(record, exception); +} catch (final Exception e) { +log.error("Fatal handling serialization exception on record {}", record, e); +recordSendError(topic, e, null); +return; +} + +if (response == ProductionExceptionHandlerResponse.FAIL) { +log.error("Fatal handling serialization exception on record {}", record, exception); +recordSendError(topic, exception, null); +return; +} + +log.warn("Unable to serialize {}. Continue processing. " + +"ProducerRecord(key=[{}], value=[{}], topic=[{}], partition=[{}], timestamp=[{}])", +keyBytes == null ? "key" : "value", Review Comment: hmm good point, maybe just log the continue processing message then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request, #13551: MINOR: Allow tagged fields with version subset of flexible version range
hachikuji opened a new pull request, #13551: URL: https://github.com/apache/kafka/pull/13551 The generated message types are missing a range check for the case when the tagged version range is a subset of the flexible version range. This causes the tagged field count, which is computed correctly, to conflict with the number of tags serialized. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
jolshan commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1164638992 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} +import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler} +import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse} +import org.apache.kafka.common.utils.Time + +import scala.collection.mutable + +object AddPartitionsToTxnManager { + type AppendCallback = Map[TopicPartition, Errors] => Unit +} + + +class TransactionDataAndCallbacks(val transactionData: AddPartitionsToTxnTransactionCollection, + val callbacks: mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]) + + +class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time: Time) + extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + config.brokerId, client, config.requestTimeoutMs, time) { + + private val inflightNodes = mutable.HashSet[Node]() + private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + + def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { +nodesToTransactions.synchronized { + // Check if we have already have either node or individual transaction. Add the Node if it isn't there. + val currentNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node, +new TransactionDataAndCallbacks( + new AddPartitionsToTxnTransactionCollection(1), + mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]())) + + val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId) + + // Check if we already have txn ID -- if the epoch is bumped, return invalid producer epoch, otherwise, the client likely disconnected and + // reconnected so return the retriable network exception. + if (currentTransactionData != null) { +val error = if (currentTransactionData.producerEpoch() < transactionData.producerEpoch()) + Errors.INVALID_PRODUCER_EPOCH +else + Errors.NETWORK_EXCEPTION +val topicPartitionsToError = mutable.Map[TopicPartition, Errors]() +currentTransactionData.topics().forEach { topic => + topic.partitions().forEach { partition => +topicPartitionsToError.put(new TopicPartition(topic.name(), partition), error) + } +} +val oldCallback = currentNodeAndTransactionData.callbacks(transactionData.transactionalId()) +currentNodeAndTransactionData.transactionData.remove(transactionData) +oldCallback(topicPartitionsToError.toMap) + } + currentNodeAndTransactionData.transactionData.add(transactionData) + currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), callback) + wakeup() +} + } + + private class AddPartitionsToTxnHandler(node: Node, transactionDataAndCallbacks: TransactionDataAndCallbacks) extends RequestCompletionHandler { +override def onComplete(response: ClientResponse): Unit = { + // Note: Synchronization is not needed on inflightNodes since it is always accessed from this thread. + inflightNodes.remove(node) + if (response.authenticationException() != null) { +error(s"AddPartitionsToTxnRequest failed for broker ${config.brokerId} with an " + + "authentication exception.", response.authenticationException) +transactionDataAndCallbacks.callbacks.foreach { case (txnId, callback) => + callback(buildErrorMap(txnId,
[GitHub] [kafka] philipnee opened a new pull request, #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance
philipnee opened a new pull request, #13550: URL: https://github.com/apache/kafka/pull/13550 This is a really long story, but the incident started in KAFKA-13419 when we observed a member sending out a topic partition owned from the previous generation when a member missed a rebalance cycle due to REBALANCE_IN_PROGRESS. Ideally, the member should continue holding onto its partition as long as there's no other owner with a younger generation; however, we need to be defensive about this approach because we aren't sure if the partition has already been assigned to other members. Therefore, it is safest for us to only honor the members with the highest generation and the previous generation during the assignment phase. In this PR, I made 2 major changes 1. In the assignor: we now honor partition owner that is only on its max - 1 generation as long as there's no other owner with a younger generation to that partition. (younger = higher generationId) 2. After getting REBALANCE_IN_PROGRESS sync group error, we immediately reset its generation so that we could ensure to claim lose for all of the owned partition if member doesn't re-join in timely member. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14898) [ MirrorMaker ] sync.topic.configs.enabled not working as expected
[ https://issues.apache.org/jira/browse/KAFKA-14898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-14898. - Resolution: Fixed > [ MirrorMaker ] sync.topic.configs.enabled not working as expected > -- > > Key: KAFKA-14898 > URL: https://issues.apache.org/jira/browse/KAFKA-14898 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.4.0 >Reporter: Srinivas Boga >Priority: Major > Labels: mirrormaker > > Hello, > In my replication set up , i do not want to sync the topic configs, the use > case is to have different retention time for the topic on the target cluster, > I am passing the config > {code:java} > sync.topic.configs.enabled = false{code} > but this is not working as expected the topic retention time is being set to > whatever is being set in the source cluster, looking at the mirrormaker logs > i can see that MirrorSourceConnector is still setting the above config as true > {code:java} > [2023-04-12 17:04:55,184] INFO [MirrorSourceConnector|task-8] ConsumerConfig > values: > allow.auto.create.topics = true > auto.commit.interval.ms = 5000 > auto.include.jmx.reporter = true > auto.offset.reset = earliest > bootstrap.servers = [sourcecluster.com:9092] > check.crcs = true > client.dns.lookup = use_all_dns_ips > client.id = consumer-null-2 > client.rack = > connections.max.idle.ms = 54 > default.api.timeout.ms = 6 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = null > group.instance.id = null > heartbeat.interval.ms = 3000 > interceptor.classes = [] > internal.leave.group.on.close = true > internal.throw.on.fetch.stable.offset.unsupported = false > isolation.level = read_uncommitted > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 30 > max.poll.records = 500 > metadata.max.age.ms = 30 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 3 > partition.assignment.strategy = [class > org.apache.kafka.clients.consumer.RangeAssignor, class > org.apache.kafka.clients.consumer.CooperativeStickyAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 3 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.connect.timeout.ms = null > sasl.login.read.timeout.ms = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.login.retry.backoff.max.ms = 1 > sasl.login.retry.backoff.ms = 100 > sasl.mechanism = GSSAPI > sasl.oauthbearer.clock.skew.seconds = 30 > sasl.oauthbearer.expected.audience = null > sasl.oauthbearer.expected.issuer = null > sasl.oauthbearer.jwks.endpoint.refresh.ms = 360 > sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 1 > sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100 > sasl.oauthbearer.jwks.endpoint.url = null > sasl.oauthbearer.scope.claim.name = scope > sasl.oauthbearer.sub.claim.name = sub > sasl.oauthbearer.token.endpoint.url = null > security.protocol = PLAINTEXT > security.providers = null > send.buffer.bytes = 131072 > session.timeout.ms = 45000 > socket.connection.setup.timeout.max.ms = 3 > socket.connection.setup.timeout.ms = 1 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2] > ssl.endpoint.identification.algorithm = https > ssl.engine.factory.class = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.certificate.chain = null > ssl.keystore.key = null >
[GitHub] [kafka] junrao commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
junrao commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1164633122 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} +import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler} +import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse} +import org.apache.kafka.common.utils.Time + +import scala.collection.mutable + +object AddPartitionsToTxnManager { + type AppendCallback = Map[TopicPartition, Errors] => Unit +} + + +class TransactionDataAndCallbacks(val transactionData: AddPartitionsToTxnTransactionCollection, + val callbacks: mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]) + + +class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time: Time) + extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + config.brokerId, client, config.requestTimeoutMs, time) { + + private val inflightNodes = mutable.HashSet[Node]() + private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + + def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { +nodesToTransactions.synchronized { + // Check if we have already have either node or individual transaction. Add the Node if it isn't there. + val currentNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node, +new TransactionDataAndCallbacks( + new AddPartitionsToTxnTransactionCollection(1), + mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]())) + + val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId) + + // Check if we already have txn ID -- if the epoch is bumped, return invalid producer epoch, otherwise, the client likely disconnected and + // reconnected so return the retriable network exception. + if (currentTransactionData != null) { +val error = if (currentTransactionData.producerEpoch() < transactionData.producerEpoch()) + Errors.INVALID_PRODUCER_EPOCH +else + Errors.NETWORK_EXCEPTION +val topicPartitionsToError = mutable.Map[TopicPartition, Errors]() +currentTransactionData.topics().forEach { topic => + topic.partitions().forEach { partition => +topicPartitionsToError.put(new TopicPartition(topic.name(), partition), error) + } +} +val oldCallback = currentNodeAndTransactionData.callbacks(transactionData.transactionalId()) +currentNodeAndTransactionData.transactionData.remove(transactionData) +oldCallback(topicPartitionsToError.toMap) + } + currentNodeAndTransactionData.transactionData.add(transactionData) + currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), callback) + wakeup() +} + } + + private class AddPartitionsToTxnHandler(node: Node, transactionDataAndCallbacks: TransactionDataAndCallbacks) extends RequestCompletionHandler { +override def onComplete(response: ClientResponse): Unit = { + // Note: Synchronization is not needed on inflightNodes since it is always accessed from this thread. + inflightNodes.remove(node) + if (response.authenticationException() != null) { +error(s"AddPartitionsToTxnRequest failed for broker ${config.brokerId} with an " + + "authentication exception.", response.authenticationException) +transactionDataAndCallbacks.callbacks.foreach { case (txnId, callback) => + callback(buildErrorMap(txnId,
[jira] [Commented] (KAFKA-14898) [ MirrorMaker ] sync.topic.configs.enabled not working as expected
[ https://issues.apache.org/jira/browse/KAFKA-14898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711551#comment-17711551 ] Srinivas Boga commented on KAFKA-14898: --- [~gharris1727] Thanks for your help on this Yes i could see that log message which you pointed out, i was running mirrormaker in distributed mode having 3 nodes and 10 tasks on each I have verified by running only on one node and it is working as expected Thanks, -srini > [ MirrorMaker ] sync.topic.configs.enabled not working as expected > -- > > Key: KAFKA-14898 > URL: https://issues.apache.org/jira/browse/KAFKA-14898 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.4.0 >Reporter: Srinivas Boga >Priority: Major > Labels: mirrormaker > > Hello, > In my replication set up , i do not want to sync the topic configs, the use > case is to have different retention time for the topic on the target cluster, > I am passing the config > {code:java} > sync.topic.configs.enabled = false{code} > but this is not working as expected the topic retention time is being set to > whatever is being set in the source cluster, looking at the mirrormaker logs > i can see that MirrorSourceConnector is still setting the above config as true > {code:java} > [2023-04-12 17:04:55,184] INFO [MirrorSourceConnector|task-8] ConsumerConfig > values: > allow.auto.create.topics = true > auto.commit.interval.ms = 5000 > auto.include.jmx.reporter = true > auto.offset.reset = earliest > bootstrap.servers = [sourcecluster.com:9092] > check.crcs = true > client.dns.lookup = use_all_dns_ips > client.id = consumer-null-2 > client.rack = > connections.max.idle.ms = 54 > default.api.timeout.ms = 6 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = null > group.instance.id = null > heartbeat.interval.ms = 3000 > interceptor.classes = [] > internal.leave.group.on.close = true > internal.throw.on.fetch.stable.offset.unsupported = false > isolation.level = read_uncommitted > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 30 > max.poll.records = 500 > metadata.max.age.ms = 30 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 3 > partition.assignment.strategy = [class > org.apache.kafka.clients.consumer.RangeAssignor, class > org.apache.kafka.clients.consumer.CooperativeStickyAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 3 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.connect.timeout.ms = null > sasl.login.read.timeout.ms = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.login.retry.backoff.max.ms = 1 > sasl.login.retry.backoff.ms = 100 > sasl.mechanism = GSSAPI > sasl.oauthbearer.clock.skew.seconds = 30 > sasl.oauthbearer.expected.audience = null > sasl.oauthbearer.expected.issuer = null > sasl.oauthbearer.jwks.endpoint.refresh.ms = 360 > sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 1 > sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100 > sasl.oauthbearer.jwks.endpoint.url = null > sasl.oauthbearer.scope.claim.name = scope > sasl.oauthbearer.sub.claim.name = sub > sasl.oauthbearer.token.endpoint.url = null > security.protocol = PLAINTEXT > security.providers = null > send.buffer.bytes = 131072 > session.timeout.ms = 45000 > socket.connection.setup.timeout.max.ms = 3 > socket.connection.setup.timeout.ms = 1 > ssl.cipher.suites = null >
[GitHub] [kafka] gharris1727 commented on pull request #13549: KAFKA-14900: Fix NPE in SharedServer causing flaky failure in AuthorizerTest
gharris1727 commented on PR #13549: URL: https://github.com/apache/kafka/pull/13549#issuecomment-1505884041 @cmccabe Could you review this fix? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 opened a new pull request, #13549: KAFKA-14900: Fix NPE in SharedServer causing flaky failure in AuthorizerTest
gharris1727 opened a new pull request, #13549: URL: https://github.com/apache/kafka/pull/13549 It appears that raftManager can become null via ControllerServer.shutdown() -> ensureNotRaftLeader() and various call sites of stop(). Rather than emit an NPE, evaluate raftManager.client while it is known to be non-null, and pass the evaluated client into the setHighWaterMarkAccessor. I unfortunately don't have much context here, and I'm not sure if once raftManager becomes null, if it is still reasonable to evaluate highWatermark(), or whether the HighWaterMarkAccessor should return empty. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14900) Flaky test AuthorizerTest failing with NPE
[ https://issues.apache.org/jira/browse/KAFKA-14900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-14900: Description: The AuthorizerTest has multiple tests that appear to have the same flaky failure: {noformat} org.apache.kafka.server.fault.FaultHandlerException: quorumTestHarnessFaultHandler: Unhandled error initializing new publishers: Cannot invoke "kafka.raft.KafkaRaftManager.client()" because the return value of "kafka.server.SharedServer.raftManager()" is null at app//kafka.server.SharedServer.$anonfun$start$3(SharedServer.scala:256) at app//org.apache.kafka.image.loader.MetadataLoader.stillNeedToCatchUp(MetadataLoader.java:229) at app//org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:270) at app//org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:258) at app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) at app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) at app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) at java.base@17.0.4.1/java.lang.Thread.run(Thread.java:833) Caused by: java.lang.NullPointerException: Cannot invoke "kafka.raft.KafkaRaftManager.client()" because the return value of "kafka.server.SharedServer.raftManager()" is null ... 8 more{noformat} was: The AuthorizerTest has multiple tests that appear to have the same flaky failure: {noformat} org.apache.kafka.server.fault.FaultHandlerException: quorumTestHarnessFaultHandler: Unhandled error initializing new publishers: Cannot invoke "kafka.raft.KafkaRaftManager.client()" because the return value of "kafka.server.SharedServer.raftManager()" is null at app//kafka.server.SharedServer.$anonfun$start$3(SharedServer.scala:256) at app//org.apache.kafka.image.loader.MetadataLoader.stillNeedToCatchUp(MetadataLoader.java:229) at app//org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:270) at app//org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:258) at app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) at app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) at app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) at java.base@17.0.4.1/java.lang.Thread.run(Thread.java:833)Caused by: java.lang.NullPointerException: Cannot invoke "kafka.raft.KafkaRaftManager.client()" because the return value of "kafka.server.SharedServer.raftManager()" is null... 8 more{noformat} > Flaky test AuthorizerTest failing with NPE > -- > > Key: KAFKA-14900 > URL: https://issues.apache.org/jira/browse/KAFKA-14900 > Project: Kafka > Issue Type: Test > Components: kraft >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > Labels: flaky-test > > The AuthorizerTest has multiple tests that appear to have the same flaky > failure: > {noformat} > org.apache.kafka.server.fault.FaultHandlerException: > quorumTestHarnessFaultHandler: Unhandled error initializing new publishers: > Cannot invoke "kafka.raft.KafkaRaftManager.client()" because the return value > of "kafka.server.SharedServer.raftManager()" is null > at > app//kafka.server.SharedServer.$anonfun$start$3(SharedServer.scala:256) > at > app//org.apache.kafka.image.loader.MetadataLoader.stillNeedToCatchUp(MetadataLoader.java:229) > at > app//org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:270) > at > app//org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:258) > at > app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) > at > app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) > at > app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) > at java.base@17.0.4.1/java.lang.Thread.run(Thread.java:833) > Caused by: java.lang.NullPointerException: Cannot invoke > "kafka.raft.KafkaRaftManager.client()" because the return value of > "kafka.server.SharedServer.raftManager()" is null > ... 8 more{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14900) Flaky test AuthorizerTest failing with NPE
Greg Harris created KAFKA-14900: --- Summary: Flaky test AuthorizerTest failing with NPE Key: KAFKA-14900 URL: https://issues.apache.org/jira/browse/KAFKA-14900 Project: Kafka Issue Type: Test Components: kraft Reporter: Greg Harris Assignee: Greg Harris The AuthorizerTest has multiple tests that appear to have the same flaky failure: {noformat} org.apache.kafka.server.fault.FaultHandlerException: quorumTestHarnessFaultHandler: Unhandled error initializing new publishers: Cannot invoke "kafka.raft.KafkaRaftManager.client()" because the return value of "kafka.server.SharedServer.raftManager()" is null at app//kafka.server.SharedServer.$anonfun$start$3(SharedServer.scala:256) at app//org.apache.kafka.image.loader.MetadataLoader.stillNeedToCatchUp(MetadataLoader.java:229) at app//org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:270) at app//org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:258) at app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127) at app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210) at app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181) at java.base@17.0.4.1/java.lang.Thread.run(Thread.java:833)Caused by: java.lang.NullPointerException: Cannot invoke "kafka.raft.KafkaRaftManager.client()" because the return value of "kafka.server.SharedServer.raftManager()" is null... 8 more{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 opened a new pull request, #13548: MINOR: Fix regression in MM2 task forwarding introduced by KAFKA-14783
gharris1727 opened a new pull request, #13548: URL: https://github.com/apache/kafka/pull/13548 The DistributedHerder was computing the forwarded URL for publishing task configs incorrectly leading to 404s in MM2 distributed mode. This regression appears in #13424 and presently only exists on trunk. This manifests as a return to pre-KIP-710 behavior, and causes the DedicatedMirrorIntegrationTest to fail whenever forwarding happens, making the test flake in >50% of runs. It appears to be just a typo and not an intended change, and was hidden by the github diff when this function was split into two parts. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rittikaadhikari commented on pull request #13503: MINOR: Refactor TierStateMachine related tests into a separate test file
rittikaadhikari commented on PR #13503: URL: https://github.com/apache/kafka/pull/13503#issuecomment-1505826616 @junrao when you get a chance, would you be free to take a quick pass? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
jeffkbkim commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1160974966 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * Properties are :- + * + * 1) Each consumer must get at least one partition per topic that it is subscribed to whenever the number of consumers is + *less than or equal to the number of partitions for that topic. (Range) + * 2) Partitions should be assigned to consumers in a way that facilitates join operations where required. (Range) + *This can only be done if the topics are co-partitioned in the first place + *Co-partitioned:- + *Two streams are co-partitioned if the following conditions are met:- + * ->The keys must have the same schemas + * ->The topics involved must have the same number of partitions + * 3) Consumers should retain as much as their previous assignment as possible. (Sticky) + * + * + * + * The algorithm works mainly in 5 steps described below + * + * 1) Get a map of the consumersPerTopic created using the member subscriptions. + * 2) Get a list of consumers (potentiallyUnfilled) that have not met the minimum required quota for assignment AND + * get a list of sticky partitions that we want to retain in the new assignment. + * 3) Add consumers from potentiallyUnfilled to Unfilled if they haven't met the total required quota = minQuota + (if necessary) extraPartition + * 4) Get a list of available partitions by calculating the difference between total partitions and assigned sticky partitions + * 5) Iterate through unfilled consumers and assign partitions from available partitions + * + * + * + */ +public class ServerSideStickyRangeAssignor implements PartitionAssignor { + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +private static void putList(Map> map, K key, V value) { +List list = map.computeIfAbsent(key, k -> new ArrayList<>()); +list.add(value); +} + +private static void putSet(Map> map, K key, V value) { +Set set = map.computeIfAbsent(key, k -> new HashSet<>()); +set.add(value); +} + +static class Pair { +private final T first; +private final U second; + +public Pair(T first, U second) { +this.first = first; +this.second = second; +} + +public T getFirst() { +return first; +} + +public U getSecond() { +return second; +} + +@Override +public String toString() { +return "(" + first + ", " + second + ")"; +} +} + +// Returns a map of the list of consumers per Topic (keyed by topicId) +private Map> consumersPerTopic(AssignmentSpec assignmentSpec) { +Map> mapTopicsToConsumers = new HashMap<>(); +Map membersData = assignmentSpec.members; + +for (Map.Entry memberEntry : membersData.entrySet()) { +String memberId = memberEntry.getKey(); +AssignmentMemberSpec memberMetadata = memberEntry.getValue(); +Collection topics = memberMetadata.subscribedTopics; +for (Uuid topicId: topics) { +putList(mapTopicsToConsumers, topicId, memberId); +} +} +return mapTopicsToConsumers; +} + +private Map> getAvailablePartitionsPerTopic(AssignmentSpec assignmentSpec, Map> assignedStickyPartitionsPerTopic) { +Map> availablePartitionsPerTopic = new HashMap<>(); +Map topicsMetadata =
[GitHub] [kafka] dajac closed pull request #13484: KAFKA-14016: Revoke more partitions than expected in Cooperative rebalance
dajac closed pull request #13484: KAFKA-14016: Revoke more partitions than expected in Cooperative rebalance URL: https://github.com/apache/kafka/pull/13484 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)
jeffkbkim commented on code in PR #13443: URL: https://github.com/apache/kafka/pull/13443#discussion_r1160941760 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.lang.Math.min; + +/** + * The Server Side Sticky Range Assignor inherits properties of both the range assignor and the sticky assignor. + * Properties are :- + * + * 1) Each consumer must get at least one partition per topic that it is subscribed to whenever the number of consumers is + *less than or equal to the number of partitions for that topic. (Range) + * 2) Partitions should be assigned to consumers in a way that facilitates join operations where required. (Range) + *This can only be done if the topics are co-partitioned in the first place + *Co-partitioned:- + *Two streams are co-partitioned if the following conditions are met:- + * ->The keys must have the same schemas + * ->The topics involved must have the same number of partitions + * 3) Consumers should retain as much as their previous assignment as possible. (Sticky) + * + * + * + * The algorithm works mainly in 5 steps described below + * + * 1) Get a map of the consumersPerTopic created using the member subscriptions. + * 2) Get a list of consumers (potentiallyUnfilled) that have not met the minimum required quota for assignment AND + * get a list of sticky partitions that we want to retain in the new assignment. + * 3) Add consumers from potentiallyUnfilled to Unfilled if they haven't met the total required quota = minQuota + (if necessary) extraPartition + * 4) Get a list of available partitions by calculating the difference between total partitions and assigned sticky partitions + * 5) Iterate through unfilled consumers and assign partitions from available partitions + * + * + * + */ +public class ServerSideStickyRangeAssignor implements PartitionAssignor { + +public static final String RANGE_ASSIGNOR_NAME = "range"; + +@Override +public String name() { +return RANGE_ASSIGNOR_NAME; +} + +private static void putList(Map> map, K key, V value) { +List list = map.computeIfAbsent(key, k -> new ArrayList<>()); +list.add(value); +} + +private static void putSet(Map> map, K key, V value) { +Set set = map.computeIfAbsent(key, k -> new HashSet<>()); +set.add(value); +} + +static class Pair { +private final T first; +private final U second; + +public Pair(T first, U second) { +this.first = first; +this.second = second; +} + +public T getFirst() { +return first; +} + +public U getSecond() { +return second; +} + +@Override +public String toString() { +return "(" + first + ", " + second + ")"; +} +} + +// Returns a map of the list of consumers per Topic (keyed by topicId) +private Map> consumersPerTopic(AssignmentSpec assignmentSpec) { +Map> mapTopicsToConsumers = new HashMap<>(); +Map membersData = assignmentSpec.members; + +for (Map.Entry memberEntry : membersData.entrySet()) { Review Comment: i think ``` membersData.forEach( (memberId, assignmentMemberSpec) -> { }) ``` looks simpler. wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tinaselenge opened a new pull request, #13547: Add note for KIP-894 and DelegationTokenCommand
tinaselenge opened a new pull request, #13547: URL: https://github.com/apache/kafka/pull/13547 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13524: MINOR: Refine `PartitionAssignor` server-side interface
rreddy-22 commented on code in PR #13524: URL: https://github.com/apache/kafka/pull/13524#discussion_r1164542031 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/common/TopicIdToPartition.java: ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.common; + +import org.apache.kafka.common.Uuid; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +public class TopicIdToPartition { +private final Uuid topicId; +private final Integer partition; +private final Optional> rackIds; + +public TopicIdToPartition(Uuid topicId, Integer topicPartition, Optional> rackIds) { +this.topicId = Objects.requireNonNull(topicId, "topicId cannot be null"); +this.partition = Objects.requireNonNull(topicPartition, "topicPartition cannot be null"); +this.rackIds = Objects.requireNonNull(rackIds, "rackId cannot be null"); +} + +/** + * @return Universally unique id representing this topic partition. + */ +public Uuid topicId() { +return topicId; +} + +/** + * @return the partition number. + */ +public int partition() { +return partition; +} + +@Override +public boolean equals(Object o) { +if (this == o) { +return true; +} +if (o == null || getClass() != o.getClass()) { +return false; +} +TopicIdToPartition that = (TopicIdToPartition) o; +return topicId.equals(that.topicId) && +partition.equals(that.partition); +} + +@Override +public int hashCode() { +final int prime = 31; Review Comment: Hey, yeah we wanted a data structure that only had a topicId to partition number mapping for each partition. The existing topicIdPartition class has topicNames as well. I didn't know how else to name it uniquely xD -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on pull request #13524: MINOR: Refine `PartitionAssignor` server-side interface
rreddy-22 commented on PR #13524: URL: https://github.com/apache/kafka/pull/13524#issuecomment-1505791579 > > I like the idea of augmenting AssignmentTopicMetadata with rackIds since this comes under metadata and it'll be passed to all the assignors -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13524: MINOR: Refine `PartitionAssignor` server-side interface
rreddy-22 commented on code in PR #13524: URL: https://github.com/apache/kafka/pull/13524#discussion_r1164538097 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/common/TopicIdToPartition.java: ## @@ -0,0 +1,76 @@ +/* Review Comment: we can remove it here if its not the right place for it and then couple it with the assignor PRs where its used -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13524: MINOR: Refine `PartitionAssignor` server-side interface
rreddy-22 commented on code in PR #13524: URL: https://github.com/apache/kafka/pull/13524#discussion_r1164536324 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicMetadata.java: ## @@ -25,12 +25,12 @@ public class AssignmentTopicMetadata { /** * The topic name. */ -final String topicName; +private final String topicName; Review Comment: I think there's no harm in keeping it just in case we want to know what topic name is associated with what topicId wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13524: MINOR: Refine `PartitionAssignor` server-side interface
rreddy-22 commented on code in PR #13524: URL: https://github.com/apache/kafka/pull/13524#discussion_r1164535218 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java: ## @@ -28,12 +28,12 @@ public class AssignmentSpec { /** * The members keyed by member id. */ -final Map members; +private final Map members; /** * The topics' metadata keyed by topic id */ -final Map topics; +private final Map topics; Review Comment: I didn't think about the full implementation yet and where the rackIds will be passed, figured we could edit it when it came to implementing the algorithm. The topicIdToPartition data structure could be populated using this so maybe we can add a field for a map of partition to its rackIds -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13524: MINOR: Refine `PartitionAssignor` server-side interface
rreddy-22 commented on code in PR #13524: URL: https://github.com/apache/kafka/pull/13524#discussion_r1164530365 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/MemberAssignment.java: ## @@ -16,25 +16,28 @@ */ package org.apache.kafka.coordinator.group.assignor; -import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; -import java.util.Collection; +import java.util.Map; import java.util.Objects; +import java.util.Set; /** * The partition assignment for a consumer group member. */ public class MemberAssignment { /** - * The target partitions assigned to this member. + * The target partitions assigned to this member keyed by topicId. */ -final Collection targetPartitions; +private final Map> assignedTopicIdPartitions; Review Comment: sounds good -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #13521: MINOR: Add tests verifying leadership during and after reassignments in KRaft mode
cmccabe merged PR #13521: URL: https://github.com/apache/kafka/pull/13521 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13502: KAFKA-14790: Add more AddPartitionsToTxn tests in KafkaApis and Authorizer tests
dajac commented on code in PR #13502: URL: https://github.com/apache/kafka/pull/13502#discussion_r1164521028 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2113,6 +2113,115 @@ class KafkaApisTest { assertEquals(expectedErrors, response.errors()) } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN) + def testHandleAddPartitionsToTxnAuthorizationFailed(version: Short): Unit = { +val topic = "topic" + +val transactionalId = "txnId1" +val producerId = 15L +val epoch = 0.toShort + +val tp = new TopicPartition(topic, 0) + +val addPartitionsToTxnRequest = if (version < 4) + AddPartitionsToTxnRequest.Builder.forClient( +transactionalId, +producerId, +epoch, +Collections.singletonList(tp)).build(version) +else + AddPartitionsToTxnRequest.Builder.forBroker( +new AddPartitionsToTxnTransactionCollection( Review Comment: correct -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13521: MINOR: Add tests verifying leadership during and after reassignments in KRaft mode
cmccabe commented on PR #13521: URL: https://github.com/apache/kafka/pull/13521#issuecomment-1505765473 Sure. The test is valid since it's testing the `PartitionChangeBuilder` and we do want it to have the behavior you're describing. Thanks for the explanation. LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request, #13546: Cherry-pick KAFKA-14857: Fix some MetadataLoader bugs (#13462)
cmccabe opened a new pull request, #13546: URL: https://github.com/apache/kafka/pull/13546 The MetadataLoader is not supposed to publish metadata updates until we have loaded up to the high water mark. Previously, this logic was broken, and we published updates immediately. This PR fixes that and adds a junit test. Another issue is that the MetadataLoader previously assumed that we would periodically get callbacks from the Raft layer even if nothing had happened. We relied on this to install new publishers in a timely fashion, for example. However, in older MetadataVersions that don't include NoOpRecord, this is not a safe assumption. Aside from the above changes, also fix a deadlock in SnapshotGeneratorTest, fix the log prefix for BrokerLifecycleManager, and remove metadata publishers on brokerserver shutdown (like we do for controllers). Reviewers: David Arthur , dengziming Conflicts: This patch was cut down to make it easier to cherry-pick to this older branch. Specifically, I removed the BrokerLifecycleManager.scala logging change and the BrokerServer installPublishers and removeAndClosePublisher changes. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13502: KAFKA-14790: Add more AddPartitionsToTxn tests in KafkaApis and Authorizer tests
jolshan commented on code in PR #13502: URL: https://github.com/apache/kafka/pull/13502#discussion_r1164515573 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2113,6 +2113,115 @@ class KafkaApisTest { assertEquals(expectedErrors, response.errors()) } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN) + def testHandleAddPartitionsToTxnAuthorizationFailed(version: Short): Unit = { +val topic = "topic" + +val transactionalId = "txnId1" +val producerId = 15L +val epoch = 0.toShort + +val tp = new TopicPartition(topic, 0) + +val addPartitionsToTxnRequest = if (version < 4) + AddPartitionsToTxnRequest.Builder.forClient( +transactionalId, +producerId, +epoch, +Collections.singletonList(tp)).build(version) +else + AddPartitionsToTxnRequest.Builder.forBroker( +new AddPartitionsToTxnTransactionCollection( Review Comment: hmm are you referring to where List is? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13502: KAFKA-14790: Add more AddPartitionsToTxn tests in KafkaApis and Authorizer tests
dajac commented on code in PR #13502: URL: https://github.com/apache/kafka/pull/13502#discussion_r1164508565 ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2113,6 +2113,115 @@ class KafkaApisTest { assertEquals(expectedErrors, response.errors()) } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN) + def testHandleAddPartitionsToTxnAuthorizationFailed(version: Short): Unit = { +val topic = "topic" + +val transactionalId = "txnId1" +val producerId = 15L +val epoch = 0.toShort + +val tp = new TopicPartition(topic, 0) + +val addPartitionsToTxnRequest = if (version < 4) + AddPartitionsToTxnRequest.Builder.forClient( +transactionalId, +producerId, +epoch, +Collections.singletonList(tp)).build(version) +else + AddPartitionsToTxnRequest.Builder.forBroker( +new AddPartitionsToTxnTransactionCollection( Review Comment: nit: indentation seems to be off. ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2113,6 +2113,115 @@ class KafkaApisTest { assertEquals(expectedErrors, response.errors()) } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN) + def testHandleAddPartitionsToTxnAuthorizationFailed(version: Short): Unit = { +val topic = "topic" + +val transactionalId = "txnId1" +val producerId = 15L +val epoch = 0.toShort + +val tp = new TopicPartition(topic, 0) + +val addPartitionsToTxnRequest = if (version < 4) + AddPartitionsToTxnRequest.Builder.forClient( +transactionalId, +producerId, +epoch, +Collections.singletonList(tp)).build(version) +else + AddPartitionsToTxnRequest.Builder.forBroker( +new AddPartitionsToTxnTransactionCollection( +List(new AddPartitionsToTxnTransaction() + .setTransactionalId(transactionalId) + .setProducerId(producerId) + .setProducerEpoch(epoch) + .setVerifyOnly(true) + .setTopics(new AddPartitionsToTxnTopicCollection( +Collections.singletonList(new AddPartitionsToTxnTopic() + .setName(tp.topic) + .setPartitions(Collections.singletonList(tp.partition)) +).iterator())) +).asJava.iterator())).build(version) + +val requestChannelRequest = buildRequest(addPartitionsToTxnRequest) + +val authorizer: Authorizer = mock(classOf[Authorizer]) +when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) + .thenReturn(Seq(AuthorizationResult.DENIED).asJava) + + createKafkaApis(authorizer = Some(authorizer)).handle( +requestChannelRequest, +RequestLocal.NoCaching + ) + +val response = verifyNoThrottling[AddPartitionsToTxnResponse](requestChannelRequest) +val error = if (version < 4) + response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(tp) +else + Errors.forCode(response.data().errorCode()) + +val expectedError = if (version < 4) Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED else Errors.CLUSTER_AUTHORIZATION_FAILED +assertEquals(expectedError, error) + } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN) + def testAddPartitionsToTxnOperationNotAttempted(version: Short): Unit = { +val topic = "topic" +addTopicToMetadataCache(topic, numPartitions = 1) + +val transactionalId = "txnId1" +val producerId = 15L +val epoch = 0.toShort + +val tp0 = new TopicPartition(topic, 0) +val tp1 = new TopicPartition(topic, 1) + +val addPartitionsToTxnRequest = if (version < 4) + AddPartitionsToTxnRequest.Builder.forClient( +transactionalId, +producerId, +epoch, +List(tp0, tp1).asJava).build(version) +else + AddPartitionsToTxnRequest.Builder.forBroker( +new AddPartitionsToTxnTransactionCollection( + List(new AddPartitionsToTxnTransaction() +.setTransactionalId(transactionalId) +.setProducerId(producerId) +.setProducerEpoch(epoch) +.setVerifyOnly(true) +.setTopics(new AddPartitionsToTxnTopicCollection( + Collections.singletonList(new AddPartitionsToTxnTopic() +.setName(tp0.topic) +.setPartitions(List[Integer](tp0.partition, tp1.partition()).asJava) + ).iterator())) + ).asJava.iterator())).build(version) + +val requestChannelRequest = buildRequest(addPartitionsToTxnRequest) + +createKafkaApis().handleAddPartitionsToTxnRequest( + requestChannelRequest, + RequestLocal.NoCaching +) + +val response = verifyNoThrottling[AddPartitionsToTxnResponse](requestChannelRequest) + +def checkErrorForTp(tp: TopicPartition): Unit = { + val error = if (version < 4) +
[GitHub] [kafka] mjsax merged pull request #13444: KAFKA-14491: [21/N] Docs updates for versioned state stores
mjsax merged PR #13444: URL: https://github.com/apache/kafka/pull/13444 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13524: MINOR: Refine `PartitionAssignor` server-side interface
rreddy-22 commented on code in PR #13524: URL: https://github.com/apache/kafka/pull/13524#discussion_r1164499488 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentMemberSpec.java: ## @@ -29,37 +31,53 @@ public class AssignmentMemberSpec { /** * The instance ID if provided. */ -final Optional instanceId; +private final Optional instanceId; /** * The rack ID if provided. */ -final Optional rackId; +private final Optional rackId; /** - * The topics that the member is subscribed to. + * The topicIds of topics that the member is subscribed to. */ -final Collection subscribedTopics; +private final Collection subscribedTopics; /** - * The current target partitions of the member. + * Partitions assigned for this member keyed by topicId */ -final Collection targetPartitions; +private final Map> assignedTopicIdPartitions; Review Comment: I thought it would be more clear if it was topicIdpartitions since we're keying by topicId? Should we change it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14898) [ MirrorMaker ] sync.topic.configs.enabled not working as expected
[ https://issues.apache.org/jira/browse/KAFKA-14898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711511#comment-17711511 ] Greg Harris edited comment on KAFKA-14898 at 4/12/23 6:25 PM: -- [~bseenu] Are you running MM2 with the MirrorMaker dedicated launcher? There's a known issue where a multi-node cluster is unable to persist configuration changes: https://issues.apache.org/jira/browse/KAFKA-10586 which has a fix to be released in 3.5.0. You can verify that the above is affecting you if you see these log messages: [https://github.com/apache/kafka/blob/9c0caca6600cd18c161f7be48a7745a98ff091dd/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java#L236-L241] indicating that the existing connector configuration is being used (meaning that your new connector configuration with enabled=false is being dropped) In the meantime, you can cold-start your cluster (shut down all nodes, then bring them all back) and that will ensure that at least one of the nodes takes your on-disk configuration and applies it to the connectors. was (Author: gharris1727): [~bseenu] Are you running MM2 with the MirrorMaker dedicated launcher? There's a known issue where a multi-node cluster is unable to persist configuration changes: https://issues.apache.org/jira/browse/KAFKA-10586 which has a fix to be released in 3.5.0. In the meantime, you can cold-start your cluster (shut down all nodes, then bring them all back) and that will ensure that at least one of the nodes takes your on-disk configuration and applies it to the connectors. > [ MirrorMaker ] sync.topic.configs.enabled not working as expected > -- > > Key: KAFKA-14898 > URL: https://issues.apache.org/jira/browse/KAFKA-14898 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.4.0 >Reporter: Srinivas Boga >Priority: Major > Labels: mirrormaker > > Hello, > In my replication set up , i do not want to sync the topic configs, the use > case is to have different retention time for the topic on the target cluster, > I am passing the config > {code:java} > sync.topic.configs.enabled = false{code} > but this is not working as expected the topic retention time is being set to > whatever is being set in the source cluster, looking at the mirrormaker logs > i can see that MirrorSourceConnector is still setting the above config as true > {code:java} > [2023-04-12 17:04:55,184] INFO [MirrorSourceConnector|task-8] ConsumerConfig > values: > allow.auto.create.topics = true > auto.commit.interval.ms = 5000 > auto.include.jmx.reporter = true > auto.offset.reset = earliest > bootstrap.servers = [sourcecluster.com:9092] > check.crcs = true > client.dns.lookup = use_all_dns_ips > client.id = consumer-null-2 > client.rack = > connections.max.idle.ms = 54 > default.api.timeout.ms = 6 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = null > group.instance.id = null > heartbeat.interval.ms = 3000 > interceptor.classes = [] > internal.leave.group.on.close = true > internal.throw.on.fetch.stable.offset.unsupported = false > isolation.level = read_uncommitted > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 30 > max.poll.records = 500 > metadata.max.age.ms = 30 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 3 > partition.assignment.strategy = [class > org.apache.kafka.clients.consumer.RangeAssignor, class > org.apache.kafka.clients.consumer.CooperativeStickyAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 3 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.connect.timeout.ms = null > sasl.login.read.timeout.ms = null > sasl.login.refresh.buffer.seconds = 300 >