[GitHub] [kafka] vcrfxia opened a new pull request, #13554: KAFKA-14834: [7/N] Update VersionedKeyValueStore#put() to return boolean

2023-04-12 Thread via GitHub


vcrfxia opened a new pull request, #13554:
URL: https://github.com/apache/kafka/pull/13554

   This PR updates the return type of `VersionedKeyValueStore#put(...)` from 
void to boolean, where the boolean represents whether the record that was put 
is the latest record for the particular key. As part of making this change, 
VersionedBytesStore introduces its own `put(key, value, timestamp)` method to 
avoid method signature conflicts with the existing `put(key, value)` method 
from `KeyValueStore` which has void return type. As a result, 
the previously added `NullableValueAndTimestampSerde` class is no longer needed 
so it's also been removed in this PR as cleanup.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14883) Broker state should be "observer" in KRaft quorum

2023-04-12 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-14883.
---
Fix Version/s: 3.5.0
   Resolution: Fixed

> Broker state should be "observer" in KRaft quorum
> -
>
> Key: KAFKA-14883
> URL: https://issues.apache.org/jira/browse/KAFKA-14883
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft, metrics
>Affects Versions: 3.4.0
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Major
> Fix For: 3.5.0
>
>
> Currently, the `current-state` KRaft related metric reports `follower` state 
> for a broker while technically it should be reported as an `observer`  as the 
> `kafka-metadata-quorum` tool does.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-9550) RemoteLogManager - copying eligible log segments to remote storage implementation

2023-04-12 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-9550.
--
Resolution: Fixed

> RemoteLogManager - copying eligible log segments to remote storage 
> implementation 
> --
>
> Key: KAFKA-9550
> URL: https://issues.apache.org/jira/browse/KAFKA-9550
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
> Fix For: 3.5.0
>
>
> Implementation of RLM as mentioned in the HLD section of KIP-405, this JIRA 
> covers copying segments to remote storage. 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP-405:KafkaTieredStorage-High-leveldesign]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon merged pull request #13525: KAFKA-14883: Expose `observer` state in KRaft metrics

2023-04-12 Thread via GitHub


showuon merged PR #13525:
URL: https://github.com/apache/kafka/pull/13525


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13525: KAFKA-14883: Expose `observer` state in KRaft metrics

2023-04-12 Thread via GitHub


showuon commented on PR #13525:
URL: https://github.com/apache/kafka/pull/13525#issuecomment-1506342972

   Failed tests are unrelated
   ```
   Build / JDK 8 and Scala 2.12 / 
kafka.security.authorizer.AuthorizerTest.testAllowAllAccess(String).quorum=kraft
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.controller.QuorumControllerTest.testDelayedConfigurationOperations()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
   Build / JDK 17 and Scala 2.13 / 
kafka.admin.AddPartitionsTest.testWrongReplicaCount(String).quorum=zk
   Build / JDK 17 and Scala 2.13 / 
kafka.security.authorizer.AuthorizerTest.testAuthorizeWithPrefixedResource(String).quorum=kraft
   Build / JDK 17 and Scala 2.13 / 
kafka.security.authorizer.AuthorizerTest.testNoAclFound(String).quorum=kraft
   Build / JDK 17 and Scala 2.13 / 
kafka.security.authorizer.AuthorizerTest.testDeleteAclOnPrefixedResource(String).quorum=kraft
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13525: KAFKA-14883: Expose `observer` state in KRaft metrics

2023-04-12 Thread via GitHub


showuon commented on code in PR #13525:
URL: https://github.com/apache/kafka/pull/13525#discussion_r1164996982


##
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java:
##
@@ -137,14 +137,14 @@ public void shouldRecordNonVoterQuorumState() throws 
IOException {
 state.initialize(new OffsetAndEpoch(0L, 0));
 raftMetrics = new KafkaRaftMetrics(metrics, "raft", state);
 
-assertEquals("unattached", getMetric(metrics, 
"current-state").metricValue());
+assertEquals("observer", getMetric(metrics, 
"current-state").metricValue());
 assertEquals((double) -1L, getMetric(metrics, 
"current-leader").metricValue());
 assertEquals((double) -1L, getMetric(metrics, 
"current-vote").metricValue());
 assertEquals((double) 0, getMetric(metrics, 
"current-epoch").metricValue());
 assertEquals((double) -1L, getMetric(metrics, 
"high-watermark").metricValue());
 
 state.transitionToFollower(2, 1);
-assertEquals("follower", getMetric(metrics, 
"current-state").metricValue());
+assertEquals("observer", getMetric(metrics, 
"current-state").metricValue());

Review Comment:
   Thanks, @dengziming ! :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

2023-04-12 Thread via GitHub


vcrfxia commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1164980568


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java:
##
@@ -705,12 +711,304 @@ public void shouldNotOptimizeJoinWhenNotInConfig() {
 assertEquals(count.get(), 2);
 }
 
+@Test
+public void shouldSetUseVersionedSemanticsOnTableFilter() {
+// Given:
+final MaterializedInternal> materializedInternal =
+new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("store",
 Duration.ofMinutes(5))), builder, storePrefix);
+final KTable table1 = builder.table("t1", consumed, 
materializedInternal);
+table1.filter((k, v) -> v != null);
+
+// When:
+builder.buildAndOptimizeTopology();
+
+// Then:
+final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+assertNotNull(filter);
+verifyVersionedSemantics((TableFilterNode) filter, true);
+}
+
+@Test
+public void shouldSetUseVersionedSemanticsWithIntermediateNode() {
+// Given:
+final MaterializedInternal> versionedMaterialize =
+new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+final KTable table1 = builder.table("t1", consumed, 
versionedMaterialize);
+final KTable table2 = table1.mapValues(v -> v != null 
? v + v : null);
+table2.filter((k, v) -> v != null);
+
+// When:
+builder.buildAndOptimizeTopology();
+
+// Then:
+final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+assertNotNull(filter);
+verifyVersionedSemantics((TableFilterNode) filter, true);
+}
+
+@Test
+public void 
shouldNotSetUseVersionedSemanticsWithMaterializedIntermediateNode() {
+// Given:
+final MaterializedInternal> versionedMaterialize =
+new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+final MaterializedInternal> unversionedMaterialize =
+new MaterializedInternal<>(Materialized.as("unversioned"), 
builder, storePrefix);
+final KTable table1 = builder.table("t1", consumed, 
versionedMaterialize);
+final KTable table2 = table1.mapValues(v -> v != null 
? v + v : null, unversionedMaterialize);
+table2.filter((k, v) -> v != null);
+
+// When:
+builder.buildAndOptimizeTopology();
+
+// Then:
+final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+assertNotNull(filter);
+verifyVersionedSemantics((TableFilterNode) filter, false);
+}
+
+@Test
+public void 
shouldSetUseVersionedSemanticsWithIntermediateNodeMaterializedAsVersioned() {
+// Given:
+final MaterializedInternal> versionedMaterialize =
+new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+final MaterializedInternal> versionedMaterialize2 =
+new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2",
 Duration.ofMinutes(5))), builder, storePrefix);
+final KTable table1 = builder.table("t1", consumed, 
versionedMaterialize);
+final KTable table2 = table1.mapValues(v -> v != null 
? v + v : null, versionedMaterialize2);
+table2.filter((k, v) -> v != null);
+
+// When:
+builder.buildAndOptimizeTopology();
+
+// Then:
+final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+assertNotNull(filter);
+verifyVersionedSemantics((TableFilterNode) filter, true);
+}
+
+@Test
+public void shouldNotSetUseVersionedSemanticsWithIntermediateAggregation() 
{
+// Given:
+final MaterializedInternal> versionedMaterialize =
+new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+final KTable table1 = builder.table("t1", consumed, 
versionedMaterialize);
+final KTable table2 = 
table1.groupBy(KeyValue::new).count();
+table2.filter((k, v) -> v != null);
+
+// When:
+builder.buildAndOptimizeTopology();
+
+// Then:
+final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+assertNotNull(filter);
+verifyVersionedSemantics((TableFilterNode) filter, false);
+}
+
+@Test
+public void 
shouldSetUseVersionedSemanticsWithIntermediateAggregationMaterializedAsVersioned()
 {
+  

[jira] [Assigned] (KAFKA-14901) Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration

2023-04-12 Thread li xiangyuan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

li xiangyuan reassigned KAFKA-14901:


Assignee: (was: li xiangyuan)

> Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
> 
>
> Key: KAFKA-14901
> URL: https://issues.apache.org/jira/browse/KAFKA-14901
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Greg Harris
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.5.0
>
> Attachments: transaction-flake-trace-0.out, transaction-flake.out
>
>
> The EOS Source test appears to be very rarely failing (<5% chance) with the 
> following error:
> {noformat}
> org.apache.kafka.common.KafkaException: Unexpected error in 
> InitProducerIdResponse; The server experienced an unexpected error when 
> processing the request.
>   at 
> app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303)
>   at 
> app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207)
>   at 
> app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
>   at 
> app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594)
>   at 
> app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
>   at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829){noformat}
> which appears to be triggered by the following failure inside the broker:
> {noformat}
> [2023-04-12 14:01:38,931] ERROR [KafkaApi-0] Unexpected error handling 
> request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
> clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, 
> headerVersion=2) -- 
> InitProducerIdRequestData(transactionalId='exactly-once-source-integration-test-exactlyOnceQuestionMark-1',
>  transactionTimeoutMs=6, producerId=-1, producerEpoch=-1) with context 
> RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
> clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, 
> headerVersion=2), connectionId='127.0.0.1:54213-127.0.0.1:54367-46', 
> clientAddress=/127.0.0.1, principal=User:ANONYMOUS, 
> listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, 
> clientInformation=ClientInformation(softwareName=apache-kafka-java, 
> softwareVersion=3.5.0-SNAPSHOT), fromPrivilegedListener=true, 
> principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@615924cd])
>  (kafka.server.KafkaApis:76)
> java.lang.IllegalStateException: Preparing transaction state transition to 
> Empty while it already a pending state Ongoing
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:380)
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareIncrementProducerEpoch(TransactionMetadata.scala:311)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.prepareInitProducerIdTransit(TransactionCoordinator.scala:240)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$3(TransactionCoordinator.scala:151)
>     at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:242)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$2(TransactionCoordinator.scala:150)
>     at scala.util.Either.flatMap(Either.scala:352)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.handleInitProducerId(TransactionCoordinator.scala:145)
>     at 
> kafka.server.KafkaApis.handleInitProducerIdRequest(KafkaApis.scala:2236)
>     at kafka.server.KafkaApis.handle(KafkaApis.scala:202)
>     at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76)
>     at java.base/java.lang.Thread.run(Thread.java:829{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14901) Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration

2023-04-12 Thread li xiangyuan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

li xiangyuan reassigned KAFKA-14901:


Assignee: li xiangyuan

> Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
> 
>
> Key: KAFKA-14901
> URL: https://issues.apache.org/jira/browse/KAFKA-14901
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Greg Harris
>Assignee: li xiangyuan
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.5.0
>
> Attachments: transaction-flake-trace-0.out, transaction-flake.out
>
>
> The EOS Source test appears to be very rarely failing (<5% chance) with the 
> following error:
> {noformat}
> org.apache.kafka.common.KafkaException: Unexpected error in 
> InitProducerIdResponse; The server experienced an unexpected error when 
> processing the request.
>   at 
> app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303)
>   at 
> app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207)
>   at 
> app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
>   at 
> app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594)
>   at 
> app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
>   at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829){noformat}
> which appears to be triggered by the following failure inside the broker:
> {noformat}
> [2023-04-12 14:01:38,931] ERROR [KafkaApi-0] Unexpected error handling 
> request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
> clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, 
> headerVersion=2) -- 
> InitProducerIdRequestData(transactionalId='exactly-once-source-integration-test-exactlyOnceQuestionMark-1',
>  transactionTimeoutMs=6, producerId=-1, producerEpoch=-1) with context 
> RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
> clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, 
> headerVersion=2), connectionId='127.0.0.1:54213-127.0.0.1:54367-46', 
> clientAddress=/127.0.0.1, principal=User:ANONYMOUS, 
> listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, 
> clientInformation=ClientInformation(softwareName=apache-kafka-java, 
> softwareVersion=3.5.0-SNAPSHOT), fromPrivilegedListener=true, 
> principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@615924cd])
>  (kafka.server.KafkaApis:76)
> java.lang.IllegalStateException: Preparing transaction state transition to 
> Empty while it already a pending state Ongoing
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:380)
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareIncrementProducerEpoch(TransactionMetadata.scala:311)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.prepareInitProducerIdTransit(TransactionCoordinator.scala:240)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$3(TransactionCoordinator.scala:151)
>     at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:242)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$2(TransactionCoordinator.scala:150)
>     at scala.util.Either.flatMap(Either.scala:352)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.handleInitProducerId(TransactionCoordinator.scala:145)
>     at 
> kafka.server.KafkaApis.handleInitProducerIdRequest(KafkaApis.scala:2236)
>     at kafka.server.KafkaApis.handle(KafkaApis.scala:202)
>     at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76)
>     at java.base/java.lang.Thread.run(Thread.java:829{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dengziming commented on a diff in pull request #13525: KAFKA-14883: Expose `observer` state in KRaft metrics

2023-04-12 Thread via GitHub


dengziming commented on code in PR #13525:
URL: https://github.com/apache/kafka/pull/13525#discussion_r1164956809


##
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java:
##
@@ -137,14 +137,14 @@ public void shouldRecordNonVoterQuorumState() throws 
IOException {
 state.initialize(new OffsetAndEpoch(0L, 0));
 raftMetrics = new KafkaRaftMetrics(metrics, "raft", state);
 
-assertEquals("unattached", getMetric(metrics, 
"current-state").metricValue());
+assertEquals("observer", getMetric(metrics, 
"current-state").metricValue());
 assertEquals((double) -1L, getMetric(metrics, 
"current-leader").metricValue());
 assertEquals((double) -1L, getMetric(metrics, 
"current-vote").metricValue());
 assertEquals((double) 0, getMetric(metrics, 
"current-epoch").metricValue());
 assertEquals((double) -1L, getMetric(metrics, 
"high-watermark").metricValue());
 
 state.transitionToFollower(2, 1);
-assertEquals("follower", getMetric(metrics, 
"current-state").metricValue());
+assertEquals("observer", getMetric(metrics, 
"current-state").metricValue());

Review Comment:
   Yes, `resigned` is another issue, in the PR we can try to distinguish  
`observer` and `follower` to match the terminology we use in public api.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

2023-04-12 Thread via GitHub


mjsax commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1164930944


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java:
##
@@ -705,12 +711,304 @@ public void shouldNotOptimizeJoinWhenNotInConfig() {
 assertEquals(count.get(), 2);
 }
 
+@Test
+public void shouldSetUseVersionedSemanticsOnTableFilter() {
+// Given:
+final MaterializedInternal> materializedInternal =
+new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("store",
 Duration.ofMinutes(5))), builder, storePrefix);
+final KTable table1 = builder.table("t1", consumed, 
materializedInternal);
+table1.filter((k, v) -> v != null);
+
+// When:
+builder.buildAndOptimizeTopology();
+
+// Then:
+final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+assertNotNull(filter);
+verifyVersionedSemantics((TableFilterNode) filter, true);
+}
+
+@Test
+public void shouldSetUseVersionedSemanticsWithIntermediateNode() {
+// Given:
+final MaterializedInternal> versionedMaterialize =
+new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+final KTable table1 = builder.table("t1", consumed, 
versionedMaterialize);
+final KTable table2 = table1.mapValues(v -> v != null 
? v + v : null);
+table2.filter((k, v) -> v != null);
+
+// When:
+builder.buildAndOptimizeTopology();
+
+// Then:
+final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+assertNotNull(filter);
+verifyVersionedSemantics((TableFilterNode) filter, true);
+}
+
+@Test
+public void 
shouldNotSetUseVersionedSemanticsWithMaterializedIntermediateNode() {
+// Given:
+final MaterializedInternal> versionedMaterialize =
+new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+final MaterializedInternal> unversionedMaterialize =
+new MaterializedInternal<>(Materialized.as("unversioned"), 
builder, storePrefix);
+final KTable table1 = builder.table("t1", consumed, 
versionedMaterialize);
+final KTable table2 = table1.mapValues(v -> v != null 
? v + v : null, unversionedMaterialize);
+table2.filter((k, v) -> v != null);
+
+// When:
+builder.buildAndOptimizeTopology();
+
+// Then:
+final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+assertNotNull(filter);
+verifyVersionedSemantics((TableFilterNode) filter, false);
+}
+
+@Test
+public void 
shouldSetUseVersionedSemanticsWithIntermediateNodeMaterializedAsVersioned() {
+// Given:
+final MaterializedInternal> versionedMaterialize =
+new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+final MaterializedInternal> versionedMaterialize2 =
+new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned2",
 Duration.ofMinutes(5))), builder, storePrefix);
+final KTable table1 = builder.table("t1", consumed, 
versionedMaterialize);
+final KTable table2 = table1.mapValues(v -> v != null 
? v + v : null, versionedMaterialize2);
+table2.filter((k, v) -> v != null);
+
+// When:
+builder.buildAndOptimizeTopology();
+
+// Then:
+final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+assertNotNull(filter);
+verifyVersionedSemantics((TableFilterNode) filter, true);
+}
+
+@Test
+public void shouldNotSetUseVersionedSemanticsWithIntermediateAggregation() 
{
+// Given:
+final MaterializedInternal> versionedMaterialize =
+new 
MaterializedInternal<>(Materialized.as(Stores.persistentVersionedKeyValueStore("versioned",
 Duration.ofMinutes(5))), builder, storePrefix);
+final KTable table1 = builder.table("t1", consumed, 
versionedMaterialize);
+final KTable table2 = 
table1.groupBy(KeyValue::new).count();
+table2.filter((k, v) -> v != null);
+
+// When:
+builder.buildAndOptimizeTopology();
+
+// Then:
+final GraphNode filter = getNodeByType(builder.root, 
TableFilterNode.class, new HashSet<>());
+assertNotNull(filter);
+verifyVersionedSemantics((TableFilterNode) filter, false);
+}
+
+@Test
+public void 
shouldSetUseVersionedSemanticsWithIntermediateAggregationMaterializedAsVersioned()
 {
+

[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

2023-04-12 Thread via GitHub


mjsax commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1164922362


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##
@@ -1294,4 +1307,11 @@ private  KTable 
doJoinOnForeignKey(final KTable forei
 builder
 );
 }
+
+private static void maybeSetOutputVersioned(final GraphNode tableNode,

Review Comment:
   To avoid this helper, would it be worth to rewrite this class a little bit 
and ensure that `materializedInternal` is never `null`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

2023-04-12 Thread via GitHub


mjsax commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1164920870


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##
@@ -1281,6 +1293,7 @@ private  KTable 
doJoinOnForeignKey(final KTable forei
 ),
 resultStore
 );
+resultNode.setOutputVersioned(materializedInternal.storeSupplier() 
instanceof VersionedBytesStoreSupplier);

Review Comment:
   Should we make `setOutputVersioned` part of `TableProcessorNode` constructor?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

2023-04-12 Thread via GitHub


mjsax commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1164915423


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java:
##
@@ -73,6 +74,7 @@  KTable build(final Map, Aggregator

[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

2023-04-12 Thread via GitHub


mjsax commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1164913480


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java:
##
@@ -236,13 +237,16 @@ public SessionWindowedKStream windowedBy(final 
SessionWindows windows) {
 private  KTable doAggregate(final KStreamAggProcessorSupplier aggregateSupplier,
  final String functionName,
  final MaterializedInternal> materializedInternal) {
+final boolean isOutputVersioned = materializedInternal != null

Review Comment:
   as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-12 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1164911085


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are :-
+ * 
+ *  1) Each consumer must get at least one partition per topic that it is 
subscribed to whenever the number of consumers is
+ *less than or equal to the number of partitions for that topic. (Range) 

+ *  2) Partitions should be assigned to consumers in a way that 
facilitates join operations where required. (Range) 
+ *This can only be done if the topics are co-partitioned in the first place
+ *Co-partitioned:-
+ *Two streams are co-partitioned if the following conditions are met:-
+ * ->The keys must have the same schemas
+ * ->The topics involved must have the same number of partitions
+ *  3) Consumers should retain as much as their previous assignment as 
possible. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm works mainly in 5 steps described below
+ * 
+ *  1) Get a map of the consumersPerTopic created using the member 
subscriptions.
+ *  2) Get a list of consumers (potentiallyUnfilled) that have not met the 
minimum required quota for assignment AND
+ * get a list of sticky partitions that we want to retain in the new 
assignment.
+ *  3) Add consumers from potentiallyUnfilled to Unfilled if they haven't 
met the total required quota = minQuota + (if necessary) extraPartition 
+ *  4) Get a list of available partitions by calculating the difference 
between total partitions and assigned sticky partitions 
+ *  5) Iterate through unfilled consumers and assign partitions from 
available partitions 
+ * 
+ * 
+ *
+ */
+public class ServerSideStickyRangeAssignor implements PartitionAssignor {
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+private static  void putList(Map> map, K key, V value) {
+List list = map.computeIfAbsent(key, k -> new ArrayList<>());
+list.add(value);
+}
+
+private static  void putSet(Map> map, K key, V value) {
+Set set = map.computeIfAbsent(key, k -> new HashSet<>());
+set.add(value);
+}
+
+static class Pair {
+private final T first;
+private final U second;
+
+public Pair(T first, U second) {
+this.first = first;
+this.second = second;
+}
+
+public T getFirst() {
+return first;
+}
+
+public U getSecond() {
+return second;
+}
+
+@Override
+public String toString() {
+return "(" + first + ", " + second + ")";
+}
+}
+
+// Returns a map of the list of consumers per Topic (keyed by topicId)
+private Map> consumersPerTopic(AssignmentSpec 
assignmentSpec) {
+Map> mapTopicsToConsumers = new HashMap<>();
+Map membersData = assignmentSpec.members;
+
+for (Map.Entry memberEntry : 
membersData.entrySet()) {
+String memberId = memberEntry.getKey();
+AssignmentMemberSpec memberMetadata = memberEntry.getValue();
+Collection topics = memberMetadata.subscribedTopics;
+for (Uuid topicId: topics) {
+putList(mapTopicsToConsumers, topicId, memberId);
+}
+}
+return mapTopicsToConsumers;
+}
+
+private Map> 
getAvailablePartitionsPerTopic(AssignmentSpec assignmentSpec, Map> assignedStickyPartitionsPerTopic) {
+Map> availablePartitionsPerTopic = new HashMap<>();
+Map topicsMetadata = 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-12 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1164908509


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are :-
+ * 
+ *  1) Each consumer must get at least one partition per topic that it is 
subscribed to whenever the number of consumers is
+ *less than or equal to the number of partitions for that topic. (Range) 

+ *  2) Partitions should be assigned to consumers in a way that 
facilitates join operations where required. (Range) 
+ *This can only be done if the topics are co-partitioned in the first place
+ *Co-partitioned:-
+ *Two streams are co-partitioned if the following conditions are met:-
+ * ->The keys must have the same schemas
+ * ->The topics involved must have the same number of partitions
+ *  3) Consumers should retain as much as their previous assignment as 
possible. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm works mainly in 5 steps described below
+ * 
+ *  1) Get a map of the consumersPerTopic created using the member 
subscriptions.
+ *  2) Get a list of consumers (potentiallyUnfilled) that have not met the 
minimum required quota for assignment AND
+ * get a list of sticky partitions that we want to retain in the new 
assignment.
+ *  3) Add consumers from potentiallyUnfilled to Unfilled if they haven't 
met the total required quota = minQuota + (if necessary) extraPartition 
+ *  4) Get a list of available partitions by calculating the difference 
between total partitions and assigned sticky partitions 
+ *  5) Iterate through unfilled consumers and assign partitions from 
available partitions 
+ * 
+ * 
+ *
+ */
+public class ServerSideStickyRangeAssignor implements PartitionAssignor {
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+private static  void putList(Map> map, K key, V value) {
+List list = map.computeIfAbsent(key, k -> new ArrayList<>());
+list.add(value);
+}
+
+private static  void putSet(Map> map, K key, V value) {
+Set set = map.computeIfAbsent(key, k -> new HashSet<>());
+set.add(value);
+}
+
+static class Pair {
+private final T first;
+private final U second;
+
+public Pair(T first, U second) {
+this.first = first;
+this.second = second;
+}
+
+public T getFirst() {
+return first;
+}
+
+public U getSecond() {
+return second;
+}
+
+@Override
+public String toString() {
+return "(" + first + ", " + second + ")";
+}
+}
+
+// Returns a map of the list of consumers per Topic (keyed by topicId)
+private Map> consumersPerTopic(AssignmentSpec 
assignmentSpec) {
+Map> mapTopicsToConsumers = new HashMap<>();
+Map membersData = assignmentSpec.members;
+
+for (Map.Entry memberEntry : 
membersData.entrySet()) {
+String memberId = memberEntry.getKey();
+AssignmentMemberSpec memberMetadata = memberEntry.getValue();
+Collection topics = memberMetadata.subscribedTopics;
+for (Uuid topicId: topics) {
+putList(mapTopicsToConsumers, topicId, memberId);
+}
+}
+return mapTopicsToConsumers;
+}
+
+private Map> 
getAvailablePartitionsPerTopic(AssignmentSpec assignmentSpec, Map> assignedStickyPartitionsPerTopic) {
+Map> availablePartitionsPerTopic = new HashMap<>();
+Map topicsMetadata = 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-12 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1164908245


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are :-
+ * 
+ *  1) Each consumer must get at least one partition per topic that it is 
subscribed to whenever the number of consumers is
+ *less than or equal to the number of partitions for that topic. (Range) 

+ *  2) Partitions should be assigned to consumers in a way that 
facilitates join operations where required. (Range) 
+ *This can only be done if the topics are co-partitioned in the first place
+ *Co-partitioned:-
+ *Two streams are co-partitioned if the following conditions are met:-
+ * ->The keys must have the same schemas
+ * ->The topics involved must have the same number of partitions
+ *  3) Consumers should retain as much as their previous assignment as 
possible. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm works mainly in 5 steps described below
+ * 
+ *  1) Get a map of the consumersPerTopic created using the member 
subscriptions.
+ *  2) Get a list of consumers (potentiallyUnfilled) that have not met the 
minimum required quota for assignment AND
+ * get a list of sticky partitions that we want to retain in the new 
assignment.
+ *  3) Add consumers from potentiallyUnfilled to Unfilled if they haven't 
met the total required quota = minQuota + (if necessary) extraPartition 
+ *  4) Get a list of available partitions by calculating the difference 
between total partitions and assigned sticky partitions 
+ *  5) Iterate through unfilled consumers and assign partitions from 
available partitions 
+ * 
+ * 
+ *
+ */
+public class ServerSideStickyRangeAssignor implements PartitionAssignor {
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+private static  void putList(Map> map, K key, V value) {
+List list = map.computeIfAbsent(key, k -> new ArrayList<>());
+list.add(value);
+}
+
+private static  void putSet(Map> map, K key, V value) {
+Set set = map.computeIfAbsent(key, k -> new HashSet<>());
+set.add(value);
+}
+
+static class Pair {
+private final T first;
+private final U second;
+
+public Pair(T first, U second) {
+this.first = first;
+this.second = second;
+}
+
+public T getFirst() {
+return first;
+}
+
+public U getSecond() {
+return second;
+}
+
+@Override
+public String toString() {
+return "(" + first + ", " + second + ")";
+}
+}
+
+// Returns a map of the list of consumers per Topic (keyed by topicId)
+private Map> consumersPerTopic(AssignmentSpec 
assignmentSpec) {
+Map> mapTopicsToConsumers = new HashMap<>();
+Map membersData = assignmentSpec.members;
+
+for (Map.Entry memberEntry : 
membersData.entrySet()) {
+String memberId = memberEntry.getKey();
+AssignmentMemberSpec memberMetadata = memberEntry.getValue();
+Collection topics = memberMetadata.subscribedTopics;
+for (Uuid topicId: topics) {
+putList(mapTopicsToConsumers, topicId, memberId);
+}
+}
+return mapTopicsToConsumers;
+}
+
+private Map> 
getAvailablePartitionsPerTopic(AssignmentSpec assignmentSpec, Map> assignedStickyPartitionsPerTopic) {
+Map> availablePartitionsPerTopic = new HashMap<>();
+Map topicsMetadata = 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-12 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1164906996


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are :-
+ * 
+ *  1) Each consumer must get at least one partition per topic that it is 
subscribed to whenever the number of consumers is
+ *less than or equal to the number of partitions for that topic. (Range) 

+ *  2) Partitions should be assigned to consumers in a way that 
facilitates join operations where required. (Range) 
+ *This can only be done if the topics are co-partitioned in the first place
+ *Co-partitioned:-
+ *Two streams are co-partitioned if the following conditions are met:-
+ * ->The keys must have the same schemas
+ * ->The topics involved must have the same number of partitions
+ *  3) Consumers should retain as much as their previous assignment as 
possible. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm works mainly in 5 steps described below
+ * 
+ *  1) Get a map of the consumersPerTopic created using the member 
subscriptions.
+ *  2) Get a list of consumers (potentiallyUnfilled) that have not met the 
minimum required quota for assignment AND
+ * get a list of sticky partitions that we want to retain in the new 
assignment.
+ *  3) Add consumers from potentiallyUnfilled to Unfilled if they haven't 
met the total required quota = minQuota + (if necessary) extraPartition 
+ *  4) Get a list of available partitions by calculating the difference 
between total partitions and assigned sticky partitions 
+ *  5) Iterate through unfilled consumers and assign partitions from 
available partitions 
+ * 
+ * 
+ *
+ */
+public class ServerSideStickyRangeAssignor implements PartitionAssignor {
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+private static  void putList(Map> map, K key, V value) {
+List list = map.computeIfAbsent(key, k -> new ArrayList<>());
+list.add(value);
+}
+
+private static  void putSet(Map> map, K key, V value) {
+Set set = map.computeIfAbsent(key, k -> new HashSet<>());
+set.add(value);
+}
+
+static class Pair {
+private final T first;
+private final U second;
+
+public Pair(T first, U second) {
+this.first = first;
+this.second = second;
+}
+
+public T getFirst() {
+return first;
+}
+
+public U getSecond() {
+return second;
+}
+
+@Override
+public String toString() {
+return "(" + first + ", " + second + ")";
+}
+}
+
+// Returns a map of the list of consumers per Topic (keyed by topicId)
+private Map> consumersPerTopic(AssignmentSpec 
assignmentSpec) {
+Map> mapTopicsToConsumers = new HashMap<>();
+Map membersData = assignmentSpec.members;
+
+for (Map.Entry memberEntry : 
membersData.entrySet()) {
+String memberId = memberEntry.getKey();
+AssignmentMemberSpec memberMetadata = memberEntry.getValue();
+Collection topics = memberMetadata.subscribedTopics;
+for (Uuid topicId: topics) {
+putList(mapTopicsToConsumers, topicId, memberId);
+}
+}
+return mapTopicsToConsumers;
+}
+
+private Map> 
getAvailablePartitionsPerTopic(AssignmentSpec assignmentSpec, Map> assignedStickyPartitionsPerTopic) {
+Map> availablePartitionsPerTopic = new HashMap<>();
+Map topicsMetadata = 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-12 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1164906229


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are :-
+ * 
+ *  1) Each consumer must get at least one partition per topic that it is 
subscribed to whenever the number of consumers is
+ *less than or equal to the number of partitions for that topic. (Range) 

+ *  2) Partitions should be assigned to consumers in a way that 
facilitates join operations where required. (Range) 
+ *This can only be done if the topics are co-partitioned in the first place
+ *Co-partitioned:-
+ *Two streams are co-partitioned if the following conditions are met:-
+ * ->The keys must have the same schemas
+ * ->The topics involved must have the same number of partitions
+ *  3) Consumers should retain as much as their previous assignment as 
possible. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm works mainly in 5 steps described below
+ * 
+ *  1) Get a map of the consumersPerTopic created using the member 
subscriptions.
+ *  2) Get a list of consumers (potentiallyUnfilled) that have not met the 
minimum required quota for assignment AND
+ * get a list of sticky partitions that we want to retain in the new 
assignment.
+ *  3) Add consumers from potentiallyUnfilled to Unfilled if they haven't 
met the total required quota = minQuota + (if necessary) extraPartition 
+ *  4) Get a list of available partitions by calculating the difference 
between total partitions and assigned sticky partitions 
+ *  5) Iterate through unfilled consumers and assign partitions from 
available partitions 
+ * 
+ * 
+ *
+ */
+public class ServerSideStickyRangeAssignor implements PartitionAssignor {
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+private static  void putList(Map> map, K key, V value) {
+List list = map.computeIfAbsent(key, k -> new ArrayList<>());
+list.add(value);
+}
+
+private static  void putSet(Map> map, K key, V value) {
+Set set = map.computeIfAbsent(key, k -> new HashSet<>());
+set.add(value);
+}
+
+static class Pair {
+private final T first;
+private final U second;
+
+public Pair(T first, U second) {
+this.first = first;
+this.second = second;
+}
+
+public T getFirst() {
+return first;
+}
+
+public U getSecond() {
+return second;
+}
+
+@Override
+public String toString() {
+return "(" + first + ", " + second + ")";
+}
+}
+
+// Returns a map of the list of consumers per Topic (keyed by topicId)
+private Map> consumersPerTopic(AssignmentSpec 
assignmentSpec) {
+Map> mapTopicsToConsumers = new HashMap<>();
+Map membersData = assignmentSpec.members;
+
+for (Map.Entry memberEntry : 
membersData.entrySet()) {
+String memberId = memberEntry.getKey();
+AssignmentMemberSpec memberMetadata = memberEntry.getValue();
+Collection topics = memberMetadata.subscribedTopics;
+for (Uuid topicId: topics) {
+putList(mapTopicsToConsumers, topicId, memberId);
+}
+}
+return mapTopicsToConsumers;
+}
+
+private Map> 
getAvailablePartitionsPerTopic(AssignmentSpec assignmentSpec, Map> assignedStickyPartitionsPerTopic) {
+Map> availablePartitionsPerTopic = new HashMap<>();
+Map topicsMetadata = 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-12 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1164905279


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are :-
+ * 
+ *  1) Each consumer must get at least one partition per topic that it is 
subscribed to whenever the number of consumers is
+ *less than or equal to the number of partitions for that topic. (Range) 

+ *  2) Partitions should be assigned to consumers in a way that 
facilitates join operations where required. (Range) 
+ *This can only be done if the topics are co-partitioned in the first place
+ *Co-partitioned:-
+ *Two streams are co-partitioned if the following conditions are met:-
+ * ->The keys must have the same schemas
+ * ->The topics involved must have the same number of partitions
+ *  3) Consumers should retain as much as their previous assignment as 
possible. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm works mainly in 5 steps described below
+ * 
+ *  1) Get a map of the consumersPerTopic created using the member 
subscriptions.
+ *  2) Get a list of consumers (potentiallyUnfilled) that have not met the 
minimum required quota for assignment AND
+ * get a list of sticky partitions that we want to retain in the new 
assignment.
+ *  3) Add consumers from potentiallyUnfilled to Unfilled if they haven't 
met the total required quota = minQuota + (if necessary) extraPartition 
+ *  4) Get a list of available partitions by calculating the difference 
between total partitions and assigned sticky partitions 
+ *  5) Iterate through unfilled consumers and assign partitions from 
available partitions 
+ * 
+ * 
+ *
+ */
+public class ServerSideStickyRangeAssignor implements PartitionAssignor {
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+private static  void putList(Map> map, K key, V value) {
+List list = map.computeIfAbsent(key, k -> new ArrayList<>());
+list.add(value);
+}
+
+private static  void putSet(Map> map, K key, V value) {
+Set set = map.computeIfAbsent(key, k -> new HashSet<>());
+set.add(value);
+}
+
+static class Pair {
+private final T first;
+private final U second;
+
+public Pair(T first, U second) {
+this.first = first;
+this.second = second;
+}
+
+public T getFirst() {
+return first;
+}
+
+public U getSecond() {
+return second;
+}
+
+@Override
+public String toString() {
+return "(" + first + ", " + second + ")";
+}
+}
+
+// Returns a map of the list of consumers per Topic (keyed by topicId)
+private Map> consumersPerTopic(AssignmentSpec 
assignmentSpec) {
+Map> mapTopicsToConsumers = new HashMap<>();
+Map membersData = assignmentSpec.members;
+
+for (Map.Entry memberEntry : 
membersData.entrySet()) {
+String memberId = memberEntry.getKey();
+AssignmentMemberSpec memberMetadata = memberEntry.getValue();
+Collection topics = memberMetadata.subscribedTopics;
+for (Uuid topicId: topics) {
+putList(mapTopicsToConsumers, topicId, memberId);
+}
+}
+return mapTopicsToConsumers;
+}
+
+private Map> 
getAvailablePartitionsPerTopic(AssignmentSpec assignmentSpec, Map> assignedStickyPartitionsPerTopic) {
+Map> availablePartitionsPerTopic = new HashMap<>();
+Map topicsMetadata = 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-12 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1164902619


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are :-
+ * 
+ *  1) Each consumer must get at least one partition per topic that it is 
subscribed to whenever the number of consumers is
+ *less than or equal to the number of partitions for that topic. (Range) 

+ *  2) Partitions should be assigned to consumers in a way that 
facilitates join operations where required. (Range) 
+ *This can only be done if the topics are co-partitioned in the first place
+ *Co-partitioned:-
+ *Two streams are co-partitioned if the following conditions are met:-
+ * ->The keys must have the same schemas
+ * ->The topics involved must have the same number of partitions
+ *  3) Consumers should retain as much as their previous assignment as 
possible. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm works mainly in 5 steps described below
+ * 
+ *  1) Get a map of the consumersPerTopic created using the member 
subscriptions.
+ *  2) Get a list of consumers (potentiallyUnfilled) that have not met the 
minimum required quota for assignment AND
+ * get a list of sticky partitions that we want to retain in the new 
assignment.
+ *  3) Add consumers from potentiallyUnfilled to Unfilled if they haven't 
met the total required quota = minQuota + (if necessary) extraPartition 
+ *  4) Get a list of available partitions by calculating the difference 
between total partitions and assigned sticky partitions 
+ *  5) Iterate through unfilled consumers and assign partitions from 
available partitions 
+ * 
+ * 
+ *
+ */
+public class ServerSideStickyRangeAssignor implements PartitionAssignor {
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+private static  void putList(Map> map, K key, V value) {
+List list = map.computeIfAbsent(key, k -> new ArrayList<>());
+list.add(value);
+}
+
+private static  void putSet(Map> map, K key, V value) {
+Set set = map.computeIfAbsent(key, k -> new HashSet<>());
+set.add(value);
+}
+
+static class Pair {
+private final T first;
+private final U second;
+
+public Pair(T first, U second) {
+this.first = first;
+this.second = second;
+}
+
+public T getFirst() {
+return first;
+}
+
+public U getSecond() {
+return second;
+}
+
+@Override
+public String toString() {
+return "(" + first + ", " + second + ")";
+}
+}
+
+// Returns a map of the list of consumers per Topic (keyed by topicId)
+private Map> consumersPerTopic(AssignmentSpec 
assignmentSpec) {
+Map> mapTopicsToConsumers = new HashMap<>();
+Map membersData = assignmentSpec.members;
+
+for (Map.Entry memberEntry : 
membersData.entrySet()) {
+String memberId = memberEntry.getKey();
+AssignmentMemberSpec memberMetadata = memberEntry.getValue();
+Collection topics = memberMetadata.subscribedTopics;
+for (Uuid topicId: topics) {
+putList(mapTopicsToConsumers, topicId, memberId);
+}
+}
+return mapTopicsToConsumers;
+}
+
+private Map> 
getAvailablePartitionsPerTopic(AssignmentSpec assignmentSpec, Map> assignedStickyPartitionsPerTopic) {
+Map> availablePartitionsPerTopic = new HashMap<>();
+Map topicsMetadata = 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-12 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1164901673


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are :-
+ * 
+ *  1) Each consumer must get at least one partition per topic that it is 
subscribed to whenever the number of consumers is
+ *less than or equal to the number of partitions for that topic. (Range) 

+ *  2) Partitions should be assigned to consumers in a way that 
facilitates join operations where required. (Range) 
+ *This can only be done if the topics are co-partitioned in the first place
+ *Co-partitioned:-
+ *Two streams are co-partitioned if the following conditions are met:-
+ * ->The keys must have the same schemas
+ * ->The topics involved must have the same number of partitions
+ *  3) Consumers should retain as much as their previous assignment as 
possible. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm works mainly in 5 steps described below
+ * 
+ *  1) Get a map of the consumersPerTopic created using the member 
subscriptions.
+ *  2) Get a list of consumers (potentiallyUnfilled) that have not met the 
minimum required quota for assignment AND
+ * get a list of sticky partitions that we want to retain in the new 
assignment.
+ *  3) Add consumers from potentiallyUnfilled to Unfilled if they haven't 
met the total required quota = minQuota + (if necessary) extraPartition 
+ *  4) Get a list of available partitions by calculating the difference 
between total partitions and assigned sticky partitions 
+ *  5) Iterate through unfilled consumers and assign partitions from 
available partitions 
+ * 
+ * 
+ *
+ */
+public class ServerSideStickyRangeAssignor implements PartitionAssignor {
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+private static  void putList(Map> map, K key, V value) {
+List list = map.computeIfAbsent(key, k -> new ArrayList<>());
+list.add(value);
+}
+
+private static  void putSet(Map> map, K key, V value) {
+Set set = map.computeIfAbsent(key, k -> new HashSet<>());
+set.add(value);
+}
+
+static class Pair {
+private final T first;
+private final U second;
+
+public Pair(T first, U second) {
+this.first = first;
+this.second = second;
+}
+
+public T getFirst() {
+return first;
+}
+
+public U getSecond() {
+return second;
+}
+
+@Override
+public String toString() {
+return "(" + first + ", " + second + ")";
+}
+}
+
+// Returns a map of the list of consumers per Topic (keyed by topicId)
+private Map> consumersPerTopic(AssignmentSpec 
assignmentSpec) {
+Map> mapTopicsToConsumers = new HashMap<>();
+Map membersData = assignmentSpec.members;
+
+for (Map.Entry memberEntry : 
membersData.entrySet()) {
+String memberId = memberEntry.getKey();
+AssignmentMemberSpec memberMetadata = memberEntry.getValue();
+Collection topics = memberMetadata.subscribedTopics;
+for (Uuid topicId: topics) {
+putList(mapTopicsToConsumers, topicId, memberId);
+}
+}
+return mapTopicsToConsumers;
+}
+
+private Map> 
getAvailablePartitionsPerTopic(AssignmentSpec assignmentSpec, Map> assignedStickyPartitionsPerTopic) {
+Map> availablePartitionsPerTopic = new HashMap<>();
+Map topicsMetadata = 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-12 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1164899524


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are :-
+ * 
+ *  1) Each consumer must get at least one partition per topic that it is 
subscribed to whenever the number of consumers is
+ *less than or equal to the number of partitions for that topic. (Range) 

+ *  2) Partitions should be assigned to consumers in a way that 
facilitates join operations where required. (Range) 
+ *This can only be done if the topics are co-partitioned in the first place
+ *Co-partitioned:-
+ *Two streams are co-partitioned if the following conditions are met:-
+ * ->The keys must have the same schemas
+ * ->The topics involved must have the same number of partitions
+ *  3) Consumers should retain as much as their previous assignment as 
possible. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm works mainly in 5 steps described below
+ * 
+ *  1) Get a map of the consumersPerTopic created using the member 
subscriptions.
+ *  2) Get a list of consumers (potentiallyUnfilled) that have not met the 
minimum required quota for assignment AND
+ * get a list of sticky partitions that we want to retain in the new 
assignment.
+ *  3) Add consumers from potentiallyUnfilled to Unfilled if they haven't 
met the total required quota = minQuota + (if necessary) extraPartition 
+ *  4) Get a list of available partitions by calculating the difference 
between total partitions and assigned sticky partitions 
+ *  5) Iterate through unfilled consumers and assign partitions from 
available partitions 
+ * 
+ * 
+ *
+ */
+public class ServerSideStickyRangeAssignor implements PartitionAssignor {
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+private static  void putList(Map> map, K key, V value) {
+List list = map.computeIfAbsent(key, k -> new ArrayList<>());
+list.add(value);
+}
+
+private static  void putSet(Map> map, K key, V value) {
+Set set = map.computeIfAbsent(key, k -> new HashSet<>());
+set.add(value);
+}
+
+static class Pair {
+private final T first;
+private final U second;
+
+public Pair(T first, U second) {
+this.first = first;
+this.second = second;
+}
+
+public T getFirst() {
+return first;
+}
+
+public U getSecond() {
+return second;
+}
+
+@Override
+public String toString() {
+return "(" + first + ", " + second + ")";
+}
+}
+
+// Returns a map of the list of consumers per Topic (keyed by topicId)
+private Map> consumersPerTopic(AssignmentSpec 
assignmentSpec) {
+Map> mapTopicsToConsumers = new HashMap<>();
+Map membersData = assignmentSpec.members;
+
+for (Map.Entry memberEntry : 
membersData.entrySet()) {
+String memberId = memberEntry.getKey();
+AssignmentMemberSpec memberMetadata = memberEntry.getValue();
+Collection topics = memberMetadata.subscribedTopics;
+for (Uuid topicId: topics) {
+putList(mapTopicsToConsumers, topicId, memberId);
+}
+}
+return mapTopicsToConsumers;
+}
+
+private Map> 
getAvailablePartitionsPerTopic(AssignmentSpec assignmentSpec, Map> assignedStickyPartitionsPerTopic) {

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-12 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1164894366


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are :-
+ * 
+ *  1) Each consumer must get at least one partition per topic that it is 
subscribed to whenever the number of consumers is
+ *less than or equal to the number of partitions for that topic. (Range) 

+ *  2) Partitions should be assigned to consumers in a way that 
facilitates join operations where required. (Range) 
+ *This can only be done if the topics are co-partitioned in the first place
+ *Co-partitioned:-
+ *Two streams are co-partitioned if the following conditions are met:-
+ * ->The keys must have the same schemas
+ * ->The topics involved must have the same number of partitions
+ *  3) Consumers should retain as much as their previous assignment as 
possible. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm works mainly in 5 steps described below
+ * 
+ *  1) Get a map of the consumersPerTopic created using the member 
subscriptions.
+ *  2) Get a list of consumers (potentiallyUnfilled) that have not met the 
minimum required quota for assignment AND
+ * get a list of sticky partitions that we want to retain in the new 
assignment.
+ *  3) Add consumers from potentiallyUnfilled to Unfilled if they haven't 
met the total required quota = minQuota + (if necessary) extraPartition 
+ *  4) Get a list of available partitions by calculating the difference 
between total partitions and assigned sticky partitions 
+ *  5) Iterate through unfilled consumers and assign partitions from 
available partitions 
+ * 
+ * 
+ *
+ */
+public class ServerSideStickyRangeAssignor implements PartitionAssignor {
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+private static  void putList(Map> map, K key, V value) {

Review Comment:
   removed



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are :-
+ * 
+ *  1) Each consumer must get at least one partition per topic that it is 
subscribed to whenever the number of consumers is
+ *less than or equal to the number of 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-12 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1164893597


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are :-
+ * 
+ *  1) Each consumer must get at least one partition per topic that it is 
subscribed to whenever the number of consumers is
+ *less than or equal to the number of partitions for that topic. (Range) 

+ *  2) Partitions should be assigned to consumers in a way that 
facilitates join operations where required. (Range) 
+ *This can only be done if the topics are co-partitioned in the first place
+ *Co-partitioned:-
+ *Two streams are co-partitioned if the following conditions are met:-
+ * ->The keys must have the same schemas
+ * ->The topics involved must have the same number of partitions
+ *  3) Consumers should retain as much as their previous assignment as 
possible. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm works mainly in 5 steps described below
+ * 
+ *  1) Get a map of the consumersPerTopic created using the member 
subscriptions.
+ *  2) Get a list of consumers (potentiallyUnfilled) that have not met the 
minimum required quota for assignment AND
+ * get a list of sticky partitions that we want to retain in the new 
assignment.
+ *  3) Add consumers from potentiallyUnfilled to Unfilled if they haven't 
met the total required quota = minQuota + (if necessary) extraPartition 
+ *  4) Get a list of available partitions by calculating the difference 
between total partitions and assigned sticky partitions 
+ *  5) Iterate through unfilled consumers and assign partitions from 
available partitions 
+ * 
+ * 
+ *
+ */
+public class ServerSideStickyRangeAssignor implements PartitionAssignor {
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+private static  void putList(Map> map, K key, V value) {
+List list = map.computeIfAbsent(key, k -> new ArrayList<>());
+list.add(value);
+}
+
+private static  void putSet(Map> map, K key, V value) {
+Set set = map.computeIfAbsent(key, k -> new HashSet<>());
+set.add(value);
+}
+
+static class Pair {
+private final T first;
+private final U second;
+
+public Pair(T first, U second) {
+this.first = first;
+this.second = second;
+}
+
+public T getFirst() {
+return first;
+}
+
+public U getSecond() {
+return second;
+}
+
+@Override
+public String toString() {
+return "(" + first + ", " + second + ")";
+}
+}
+
+// Returns a map of the list of consumers per Topic (keyed by topicId)
+private Map> consumersPerTopic(AssignmentSpec 
assignmentSpec) {
+Map> mapTopicsToConsumers = new HashMap<>();
+Map membersData = assignmentSpec.members;
+
+for (Map.Entry memberEntry : 
membersData.entrySet()) {

Review Comment:
   changed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

2023-04-12 Thread via GitHub


mjsax commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1164893058


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -142,6 +145,7 @@ public  KTable table(final String topic,
 .withMaterializedInternal(materialized)
 .withProcessorParameters(processorParameters)
 .build();
+tableSourceNode.setOutputVersioned(materialized.storeSupplier() 
instanceof VersionedBytesStoreSupplier);

Review Comment:
   Should we make `setOutputVersioned` a `withOutputVersioned` builder method 
like all the others?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

2023-04-12 Thread via GitHub


mjsax commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1164891330


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java:
##
@@ -136,13 +137,16 @@ public SessionWindowedCogroupedKStream 
windowedBy(final SessionWindows
 private KTable doAggregate(final Initializer initializer,
 final NamedInternal named,
 final MaterializedInternal> materializedInternal) {
+final boolean isOutputVersioned = materializedInternal != null
+&& materializedInternal.storeSupplier() instanceof 
VersionedBytesStoreSupplier;
 return aggregateBuilder.build(
 groupPatterns,
 initializer,
 named,
 new 
KeyValueStoreMaterializer<>(materializedInternal).materialize(),
 materializedInternal.keySerde(),
 materializedInternal.valueSerde(),
-materializedInternal.queryableStoreName());
+materializedInternal.queryableStoreName(),
+isOutputVersioned);

Review Comment:
   Think we can just pass in `materializedInternal.storeSupplier() instanceof 
VersionedBytesStoreSupplier` directly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

2023-04-12 Thread via GitHub


mjsax commented on code in PR #13552:
URL: https://github.com/apache/kafka/pull/13552#discussion_r1164890769


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedKStreamImpl.java:
##
@@ -136,13 +137,16 @@ public SessionWindowedCogroupedKStream 
windowedBy(final SessionWindows
 private KTable doAggregate(final Initializer initializer,
 final NamedInternal named,
 final MaterializedInternal> materializedInternal) {
+final boolean isOutputVersioned = materializedInternal != null

Review Comment:
   `materializedInternal` cannot be `null` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13522: KAFKA-14834: [5/N] Drop out-of-order records from FK join with versioned tables

2023-04-12 Thread via GitHub


mjsax merged PR #13522:
URL: https://github.com/apache/kafka/pull/13522


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-12 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1164884866


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are :-
+ * 
+ *  1) Each consumer must get at least one partition per topic that it is 
subscribed to whenever the number of consumers is
+ *less than or equal to the number of partitions for that topic. (Range) 

+ *  2) Partitions should be assigned to consumers in a way that 
facilitates join operations where required. (Range) 
+ *This can only be done if the topics are co-partitioned in the first place
+ *Co-partitioned:-
+ *Two streams are co-partitioned if the following conditions are met:-
+ * ->The keys must have the same schemas
+ * ->The topics involved must have the same number of partitions
+ *  3) Consumers should retain as much as their previous assignment as 
possible. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm works mainly in 5 steps described below
+ * 
+ *  1) Get a map of the consumersPerTopic created using the member 
subscriptions.
+ *  2) Get a list of consumers (potentiallyUnfilled) that have not met the 
minimum required quota for assignment AND
+ * get a list of sticky partitions that we want to retain in the new 
assignment.
+ *  3) Add consumers from potentiallyUnfilled to Unfilled if they haven't 
met the total required quota = minQuota + (if necessary) extraPartition 
+ *  4) Get a list of available partitions by calculating the difference 
between total partitions and assigned sticky partitions 
+ *  5) Iterate through unfilled consumers and assign partitions from 
available partitions 
+ * 
+ * 
+ *
+ */
+public class ServerSideStickyRangeAssignor implements PartitionAssignor {
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+private static  void putList(Map> map, K key, V value) {
+List list = map.computeIfAbsent(key, k -> new ArrayList<>());
+list.add(value);
+}
+
+private static  void putSet(Map> map, K key, V value) {
+Set set = map.computeIfAbsent(key, k -> new HashSet<>());
+set.add(value);
+}
+
+static class Pair {
+private final T first;
+private final U second;
+
+public Pair(T first, U second) {
+this.first = first;
+this.second = second;
+}
+
+public T getFirst() {
+return first;
+}
+
+public U getSecond() {
+return second;
+}
+
+@Override
+public String toString() {
+return "(" + first + ", " + second + ")";
+}
+}
+
+// Returns a map of the list of consumers per Topic (keyed by topicId)

Review Comment:
   removed the whole comment cause unnecessary



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-12 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1164884200


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are :-
+ * 
+ *  1) Each consumer must get at least one partition per topic that it is 
subscribed to whenever the number of consumers is
+ *less than or equal to the number of partitions for that topic. (Range) 

+ *  2) Partitions should be assigned to consumers in a way that 
facilitates join operations where required. (Range) 
+ *This can only be done if the topics are co-partitioned in the first place
+ *Co-partitioned:-
+ *Two streams are co-partitioned if the following conditions are met:-
+ * ->The keys must have the same schemas
+ * ->The topics involved must have the same number of partitions
+ *  3) Consumers should retain as much as their previous assignment as 
possible. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm works mainly in 5 steps described below
+ * 
+ *  1) Get a map of the consumersPerTopic created using the member 
subscriptions.
+ *  2) Get a list of consumers (potentiallyUnfilled) that have not met the 
minimum required quota for assignment AND
+ * get a list of sticky partitions that we want to retain in the new 
assignment.
+ *  3) Add consumers from potentiallyUnfilled to Unfilled if they haven't 
met the total required quota = minQuota + (if necessary) extraPartition 
+ *  4) Get a list of available partitions by calculating the difference 
between total partitions and assigned sticky partitions 
+ *  5) Iterate through unfilled consumers and assign partitions from 
available partitions 
+ * 
+ * 
+ *
+ */
+public class ServerSideStickyRangeAssignor implements PartitionAssignor {
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+private static  void putList(Map> map, K key, V value) {
+List list = map.computeIfAbsent(key, k -> new ArrayList<>());
+list.add(value);
+}
+
+private static  void putSet(Map> map, K key, V value) {
+Set set = map.computeIfAbsent(key, k -> new HashSet<>());
+set.add(value);
+}
+
+static class Pair {
+private final T first;
+private final U second;
+
+public Pair(T first, U second) {
+this.first = first;
+this.second = second;
+}
+
+public T getFirst() {
+return first;
+}
+
+public U getSecond() {
+return second;
+}
+
+@Override
+public String toString() {
+return "(" + first + ", " + second + ")";
+}
+}
+
+// Returns a map of the list of consumers per Topic (keyed by topicId)
+private Map> consumersPerTopic(AssignmentSpec 
assignmentSpec) {
+Map> mapTopicsToConsumers = new HashMap<>();

Review Comment:
   changed to membersPerTopic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13522: KAFKA-14834: [5/N] Drop out-of-order records from FK join with versioned tables

2023-04-12 Thread via GitHub


mjsax commented on code in PR #13522:
URL: https://github.com/apache/kafka/pull/13522#discussion_r1164874948


##
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyVersionedJoinIntegrationTest.java:
##
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+@Category(IntegrationTest.class)
+public class KTableKTableForeignKeyVersionedJoinIntegrationTest extends 
KTableKTableForeignKeyJoinIntegrationTest {
+
+public KTableKTableForeignKeyVersionedJoinIntegrationTest(final boolean 
leftJoin,
+  final boolean 
materialized,
+  final boolean 
leftVersioned,
+  final boolean 
rightVersioned) {
+// optimizations and rejoin are disabled for these tests, as these 
tests focus on versioning.
+// see KTableKTableForeignKeyJoinIntegrationTest for test coverage for 
optimizations and rejoin
+super(leftJoin, StreamsConfig.NO_OPTIMIZATION, materialized, false, 
leftVersioned, rightVersioned);
+}
+
+@Parameterized.Parameters(name = "leftJoin={0}, materialized={1}, 
leftVersioned={2}, rightVersioned={3}")
+public static Collection data() {
+final List booleans = Arrays.asList(true, false);
+return buildParameters(booleans, booleans, booleans, booleans);
+}
+
+@Test
+public void shouldIgnoreOutOfOrderRecordsIffVersioned() {

Review Comment:
   `Iff` -> `If` (or is it `if and only if`)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13522: KAFKA-14834: [5/N] Drop out-of-order records from FK join with versioned tables

2023-04-12 Thread via GitHub


mjsax commented on code in PR #13522:
URL: https://github.com/apache/kafka/pull/13522#discussion_r1164874948


##
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyVersionedJoinIntegrationTest.java:
##
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+@Category(IntegrationTest.class)
+public class KTableKTableForeignKeyVersionedJoinIntegrationTest extends 
KTableKTableForeignKeyJoinIntegrationTest {
+
+public KTableKTableForeignKeyVersionedJoinIntegrationTest(final boolean 
leftJoin,
+  final boolean 
materialized,
+  final boolean 
leftVersioned,
+  final boolean 
rightVersioned) {
+// optimizations and rejoin are disabled for these tests, as these 
tests focus on versioning.
+// see KTableKTableForeignKeyJoinIntegrationTest for test coverage for 
optimizations and rejoin
+super(leftJoin, StreamsConfig.NO_OPTIMIZATION, materialized, false, 
leftVersioned, rightVersioned);
+}
+
+@Parameterized.Parameters(name = "leftJoin={0}, materialized={1}, 
leftVersioned={2}, rightVersioned={3}")
+public static Collection data() {
+final List booleans = Arrays.asList(true, false);
+return buildParameters(booleans, booleans, booleans, booleans);
+}
+
+@Test
+public void shouldIgnoreOutOfOrderRecordsIffVersioned() {

Review Comment:
   `Iff` -> `If`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13522: KAFKA-14834: [5/N] Drop out-of-order records from FK join with versioned tables

2023-04-12 Thread via GitHub


mjsax commented on code in PR #13522:
URL: https://github.com/apache/kafka/pull/13522#discussion_r1164864257


##
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##
@@ -140,9 +163,9 @@ public void doJoinFromLeftThenDeleteLeftEntity() {
 final KeyValueStore store = 
driver.getKeyValueStore("store");
 
 // Pre-populate the RHS records. This test is all about what 
happens when we add/remove LHS records
-right.pipeInput("rhs1", "rhsValue1");
-right.pipeInput("rhs2", "rhsValue2");
-right.pipeInput("rhs3", "rhsValue3"); // this unreferenced FK 
won't show up in any results
+right.pipeInput("rhs1", "rhsValue1", baseTimestamp);

Review Comment:
   Should we use "auto advance" instead of passing in timestamps expliclity? 
Could also be an advantage to keep it explicit for readability?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13522: KAFKA-14834: [5/N] Drop out-of-order records from FK join with versioned tables

2023-04-12 Thread via GitHub


mjsax commented on code in PR #13522:
URL: https://github.com/apache/kafka/pull/13522#discussion_r1164867385


##
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##
@@ -249,62 +272,62 @@ public void doJoinFromLeftThenDeleteLeftEntity() {
 
 @Test
 public void doJoinFromRightThenDeleteRightEntity() {
-final Topology topology = getTopology(streamsConfig, materialized ? 
"store" : null, leftJoin, rejoin);
+final Topology topology = getTopology(streamsConfig, materialized ? 
"store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
 try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, streamsConfig)) {
 final TestInputTopic right = 
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new 
StringSerializer());
 final TestInputTopic left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
 final TestOutputTopic outputTopic = 
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
StringDeserializer());
 final KeyValueStore store = 
driver.getKeyValueStore("store");
 
 // Pre-populate the LHS records. This test is all about what 
happens when we add/remove RHS records
-left.pipeInput("lhs1", "lhsValue1|rhs1");
-left.pipeInput("lhs2", "lhsValue2|rhs2");
-left.pipeInput("lhs3", "lhsValue3|rhs1");
+left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
+left.pipeInput("lhs2", "lhsValue2|rhs2", baseTimestamp + 1);
+left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 2);
 
 assertThat(
 outputTopic.readKeyValuesToMap(),
 is(leftJoin
-   ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
-   mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
-   mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
-   : emptyMap()
+? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"),
+mkEntry("lhs2", "(lhsValue2|rhs2,null)"),
+mkEntry("lhs3", "(lhsValue3|rhs1,null)"))
+: emptyMap()

Review Comment:
   nit: avoid unnecessary reformatting (seems to be an IDE setting that 
triggered?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13522: KAFKA-14834: [5/N] Drop out-of-order records from FK join with versioned tables

2023-04-12 Thread via GitHub


mjsax commented on code in PR #13522:
URL: https://github.com/apache/kafka/pull/13522#discussion_r1164864257


##
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java:
##
@@ -140,9 +163,9 @@ public void doJoinFromLeftThenDeleteLeftEntity() {
 final KeyValueStore store = 
driver.getKeyValueStore("store");
 
 // Pre-populate the RHS records. This test is all about what 
happens when we add/remove LHS records
-right.pipeInput("rhs1", "rhsValue1");
-right.pipeInput("rhs2", "rhsValue2");
-right.pipeInput("rhs3", "rhsValue3"); // this unreferenced FK 
won't show up in any results
+right.pipeInput("rhs1", "rhsValue1", baseTimestamp);

Review Comment:
   Should we use "auto advance" instead of passing in timestamps expliclity?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13547: Add note for KIP-894 and DelegationTokenCommand

2023-04-12 Thread via GitHub


showuon commented on code in PR #13547:
URL: https://github.com/apache/kafka/pull/13547#discussion_r1164858999


##
docs/upgrade.html:
##
@@ -27,6 +27,15 @@ Notable changes in 3
 See https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores;>KIP-889
 for more details.
 
+MirrorMaker now uses incrementalAlterConfigs API to synchronize topic 
configurations instead of the deprecated alterConfigs API.

Review Comment:
   MirrorMaker now uses incrementalAlterConfigs API [by default] to synchronize 
topic configurations instead of the deprecated alterConfigs API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #13500: MINOR: Install missing iputils-ping for system tests

2023-04-12 Thread via GitHub


showuon merged PR #13500:
URL: https://github.com/apache/kafka/pull/13500


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio opened a new pull request, #13553: MINOR; Improve logging to help debug kraft

2023-04-12 Thread via GitHub


jsancio opened a new pull request, #13553:
URL: https://github.com/apache/kafka/pull/13553

   Minor changes to the log message and log levels to help us debug KRaft 
issues.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14561) Improve transactions experience for older clients by ensuring ongoing transaction

2023-04-12 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-14561.
-
Fix Version/s: 3.5.0
   Resolution: Fixed

merged the PR to trunk.

> Improve transactions experience for older clients by ensuring ongoing 
> transaction
> -
>
> Key: KAFKA-14561
> URL: https://issues.apache.org/jira/browse/KAFKA-14561
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
> Fix For: 3.5.0
>
>
> This is part 3 of KIP-890:
> 3. *To cover older clients, we will ensure a transaction is ongoing before we 
> write to a transaction. We can do this by querying the transaction 
> coordinator and caching the result.*
> See KIP-890 for more details: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-04-12 Thread via GitHub


jolshan commented on PR #13391:
URL: https://github.com/apache/kafka/pull/13391#issuecomment-1506123475

   Here are errors on the latest build on trunk I could find:
   
https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/1754/#showFailuresLink
   
   Seems to roughly correlate with the failures I see. 
   
   I think the only suspicious one is [Build / JDK 8 and Scala 2.12 / 
kafka.api.TransactionsBounceTest.testWithGroupId()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13391/26/testReport/junit/kafka.api/TransactionsBounceTest/Build___JDK_8_and_Scala_2_12___testWithGroupId__/)
 which was failing before without my fix. This error is slightly different and 
related to endTxn. I did see this flake one time when I was repeatedly testing 
on my branch. I can look on trunk as well. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13510: KAFKA-14834: [4/N] Drop out-of-order records from table-table join with versioned tables

2023-04-12 Thread via GitHub


mjsax merged PR #13510:
URL: https://github.com/apache/kafka/pull/13510


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao merged pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-04-12 Thread via GitHub


junrao merged PR #13391:
URL: https://github.com/apache/kafka/pull/13391


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #13509: KAFKA-14834: [3/N] Timestamped lookups for stream-table joins

2023-04-12 Thread via GitHub


mjsax commented on PR #13509:
URL: https://github.com/apache/kafka/pull/13509#issuecomment-1506112408

   Merged this PR, but as discussed in person, it seems questionable to allow 
versioned-stores for global-ktables -- the code change to the stream-table join 
processor that is shared for stream-table and stream-globalTable join is fine, 
but when we disallow versioned stores for global-tables, we would need to 
revert the corresponding test cases in a follow up PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13509: KAFKA-14834: [3/N] Timestamped lookups for stream-table joins

2023-04-12 Thread via GitHub


mjsax merged PR #13509:
URL: https://github.com/apache/kafka/pull/13509


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14901) Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration

2023-04-12 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711608#comment-17711608
 ] 

Justine Olshan commented on KAFKA-14901:


So many transaction/init producer ID issues lately. I had to check this wasn't 
the same as https://issues.apache.org/jira/browse/KAFKA-14830 

I've seen some init producer ID failures a few times as well (seeming more 
frequently) when testing.

I was also wondering if this change could be related, but it seems like this 
one caused deadlock and not the same error: 
[https://github.com/apache/kafka/pull/13267] 

Maybe this is yet another issue to investigate. Thanks for bringing it to our 
attention.

> Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
> 
>
> Key: KAFKA-14901
> URL: https://issues.apache.org/jira/browse/KAFKA-14901
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Greg Harris
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.5.0
>
> Attachments: transaction-flake-trace-0.out, transaction-flake.out
>
>
> The EOS Source test appears to be very rarely failing (<5% chance) with the 
> following error:
> {noformat}
> org.apache.kafka.common.KafkaException: Unexpected error in 
> InitProducerIdResponse; The server experienced an unexpected error when 
> processing the request.
>   at 
> app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303)
>   at 
> app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207)
>   at 
> app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
>   at 
> app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594)
>   at 
> app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
>   at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829){noformat}
> which appears to be triggered by the following failure inside the broker:
> {noformat}
> [2023-04-12 14:01:38,931] ERROR [KafkaApi-0] Unexpected error handling 
> request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
> clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, 
> headerVersion=2) -- 
> InitProducerIdRequestData(transactionalId='exactly-once-source-integration-test-exactlyOnceQuestionMark-1',
>  transactionTimeoutMs=6, producerId=-1, producerEpoch=-1) with context 
> RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
> clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, 
> headerVersion=2), connectionId='127.0.0.1:54213-127.0.0.1:54367-46', 
> clientAddress=/127.0.0.1, principal=User:ANONYMOUS, 
> listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, 
> clientInformation=ClientInformation(softwareName=apache-kafka-java, 
> softwareVersion=3.5.0-SNAPSHOT), fromPrivilegedListener=true, 
> principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@615924cd])
>  (kafka.server.KafkaApis:76)
> java.lang.IllegalStateException: Preparing transaction state transition to 
> Empty while it already a pending state Ongoing
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:380)
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareIncrementProducerEpoch(TransactionMetadata.scala:311)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.prepareInitProducerIdTransit(TransactionCoordinator.scala:240)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$3(TransactionCoordinator.scala:151)
>     at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:242)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$2(TransactionCoordinator.scala:150)
>     at scala.util.Either.flatMap(Either.scala:352)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.handleInitProducerId(TransactionCoordinator.scala:145)
>     at 
> kafka.server.KafkaApis.handleInitProducerIdRequest(KafkaApis.scala:2236)
>     at kafka.server.KafkaApis.handle(KafkaApis.scala:202)
>     at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76)
>     at 

[GitHub] [kafka] mjsax commented on pull request #13477: KAFKA-7499: Handle serialization error in ProductionExceptionHandler

2023-04-12 Thread via GitHub


mjsax commented on PR #13477:
URL: https://github.com/apache/kafka/pull/13477#issuecomment-1506098834

   > I believe I saw something about the class casting handling but I couldn't 
see that in the comments.
   
   I did leave a commend on the PR, but delete it later as the comment was 
wrong -- the KIP expliclity says that `ClassCastException` should be excluded 
and we should always fail, and not allow to skip over it via the handler.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe closed pull request #13546: Cherry-pick KAFKA-14857: Fix some MetadataLoader bugs (#13462)

2023-04-12 Thread via GitHub


cmccabe closed pull request #13546: Cherry-pick KAFKA-14857: Fix some 
MetadataLoader bugs (#13462)
URL: https://github.com/apache/kafka/pull/13546


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia opened a new pull request, #13552: KAFKA-14834: [6/N] Add tracking of versioned tables into graph nodes

2023-04-12 Thread via GitHub


vcrfxia opened a new pull request, #13552:
URL: https://github.com/apache/kafka/pull/13552

   This PR adds a method into GraphNode to assist in tracking whether tables 
are materialized as versioned or unversioned stores. This is needed in order to 
allow processors which have different behavior on versioned vs unversioned 
tables to use the correct semantics. For the full definition of when a table is 
considered "versioned" and which processors behave differently on versioned 
tables, see 
[KIP-914](https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+DSL+Processor+Semantics+for+Versioned+Stores).
 
   
   Unit tests will be added shortly, and integration tests will be in a 
follow-up PR.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14750) Sink connector fails if a topic matching its topics.regex gets deleted

2023-04-12 Thread Sergei Morozov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711589#comment-17711589
 ] 

Sergei Morozov edited comment on KAFKA-14750 at 4/12/23 10:48 PM:
--

{quote}how big of an impact is this to you for your use case actually?
{quote}
This issue is quite impactful. For the background: we use Connect in a 
multi-tenant environment where each topic represents a tenant. There is a 
Streams application that reads a topic containing the messages from about a 
hundred tenants and routes them to the tenant-scoped topics. To handle the 
situations when a new tenant is provisioned, in order to reduce the latency, we 
pre-provision a few standby topics per Streams replica. Topics are managed by 
Strimzi and are owned by the Streams pod. So if a Streams application is 
restarted, all its standby topics are deleted, and then new ones are created 
for the new instance. Restarting the Streams application is a common thing 
(e.g. due to a rolling upgrade, internal housekeeping in Kubernetes, etc.), so 
we get sink connector failures in pretty much common scenarios.
{quote}Is the test you listed in the OP a reflection of how the connect is used 
by you?
{quote}
Yes. The above should explain it. Note that in our case, there are a couple of 
dozens of standby topics per Kafka cluster, so not every restart causes a 
connector failure. The 300 topics in the reproducer are used just to increase 
the likelihood of the connector failure.
{quote}If it doesn't exist, then we can skip the offset commit but what happens 
if it exists? Do we retry?
{quote}
I'd say we throw the original exception.


was (Author: morozov):
{quote}how big of an impact is this to you for your use case actually?
{quote}
This issue is quite impactful. For the background: we use Connect in a 
multi-tenant environment where each topic represents a tenant. There is a 
Streams application that reads a topic containing the messages from about a 
hundred tenants and routes them to the tenant-scoped topics. To handle the 
situations when a new tenant is provisioned, in order to reduce the latency, we 
pre-provision a few standby topics per Streams replica. Topics are managed by 
Strimzi and are owned by the Streams pod. So if a Streams application is 
restarted, all its standby topics are deleted, and then new ones are created 
for the new instance. Restarting the Streams application is a common thing 
(e.g. due to a rolling upgrade, internal housekeeping in Kubernetes, etc.), so 
we get sink connector failures in pretty much common scenarios.
{quote}Is the test you listed in the OP a reflection of how the connect is used 
by you?
{quote}
Yes. The above should explain it. Note that in our case, there are a couple of 
dozens of standby topics per standby cluster, so not every restart causes a 
connector failure. The 300 topics in the reproducer are used just to increase 
the likelihood of the connector failure.
{quote}If it doesn't exist, then we can skip the offset commit but what happens 
if it exists? Do we retry?
{quote}
I'd say we throw the original exception.

> Sink connector fails if a topic matching its topics.regex gets deleted
> --
>
> Key: KAFKA-14750
> URL: https://issues.apache.org/jira/browse/KAFKA-14750
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.3.1
>Reporter: Sergei Morozov
>Assignee: Sagar Rao
>Priority: Major
>
> Steps to reproduce:
> # In {{{}config/connect-standalone.properties{}}}, set:
> {code:bash}
> plugin.path=libs/connect-file-3.3.1.jar
> {code}
> # In {{{}config/connect-file-sink.properties{}}}, remove the {{topics=}} line 
> and add this one:
> {code:bash}
> topics.regex=connect-test-.*
> {code}
> # Start zookeeper:
> {code:bash}
> bin/zookeeper-server-start.sh config/zookeeper.properties
> {code}
> # Start the brokers:
> {code:bash}
> bin/kafka-server-start.sh config/server.properties
> {code}
> # Start the file sink connector:
> {code:bash}
> bin/connect-standalone.sh config/connect-standalone.properties 
> config/connect-file-sink.properties
> {code}
> # Create topics for the sink connector to subscribe to:
> {code:bash}
> for i in {0..2}; do
>   for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do
> bin/kafka-topics.sh \
>         --bootstrap-server localhost:9092 \
>         --create \
>         --topic connect-test-$j
>   done &
> done
> wait
> {code}
> # Wait until all the created topics are assigned to the connector. Check the 
> number of partitions to be > 0 in the output of:
> {code:bash}
> bin/kafka-consumer-groups.sh \
>     --bootstrap-server localhost:9092 \
>     --group connect-local-file-sink \
>     --describe --members
> {code}
> # Delete the created topics:
> 

[jira] [Comment Edited] (KAFKA-14750) Sink connector fails if a topic matching its topics.regex gets deleted

2023-04-12 Thread Sergei Morozov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711589#comment-17711589
 ] 

Sergei Morozov edited comment on KAFKA-14750 at 4/12/23 10:47 PM:
--

{quote}how big of an impact is this to you for your use case actually?
{quote}
This issue is quite impactful. For the background: we use Connect in a 
multi-tenant environment where each topic represents a tenant. There is a 
Streams application that reads a topic containing the messages from about a 
hundred tenants and routes them to the tenant-scoped topics. To handle the 
situations when a new tenant is provisioned, in order to reduce the latency, we 
pre-provision a few standby topics per Streams replica. Topics are managed by 
Strimzi and are owned by the Streams pod. So if a Streams application is 
restarted, all its standby topics are deleted, and then new ones are created 
for the new instance. Restarting the Streams application is a common thing 
(e.g. due to a rolling upgrade, internal housekeeping in Kubernetes, etc.), so 
we get sink connector failures in pretty much common scenarios.
{quote}Is the test you listed in the OP a reflection of how the connect is used 
by you?
{quote}
Yes. The above should explain it. Note that in our case, there are a couple of 
dozens of standby topics per standby cluster, so not every restart causes a 
connector failure. The 300 topics in the reproducer are used just to increase 
the likelihood of the connector failure.
{quote}If it doesn't exist, then we can skip the offset commit but what happens 
if it exists? Do we retry?
{quote}
I'd say we throw the original exception.


was (Author: morozov):
{quote}how big of an impact is this to you for your use case actually?
{quote}
This issue is quite impactful. For the background: we use Connect in a 
multi-tenant environment where each topic represents a tenant. There is a 
Streams application that reads a topic containing the messages from about a 
hundred tenants and routes them to the tenant-scoped topics. To handle the 
situations when a new tenant is provisioned, in order to reduce the latency, we 
pre-provision a few standby topics per Streams replica. Topics are managed by 
Strimzi and are owned by the Streams pod. So if a Streams application is 
restarted, all its standby topics are deleted, and then new ones are created 
for the new instance. Restarting the Streams application is a common thing 
(e.g. due to a rolling upgrade, internal housekeeping in Kubernetes, etc.), so 
we get sink connector failures in pretty much common scenarios.
{quote}Is the test you listed in the OP a reflection of how the connect is used 
by you?
{quote}
Yes. The above should explain it.
{quote}If it doesn't exist, then we can skip the offset commit but what happens 
if it exists? Do we retry?
{quote}
I'd say we throw the original exception.

> Sink connector fails if a topic matching its topics.regex gets deleted
> --
>
> Key: KAFKA-14750
> URL: https://issues.apache.org/jira/browse/KAFKA-14750
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.3.1
>Reporter: Sergei Morozov
>Assignee: Sagar Rao
>Priority: Major
>
> Steps to reproduce:
> # In {{{}config/connect-standalone.properties{}}}, set:
> {code:bash}
> plugin.path=libs/connect-file-3.3.1.jar
> {code}
> # In {{{}config/connect-file-sink.properties{}}}, remove the {{topics=}} line 
> and add this one:
> {code:bash}
> topics.regex=connect-test-.*
> {code}
> # Start zookeeper:
> {code:bash}
> bin/zookeeper-server-start.sh config/zookeeper.properties
> {code}
> # Start the brokers:
> {code:bash}
> bin/kafka-server-start.sh config/server.properties
> {code}
> # Start the file sink connector:
> {code:bash}
> bin/connect-standalone.sh config/connect-standalone.properties 
> config/connect-file-sink.properties
> {code}
> # Create topics for the sink connector to subscribe to:
> {code:bash}
> for i in {0..2}; do
>   for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do
> bin/kafka-topics.sh \
>         --bootstrap-server localhost:9092 \
>         --create \
>         --topic connect-test-$j
>   done &
> done
> wait
> {code}
> # Wait until all the created topics are assigned to the connector. Check the 
> number of partitions to be > 0 in the output of:
> {code:bash}
> bin/kafka-consumer-groups.sh \
>     --bootstrap-server localhost:9092 \
>     --group connect-local-file-sink \
>     --describe --members
> {code}
> # Delete the created topics:
> {code:bash}
> for i in {0..2}; do
>   for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do
>     bin/kafka-topics.sh \
>         --bootstrap-server localhost:9092 \
>         --delete \
>         --topic connect-test-$j
>     echo 

[jira] [Comment Edited] (KAFKA-14750) Sink connector fails if a topic matching its topics.regex gets deleted

2023-04-12 Thread Sergei Morozov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711589#comment-17711589
 ] 

Sergei Morozov edited comment on KAFKA-14750 at 4/12/23 10:46 PM:
--

{quote}how big of an impact is this to you for your use case actually?
{quote}
This issue is quite impactful. For the background: we use Connect in a 
multi-tenant environment where each topic represents a tenant. There is a 
Streams application that reads a topic containing the messages from about a 
hundred tenants and routes them to the tenant-scoped topics. To handle the 
situations when a new tenant is provisioned, in order to reduce the latency, we 
pre-provision a few standby topics per Streams replica. Topics are managed by 
Strimzi and are owned by the Streams pod. So if a Streams application is 
restarted, all its standby topics are deleted, and then new ones are created 
for the new instance. Restarting the Streams application is a common thing 
(e.g. due to a rolling upgrade, internal housekeeping in Kubernetes, etc.), so 
we get sink connector failures in pretty much common scenarios.
{quote}Is the test you listed in the OP a reflection of how the connect is used 
by you?
{quote}
Yes. The above should explain it.
{quote}If it doesn't exist, then we can skip the offset commit but what happens 
if it exists? Do we retry?
{quote}
I'd say we throw the original exception.


was (Author: morozov):
{quote}how big of an impact is this to you for your use case actually?
{quote}
This issue is quite impactful. For the background: we use Connect in a 
multi-tenant environment where each topic represents a tenant. There is a 
Streams application that reads a topic containing the messages from about a 
hundred tenants and routes them to the tenant-scoped topics. To handle the 
situations when a new tenant is provisioned, in order to reduce the latency, we 
pre-provision a few standby topics per Streams replica. Topics are managed by 
Strimzi and are owned by the Streams pod. So if a Streams application is 
restarted, all its standby topics are deleted, and then new ones are created 
for the new instance. Restarting the Streams application is a common thing 
(e.g. due to a rolling upgrade), so we get sink connector failures in pretty 
much common scenarios.
{quote}Is the test you listed in the OP a reflection of how the connect is used 
by you?
{quote}
Yes. The above should explain it.
{quote}If it doesn't exist, then we can skip the offset commit but what happens 
if it exists? Do we retry?
{quote}
I'd say we throw the original exception.

> Sink connector fails if a topic matching its topics.regex gets deleted
> --
>
> Key: KAFKA-14750
> URL: https://issues.apache.org/jira/browse/KAFKA-14750
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.3.1
>Reporter: Sergei Morozov
>Assignee: Sagar Rao
>Priority: Major
>
> Steps to reproduce:
> # In {{{}config/connect-standalone.properties{}}}, set:
> {code:bash}
> plugin.path=libs/connect-file-3.3.1.jar
> {code}
> # In {{{}config/connect-file-sink.properties{}}}, remove the {{topics=}} line 
> and add this one:
> {code:bash}
> topics.regex=connect-test-.*
> {code}
> # Start zookeeper:
> {code:bash}
> bin/zookeeper-server-start.sh config/zookeeper.properties
> {code}
> # Start the brokers:
> {code:bash}
> bin/kafka-server-start.sh config/server.properties
> {code}
> # Start the file sink connector:
> {code:bash}
> bin/connect-standalone.sh config/connect-standalone.properties 
> config/connect-file-sink.properties
> {code}
> # Create topics for the sink connector to subscribe to:
> {code:bash}
> for i in {0..2}; do
>   for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do
> bin/kafka-topics.sh \
>         --bootstrap-server localhost:9092 \
>         --create \
>         --topic connect-test-$j
>   done &
> done
> wait
> {code}
> # Wait until all the created topics are assigned to the connector. Check the 
> number of partitions to be > 0 in the output of:
> {code:bash}
> bin/kafka-consumer-groups.sh \
>     --bootstrap-server localhost:9092 \
>     --group connect-local-file-sink \
>     --describe --members
> {code}
> # Delete the created topics:
> {code:bash}
> for i in {0..2}; do
>   for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do
>     bin/kafka-topics.sh \
>         --bootstrap-server localhost:9092 \
>         --delete \
>         --topic connect-test-$j
>     echo Deleted topic connect-test-$j.
>   done &
> done
> wait
> {code}
> # Observe the connector fail with the following error:
> {quote}org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms 
> expired before the position for partition connect-test-211-0 could be 
> determined
> 

[jira] [Commented] (KAFKA-14750) Sink connector fails if a topic matching its topics.regex gets deleted

2023-04-12 Thread Sergei Morozov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711589#comment-17711589
 ] 

Sergei Morozov commented on KAFKA-14750:


{quote}how big of an impact is this to you for your use case actually?
{quote}
This issue is quite impactful. For the background: we use Connect in a 
multi-tenant environment where each topic represents a tenant. There is a 
Streams application that reads a topic containing the messages from about a 
hundred tenants and routes them to the tenant-scoped topics. To handle the 
situations when a new tenant is provisioned, in order to reduce the latency, we 
pre-provision a few standby topics per Streams replica. Topics are managed by 
Strimzi and are owned by the Streams pod. So if a Streams application is 
restarted, all its standby topics are deleted, and then new ones are created 
for the new instance. Restarting the Streams application is a common thing 
(e.g. due to a rolling upgrade), so we get sink connector failures in pretty 
much common scenarios.
{quote}Is the test you listed in the OP a reflection of how the connect is used 
by you?
{quote}
Yes. The above should explain it.
{quote}If it doesn't exist, then we can skip the offset commit but what happens 
if it exists? Do we retry?
{quote}
I'd say we throw the original exception.

> Sink connector fails if a topic matching its topics.regex gets deleted
> --
>
> Key: KAFKA-14750
> URL: https://issues.apache.org/jira/browse/KAFKA-14750
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.3.1
>Reporter: Sergei Morozov
>Assignee: Sagar Rao
>Priority: Major
>
> Steps to reproduce:
> # In {{{}config/connect-standalone.properties{}}}, set:
> {code:bash}
> plugin.path=libs/connect-file-3.3.1.jar
> {code}
> # In {{{}config/connect-file-sink.properties{}}}, remove the {{topics=}} line 
> and add this one:
> {code:bash}
> topics.regex=connect-test-.*
> {code}
> # Start zookeeper:
> {code:bash}
> bin/zookeeper-server-start.sh config/zookeeper.properties
> {code}
> # Start the brokers:
> {code:bash}
> bin/kafka-server-start.sh config/server.properties
> {code}
> # Start the file sink connector:
> {code:bash}
> bin/connect-standalone.sh config/connect-standalone.properties 
> config/connect-file-sink.properties
> {code}
> # Create topics for the sink connector to subscribe to:
> {code:bash}
> for i in {0..2}; do
>   for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do
> bin/kafka-topics.sh \
>         --bootstrap-server localhost:9092 \
>         --create \
>         --topic connect-test-$j
>   done &
> done
> wait
> {code}
> # Wait until all the created topics are assigned to the connector. Check the 
> number of partitions to be > 0 in the output of:
> {code:bash}
> bin/kafka-consumer-groups.sh \
>     --bootstrap-server localhost:9092 \
>     --group connect-local-file-sink \
>     --describe --members
> {code}
> # Delete the created topics:
> {code:bash}
> for i in {0..2}; do
>   for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do
>     bin/kafka-topics.sh \
>         --bootstrap-server localhost:9092 \
>         --delete \
>         --topic connect-test-$j
>     echo Deleted topic connect-test-$j.
>   done &
> done
> wait
> {code}
> # Observe the connector fail with the following error:
> {quote}org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms 
> expired before the position for partition connect-test-211-0 could be 
> determined
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on pull request #13549: KAFKA-14900: Fix NPE in SharedServer causing flaky failure in AuthorizerTest

2023-04-12 Thread via GitHub


cmccabe commented on PR #13549:
URL: https://github.com/apache/kafka/pull/13549#issuecomment-1506025212

   Thanks @gharris1727 .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader

2023-04-12 Thread via GitHub


guozhangwang merged PR #13523:
URL: https://github.com/apache/kafka/pull/13523


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14900) Flaky test AuthorizerTest failing with NPE

2023-04-12 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711581#comment-17711581
 ] 

Greg Harris commented on KAFKA-14900:
-

This will be addressed in https://github.com/apache/kafka/pull/13543

> Flaky test AuthorizerTest failing with NPE
> --
>
> Key: KAFKA-14900
> URL: https://issues.apache.org/jira/browse/KAFKA-14900
> Project: Kafka
>  Issue Type: Test
>  Components: kraft
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
>  Labels: flaky-test
>
> The AuthorizerTest has multiple tests that appear to have the same flaky 
> failure:
> {noformat}
> org.apache.kafka.server.fault.FaultHandlerException: 
> quorumTestHarnessFaultHandler: Unhandled error initializing new publishers: 
> Cannot invoke "kafka.raft.KafkaRaftManager.client()" because the return value 
> of "kafka.server.SharedServer.raftManager()" is null
>   at 
> app//kafka.server.SharedServer.$anonfun$start$3(SharedServer.scala:256)
>   at 
> app//org.apache.kafka.image.loader.MetadataLoader.stillNeedToCatchUp(MetadataLoader.java:229)
>   at 
> app//org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:270)
>   at 
> app//org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:258)
>   at 
> app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>   at 
> app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>   at 
> app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>   at java.base@17.0.4.1/java.lang.Thread.run(Thread.java:833)
> Caused by: java.lang.NullPointerException: Cannot invoke 
> "kafka.raft.KafkaRaftManager.client()" because the return value of 
> "kafka.server.SharedServer.raftManager()" is null
>   ... 8 more{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 closed pull request #13549: KAFKA-14900: Fix NPE in SharedServer causing flaky failure in AuthorizerTest

2023-04-12 Thread via GitHub


gharris1727 closed pull request #13549: KAFKA-14900: Fix NPE in SharedServer 
causing flaky failure in AuthorizerTest
URL: https://github.com/apache/kafka/pull/13549


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13549: KAFKA-14900: Fix NPE in SharedServer causing flaky failure in AuthorizerTest

2023-04-12 Thread via GitHub


gharris1727 commented on PR #13549:
URL: https://github.com/apache/kafka/pull/13549#issuecomment-1506006379

   This duplicates #13543 so I'll close this in favor of that more 
comprehensive change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14901) Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration

2023-04-12 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711580#comment-17711580
 ] 

Greg Harris edited comment on KAFKA-14901 at 4/12/23 9:49 PM:
--

cc [~jolshan] [~dajac] [~hachikuji] [~mimaison]

I have not seen this failure mode before, and I'm worried that this might be a 
recent regression. It also doesn't look like an error that is intended to be 
surfaced by the API in normal operations (I might expect a disconnect or 
ProducerFencedException, maybe).
Just to be safe I've marked this as a blocker for 3.5.0, but we can downgrade 
the severity with some more investigation.

Since the incidence rate for this is relatively low, I've attached a couple of 
logs of the test run leading up to the error. The trace-0 log fails on 
PrepareEpochFence instead of Empty, but it appears both runs fail when the 
pending state is Ongoing.
To reproduce yourself: Run the version of test on trunk 10-50 times (30s each) 
until it fails. This test is different from the other tests in the same suite 
as it makes use of active producer fencing to eliminate other tasks/producers 
when reconfiguring tasks.


was (Author: gharris1727):
cc [~jolshan] [~dajac] [~hachikuji] [~mimaison] 

I have not seen this failure mode before, and I'm worried that this might be a 
recent regression. It also doesn't look like an error that is intended to be 
surfaced by the API in normal operations (I might expect a disconnect or 
ProducerFencedException, maybe).
Just to be safe I've marked this as a blocker for 3.5.0, but we can downgrade 
the severity with some more investigation.

Since the incidence rate for this is relatively low, I've attached a couple of 
logs of the test run leading up to the error. The trace-0 log fails on 
PrepareEpochFence instead of Empty, but it appears both runs fail when the 
pending state is Ongoing.
To reproduce yourself: Run the version of test on trunk 10-50 times until it 
fails. This test is different from the other tests in the same suite as it 
makes use of active producer fencing to eliminate other tasks/producers when 
reconfiguring tasks.

> Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
> 
>
> Key: KAFKA-14901
> URL: https://issues.apache.org/jira/browse/KAFKA-14901
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Greg Harris
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.5.0
>
> Attachments: transaction-flake-trace-0.out, transaction-flake.out
>
>
> The EOS Source test appears to be very rarely failing (<5% chance) with the 
> following error:
> {noformat}
> org.apache.kafka.common.KafkaException: Unexpected error in 
> InitProducerIdResponse; The server experienced an unexpected error when 
> processing the request.
>   at 
> app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303)
>   at 
> app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207)
>   at 
> app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
>   at 
> app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594)
>   at 
> app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
>   at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829){noformat}
> which appears to be triggered by the following failure inside the broker:
> {noformat}
> [2023-04-12 14:01:38,931] ERROR [KafkaApi-0] Unexpected error handling 
> request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
> clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, 
> headerVersion=2) -- 
> InitProducerIdRequestData(transactionalId='exactly-once-source-integration-test-exactlyOnceQuestionMark-1',
>  transactionTimeoutMs=6, producerId=-1, producerEpoch=-1) with context 
> RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
> clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, 
> headerVersion=2), connectionId='127.0.0.1:54213-127.0.0.1:54367-46', 
> clientAddress=/127.0.0.1, principal=User:ANONYMOUS, 
> listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, 
> clientInformation=ClientInformation(softwareName=apache-kafka-java, 
> softwareVersion=3.5.0-SNAPSHOT), 

[jira] [Commented] (KAFKA-14901) Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration

2023-04-12 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711580#comment-17711580
 ] 

Greg Harris commented on KAFKA-14901:
-

cc [~jolshan] [~dajac] [~hachikuji] [~mimaison] 

I have not seen this failure mode before, and I'm worried that this might be a 
recent regression. It also doesn't look like an error that is intended to be 
surfaced by the API in normal operations (I might expect a disconnect or 
ProducerFencedException, maybe).
Just to be safe I've marked this as a blocker for 3.5.0, but we can downgrade 
the severity with some more investigation.

Since the incidence rate for this is relatively low, I've attached a couple of 
logs of the test run leading up to the error. The trace-0 log fails on 
PrepareEpochFence instead of Empty, but it appears both runs fail when the 
pending state is Ongoing.
To reproduce yourself: Run the version of test on trunk 10-50 times until it 
fails. This test is different from the other tests in the same suite as it 
makes use of active producer fencing to eliminate other tasks/producers when 
reconfiguring tasks.

> Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
> 
>
> Key: KAFKA-14901
> URL: https://issues.apache.org/jira/browse/KAFKA-14901
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Greg Harris
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.5.0
>
> Attachments: transaction-flake-trace-0.out, transaction-flake.out
>
>
> The EOS Source test appears to be very rarely failing (<5% chance) with the 
> following error:
> {noformat}
> org.apache.kafka.common.KafkaException: Unexpected error in 
> InitProducerIdResponse; The server experienced an unexpected error when 
> processing the request.
>   at 
> app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303)
>   at 
> app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207)
>   at 
> app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
>   at 
> app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594)
>   at 
> app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
>   at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829){noformat}
> which appears to be triggered by the following failure inside the broker:
> {noformat}
> [2023-04-12 14:01:38,931] ERROR [KafkaApi-0] Unexpected error handling 
> request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
> clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, 
> headerVersion=2) -- 
> InitProducerIdRequestData(transactionalId='exactly-once-source-integration-test-exactlyOnceQuestionMark-1',
>  transactionTimeoutMs=6, producerId=-1, producerEpoch=-1) with context 
> RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
> clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, 
> headerVersion=2), connectionId='127.0.0.1:54213-127.0.0.1:54367-46', 
> clientAddress=/127.0.0.1, principal=User:ANONYMOUS, 
> listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, 
> clientInformation=ClientInformation(softwareName=apache-kafka-java, 
> softwareVersion=3.5.0-SNAPSHOT), fromPrivilegedListener=true, 
> principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@615924cd])
>  (kafka.server.KafkaApis:76)
> java.lang.IllegalStateException: Preparing transaction state transition to 
> Empty while it already a pending state Ongoing
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:380)
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareIncrementProducerEpoch(TransactionMetadata.scala:311)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.prepareInitProducerIdTransit(TransactionCoordinator.scala:240)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$3(TransactionCoordinator.scala:151)
>     at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:242)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$2(TransactionCoordinator.scala:150)
>     at 

[jira] [Updated] (KAFKA-14901) Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration

2023-04-12 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-14901:

Attachment: transaction-flake-trace-0.out

> Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
> 
>
> Key: KAFKA-14901
> URL: https://issues.apache.org/jira/browse/KAFKA-14901
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Greg Harris
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.5.0
>
> Attachments: transaction-flake-trace-0.out, transaction-flake.out
>
>
> The EOS Source test appears to be very rarely failing (<5% chance) with the 
> following error:
> {noformat}
> org.apache.kafka.common.KafkaException: Unexpected error in 
> InitProducerIdResponse; The server experienced an unexpected error when 
> processing the request.
>   at 
> app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303)
>   at 
> app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207)
>   at 
> app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
>   at 
> app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594)
>   at 
> app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
>   at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829){noformat}
> which appears to be triggered by the following failure inside the broker:
> {noformat}
> [2023-04-12 14:01:38,931] ERROR [KafkaApi-0] Unexpected error handling 
> request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
> clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, 
> headerVersion=2) -- 
> InitProducerIdRequestData(transactionalId='exactly-once-source-integration-test-exactlyOnceQuestionMark-1',
>  transactionTimeoutMs=6, producerId=-1, producerEpoch=-1) with context 
> RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
> clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, 
> headerVersion=2), connectionId='127.0.0.1:54213-127.0.0.1:54367-46', 
> clientAddress=/127.0.0.1, principal=User:ANONYMOUS, 
> listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, 
> clientInformation=ClientInformation(softwareName=apache-kafka-java, 
> softwareVersion=3.5.0-SNAPSHOT), fromPrivilegedListener=true, 
> principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@615924cd])
>  (kafka.server.KafkaApis:76)
> java.lang.IllegalStateException: Preparing transaction state transition to 
> Empty while it already a pending state Ongoing
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:380)
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareIncrementProducerEpoch(TransactionMetadata.scala:311)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.prepareInitProducerIdTransit(TransactionCoordinator.scala:240)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$3(TransactionCoordinator.scala:151)
>     at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:242)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$2(TransactionCoordinator.scala:150)
>     at scala.util.Either.flatMap(Either.scala:352)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.handleInitProducerId(TransactionCoordinator.scala:145)
>     at 
> kafka.server.KafkaApis.handleInitProducerIdRequest(KafkaApis.scala:2236)
>     at kafka.server.KafkaApis.handle(KafkaApis.scala:202)
>     at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76)
>     at java.base/java.lang.Thread.run(Thread.java:829{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on pull request #13477: KAFKA-7499: Handle serialization error in ProductionExceptionHandler

2023-04-12 Thread via GitHub


philipnee commented on PR #13477:
URL: https://github.com/apache/kafka/pull/13477#issuecomment-1505978765

   @bbejeck and @mjsax - thanks for the reviews. I believe I saw something 
about the class casting handling but I couldn't see that in the comments.  I 
believe that is already implemented some time ago.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14901) Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration

2023-04-12 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-14901:

Fix Version/s: 3.5.0

> Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
> 
>
> Key: KAFKA-14901
> URL: https://issues.apache.org/jira/browse/KAFKA-14901
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Greg Harris
>Priority: Major
>  Labels: flaky-test
> Fix For: 3.5.0
>
> Attachments: transaction-flake.out
>
>
> The EOS Source test appears to be very rarely failing (<5% chance) with the 
> following error:
> {noformat}
> org.apache.kafka.common.KafkaException: Unexpected error in 
> InitProducerIdResponse; The server experienced an unexpected error when 
> processing the request.
>   at 
> app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303)
>   at 
> app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207)
>   at 
> app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
>   at 
> app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594)
>   at 
> app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
>   at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829){noformat}
> which appears to be triggered by the following failure inside the broker:
> {noformat}
> [2023-04-12 14:01:38,931] ERROR [KafkaApi-0] Unexpected error handling 
> request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
> clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, 
> headerVersion=2) -- 
> InitProducerIdRequestData(transactionalId='exactly-once-source-integration-test-exactlyOnceQuestionMark-1',
>  transactionTimeoutMs=6, producerId=-1, producerEpoch=-1) with context 
> RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
> clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, 
> headerVersion=2), connectionId='127.0.0.1:54213-127.0.0.1:54367-46', 
> clientAddress=/127.0.0.1, principal=User:ANONYMOUS, 
> listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, 
> clientInformation=ClientInformation(softwareName=apache-kafka-java, 
> softwareVersion=3.5.0-SNAPSHOT), fromPrivilegedListener=true, 
> principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@615924cd])
>  (kafka.server.KafkaApis:76)
> java.lang.IllegalStateException: Preparing transaction state transition to 
> Empty while it already a pending state Ongoing
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:380)
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareIncrementProducerEpoch(TransactionMetadata.scala:311)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.prepareInitProducerIdTransit(TransactionCoordinator.scala:240)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$3(TransactionCoordinator.scala:151)
>     at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:242)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$2(TransactionCoordinator.scala:150)
>     at scala.util.Either.flatMap(Either.scala:352)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.handleInitProducerId(TransactionCoordinator.scala:145)
>     at 
> kafka.server.KafkaApis.handleInitProducerIdRequest(KafkaApis.scala:2236)
>     at kafka.server.KafkaApis.handle(KafkaApis.scala:202)
>     at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76)
>     at java.base/java.lang.Thread.run(Thread.java:829{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14901) Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration

2023-04-12 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-14901:

Priority: Blocker  (was: Major)

> Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
> 
>
> Key: KAFKA-14901
> URL: https://issues.apache.org/jira/browse/KAFKA-14901
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Greg Harris
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.5.0
>
> Attachments: transaction-flake.out
>
>
> The EOS Source test appears to be very rarely failing (<5% chance) with the 
> following error:
> {noformat}
> org.apache.kafka.common.KafkaException: Unexpected error in 
> InitProducerIdResponse; The server experienced an unexpected error when 
> processing the request.
>   at 
> app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303)
>   at 
> app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207)
>   at 
> app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
>   at 
> app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594)
>   at 
> app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
>   at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829){noformat}
> which appears to be triggered by the following failure inside the broker:
> {noformat}
> [2023-04-12 14:01:38,931] ERROR [KafkaApi-0] Unexpected error handling 
> request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
> clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, 
> headerVersion=2) -- 
> InitProducerIdRequestData(transactionalId='exactly-once-source-integration-test-exactlyOnceQuestionMark-1',
>  transactionTimeoutMs=6, producerId=-1, producerEpoch=-1) with context 
> RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
> clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, 
> headerVersion=2), connectionId='127.0.0.1:54213-127.0.0.1:54367-46', 
> clientAddress=/127.0.0.1, principal=User:ANONYMOUS, 
> listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, 
> clientInformation=ClientInformation(softwareName=apache-kafka-java, 
> softwareVersion=3.5.0-SNAPSHOT), fromPrivilegedListener=true, 
> principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@615924cd])
>  (kafka.server.KafkaApis:76)
> java.lang.IllegalStateException: Preparing transaction state transition to 
> Empty while it already a pending state Ongoing
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:380)
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareIncrementProducerEpoch(TransactionMetadata.scala:311)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.prepareInitProducerIdTransit(TransactionCoordinator.scala:240)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$3(TransactionCoordinator.scala:151)
>     at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:242)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$2(TransactionCoordinator.scala:150)
>     at scala.util.Either.flatMap(Either.scala:352)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.handleInitProducerId(TransactionCoordinator.scala:145)
>     at 
> kafka.server.KafkaApis.handleInitProducerIdRequest(KafkaApis.scala:2236)
>     at kafka.server.KafkaApis.handle(KafkaApis.scala:202)
>     at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76)
>     at java.base/java.lang.Thread.run(Thread.java:829{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14901) Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration

2023-04-12 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-14901:

Attachment: transaction-flake.out

> Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
> 
>
> Key: KAFKA-14901
> URL: https://issues.apache.org/jira/browse/KAFKA-14901
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Greg Harris
>Priority: Major
>  Labels: flaky-test
> Attachments: transaction-flake.out
>
>
> The EOS Source test appears to be very rarely failing (<5% chance) with the 
> following error:
> {noformat}
> org.apache.kafka.common.KafkaException: Unexpected error in 
> InitProducerIdResponse; The server experienced an unexpected error when 
> processing the request.
>   at 
> app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303)
>   at 
> app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207)
>   at 
> app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
>   at 
> app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594)
>   at 
> app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
>   at 
> app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
>   at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829){noformat}
> which appears to be triggered by the following failure inside the broker:
> {noformat}
> [2023-04-12 14:01:38,931] ERROR [KafkaApi-0] Unexpected error handling 
> request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
> clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, 
> headerVersion=2) -- 
> InitProducerIdRequestData(transactionalId='exactly-once-source-integration-test-exactlyOnceQuestionMark-1',
>  transactionTimeoutMs=6, producerId=-1, producerEpoch=-1) with context 
> RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
> clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, 
> headerVersion=2), connectionId='127.0.0.1:54213-127.0.0.1:54367-46', 
> clientAddress=/127.0.0.1, principal=User:ANONYMOUS, 
> listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, 
> clientInformation=ClientInformation(softwareName=apache-kafka-java, 
> softwareVersion=3.5.0-SNAPSHOT), fromPrivilegedListener=true, 
> principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@615924cd])
>  (kafka.server.KafkaApis:76)
> java.lang.IllegalStateException: Preparing transaction state transition to 
> Empty while it already a pending state Ongoing
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:380)
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareIncrementProducerEpoch(TransactionMetadata.scala:311)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.prepareInitProducerIdTransit(TransactionCoordinator.scala:240)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$3(TransactionCoordinator.scala:151)
>     at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:242)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$2(TransactionCoordinator.scala:150)
>     at scala.util.Either.flatMap(Either.scala:352)
>     at 
> kafka.coordinator.transaction.TransactionCoordinator.handleInitProducerId(TransactionCoordinator.scala:145)
>     at 
> kafka.server.KafkaApis.handleInitProducerIdRequest(KafkaApis.scala:2236)
>     at kafka.server.KafkaApis.handle(KafkaApis.scala:202)
>     at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76)
>     at java.base/java.lang.Thread.run(Thread.java:829{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14901) Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration

2023-04-12 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-14901:

Description: 
The EOS Source test appears to be very rarely failing (<5% chance) with the 
following error:
{noformat}
org.apache.kafka.common.KafkaException: Unexpected error in 
InitProducerIdResponse; The server experienced an unexpected error when 
processing the request.
at 
app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303)
at 
app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207)
at 
app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
at 
app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594)
at 
app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586)
at 
app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426)
at 
app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
at 
app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829){noformat}
which appears to be triggered by the following failure inside the broker:
{noformat}
[2023-04-12 14:01:38,931] ERROR [KafkaApi-0] Unexpected error handling request 
RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, 
headerVersion=2) -- 
InitProducerIdRequestData(transactionalId='exactly-once-source-integration-test-exactlyOnceQuestionMark-1',
 transactionTimeoutMs=6, producerId=-1, producerEpoch=-1) with context 
RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, 
headerVersion=2), connectionId='127.0.0.1:54213-127.0.0.1:54367-46', 
clientAddress=/127.0.0.1, principal=User:ANONYMOUS, 
listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, 
clientInformation=ClientInformation(softwareName=apache-kafka-java, 
softwareVersion=3.5.0-SNAPSHOT), fromPrivilegedListener=true, 
principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@615924cd])
 (kafka.server.KafkaApis:76)
java.lang.IllegalStateException: Preparing transaction state transition to 
Empty while it already a pending state Ongoing
    at 
kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:380)
    at 
kafka.coordinator.transaction.TransactionMetadata.prepareIncrementProducerEpoch(TransactionMetadata.scala:311)
    at 
kafka.coordinator.transaction.TransactionCoordinator.prepareInitProducerIdTransit(TransactionCoordinator.scala:240)
    at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$3(TransactionCoordinator.scala:151)
    at 
kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:242)
    at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$2(TransactionCoordinator.scala:150)
    at scala.util.Either.flatMap(Either.scala:352)
    at 
kafka.coordinator.transaction.TransactionCoordinator.handleInitProducerId(TransactionCoordinator.scala:145)
    at kafka.server.KafkaApis.handleInitProducerIdRequest(KafkaApis.scala:2236)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:202)
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76)
    at java.base/java.lang.Thread.run(Thread.java:829{noformat}

  was:
The EOS Source test appears to be occasionally failing with the following error:
{noformat}
org.apache.kafka.common.KafkaException: Unexpected error in 
InitProducerIdResponse; The server experienced an unexpected error when 
processing the request.
at 
app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303)
at 
app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207)
at 
app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
at 
app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594)
at 
app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586)
at 
app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426)
at 
app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
at 
app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
at 

[jira] [Created] (KAFKA-14901) Flaky test ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration

2023-04-12 Thread Greg Harris (Jira)
Greg Harris created KAFKA-14901:
---

 Summary: Flaky test 
ExactlyOnceSourceIntegrationTest.testConnectorReconfiguration
 Key: KAFKA-14901
 URL: https://issues.apache.org/jira/browse/KAFKA-14901
 Project: Kafka
  Issue Type: Test
  Components: KafkaConnect
Reporter: Greg Harris


The EOS Source test appears to be occasionally failing with the following error:
{noformat}
org.apache.kafka.common.KafkaException: Unexpected error in 
InitProducerIdResponse; The server experienced an unexpected error when 
processing the request.
at 
app//org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1303)
at 
app//org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1207)
at 
app//org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
at 
app//org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:594)
at 
app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:586)
at 
app//org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:426)
at 
app//org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
at 
app//org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
at java.base@11.0.16.1/java.lang.Thread.run(Thread.java:829){noformat}
which appears to be triggered by the following failure inside the broker:
{noformat}
[2023-04-12 14:01:38,931] ERROR [KafkaApi-0] Unexpected error handling request 
RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, 
headerVersion=2) -- 
InitProducerIdRequestData(transactionalId='exactly-once-source-integration-test-exactlyOnceQuestionMark-1',
 transactionTimeoutMs=6, producerId=-1, producerEpoch=-1) with context 
RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4, 
clientId=simulated-task-producer-exactlyOnceQuestionMark-1, correlationId=5, 
headerVersion=2), connectionId='127.0.0.1:54213-127.0.0.1:54367-46', 
clientAddress=/127.0.0.1, principal=User:ANONYMOUS, 
listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, 
clientInformation=ClientInformation(softwareName=apache-kafka-java, 
softwareVersion=3.5.0-SNAPSHOT), fromPrivilegedListener=true, 
principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@615924cd])
 (kafka.server.KafkaApis:76)
java.lang.IllegalStateException: Preparing transaction state transition to 
Empty while it already a pending state Ongoing
    at 
kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:380)
    at 
kafka.coordinator.transaction.TransactionMetadata.prepareIncrementProducerEpoch(TransactionMetadata.scala:311)
    at 
kafka.coordinator.transaction.TransactionCoordinator.prepareInitProducerIdTransit(TransactionCoordinator.scala:240)
    at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$3(TransactionCoordinator.scala:151)
    at 
kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:242)
    at 
kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleInitProducerId$2(TransactionCoordinator.scala:150)
    at scala.util.Either.flatMap(Either.scala:352)
    at 
kafka.coordinator.transaction.TransactionCoordinator.handleInitProducerId(TransactionCoordinator.scala:145)
    at kafka.server.KafkaApis.handleInitProducerIdRequest(KafkaApis.scala:2236)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:202)
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76)
    at java.base/java.lang.Thread.run(Thread.java:829{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #13477: KAFKA-7499: Handle serialization error in ProductionExceptionHandler

2023-04-12 Thread via GitHub


philipnee commented on code in PR #13477:
URL: https://github.com/apache/kafka/pull/13477#discussion_r1164651827


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##
@@ -213,6 +212,31 @@ public  void send(final String topic,
 keyClass,
 valueClass),
 exception);
+} catch (final SerializationException exception) {
+final ProducerRecord record = new ProducerRecord<>(topic, 
partition, timestamp, key, value, headers);
+final 
ProductionExceptionHandler.ProductionExceptionHandlerResponse response;
+try {
+response = 
productionExceptionHandler.onSerializationException(record, exception);
+} catch (final Exception e) {
+log.error("Fatal handling serialization exception on record 
{}", record, e);
+recordSendError(topic, e, null);
+return;
+}
+
+if (response == ProductionExceptionHandlerResponse.FAIL) {
+log.error("Fatal handling serialization exception on record 
{}", record, exception);
+recordSendError(topic, exception, null);
+return;
+}
+
+log.warn("Unable to serialize {}. Continue processing. " +
+"ProducerRecord(key=[{}], value=[{}], topic=[{}], 
partition=[{}], timestamp=[{}])",
+keyBytes == null ? "key" : "value",

Review Comment:
   hmm good point, maybe just log the continue processing message then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji opened a new pull request, #13551: MINOR: Allow tagged fields with version subset of flexible version range

2023-04-12 Thread via GitHub


hachikuji opened a new pull request, #13551:
URL: https://github.com/apache/kafka/pull/13551

   The generated message types are missing a range check for the case when the 
tagged version range is a subset of the flexible version range. This causes the 
tagged field count, which is computed correctly, to conflict with the number of 
tags serialized.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-04-12 Thread via GitHub


jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1164638992


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
+import org.apache.kafka.common.{Node, TopicPartition}
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+
+object AddPartitionsToTxnManager {
+  type AppendCallback = Map[TopicPartition, Errors] => Unit
+}
+
+
+class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
+  val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback])
+
+
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time) 
+  extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time) {
+  
+  private val inflightNodes = mutable.HashSet[Node]()
+  private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
+  
+  def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
+nodesToTransactions.synchronized {
+  // Check if we have already have either node or individual transaction. 
Add the Node if it isn't there.
+  val currentNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
+new TransactionDataAndCallbacks(
+  new AddPartitionsToTxnTransactionCollection(1),
+  mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]()))
+
+  val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+
+  // Check if we already have txn ID -- if the epoch is bumped, return 
invalid producer epoch, otherwise, the client likely disconnected and 
+  // reconnected so return the retriable network exception.
+  if (currentTransactionData != null) {
+val error = if (currentTransactionData.producerEpoch() < 
transactionData.producerEpoch())
+  Errors.INVALID_PRODUCER_EPOCH
+else 
+  Errors.NETWORK_EXCEPTION
+val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
+currentTransactionData.topics().forEach { topic =>
+  topic.partitions().forEach { partition =>
+topicPartitionsToError.put(new TopicPartition(topic.name(), 
partition), error)
+  }
+}
+val oldCallback = 
currentNodeAndTransactionData.callbacks(transactionData.transactionalId())
+currentNodeAndTransactionData.transactionData.remove(transactionData)
+oldCallback(topicPartitionsToError.toMap)
+  }
+  currentNodeAndTransactionData.transactionData.add(transactionData)
+  
currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), 
callback)
+  wakeup()
+}
+  }
+
+  private class AddPartitionsToTxnHandler(node: Node, 
transactionDataAndCallbacks: TransactionDataAndCallbacks) extends 
RequestCompletionHandler {
+override def onComplete(response: ClientResponse): Unit = {
+  // Note: Synchronization is not needed on inflightNodes since it is 
always accessed from this thread.
+  inflightNodes.remove(node)
+  if (response.authenticationException() != null) {
+error(s"AddPartitionsToTxnRequest failed for broker ${config.brokerId} 
with an " +
+  "authentication exception.", response.authenticationException)
+transactionDataAndCallbacks.callbacks.foreach { case (txnId, callback) 
=>
+  callback(buildErrorMap(txnId, 

[GitHub] [kafka] philipnee opened a new pull request, #13550: KAFKA-14639 - A single partition may be revoked and assign during a single round of rebalance

2023-04-12 Thread via GitHub


philipnee opened a new pull request, #13550:
URL: https://github.com/apache/kafka/pull/13550

   This is a really long story, but the incident started in KAFKA-13419 when we 
observed a member sending out a topic partition owned from the previous 
generation when a member missed a rebalance cycle due to REBALANCE_IN_PROGRESS.
   
   Ideally, the member should continue holding onto its partition as long as 
there's no other owner with a younger generation; however, we need to be 
defensive about this approach because we aren't sure if the partition has 
already been assigned to other members.  Therefore, it is safest for us to only 
honor the members with the highest generation and the previous generation 
during the assignment phase.
   
   In this PR, I made 2 major changes
   1. In the assignor: we now honor partition owner that is only on its max - 1 
generation as long as there's no other owner with a younger generation to that 
partition. (younger = higher generationId)
   2. After getting REBALANCE_IN_PROGRESS sync group error, we immediately 
reset its generation so that we could ensure to claim lose for all of the owned 
partition if member doesn't re-join in timely member.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14898) [ MirrorMaker ] sync.topic.configs.enabled not working as expected

2023-04-12 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-14898.
-
Resolution: Fixed

> [ MirrorMaker ] sync.topic.configs.enabled not working as expected
> --
>
> Key: KAFKA-14898
> URL: https://issues.apache.org/jira/browse/KAFKA-14898
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.4.0
>Reporter: Srinivas Boga
>Priority: Major
>  Labels: mirrormaker
>
> Hello,
> In my replication set up , i do not want to sync the topic configs, the use 
> case is to have different retention time for the topic on the target cluster, 
> I am passing the config
> {code:java}
>  sync.topic.configs.enabled = false{code}
> but this is not working as expected the topic retention time is being set to 
> whatever is being set in the source cluster, looking at the mirrormaker logs 
> i can see that MirrorSourceConnector is still setting the above config as true
> {code:java}
> [2023-04-12 17:04:55,184] INFO [MirrorSourceConnector|task-8] ConsumerConfig 
> values:
>         allow.auto.create.topics = true
>         auto.commit.interval.ms = 5000
>         auto.include.jmx.reporter = true
>         auto.offset.reset = earliest
>         bootstrap.servers = [sourcecluster.com:9092]
>         check.crcs = true
>         client.dns.lookup = use_all_dns_ips
>         client.id = consumer-null-2
>         client.rack =
>         connections.max.idle.ms = 54
>         default.api.timeout.ms = 6
>         enable.auto.commit = false
>         exclude.internal.topics = true
>         fetch.max.bytes = 52428800
>         fetch.max.wait.ms = 500
>         fetch.min.bytes = 1
>         group.id = null
>         group.instance.id = null
>         heartbeat.interval.ms = 3000
>         interceptor.classes = []
>         internal.leave.group.on.close = true
>         internal.throw.on.fetch.stable.offset.unsupported = false
>         isolation.level = read_uncommitted
>         key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>         max.partition.fetch.bytes = 1048576
>         max.poll.interval.ms = 30
>         max.poll.records = 500
>         metadata.max.age.ms = 30
>         metric.reporters = []
>         metrics.num.samples = 2
>         metrics.recording.level = INFO
>         metrics.sample.window.ms = 3
>         partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor, class 
> org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
>         receive.buffer.bytes = 65536
>         reconnect.backoff.max.ms = 1000
>         reconnect.backoff.ms = 50
>         request.timeout.ms = 3
>         retry.backoff.ms = 100
>         sasl.client.callback.handler.class = null
>         sasl.jaas.config = null
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>         sasl.kerberos.min.time.before.relogin = 6
>         sasl.kerberos.service.name = null
>         sasl.kerberos.ticket.renew.jitter = 0.05
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>         sasl.login.callback.handler.class = null
>         sasl.login.class = null
>         sasl.login.connect.timeout.ms = null
>         sasl.login.read.timeout.ms = null
>         sasl.login.refresh.buffer.seconds = 300
>         sasl.login.refresh.min.period.seconds = 60
>         sasl.login.refresh.window.factor = 0.8
>         sasl.login.refresh.window.jitter = 0.05
>         sasl.login.retry.backoff.max.ms = 1
>         sasl.login.retry.backoff.ms = 100
>         sasl.mechanism = GSSAPI
>         sasl.oauthbearer.clock.skew.seconds = 30
>         sasl.oauthbearer.expected.audience = null
>         sasl.oauthbearer.expected.issuer = null
>         sasl.oauthbearer.jwks.endpoint.refresh.ms = 360
>         sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 1
>         sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
>         sasl.oauthbearer.jwks.endpoint.url = null
>         sasl.oauthbearer.scope.claim.name = scope
>         sasl.oauthbearer.sub.claim.name = sub
>         sasl.oauthbearer.token.endpoint.url = null
>         security.protocol = PLAINTEXT
>         security.providers = null
>         send.buffer.bytes = 131072
>         session.timeout.ms = 45000
>         socket.connection.setup.timeout.max.ms = 3
>         socket.connection.setup.timeout.ms = 1
>         ssl.cipher.suites = null
>         ssl.enabled.protocols = [TLSv1.2]
>         ssl.endpoint.identification.algorithm = https
>         ssl.engine.factory.class = null
>         ssl.key.password = null
>         ssl.keymanager.algorithm = SunX509
>         ssl.keystore.certificate.chain = null
>         ssl.keystore.key = null
>         

[GitHub] [kafka] junrao commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-04-12 Thread via GitHub


junrao commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1164633122


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
+import org.apache.kafka.common.{Node, TopicPartition}
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+
+object AddPartitionsToTxnManager {
+  type AppendCallback = Map[TopicPartition, Errors] => Unit
+}
+
+
+class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
+  val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback])
+
+
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time) 
+  extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time) {
+  
+  private val inflightNodes = mutable.HashSet[Node]()
+  private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
+  
+  def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
+nodesToTransactions.synchronized {
+  // Check if we have already have either node or individual transaction. 
Add the Node if it isn't there.
+  val currentNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
+new TransactionDataAndCallbacks(
+  new AddPartitionsToTxnTransactionCollection(1),
+  mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]()))
+
+  val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+
+  // Check if we already have txn ID -- if the epoch is bumped, return 
invalid producer epoch, otherwise, the client likely disconnected and 
+  // reconnected so return the retriable network exception.
+  if (currentTransactionData != null) {
+val error = if (currentTransactionData.producerEpoch() < 
transactionData.producerEpoch())
+  Errors.INVALID_PRODUCER_EPOCH
+else 
+  Errors.NETWORK_EXCEPTION
+val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
+currentTransactionData.topics().forEach { topic =>
+  topic.partitions().forEach { partition =>
+topicPartitionsToError.put(new TopicPartition(topic.name(), 
partition), error)
+  }
+}
+val oldCallback = 
currentNodeAndTransactionData.callbacks(transactionData.transactionalId())
+currentNodeAndTransactionData.transactionData.remove(transactionData)
+oldCallback(topicPartitionsToError.toMap)
+  }
+  currentNodeAndTransactionData.transactionData.add(transactionData)
+  
currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), 
callback)
+  wakeup()
+}
+  }
+
+  private class AddPartitionsToTxnHandler(node: Node, 
transactionDataAndCallbacks: TransactionDataAndCallbacks) extends 
RequestCompletionHandler {
+override def onComplete(response: ClientResponse): Unit = {
+  // Note: Synchronization is not needed on inflightNodes since it is 
always accessed from this thread.
+  inflightNodes.remove(node)
+  if (response.authenticationException() != null) {
+error(s"AddPartitionsToTxnRequest failed for broker ${config.brokerId} 
with an " +
+  "authentication exception.", response.authenticationException)
+transactionDataAndCallbacks.callbacks.foreach { case (txnId, callback) 
=>
+  callback(buildErrorMap(txnId, 

[jira] [Commented] (KAFKA-14898) [ MirrorMaker ] sync.topic.configs.enabled not working as expected

2023-04-12 Thread Srinivas Boga (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711551#comment-17711551
 ] 

Srinivas Boga commented on KAFKA-14898:
---

[~gharris1727] Thanks for your help on this

 

Yes i could see that log message which you pointed out, i was running 
mirrormaker in distributed mode having 3 nodes and 10 tasks on each

I have verified by running only on one node and it is working as expected

 

Thanks,

-srini

> [ MirrorMaker ] sync.topic.configs.enabled not working as expected
> --
>
> Key: KAFKA-14898
> URL: https://issues.apache.org/jira/browse/KAFKA-14898
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.4.0
>Reporter: Srinivas Boga
>Priority: Major
>  Labels: mirrormaker
>
> Hello,
> In my replication set up , i do not want to sync the topic configs, the use 
> case is to have different retention time for the topic on the target cluster, 
> I am passing the config
> {code:java}
>  sync.topic.configs.enabled = false{code}
> but this is not working as expected the topic retention time is being set to 
> whatever is being set in the source cluster, looking at the mirrormaker logs 
> i can see that MirrorSourceConnector is still setting the above config as true
> {code:java}
> [2023-04-12 17:04:55,184] INFO [MirrorSourceConnector|task-8] ConsumerConfig 
> values:
>         allow.auto.create.topics = true
>         auto.commit.interval.ms = 5000
>         auto.include.jmx.reporter = true
>         auto.offset.reset = earliest
>         bootstrap.servers = [sourcecluster.com:9092]
>         check.crcs = true
>         client.dns.lookup = use_all_dns_ips
>         client.id = consumer-null-2
>         client.rack =
>         connections.max.idle.ms = 54
>         default.api.timeout.ms = 6
>         enable.auto.commit = false
>         exclude.internal.topics = true
>         fetch.max.bytes = 52428800
>         fetch.max.wait.ms = 500
>         fetch.min.bytes = 1
>         group.id = null
>         group.instance.id = null
>         heartbeat.interval.ms = 3000
>         interceptor.classes = []
>         internal.leave.group.on.close = true
>         internal.throw.on.fetch.stable.offset.unsupported = false
>         isolation.level = read_uncommitted
>         key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>         max.partition.fetch.bytes = 1048576
>         max.poll.interval.ms = 30
>         max.poll.records = 500
>         metadata.max.age.ms = 30
>         metric.reporters = []
>         metrics.num.samples = 2
>         metrics.recording.level = INFO
>         metrics.sample.window.ms = 3
>         partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor, class 
> org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
>         receive.buffer.bytes = 65536
>         reconnect.backoff.max.ms = 1000
>         reconnect.backoff.ms = 50
>         request.timeout.ms = 3
>         retry.backoff.ms = 100
>         sasl.client.callback.handler.class = null
>         sasl.jaas.config = null
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>         sasl.kerberos.min.time.before.relogin = 6
>         sasl.kerberos.service.name = null
>         sasl.kerberos.ticket.renew.jitter = 0.05
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>         sasl.login.callback.handler.class = null
>         sasl.login.class = null
>         sasl.login.connect.timeout.ms = null
>         sasl.login.read.timeout.ms = null
>         sasl.login.refresh.buffer.seconds = 300
>         sasl.login.refresh.min.period.seconds = 60
>         sasl.login.refresh.window.factor = 0.8
>         sasl.login.refresh.window.jitter = 0.05
>         sasl.login.retry.backoff.max.ms = 1
>         sasl.login.retry.backoff.ms = 100
>         sasl.mechanism = GSSAPI
>         sasl.oauthbearer.clock.skew.seconds = 30
>         sasl.oauthbearer.expected.audience = null
>         sasl.oauthbearer.expected.issuer = null
>         sasl.oauthbearer.jwks.endpoint.refresh.ms = 360
>         sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 1
>         sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
>         sasl.oauthbearer.jwks.endpoint.url = null
>         sasl.oauthbearer.scope.claim.name = scope
>         sasl.oauthbearer.sub.claim.name = sub
>         sasl.oauthbearer.token.endpoint.url = null
>         security.protocol = PLAINTEXT
>         security.providers = null
>         send.buffer.bytes = 131072
>         session.timeout.ms = 45000
>         socket.connection.setup.timeout.max.ms = 3
>         socket.connection.setup.timeout.ms = 1
>         ssl.cipher.suites = null
>         

[GitHub] [kafka] gharris1727 commented on pull request #13549: KAFKA-14900: Fix NPE in SharedServer causing flaky failure in AuthorizerTest

2023-04-12 Thread via GitHub


gharris1727 commented on PR #13549:
URL: https://github.com/apache/kafka/pull/13549#issuecomment-1505884041

   @cmccabe Could you review this fix?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 opened a new pull request, #13549: KAFKA-14900: Fix NPE in SharedServer causing flaky failure in AuthorizerTest

2023-04-12 Thread via GitHub


gharris1727 opened a new pull request, #13549:
URL: https://github.com/apache/kafka/pull/13549

   It appears that raftManager can become null via ControllerServer.shutdown() 
-> ensureNotRaftLeader() and various call sites of stop(). Rather than emit an 
NPE, evaluate raftManager.client while it is known to be non-null, and pass the 
evaluated client into the setHighWaterMarkAccessor.
   
   I unfortunately don't have much context here, and I'm not sure if once 
raftManager becomes null, if it is still reasonable to evaluate 
highWatermark(), or whether the HighWaterMarkAccessor should return empty.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14900) Flaky test AuthorizerTest failing with NPE

2023-04-12 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-14900:

Description: 
The AuthorizerTest has multiple tests that appear to have the same flaky 
failure:
{noformat}
org.apache.kafka.server.fault.FaultHandlerException: 
quorumTestHarnessFaultHandler: Unhandled error initializing new publishers: 
Cannot invoke "kafka.raft.KafkaRaftManager.client()" because the return value 
of "kafka.server.SharedServer.raftManager()" is null
at 
app//kafka.server.SharedServer.$anonfun$start$3(SharedServer.scala:256)
at 
app//org.apache.kafka.image.loader.MetadataLoader.stillNeedToCatchUp(MetadataLoader.java:229)
at 
app//org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:270)
at 
app//org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:258)
at 
app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
at java.base@17.0.4.1/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NullPointerException: Cannot invoke 
"kafka.raft.KafkaRaftManager.client()" because the return value of 
"kafka.server.SharedServer.raftManager()" is null
... 8 more{noformat}

  was:
The AuthorizerTest has multiple tests that appear to have the same flaky 
failure:
{noformat}
org.apache.kafka.server.fault.FaultHandlerException: 
quorumTestHarnessFaultHandler: Unhandled error initializing new publishers: 
Cannot invoke "kafka.raft.KafkaRaftManager.client()" because the return value 
of "kafka.server.SharedServer.raftManager()" is null at 
app//kafka.server.SharedServer.$anonfun$start$3(SharedServer.scala:256)  at 
app//org.apache.kafka.image.loader.MetadataLoader.stillNeedToCatchUp(MetadataLoader.java:229)
at 
app//org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:270)
   at 
app//org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:258)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
   at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
   at java.base@17.0.4.1/java.lang.Thread.run(Thread.java:833)Caused by: 
java.lang.NullPointerException: Cannot invoke 
"kafka.raft.KafkaRaftManager.client()" because the return value of 
"kafka.server.SharedServer.raftManager()" is null... 8 more{noformat}


> Flaky test AuthorizerTest failing with NPE
> --
>
> Key: KAFKA-14900
> URL: https://issues.apache.org/jira/browse/KAFKA-14900
> Project: Kafka
>  Issue Type: Test
>  Components: kraft
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
>  Labels: flaky-test
>
> The AuthorizerTest has multiple tests that appear to have the same flaky 
> failure:
> {noformat}
> org.apache.kafka.server.fault.FaultHandlerException: 
> quorumTestHarnessFaultHandler: Unhandled error initializing new publishers: 
> Cannot invoke "kafka.raft.KafkaRaftManager.client()" because the return value 
> of "kafka.server.SharedServer.raftManager()" is null
>   at 
> app//kafka.server.SharedServer.$anonfun$start$3(SharedServer.scala:256)
>   at 
> app//org.apache.kafka.image.loader.MetadataLoader.stillNeedToCatchUp(MetadataLoader.java:229)
>   at 
> app//org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:270)
>   at 
> app//org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:258)
>   at 
> app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>   at 
> app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>   at 
> app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>   at java.base@17.0.4.1/java.lang.Thread.run(Thread.java:833)
> Caused by: java.lang.NullPointerException: Cannot invoke 
> "kafka.raft.KafkaRaftManager.client()" because the return value of 
> "kafka.server.SharedServer.raftManager()" is null
>   ... 8 more{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14900) Flaky test AuthorizerTest failing with NPE

2023-04-12 Thread Greg Harris (Jira)
Greg Harris created KAFKA-14900:
---

 Summary: Flaky test AuthorizerTest failing with NPE
 Key: KAFKA-14900
 URL: https://issues.apache.org/jira/browse/KAFKA-14900
 Project: Kafka
  Issue Type: Test
  Components: kraft
Reporter: Greg Harris
Assignee: Greg Harris


The AuthorizerTest has multiple tests that appear to have the same flaky 
failure:
{noformat}
org.apache.kafka.server.fault.FaultHandlerException: 
quorumTestHarnessFaultHandler: Unhandled error initializing new publishers: 
Cannot invoke "kafka.raft.KafkaRaftManager.client()" because the return value 
of "kafka.server.SharedServer.raftManager()" is null at 
app//kafka.server.SharedServer.$anonfun$start$3(SharedServer.scala:256)  at 
app//org.apache.kafka.image.loader.MetadataLoader.stillNeedToCatchUp(MetadataLoader.java:229)
at 
app//org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:270)
   at 
app//org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:258)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
   at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
  at 
app//org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
   at java.base@17.0.4.1/java.lang.Thread.run(Thread.java:833)Caused by: 
java.lang.NullPointerException: Cannot invoke 
"kafka.raft.KafkaRaftManager.client()" because the return value of 
"kafka.server.SharedServer.raftManager()" is null... 8 more{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 opened a new pull request, #13548: MINOR: Fix regression in MM2 task forwarding introduced by KAFKA-14783

2023-04-12 Thread via GitHub


gharris1727 opened a new pull request, #13548:
URL: https://github.com/apache/kafka/pull/13548

   The DistributedHerder was computing the forwarded URL for publishing task 
configs incorrectly leading to 404s in MM2 distributed mode.
   
   This regression appears in #13424 and presently only exists on trunk. This 
manifests as a return to pre-KIP-710 behavior, and causes the 
DedicatedMirrorIntegrationTest to fail whenever forwarding happens, making the 
test flake in >50% of runs.
   
   It appears to be just a typo and not an intended change, and was hidden by 
the github diff when this function was split into two parts.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rittikaadhikari commented on pull request #13503: MINOR: Refactor TierStateMachine related tests into a separate test file

2023-04-12 Thread via GitHub


rittikaadhikari commented on PR #13503:
URL: https://github.com/apache/kafka/pull/13503#issuecomment-1505826616

   @junrao when you get a chance, would you be free to take a quick pass?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-12 Thread via GitHub


jeffkbkim commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1160974966


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are :-
+ * 
+ *  1) Each consumer must get at least one partition per topic that it is 
subscribed to whenever the number of consumers is
+ *less than or equal to the number of partitions for that topic. (Range) 

+ *  2) Partitions should be assigned to consumers in a way that 
facilitates join operations where required. (Range) 
+ *This can only be done if the topics are co-partitioned in the first place
+ *Co-partitioned:-
+ *Two streams are co-partitioned if the following conditions are met:-
+ * ->The keys must have the same schemas
+ * ->The topics involved must have the same number of partitions
+ *  3) Consumers should retain as much as their previous assignment as 
possible. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm works mainly in 5 steps described below
+ * 
+ *  1) Get a map of the consumersPerTopic created using the member 
subscriptions.
+ *  2) Get a list of consumers (potentiallyUnfilled) that have not met the 
minimum required quota for assignment AND
+ * get a list of sticky partitions that we want to retain in the new 
assignment.
+ *  3) Add consumers from potentiallyUnfilled to Unfilled if they haven't 
met the total required quota = minQuota + (if necessary) extraPartition 
+ *  4) Get a list of available partitions by calculating the difference 
between total partitions and assigned sticky partitions 
+ *  5) Iterate through unfilled consumers and assign partitions from 
available partitions 
+ * 
+ * 
+ *
+ */
+public class ServerSideStickyRangeAssignor implements PartitionAssignor {
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+private static  void putList(Map> map, K key, V value) {
+List list = map.computeIfAbsent(key, k -> new ArrayList<>());
+list.add(value);
+}
+
+private static  void putSet(Map> map, K key, V value) {
+Set set = map.computeIfAbsent(key, k -> new HashSet<>());
+set.add(value);
+}
+
+static class Pair {
+private final T first;
+private final U second;
+
+public Pair(T first, U second) {
+this.first = first;
+this.second = second;
+}
+
+public T getFirst() {
+return first;
+}
+
+public U getSecond() {
+return second;
+}
+
+@Override
+public String toString() {
+return "(" + first + ", " + second + ")";
+}
+}
+
+// Returns a map of the list of consumers per Topic (keyed by topicId)
+private Map> consumersPerTopic(AssignmentSpec 
assignmentSpec) {
+Map> mapTopicsToConsumers = new HashMap<>();
+Map membersData = assignmentSpec.members;
+
+for (Map.Entry memberEntry : 
membersData.entrySet()) {
+String memberId = memberEntry.getKey();
+AssignmentMemberSpec memberMetadata = memberEntry.getValue();
+Collection topics = memberMetadata.subscribedTopics;
+for (Uuid topicId: topics) {
+putList(mapTopicsToConsumers, topicId, memberId);
+}
+}
+return mapTopicsToConsumers;
+}
+
+private Map> 
getAvailablePartitionsPerTopic(AssignmentSpec assignmentSpec, Map> assignedStickyPartitionsPerTopic) {
+Map> availablePartitionsPerTopic = new HashMap<>();
+Map topicsMetadata = 

[GitHub] [kafka] dajac closed pull request #13484: KAFKA-14016: Revoke more partitions than expected in Cooperative rebalance

2023-04-12 Thread via GitHub


dajac closed pull request #13484: KAFKA-14016: Revoke more partitions than 
expected in Cooperative rebalance
URL: https://github.com/apache/kafka/pull/13484


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-12 Thread via GitHub


jeffkbkim commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1160941760


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static java.lang.Math.min;
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are :-
+ * 
+ *  1) Each consumer must get at least one partition per topic that it is 
subscribed to whenever the number of consumers is
+ *less than or equal to the number of partitions for that topic. (Range) 

+ *  2) Partitions should be assigned to consumers in a way that 
facilitates join operations where required. (Range) 
+ *This can only be done if the topics are co-partitioned in the first place
+ *Co-partitioned:-
+ *Two streams are co-partitioned if the following conditions are met:-
+ * ->The keys must have the same schemas
+ * ->The topics involved must have the same number of partitions
+ *  3) Consumers should retain as much as their previous assignment as 
possible. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm works mainly in 5 steps described below
+ * 
+ *  1) Get a map of the consumersPerTopic created using the member 
subscriptions.
+ *  2) Get a list of consumers (potentiallyUnfilled) that have not met the 
minimum required quota for assignment AND
+ * get a list of sticky partitions that we want to retain in the new 
assignment.
+ *  3) Add consumers from potentiallyUnfilled to Unfilled if they haven't 
met the total required quota = minQuota + (if necessary) extraPartition 
+ *  4) Get a list of available partitions by calculating the difference 
between total partitions and assigned sticky partitions 
+ *  5) Iterate through unfilled consumers and assign partitions from 
available partitions 
+ * 
+ * 
+ *
+ */
+public class ServerSideStickyRangeAssignor implements PartitionAssignor {
+
+public static final String RANGE_ASSIGNOR_NAME = "range";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+private static  void putList(Map> map, K key, V value) {
+List list = map.computeIfAbsent(key, k -> new ArrayList<>());
+list.add(value);
+}
+
+private static  void putSet(Map> map, K key, V value) {
+Set set = map.computeIfAbsent(key, k -> new HashSet<>());
+set.add(value);
+}
+
+static class Pair {
+private final T first;
+private final U second;
+
+public Pair(T first, U second) {
+this.first = first;
+this.second = second;
+}
+
+public T getFirst() {
+return first;
+}
+
+public U getSecond() {
+return second;
+}
+
+@Override
+public String toString() {
+return "(" + first + ", " + second + ")";
+}
+}
+
+// Returns a map of the list of consumers per Topic (keyed by topicId)
+private Map> consumersPerTopic(AssignmentSpec 
assignmentSpec) {
+Map> mapTopicsToConsumers = new HashMap<>();
+Map membersData = assignmentSpec.members;
+
+for (Map.Entry memberEntry : 
membersData.entrySet()) {

Review Comment:
   i think
   ```
   membersData.forEach( (memberId, assignmentMemberSpec) -> {
   
   })
   ```
   looks simpler. wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tinaselenge opened a new pull request, #13547: Add note for KIP-894 and DelegationTokenCommand

2023-04-12 Thread via GitHub


tinaselenge opened a new pull request, #13547:
URL: https://github.com/apache/kafka/pull/13547

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13524: MINOR: Refine `PartitionAssignor` server-side interface

2023-04-12 Thread via GitHub


rreddy-22 commented on code in PR #13524:
URL: https://github.com/apache/kafka/pull/13524#discussion_r1164542031


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/common/TopicIdToPartition.java:
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.common;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+public class TopicIdToPartition {
+private final Uuid topicId;
+private final Integer partition;
+private final Optional> rackIds;
+
+public TopicIdToPartition(Uuid topicId, Integer topicPartition, 
Optional> rackIds) {
+this.topicId = Objects.requireNonNull(topicId, "topicId cannot be 
null");
+this.partition = Objects.requireNonNull(topicPartition, 
"topicPartition cannot be null");
+this.rackIds = Objects.requireNonNull(rackIds, "rackId cannot be 
null");
+}
+
+/**
+ * @return Universally unique id representing this topic partition.
+ */
+public Uuid topicId() {
+return topicId;
+}
+
+/**
+ * @return the partition number.
+ */
+public int partition() {
+return partition;
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) {
+return true;
+}
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+TopicIdToPartition that = (TopicIdToPartition) o;
+return topicId.equals(that.topicId) &&
+partition.equals(that.partition);
+}
+
+@Override
+public int hashCode() {
+final int prime = 31;

Review Comment:
   Hey, yeah we wanted a data structure that only had a topicId to partition 
number mapping for each partition. The existing topicIdPartition class has 
topicNames as well. I didn't know how else to name it uniquely xD



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on pull request #13524: MINOR: Refine `PartitionAssignor` server-side interface

2023-04-12 Thread via GitHub


rreddy-22 commented on PR #13524:
URL: https://github.com/apache/kafka/pull/13524#issuecomment-1505791579

   > 
   
   
   
   > 
   
   I like the idea of augmenting AssignmentTopicMetadata with rackIds since 
this comes under metadata and it'll be passed to all the assignors 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13524: MINOR: Refine `PartitionAssignor` server-side interface

2023-04-12 Thread via GitHub


rreddy-22 commented on code in PR #13524:
URL: https://github.com/apache/kafka/pull/13524#discussion_r1164538097


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/common/TopicIdToPartition.java:
##
@@ -0,0 +1,76 @@
+/*

Review Comment:
   we can remove it here if its not the right place for it and then couple it 
with the assignor PRs where its used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13524: MINOR: Refine `PartitionAssignor` server-side interface

2023-04-12 Thread via GitHub


rreddy-22 commented on code in PR #13524:
URL: https://github.com/apache/kafka/pull/13524#discussion_r1164536324


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentTopicMetadata.java:
##
@@ -25,12 +25,12 @@ public class AssignmentTopicMetadata {
 /**
  * The topic name.
  */
-final String topicName;
+private final String topicName;

Review Comment:
   I think there's no harm in keeping it just in case we want to know what 
topic name is associated with what topicId wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13524: MINOR: Refine `PartitionAssignor` server-side interface

2023-04-12 Thread via GitHub


rreddy-22 commented on code in PR #13524:
URL: https://github.com/apache/kafka/pull/13524#discussion_r1164535218


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentSpec.java:
##
@@ -28,12 +28,12 @@ public class AssignmentSpec {
 /**
  * The members keyed by member id.
  */
-final Map members;
+private final Map members;
 
 /**
  * The topics' metadata keyed by topic id
  */
-final Map topics;
+private final Map topics;

Review Comment:
   I didn't think about the full implementation yet and where the rackIds will 
be passed, figured we could edit it when it came to implementing the algorithm. 
The topicIdToPartition data structure could be populated using this so maybe we 
can add a field for a map of partition to its rackIds



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13524: MINOR: Refine `PartitionAssignor` server-side interface

2023-04-12 Thread via GitHub


rreddy-22 commented on code in PR #13524:
URL: https://github.com/apache/kafka/pull/13524#discussion_r1164530365


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/MemberAssignment.java:
##
@@ -16,25 +16,28 @@
  */
 package org.apache.kafka.coordinator.group.assignor;
 
-import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 
-import java.util.Collection;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
 /**
  * The partition assignment for a consumer group member.
  */
 public class MemberAssignment {
 /**
- * The target partitions assigned to this member.
+ * The target partitions assigned to this member keyed by topicId.
  */
-final Collection targetPartitions;
+private final Map> assignedTopicIdPartitions;

Review Comment:
   sounds good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #13521: MINOR: Add tests verifying leadership during and after reassignments in KRaft mode

2023-04-12 Thread via GitHub


cmccabe merged PR #13521:
URL: https://github.com/apache/kafka/pull/13521


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13502: KAFKA-14790: Add more AddPartitionsToTxn tests in KafkaApis and Authorizer tests

2023-04-12 Thread via GitHub


dajac commented on code in PR #13502:
URL: https://github.com/apache/kafka/pull/13502#discussion_r1164521028


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2113,6 +2113,115 @@ class KafkaApisTest {
 assertEquals(expectedErrors, response.errors())
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN)
+  def testHandleAddPartitionsToTxnAuthorizationFailed(version: Short): Unit = {
+val topic = "topic"
+
+val transactionalId = "txnId1"
+val producerId = 15L
+val epoch = 0.toShort
+
+val tp = new TopicPartition(topic, 0)
+
+val addPartitionsToTxnRequest = if (version < 4) 
+  AddPartitionsToTxnRequest.Builder.forClient(
+transactionalId,
+producerId,
+epoch,
+Collections.singletonList(tp)).build(version)
+else
+  AddPartitionsToTxnRequest.Builder.forBroker(
+new AddPartitionsToTxnTransactionCollection(

Review Comment:
   correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13521: MINOR: Add tests verifying leadership during and after reassignments in KRaft mode

2023-04-12 Thread via GitHub


cmccabe commented on PR #13521:
URL: https://github.com/apache/kafka/pull/13521#issuecomment-1505765473

   Sure. The test is valid since it's testing the `PartitionChangeBuilder` and 
we do want it to have the behavior you're describing. Thanks for the 
explanation.
   
   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe opened a new pull request, #13546: Cherry-pick KAFKA-14857: Fix some MetadataLoader bugs (#13462)

2023-04-12 Thread via GitHub


cmccabe opened a new pull request, #13546:
URL: https://github.com/apache/kafka/pull/13546

   The MetadataLoader is not supposed to publish metadata updates until we have 
loaded up to the high water mark. Previously, this logic was broken, and we 
published updates immediately. This PR fixes that and adds a junit test.
   
   Another issue is that the MetadataLoader previously assumed that we would 
periodically get callbacks from the Raft layer even if nothing had happened. We 
relied on this to install new publishers in a timely fashion, for example. 
However, in older MetadataVersions that don't include NoOpRecord, this is not a 
safe assumption.
   
   Aside from the above changes, also fix a deadlock in SnapshotGeneratorTest, 
fix the log prefix for BrokerLifecycleManager, and remove metadata publishers 
on brokerserver shutdown (like we do for controllers).
   
   Reviewers: David Arthur , dengziming 

   
   Conflicts: This patch was cut down to make it easier to cherry-pick to this 
older branch. Specifically, I removed the BrokerLifecycleManager.scala logging 
change and the BrokerServer installPublishers and removeAndClosePublisher 
changes.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13502: KAFKA-14790: Add more AddPartitionsToTxn tests in KafkaApis and Authorizer tests

2023-04-12 Thread via GitHub


jolshan commented on code in PR #13502:
URL: https://github.com/apache/kafka/pull/13502#discussion_r1164515573


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2113,6 +2113,115 @@ class KafkaApisTest {
 assertEquals(expectedErrors, response.errors())
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN)
+  def testHandleAddPartitionsToTxnAuthorizationFailed(version: Short): Unit = {
+val topic = "topic"
+
+val transactionalId = "txnId1"
+val producerId = 15L
+val epoch = 0.toShort
+
+val tp = new TopicPartition(topic, 0)
+
+val addPartitionsToTxnRequest = if (version < 4) 
+  AddPartitionsToTxnRequest.Builder.forClient(
+transactionalId,
+producerId,
+epoch,
+Collections.singletonList(tp)).build(version)
+else
+  AddPartitionsToTxnRequest.Builder.forBroker(
+new AddPartitionsToTxnTransactionCollection(

Review Comment:
   hmm are you referring to where List is?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #13502: KAFKA-14790: Add more AddPartitionsToTxn tests in KafkaApis and Authorizer tests

2023-04-12 Thread via GitHub


dajac commented on code in PR #13502:
URL: https://github.com/apache/kafka/pull/13502#discussion_r1164508565


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2113,6 +2113,115 @@ class KafkaApisTest {
 assertEquals(expectedErrors, response.errors())
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN)
+  def testHandleAddPartitionsToTxnAuthorizationFailed(version: Short): Unit = {
+val topic = "topic"
+
+val transactionalId = "txnId1"
+val producerId = 15L
+val epoch = 0.toShort
+
+val tp = new TopicPartition(topic, 0)
+
+val addPartitionsToTxnRequest = if (version < 4) 
+  AddPartitionsToTxnRequest.Builder.forClient(
+transactionalId,
+producerId,
+epoch,
+Collections.singletonList(tp)).build(version)
+else
+  AddPartitionsToTxnRequest.Builder.forBroker(
+new AddPartitionsToTxnTransactionCollection(

Review Comment:
   nit: indentation seems to be off.



##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2113,6 +2113,115 @@ class KafkaApisTest {
 assertEquals(expectedErrors, response.errors())
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN)
+  def testHandleAddPartitionsToTxnAuthorizationFailed(version: Short): Unit = {
+val topic = "topic"
+
+val transactionalId = "txnId1"
+val producerId = 15L
+val epoch = 0.toShort
+
+val tp = new TopicPartition(topic, 0)
+
+val addPartitionsToTxnRequest = if (version < 4) 
+  AddPartitionsToTxnRequest.Builder.forClient(
+transactionalId,
+producerId,
+epoch,
+Collections.singletonList(tp)).build(version)
+else
+  AddPartitionsToTxnRequest.Builder.forBroker(
+new AddPartitionsToTxnTransactionCollection(
+List(new AddPartitionsToTxnTransaction()
+  .setTransactionalId(transactionalId)
+  .setProducerId(producerId)
+  .setProducerEpoch(epoch)
+  .setVerifyOnly(true)
+  .setTopics(new AddPartitionsToTxnTopicCollection(
+Collections.singletonList(new AddPartitionsToTxnTopic()
+  .setName(tp.topic)
+  .setPartitions(Collections.singletonList(tp.partition))
+).iterator()))
+).asJava.iterator())).build(version)
+
+val requestChannelRequest = buildRequest(addPartitionsToTxnRequest)
+
+val authorizer: Authorizer = mock(classOf[Authorizer])
+when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+  .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
+
+  createKafkaApis(authorizer = Some(authorizer)).handle(
+requestChannelRequest,
+RequestLocal.NoCaching
+  )
+
+val response = 
verifyNoThrottling[AddPartitionsToTxnResponse](requestChannelRequest)
+val error = if (version < 4) 
+  
response.errors().get(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID).get(tp) 
+else
+  Errors.forCode(response.data().errorCode())
+  
+val expectedError = if (version < 4) 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED else 
Errors.CLUSTER_AUTHORIZATION_FAILED
+assertEquals(expectedError, error)
+  }
+
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.ADD_PARTITIONS_TO_TXN)
+  def testAddPartitionsToTxnOperationNotAttempted(version: Short): Unit = {
+val topic = "topic"
+addTopicToMetadataCache(topic, numPartitions = 1)
+
+val transactionalId = "txnId1"
+val producerId = 15L
+val epoch = 0.toShort
+
+val tp0 = new TopicPartition(topic, 0)
+val tp1 = new TopicPartition(topic, 1)
+
+val addPartitionsToTxnRequest = if (version < 4)
+  AddPartitionsToTxnRequest.Builder.forClient(
+transactionalId,
+producerId,
+epoch,
+List(tp0, tp1).asJava).build(version)
+else
+  AddPartitionsToTxnRequest.Builder.forBroker(
+new AddPartitionsToTxnTransactionCollection(
+  List(new AddPartitionsToTxnTransaction()
+.setTransactionalId(transactionalId)
+.setProducerId(producerId)
+.setProducerEpoch(epoch)
+.setVerifyOnly(true)
+.setTopics(new AddPartitionsToTxnTopicCollection(
+  Collections.singletonList(new AddPartitionsToTxnTopic()
+.setName(tp0.topic)
+.setPartitions(List[Integer](tp0.partition, 
tp1.partition()).asJava)
+  ).iterator()))
+  ).asJava.iterator())).build(version)
+
+val requestChannelRequest = buildRequest(addPartitionsToTxnRequest)
+
+createKafkaApis().handleAddPartitionsToTxnRequest(
+  requestChannelRequest,
+  RequestLocal.NoCaching
+)
+
+val response = 
verifyNoThrottling[AddPartitionsToTxnResponse](requestChannelRequest)
+
+def checkErrorForTp(tp: TopicPartition): Unit = {
+  val error = if (version < 4)
+

[GitHub] [kafka] mjsax merged pull request #13444: KAFKA-14491: [21/N] Docs updates for versioned state stores

2023-04-12 Thread via GitHub


mjsax merged PR #13444:
URL: https://github.com/apache/kafka/pull/13444


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13524: MINOR: Refine `PartitionAssignor` server-side interface

2023-04-12 Thread via GitHub


rreddy-22 commented on code in PR #13524:
URL: https://github.com/apache/kafka/pull/13524#discussion_r1164499488


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentMemberSpec.java:
##
@@ -29,37 +31,53 @@ public class AssignmentMemberSpec {
 /**
  * The instance ID if provided.
  */
-final Optional instanceId;
+private final Optional instanceId;
 
 /**
  * The rack ID if provided.
  */
-final Optional rackId;
+private final Optional rackId;
 
 /**
- * The topics that the member is subscribed to.
+ * The topicIds of topics that the member is subscribed to.
  */
-final Collection subscribedTopics;
+private final Collection subscribedTopics;
 
 /**
- * The current target partitions of the member.
+ * Partitions assigned for this member keyed by topicId
  */
-final Collection targetPartitions;
+private final Map> assignedTopicIdPartitions;

Review Comment:
   I thought it would be more clear if it was topicIdpartitions since we're 
keying by topicId? Should we change it?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14898) [ MirrorMaker ] sync.topic.configs.enabled not working as expected

2023-04-12 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711511#comment-17711511
 ] 

Greg Harris edited comment on KAFKA-14898 at 4/12/23 6:25 PM:
--

[~bseenu] Are you running MM2 with the MirrorMaker dedicated launcher? There's 
a known issue where a multi-node cluster is unable to persist configuration 
changes: https://issues.apache.org/jira/browse/KAFKA-10586 which has a fix to 
be released in 3.5.0.
You can verify that the above is affecting you if you see these log messages: 
[https://github.com/apache/kafka/blob/9c0caca6600cd18c161f7be48a7745a98ff091dd/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java#L236-L241]
 indicating that the existing connector configuration is being used (meaning 
that your new connector configuration with enabled=false is being dropped)
In the meantime, you can cold-start your cluster (shut down all nodes, then 
bring them all back) and that will ensure that at least one of the nodes takes 
your on-disk configuration and applies it to the connectors. 


was (Author: gharris1727):
[~bseenu] Are you running MM2 with the MirrorMaker dedicated launcher? There's 
a known issue where a multi-node cluster is unable to persist configuration 
changes: https://issues.apache.org/jira/browse/KAFKA-10586 which has a fix to 
be released in 3.5.0.
In the meantime, you can cold-start your cluster (shut down all nodes, then 
bring them all back) and that will ensure that at least one of the nodes takes 
your on-disk configuration and applies it to the connectors. 

> [ MirrorMaker ] sync.topic.configs.enabled not working as expected
> --
>
> Key: KAFKA-14898
> URL: https://issues.apache.org/jira/browse/KAFKA-14898
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.4.0
>Reporter: Srinivas Boga
>Priority: Major
>  Labels: mirrormaker
>
> Hello,
> In my replication set up , i do not want to sync the topic configs, the use 
> case is to have different retention time for the topic on the target cluster, 
> I am passing the config
> {code:java}
>  sync.topic.configs.enabled = false{code}
> but this is not working as expected the topic retention time is being set to 
> whatever is being set in the source cluster, looking at the mirrormaker logs 
> i can see that MirrorSourceConnector is still setting the above config as true
> {code:java}
> [2023-04-12 17:04:55,184] INFO [MirrorSourceConnector|task-8] ConsumerConfig 
> values:
>         allow.auto.create.topics = true
>         auto.commit.interval.ms = 5000
>         auto.include.jmx.reporter = true
>         auto.offset.reset = earliest
>         bootstrap.servers = [sourcecluster.com:9092]
>         check.crcs = true
>         client.dns.lookup = use_all_dns_ips
>         client.id = consumer-null-2
>         client.rack =
>         connections.max.idle.ms = 54
>         default.api.timeout.ms = 6
>         enable.auto.commit = false
>         exclude.internal.topics = true
>         fetch.max.bytes = 52428800
>         fetch.max.wait.ms = 500
>         fetch.min.bytes = 1
>         group.id = null
>         group.instance.id = null
>         heartbeat.interval.ms = 3000
>         interceptor.classes = []
>         internal.leave.group.on.close = true
>         internal.throw.on.fetch.stable.offset.unsupported = false
>         isolation.level = read_uncommitted
>         key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>         max.partition.fetch.bytes = 1048576
>         max.poll.interval.ms = 30
>         max.poll.records = 500
>         metadata.max.age.ms = 30
>         metric.reporters = []
>         metrics.num.samples = 2
>         metrics.recording.level = INFO
>         metrics.sample.window.ms = 3
>         partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor, class 
> org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
>         receive.buffer.bytes = 65536
>         reconnect.backoff.max.ms = 1000
>         reconnect.backoff.ms = 50
>         request.timeout.ms = 3
>         retry.backoff.ms = 100
>         sasl.client.callback.handler.class = null
>         sasl.jaas.config = null
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>         sasl.kerberos.min.time.before.relogin = 6
>         sasl.kerberos.service.name = null
>         sasl.kerberos.ticket.renew.jitter = 0.05
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>         sasl.login.callback.handler.class = null
>         sasl.login.class = null
>         sasl.login.connect.timeout.ms = null
>         sasl.login.read.timeout.ms = null
>         sasl.login.refresh.buffer.seconds = 300
>         

  1   2   3   >