[jira] [Resolved] (KAFKA-16903) Task should consider producer error previously occurred for different task

2024-06-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16903.
-
Resolution: Fixed

> Task should consider producer error previously occurred for different task
> --
>
> Key: KAFKA-16903
> URL: https://issues.apache.org/jira/browse/KAFKA-16903
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 3.8.0
>
>
> A task does not consider a producer error that occurred for a different task.
> The following log messages show the issue.
> Task {{0_2}} of a Streams app (EOSv2 enabled) crashes while sending records 
> with an {{InvalidTxnStateException}}:
> {code:java}
> [2024-05-30 10:20:35,881] ERROR [kafka-producer-network-thread | 
> i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread 
> [i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_2] Error encountered 
> sending record to topic stream-soak-test-node-name-repartition for task 0_2 
> due to:
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer 
> attempted a transactional operation in an invalid state.
> Exception handler choose to FAIL the processing, no more records would be 
> sent. (org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer 
> attempted a transactional operation in an invalid state.
> [2024-05-30 10:20:35,886] ERROR [i-0af25f5c2bd9bba31-StreamThread-1] 
> stream-thread [i-0af25f5c2bd9bba31-StreamThread-1] Failed to process stream 
> task 0_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic stream-soak-test-node-name-repartition for task 0_2 due to:
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer 
> attempted a transactional operation in an invalid state.
> Exception handler choose to FAIL the processing, no more records would be 
> sent.
>   at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:316)
>   at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:285)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1565)
>   at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:311)
>   at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:272)
>   at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:236)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:829)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:818)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:770)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$null$2(Sender.java:627)
>   at java.util.ArrayList.forEach(ArrayList.java:1259)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$3(Sender.java:612)
>   at java.lang.Iterable.forEach(Iterable.java:75)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$9(Sender.java:916)
>   at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
>   at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:348)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250)
>   at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The 
> producer attempted a transactional operation in an invalid state.
> {code} 
> Just before the exception of task 0_2  also task 0_0  encountered an 
> exception while producing:
> {code:java}
> [2024-05-

[jira] [Updated] (KAFKA-16903) Task should consider producer error previously occurred for different task

2024-06-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16903:

Fix Version/s: 3.8.0

> Task should consider producer error previously occurred for different task
> --
>
> Key: KAFKA-16903
> URL: https://issues.apache.org/jira/browse/KAFKA-16903
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 3.8.0
>
>
> A task does not consider a producer error that occurred for a different task.
> The following log messages show the issue.
> Task {{0_2}} of a Streams app (EOSv2 enabled) crashes while sending records 
> with an {{InvalidTxnStateException}}:
> {code:java}
> [2024-05-30 10:20:35,881] ERROR [kafka-producer-network-thread | 
> i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread 
> [i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_2] Error encountered 
> sending record to topic stream-soak-test-node-name-repartition for task 0_2 
> due to:
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer 
> attempted a transactional operation in an invalid state.
> Exception handler choose to FAIL the processing, no more records would be 
> sent. (org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer 
> attempted a transactional operation in an invalid state.
> [2024-05-30 10:20:35,886] ERROR [i-0af25f5c2bd9bba31-StreamThread-1] 
> stream-thread [i-0af25f5c2bd9bba31-StreamThread-1] Failed to process stream 
> task 0_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic stream-soak-test-node-name-repartition for task 0_2 due to:
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer 
> attempted a transactional operation in an invalid state.
> Exception handler choose to FAIL the processing, no more records would be 
> sent.
>   at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:316)
>   at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:285)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1565)
>   at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:311)
>   at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:272)
>   at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:236)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:829)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:818)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:770)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$null$2(Sender.java:627)
>   at java.util.ArrayList.forEach(ArrayList.java:1259)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$3(Sender.java:612)
>   at java.lang.Iterable.forEach(Iterable.java:75)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$9(Sender.java:916)
>   at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
>   at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:348)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250)
>   at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The 
> producer attempted a transactional operation in an invalid state.
> {code} 
> Just before the exception of task 0_2  also task 0_0  encountered an 
> exception while producing:
> {code:java}
> [2024-05-

[jira] [Resolved] (KAFKA-16903) Task should consider producer error previously occurred for different task

2024-06-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16903.
-
Resolution: Fixed

> Task should consider producer error previously occurred for different task
> --
>
> Key: KAFKA-16903
> URL: https://issues.apache.org/jira/browse/KAFKA-16903
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 3.8.0
>
>
> A task does not consider a producer error that occurred for a different task.
> The following log messages show the issue.
> Task {{0_2}} of a Streams app (EOSv2 enabled) crashes while sending records 
> with an {{InvalidTxnStateException}}:
> {code:java}
> [2024-05-30 10:20:35,881] ERROR [kafka-producer-network-thread | 
> i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread 
> [i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_2] Error encountered 
> sending record to topic stream-soak-test-node-name-repartition for task 0_2 
> due to:
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer 
> attempted a transactional operation in an invalid state.
> Exception handler choose to FAIL the processing, no more records would be 
> sent. (org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer 
> attempted a transactional operation in an invalid state.
> [2024-05-30 10:20:35,886] ERROR [i-0af25f5c2bd9bba31-StreamThread-1] 
> stream-thread [i-0af25f5c2bd9bba31-StreamThread-1] Failed to process stream 
> task 0_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic stream-soak-test-node-name-repartition for task 0_2 due to:
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer 
> attempted a transactional operation in an invalid state.
> Exception handler choose to FAIL the processing, no more records would be 
> sent.
>   at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:316)
>   at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:285)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1565)
>   at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:311)
>   at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:272)
>   at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:236)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:829)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:818)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:770)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$null$2(Sender.java:627)
>   at java.util.ArrayList.forEach(ArrayList.java:1259)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$3(Sender.java:612)
>   at java.lang.Iterable.forEach(Iterable.java:75)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$9(Sender.java:916)
>   at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
>   at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:348)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250)
>   at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The 
> producer attempted a transactional operation in an invalid state.
> {code} 
> Just before the exception of task 0_2  also task 0_0  encountered an 
> exception while producing:
> {code:java}
> [2024-05-

[jira] [Commented] (KAFKA-16811) Punctuate Ratio metric almost impossible to track

2024-06-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852531#comment-17852531
 ] 

Matthias J. Sax commented on KAFKA-16811:
-

Let's more the discussion on the PR

> Punctuate Ratio metric almost impossible to track
> -
>
> Key: KAFKA-16811
> URL: https://issues.apache.org/jira/browse/KAFKA-16811
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sebastien Viale
>Assignee: Ganesh Sadanala
>Priority: Minor
>  Time Spent: 48h
>  Remaining Estimate: 0h
>
> The Punctuate ratio metric is returned after the last record of the poll 
> loop. It is recomputed in every poll loop.
> After a puntuate, the value is close to 1, but there is little chance that 
> metric is sampled at this time. 
> So its value is almost always 0.   
> A solution could be to apply a kind of "sliding window" to it and report the 
> value for the last x seconds.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16811) Punctuate Ratio metric almost impossible to track

2024-06-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852231#comment-17852231
 ] 

Matthias J. Sax commented on KAFKA-16811:
-

[~ganesh_6] – WordCountProcessorDemo does not contain any punctuation calls, 
right? Or did you modify the code and added punctuations?

> Punctuate Ratio metric almost impossible to track
> -
>
> Key: KAFKA-16811
> URL: https://issues.apache.org/jira/browse/KAFKA-16811
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sebastien Viale
>Assignee: Ganesh Sadanala
>Priority: Minor
>  Time Spent: 48h
>  Remaining Estimate: 0h
>
> The Punctuate ratio metric is returned after the last record of the poll 
> loop. It is recomputed in every poll loop.
> After a puntuate, the value is close to 1, but there is little chance that 
> metric is sampled at this time. 
> So its value is almost always 0.   
> A solution could be to apply a kind of "sliding window" to it and report the 
> value for the last x seconds.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16874) Remove old TaskAssignor interface

2024-06-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16874:

Component/s: streams

> Remove old TaskAssignor interface
> -
>
> Key: KAFKA-16874
> URL: https://issues.apache.org/jira/browse/KAFKA-16874
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>
> Once we have the new HAAssignor that implements the new TaskAssignor 
> interface, we can remove the old TaskAssignor interface. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15905) Restarts of MirrorCheckpointTask should not permanently interrupt offset translation

2024-06-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15905:

Fix Version/s: 3.8.0
   (was: 3.8)

> Restarts of MirrorCheckpointTask should not permanently interrupt offset 
> translation
> 
>
> Key: KAFKA-15905
> URL: https://issues.apache.org/jira/browse/KAFKA-15905
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Assignee: Edoardo Comar
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> Executive summary: When the MirrorCheckpointTask restarts, it loses the state 
> of checkpointsPerConsumerGroup, which limits offset translation to records 
> mirrored after the latest restart.
> For example, if 1000 records are mirrored and the OffsetSyncs are read by 
> MirrorCheckpointTask, the emitted checkpoints are cached, and translation can 
> happen at the ~500th record. If MirrorCheckpointTask restarts, and 1000 more 
> records are mirrored, translation can happen at the ~1500th record, but no 
> longer at the ~500th record.
> Context:
> Before KAFKA-13659, MM2 made translation decisions based on the 
> incompletely-initialized OffsetSyncStore, and the checkpoint could appear to 
> go backwards temporarily during restarts. To fix this, we forced the 
> OffsetSyncStore to initialize completely before translation could take place, 
> ensuring that the latest OffsetSync had been read, and thus providing the 
> most accurate translation.
> Before KAFKA-14666, MM2 translated offsets only off of the latest OffsetSync. 
> Afterwards, an in-memory sparse cache of historical OffsetSyncs was kept, to 
> allow for translation of earlier offsets. This came with the caveat that the 
> cache's sparseness allowed translations to go backwards permanently. To 
> prevent this behavior, a cache of the latest Checkpoints was kept in the 
> MirrorCheckpointTask#checkpointsPerConsumerGroup variable, and offset 
> translation remained restricted to the fully-initialized OffsetSyncStore.
> Effectively, the MirrorCheckpointTask ensures that it translates based on an 
> OffsetSync emitted during it's lifetime, to ensure that no previous 
> MirrorCheckpointTask emitted a later sync. If we can read the checkpoints 
> emitted by previous generations of MirrorCheckpointTask, we can still ensure 
> that checkpoints are monotonic, while allowing translation further back in 
> history.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once

2024-06-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16622:

Fix Version/s: 3.8.0
   (was: 3.8)

> Mirromaker2 first Checkpoint not emitted until consumer group fully catches 
> up once
> ---
>
> Key: KAFKA-16622
> URL: https://issues.apache.org/jira/browse/KAFKA-16622
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.0, 3.6.2, 3.8.0
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
> Attachments: connect.log.2024-04-26-10.zip, 
> edo-connect-mirror-maker-sourcetarget.properties
>
>
> We observed an excessively delayed emission of the MM2 Checkpoint record.
> It only gets created when the source consumer reaches the end of a topic. 
> This does not seem reasonable.
> In a very simple setup :
> Tested with a standalone single process MirrorMaker2 mirroring between two 
> single-node kafka clusters(mirromaker config attached) with quick refresh 
> intervals (eg 5 sec) and a small offset.lag.max (eg 10)
> create a single topic in the source cluster
> produce data to it (e.g. 1 records)
> start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec 
> between polls which commits after each poll
> watch the Checkpoint topic in the target cluster
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \
>   --topic source.checkpoints.internal \
>   --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \
>--from-beginning
> -> no record appears in the checkpoint topic until the consumer reaches the 
> end of the topic (ie its consumer group lag gets down to 0).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16757) Fix broker re-registration issues around MV 3.7-IV2

2024-06-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16757:

Fix Version/s: 3.8.0
   (was: 3.8)

> Fix broker re-registration issues around MV 3.7-IV2
> ---
>
> Key: KAFKA-16757
> URL: https://issues.apache.org/jira/browse/KAFKA-16757
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> When upgrading from a MetadataVersion older than 3.7-IV2, we need to resend 
> the broker registration, so that the controller can record the storage 
> directories. The current code for doing this has several problems, however. 
> One is that it tends to trigger even in cases where we don't actually need 
> it. Another is that when re-registering the broker, the broker is marked as 
> fenced.
> This PR moves the handling of the re-registration case out of 
> BrokerMetadataPublisher and into BrokerRegistrationTracker. The 
> re-registration code there will only trigger in the case where the broker 
> sees an existing registration for itself with no directories set. This is 
> much more targetted than the original code.
> Additionally, in ClusterControlManager, when re-registering the same broker, 
> we now preserve its fencing and shutdown state, rather than clearing those. 
> (There isn't any good reason re-registering the same broker should clear 
> these things... this was purely an oversight.) Note that we can tell the 
> broker is "the same" because it has the same IncarnationId.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-06-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16692:

Fix Version/s: 3.8.0
   (was: 3.8)

> InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled when upgrading from kafka 3.5 to 3.6 
> 
>
> Key: KAFKA-16692
> URL: https://issues.apache.org/jira/browse/KAFKA-16692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.7.0, 3.6.1, 3.8
>Reporter: Johnson Okorie
>Assignee: Justine Olshan
>Priority: Major
> Fix For: 3.8.0, 3.7.1, 3.6.3
>
>
> We have a kafka cluster running on version 3.5.2 that we are upgrading to 
> 3.6.1. This cluster has a lot of clients with exactly one semantics enabled 
> and hence creating transactions. As we replaced brokers with the new 
> binaries, we observed lots of clients in the cluster experiencing the 
> following error:
> {code:java}
> 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
> transactionalId=] Got error produce response with 
> correlation id 6402937 on topic-partition , retrying 
> (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The 
> server disconnected before a response was received.{code}
> On inspecting the broker, we saw the following errors on brokers still 
> running Kafka version 3.5.2:
>  
> {code:java}
> message:     
> Closing socket for  because of error
> exception_exception_class:    
> org.apache.kafka.common.errors.InvalidRequestException
> exception_exception_message:    
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled
> exception_stacktrace:    
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
> {code}
> On the new brokers running 3.6.1 we saw the following errors:
>  
> {code:java}
> [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
> node 1043 with a network exception.{code}
>  
> I can also see this :
> {code:java}
> [AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
> ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 
> being disconnected (elapsed time since creation: 11ms, elapsed time since 
> send: 4ms, request timeout: 3ms){code}
> We started investigating this issue and digging through the changes in 3.6, 
> we came across some changes introduced as part of KAFKA-14402 that we thought 
> might lead to this behaviour. 
> First we could see that _transaction.partition.verification.enable_ is 
> enabled by default and enables a new code path that culminates in we sending 
> version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
> [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].
> From a 
> [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
> on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
> possible as the following code paths should prevent version 4 
> ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
>  
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]
> However, these requests are still sent to other brokers in our environment.
> On further inspection of the code, I am wondering if the following code path 
> could lead to this issue:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]
> In this scenario, we don't have any _NodeApiVersions_ available for the 
> specified nodeId and potentially skipping the _latestUsableVersion_ check. I 
> am wondering if it is possible that because _discoverBrokerVersions_ is set 
> to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it 
> skips fetching {_}NodeApiVersions{_}? I can see that we create the network 
> client here:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641]
> The _NetworkUtils.buildNe

[jira] [Updated] (KAFKA-15045) Move Streams task assignor to public configs

2024-06-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15045:

Fix Version/s: 3.8.0

> Move Streams task assignor to public configs
> 
>
> Key: KAFKA-15045
> URL: https://issues.apache.org/jira/browse/KAFKA-15045
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: kip
> Fix For: 3.8.0
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16875) Replace ClientState with TaskAssignment when creating individual consumer Assignments

2024-06-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16875:

Component/s: streams

> Replace ClientState with TaskAssignment when creating individual consumer 
> Assignments
> -
>
> Key: KAFKA-16875
> URL: https://issues.apache.org/jira/browse/KAFKA-16875
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>
> In the initial implementation of KIP-924 in version 3.8, we converted from 
> the new TaskAssignor's output type (TaskAssignment) into the old 
> ClientState-based assignment representation. This allowed us to plug in a 
> custom assignor without converting all the internal mechanisms that occur 
> after the KafkaStreams client level assignment and process it into a consumer 
> level assignment.
> However we ultimately want to get rid of ClientState altogether, so we need 
> to invert this logic so that we instead convert the ClientState into a 
> TaskAssignment and then use the TaskAssignment to process the assigned tasks 
> into consumer Assignments



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16873) Remove StreamsConfig.INTERNAL_TASK_ASSIGNOR_CLASS

2024-06-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16873:

Component/s: streams

> Remove StreamsConfig.INTERNAL_TASK_ASSIGNOR_CLASS
> -
>
> Key: KAFKA-16873
> URL: https://issues.apache.org/jira/browse/KAFKA-16873
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>
> Once we have all the out-of-the-box assignors implementing the new 
> TaskAssignor interface that corresponds to the new public task assignor 
> config, we can remove the old internal task assignor config altogether. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16872) Remove ClientState class

2024-06-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16872:

Component/s: streams

> Remove ClientState class
> 
>
> Key: KAFKA-16872
> URL: https://issues.apache.org/jira/browse/KAFKA-16872
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>
> One of the end-state goals of KIP-924 is to remove the ClientState class 
> altogether. There are some blockers to this such as the removal of the old 
> internal task assignor config and the old HAAssignor, so this ticket will 
> probably be one of the very last KAFKA-16868 subtasks to be tackled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16871) Clean up internal AssignmentConfigs class in Streams

2024-06-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16871:

Component/s: streams

> Clean up internal AssignmentConfigs class in Streams
> 
>
> Key: KAFKA-16871
> URL: https://issues.apache.org/jira/browse/KAFKA-16871
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: newbie, newbie++
>
> In KIP-924 we added a new public AssignmentConfigs class to hold all of the, 
> you guessed it, assignment related configs.
> However, there is an existing config class of the same name and largely the 
> same contents but that's in an internal package, specifically inside the 
> AssignorConfiguration class.
> We should remove the old AssignmentConfigs class that's in 
> AssignorConfiguration and replace any usages of it with the new public 
> AssignmentConfigs that we added in KIP-924



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16868) Post KIP-924 StreamsPartitionAssignor code cleanup

2024-06-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16868:

Component/s: streams

> Post KIP-924 StreamsPartitionAssignor code cleanup
> --
>
> Key: KAFKA-16868
> URL: https://issues.apache.org/jira/browse/KAFKA-16868
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>
> Making an umbrella task for all of the tech debt and code consolidation 
> cleanup work that can/should be done following the implementation of 
> [KIP-924: customizable task assignment for 
> Streams|https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams]
> Most of this revolves around deduplicating code once it's no longer needed, 
> including classes like the ClientState, StandbyTaskAssignor and related 
> elements, and the old TaskAssignor interface along with its implementations.
> Note that in 3.8, the first version in which KIP-924 was released, we just 
> added the new public config and new TaskAssignor interface but did not get 
> rid of the old internal config or old TaskAssignor interface. If neither 
> config is set in 3.8 we still default to the old HAAssignor, as a kind of 
> opt-in feature flag, and internally will convert the output of the new 
> TaskAssignor into the old style of ClientState-based assignment tracking. We 
> intend to clean up all of the old code and eventually support only the new 
> TaskAssignor interface as well as converting everything internally from the 
> ClientState to the TaskAssignment/KafkaStreamsAssignment style output



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1049: Add config log.summary.interval.ms to Kafka Streams

2024-06-04 Thread Matthias J. Sax

Jiang,

Thanks for the KIP. I think it make sense. I agree with Sophie that the 
KIP writeup should be improved a little bit, with regard to public API 
(ie, config) which are changed.


The only other idea I had to avoid this issue would be some internal 
change: we would introduce a new logger class allowing you to disable 
logging for this specific logger. However, not sure how we could make 
this a public contract you could rely on? -- Also, given Sophie's 
comment about potential other logs we might want to configure, it seems 
like a good idea to add this config.


@Sophie: what other logs did you have in mind?


One more nit: rejected alternatives lists `log.summary.interval.ms` as 
rejected -- seems this needs to be removed?



-Matthias

On 5/29/24 12:56 AM, Sophie Blee-Goldman wrote:

Sure, as I said I'm supportive of this KIP. Just wanted to mention how the
issue could be mitigated in the meantime since the description made it
sound like you were suffering from excessive logs right now. Apologies if I
misinterpreted that.

I do think it would be nice to have a general setting for log intervals in
Streams. There are some other places where a regular summary log might be
nice. The config name you proposed is generic enough that we could reuse it
for other areas where we'd like to log summaries, so this seems like a good
config to introduce

My only question/request is that the KIP doesn't mention where this config
is being added. I assume from the context and Motivation section that
you're proposing to add this to StreamsConfig, which makes sense to me. But
please update the KIP to say this somewhere.

Otherwise the KIP LGTM. Anyone else have thoughts on this?

On Thu, May 23, 2024 at 12:19 AM jiang dou  wrote:


Thank you for your reply,
I do not recommend agreeing set log level is WARN, because INFO level logs
should be useful


Sophie Blee-Goldman  于2024年5月23日周四 04:30写道:


Thanks for the KIP!

I'm not against adding this as a config for this per se, but if this is
causing you trouble right now you should be able to disable it via log4j
configuration so you don't need to wait for a fix in Kafka Streams

itself.

Putting something like this in your log4j will shut off the offending

log:




log4j.logger.org.apache.kafka.streams.processor.internals.StreamThread=WARN


On Wed, May 22, 2024 at 6:46 AM jiang dou  wrote:


Hi


I would like to propose a change in the kafka-stream summary log。

Now the summary of stream-tread is record every two minutes, and not
support close  or update intervals.

When the kafka  is running, this is absolutely unnecessary and even

harmful

since it fills the logs and thus storage space with unwanted and

useless

data.

I propose adding a configuration to control the output interval or

disable

it

KIP:





https://cwiki.apache.org/confluence/display/KAFKA/KIP-1049%3A+Add+config+log.summary.interval.ms+to+Kafka+Streams










Re: [DISCUSS] KIP-655: Add deduplication processor in kafka-streams

2024-06-04 Thread Matthias J. Sax

Ayoub,

thanks for resurrecting this KIP. I think a built-in de-duplication 
operator will be very useful.



Couple of questions:



100: `deduplicationKeySelector`

Is this the best name? It might indicate that we select a "key" what is 
an overloaded term... Maybe we could use `Field` or `Id` or `Attribute` 
instead of `Key` in the name? Just brainstorming. If we think `Key` is 
the best word, I am also ok with it.




101: `Deduplicated` class

You propose to add static methods `keySerde()` and `valueSerde()` -- in 
other config classes, we use only `with(keySerde, valueSerde)` as we try 
to use the "builder" pattern, and avoid too many overloads. I would 
prefer to omit both methods you suggest and just use a single `with` for 
both serdes.


Similarly, I thing we don't want to add `with(...)` which takes all 
parameters at once (which should only be 3 parameters, not 4 as it's 
currently in the KIP)?




102: Usage of `WindowedStore`:

Would this be efficient? The physical byte layout it "" for 
the store key, so it would be difficult to do an efficient lookup for a 
given "de-duplication key" to discard duplicates, as we don't know the 
timestamp of the first record with the same "de-duplication key".


This boils down to the actual de-duplication logic (some more comments 
below), but what you propose seems to require expensive range-scans what 
could be cost prohibitive in practice. I think we need to find a way to 
use efficient key-point-lookups to make this work.




103: "Processing logic":

Might need some updates (Cf 102 comment). I am not sure if I fully 
understand the logic: cf 105 below.




104:

If no entries found → forward the record + save the record in the store 


This part is critical, and we should discuss in detail. In the end, 
de-duplication does only make sense when EOS is used, and we might want 
to call this out (eg, on the JavaDocs)? But if used with ALOS, it's very 
difficult to ensure that we never lose data... Your proposal to 
first-forward goes into the right direction, but does not really solve 
the problem entirely:


Even if we forward the message first, all downstream processing happens, 
`context.forward()` returns and we update the state store, we could now 
crash w/o committing offsets. For this case, we have no guarantee that 
the result records where published (as we did not flush the producer 
yet), but when re-reading from the input topic, we would find the record 
in the store and incorrectly drop as duplicate...


I think the only solution to make this work would be to use TX-state 
stores in combination with ALOS as proposed via KIP-892?


Using an in-memory store won't help much either? The producer could have 
send the write into the changelog topic, but not into the result topic, 
and thus we could still not guarantee ALOS...?


How do we want to go about this? We could also say, this new operator 
only works with EOS. Would this be too restrictive? -- At lest for know, 
until KIP-892 lands, and we could relax it?




105: "How to detect late records"

In the end, it seems to boil down to determine which of the records to 
forward and which record to drop, for (1) the regular case and (2) the 
out-of-order data case.


Regular case (no out-of-order data): For this case, offset and ts order 
is the same, and we can forward the first record we get. All later 
record within "de-duplication period" with the same "de-duplication key" 
would be dropped. If a record with the same "de-duplication key" arrives 
after "de-duplication period" passed, we cannot drop it any longer, but 
would still forward it, as by the contract of the operator / 
de-duplication period.


For the out-of-order case: The first question we need to answer is, do 
we want to forward the record with the smallest offset or the record 
with the smallest ts? Logically, forwarding with the smallest ts might 
be "more correct", however, it implies we could only forward it after 
"de-duplication period" passed, what might be undesired latency? Would 
this be desired/acceptable?


In contrast, if we forward record with the smallest offset (this is what 
you seem to propose) we don't have a latency issue, but of course the 
question what records to drop is more tricky to answer: it seems you 
propose to compare the time difference of the stored record to the 
current record, but I am wondering why? Would it not be desired to drop 
all duplicates independent of their ts, as long as we find a record in 
the store? Would be good to get some more motivation and tradeoffs 
discussed about the different strategies we could use.


You also propose to drop _any_ late record: I am also not sure if that's 
desired? Could this not lead to data loss? Assume we get a late record, 
but in fact there was never a duplicate? Why would we want to drop it? 
If there is a late record which is indeed a duplicate, but we purged the 
original record from the store already, it seems to be the same case as 
for the "

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-06-04 Thread Matthias J. Sax
rds,
Nick

On Wed, 29 May 2024 at 09:28, Nick Telford  wrote:


Hi everyone,

Sorry I haven't got around to updating the KIP yet. Now that I've wrapped
up KIP-989, I'm going to be working on 1035 starting today.

I'll update the KIP first, and then call a vote.

Regards,
Nick

On Wed, 29 May 2024 at 07:25, Bruno Cadonna  wrote:


Totally agree on moving forward and starting the VOTE!

However, the KIP should be updated with the new info before starting the
VOTE.

Best,
Bruno

On 5/29/24 2:36 AM, Matthias J. Sax wrote:

Sounds like a good plan. -- I think we are still wrapping up 3.8
release, but would also like to move forward with with one.

Should we start a VOTE?

For merging PRs we need to wait after code freeze, and 3.8 branch was
but. But we could start reviewing PRs before this already.


-Matthias

On 5/17/24 3:05 AM, Nick Telford wrote:

Hi everyone,

As discussed on the Zoom call, we're going to handle rebalance
meta-data by:

- On start-up, Streams will open each store and read its changelog
offsets
into an in-memory cache. This cache will be shared among all
StreamThreads.
- On rebalance, the cache will be consulted for Task offsets for any

Task

that is not active on any instance-local StreamThreads. If the Task is
active on *any* instance-local StreamThread, we will report the Task
lag as
"up to date" (i.e. -1), because we know that the local state is

currently

up-to-date.

We will avoid caching offsets across restarts in the legacy

".checkpoint"

file, so that we can eliminate the logic for handling this class. If
performance of opening/closing many state stores is poor, we can
parallelise it by forking off a thread for each Task directory when
reading
the offsets.

I'll update the KIP later today to reflect this design, but I will

try to

keep it high-level, so that the exact implementation can vary.

Regards,

Nick

On Thu, 16 May 2024 at 03:12, Sophie Blee-Goldman <

sop...@responsive.dev>

wrote:


103: I like the idea of immediately deprecating #managesOffsets and
aiming
to make offset management mandatory in the long run. I assume we
would also
log a warning for any custom stores that return "false" from this
method to
encourage custom store implementations to start doing so? My only
question/concern is that if we want folks to start managing their own
offsets then we should make this transition easy for them, perhaps by
exposing some public utility APIs for things that are currently
handled by
Kafka Streams such as reading/writing checkpoint files. Maybe it
would be
useful to include a small example in the KIP of what it would
actually mean
to "manage your own offsets" -- I know (all too well) that plugging

in

custom storage implementations is not easy and most people who do
this are
probably fairly advanced users, but offset management will be a
totally new
ballgame to most people people and this kind of feels like throwing

them

off the deep end. We should at least provide a lifejacket via some
kind of
utility API and/or example

200. There's been a lot of back and forth on the rebalance

metadata/task

lag computation question, so forgive me if I missed any part of

this,

but I
think we've landed at the right idea here. To summarize: the "tl;dr"
explanation is that we'll write the checkpoint file only on close

and

will
account for hard-crash scenarios by opening up the stores on startup

and

writing a checkpoint file for any missing tasks. Does that sound

about

right?

A few clarifications:
I think we're all more or less on the same page here but just to be
absolutely clear, the task lags for each task directory found on

disk

will
be reported by only one of the StreamThreads, and each StreamThread

will

report lags only for tasks that it already owns or are not assigned
to any
other StreamThread in the client. In other words, we only need to

get

the
task lag for completely unassigned/unlocked tasks, which means if
there is
a checkpoint file at all then it must be up-to-date, because there

is no

other StreamThread actively writing to that state store (if so then

only

that StreamThread would report lag for that particular task).

This still leaves the "no checkpoint at all" case which as previously
mentioned can occur after a  hard-crash. Luckily we only have to

worry

about this once, after starting up again following said hard crash.
We can
simply open up each of the state stores before ever joining the
group, get
the offsets from rocksdb, and write them to a new checkpoint file.

After

that, we can depend on the checkpoints written at close and won't
have to
open up any stores that aren't already assigned for the reasons laid
out in
the paragraph above.

As for the specific mechanism and which thread-does-what, since

there

were
some questions, this is how I'm imagining the process:

 1.   The general idea is that 

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-06-04 Thread Matthias J. Sax
 favor of option #2 (pull/15959
<https://github.com/apache/kafka/pull/15959>) as I believe including
general task metadata may be useful and this API would be easy to

evolve if

we wanted to add anything else in a future KIP. The current KIP was

updated

using this option, although nothing related to the rack ids has been

merged

yet. We're happy to defer to anyone with a strong preference for

either of

these options, or a new suggestion of their own.

As always, let us know if you have any questions or concerns or

feedback of

any kind.

Thanks!


On Mon, May 6, 2024 at 1:33 PM Sophie Blee-Goldman <

sop...@responsive.dev>

wrote:


Thanks guys. Updated the error codes in both the code and the

explanation

under "Public Changes". To sum up, here are the error codes listed

in the

KIP:

enum AssignmentError {
  NONE,
  ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES,
  ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS,
  INVALID_STANDBY_TASK,
  UNKNOWN_PROCESS_ID,
  UNKNOWN_TASK_ID
}

Anything missing?

(also updated all the code block headings, thanks for noticing that

Bruno)


On Fri, May 3, 2024 at 9:33 AM Matthias J. Sax 

wrote:



117f: Good point by Bruno. We should check for this, and could have

an

additional `INVALID_STANDBY_TASK` error code?


-Matthias

On 5/3/24 5:52 AM, Guozhang Wang wrote:

Hi Sophie,

Re: As for the return type of the TaskAssignmentUtils, I think that
makes sense. LGTM.

On Fri, May 3, 2024 at 2:26 AM Bruno Cadonna 

wrote:


Hi Sophie,

117f:
I think, removing the STATEFUL and STATELESS types is not enough

to

avoid the error Guozhang mentioned. The StreamsPartitionAssignor

passes

the information whether a task is stateless or stateful into the

task

assignor. However, the task assignor can return a standby task

for a

stateless task which is inconsistent.

Echoing Matthias' statement about the missing UNKNOWN_TASK_ID

error.


nit:
The titles of some code blocks in the KIP are not consistent with

their

content, e.g., KafkaStreamsState <-> NodeState


Best,
Bruno

On 5/3/24 2:43 AM, Matthias J. Sax wrote:

Thanks Sophie. My bad. You are of course right about

`TaskAssignment`

and the StreamsPartitionAssignor's responsibitliy to map tasks

of a

instance to consumers. When I wrote my reply, I forgot about this

detail.


Seems you did not add `UNKNOWN_TASK_ID` error yet as proposed by

Guozhang?


Otherwise LGTM.


-Matthias

On 5/2/24 4:20 PM, Sophie Blee-Goldman wrote:

Guozhang:

117. All three additions make sense to me. However, while

thinking

about

how users would actually produce an assignment, I realized that

it

seems

silly to make it their responsibility to distinguish between a

stateless

and stateful task when they return the assignment. The
StreamsPartitionAssignor already knows which tasks are stateful

vs

stateless, so there's no need to add this extra step for users

to

figure it
out themselves, and potentially make a mistake.

117f: So, rather than add a new error type for "inconsistent

task

types",

I'm proposing to just flatten the AssignedTask.Type enum to only

"ACTIVE"

and "STANDBY", and remove the "STATEFUL" and "STATELESS" types
altogether.
Any objections?

-

-Thanks, fixed the indentation of headers under "User APIs" and
"Read-Only
APIs"

-As for the return type of the TaskAssignmentUtils methods, I

don't

personally feel too strongly about this, but the reason for the

return

type
being a Map rather than a
TaskAssignment
is because they are meant to be used iteratively/to create a

part of

the

full assignment, and not necessarily a full assignment for each.

Notice

that they all have an input parameter of the same type:

Map
KafkaStreamsAssignment>. The idea is you can take the output of

any

of

these and pass it in to another to generate or optimize another

piece of

the overall assignment. For example, if you want to perform the
rack-aware
optimization on both active and standby tasks, you would need

to call

#optimizeRackAwareActiveTasks and then forward the output to
#optimizeRackAwareStandbyTasks to get the final assignment. If

we

return a
TaskAssignment, it will usually need to be unwrapped right away.

Perhaps

more importantly, I worry that returning a TaskAssignment will

make

it

seem
like each of these utility methods return a "full" and final

assignment

that can just be returned as-is from the TaskAssignor's #assign

method.

Whereas they are each just a single step in the full assignment

process,

and not the final product. Does that make sense?

On Thu, May 2, 2024 at 3:50 PM Sophie Blee-Goldman

wrote:


Matthias:

Thanks for the naming suggestions for the error codes. I was
definitely not happy with my original naming but couldn't

think of

anything
better.  I like your proposals though, will update the KIP

names.

I

Re: Hello world - please subscribe me!

2024-06-03 Thread Matthias J. Sax

Chris,

Subscription is self-service. Please follow the instruction from the web 
page: https://kafka.apache.org/contact



-Matthias

On 6/3/24 1:59 AM, Chris wrote:





[jira] [Assigned] (KAFKA-16248) Kafka consumer should cache leader offset ranges

2024-05-31 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-16248:
---

Assignee: Alieh Saeedi

> Kafka consumer should cache leader offset ranges
> 
>
> Key: KAFKA-16248
> URL: https://issues.apache.org/jira/browse/KAFKA-16248
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Brutschy
>Assignee: Alieh Saeedi
>Priority: Critical
>
> We noticed a streams application received an OFFSET_OUT_OF_RANGE error 
> following a network partition and streams task rebalance and subsequently 
> reset its offsets to the beginning.
> Inspecting the logs, we saw multiple consumer log messages like: 
> {code:java}
> Setting offset for partition tp to the committed offset 
> FetchPosition{offset=1234, offsetEpoch=Optional.empty...)
> {code}
> Inspecting the streams code, it looks like kafka streams calls `commitSync` 
> passing through an explicit OffsetAndMetadata object but does not populate 
> the offset leader epoch.
> The offset leader epoch is required in the offset commit to ensure that all 
> consumers in the consumer group have coherent metadata before fetching. 
> Otherwise after a consumer group rebalance, a consumer may fetch with a stale 
> leader epoch with respect to the committed offset and get an offset out of 
> range error from a zombie partition leader.
> An example of where this can cause issues:
> 1. We have a consumer group with consumer 1 and consumer 2. Partition P is 
> assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has 
> stale metadata for P.
> 2. Consumer 1 fetches partition P with offset 50, epoch 8. commits the offset 
> 50 without an epoch.
> 3. The consumer group rebalances and P is now assigned to consumer 2. 
> Consumer 2 has a stale leader epoch for P (let's say leader epoch 7). 
> Consumer 2 will now try to fetch with leader epoch 7, offset 50. If we have a 
> zombie leader due to a network partition, the zombie leader may accept 
> consumer 2's fetch leader epoch and return an OFFSET_OUT_OF_RANGE to consumer 
> 2.
> If in step 1, consumer 1 committed the leader epoch for the message, then 
> when consumer 2 receives assignment P it would force a metadata refresh to 
> discover a sufficiently new leader epoch for the committed offset.
> Kafka Streams cannot fully determine the leader epoch of the offsets it wants 
> to commit - in EOS mode, streams commits the offset after the last control 
> records (to avoid always having a lag of >0), but the leader epoch of the 
> control record is not known to streams (since only non-control records are 
> returned from Consumer.poll).
> A fix discussed with [~hachikuji] is to have the consumer cache leader epoch 
> ranges, similar to how the broker maintains a leader epoch cache.
> This ticket was split from the original ticket 
> https://issues.apache.org/jira/browse/KAFKA-15344 which was described as a 
> streams fix, but the problem cannot be fully fixed in streams.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16863) Consider removing `default.` prefix for exception handler config

2024-05-30 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17850869#comment-17850869
 ] 

Matthias J. Sax commented on KAFKA-16863:
-

I think `traffic_cost` was on purpose... But I really don't feel strong about 
it at all.

In general, I am always in favor to cleanup stuff; we also just did KIP-1020.

Not sure if we should do a single KIP though. I can become very convoluted 
quickly. I would rather to multiple smaller KIPs?

Not sure what other issue there might be? Do you have a list?

> Consider removing `default.` prefix for exception handler config
> 
>
> Key: KAFKA-16863
> URL: https://issues.apache.org/jira/browse/KAFKA-16863
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>    Reporter: Matthias J. Sax
>Priority: Trivial
>  Labels: need-kip
>
> Kafka Streams has a set of configs with `default.` prefix. The intent for the 
> default-prefix is to make a distinction between, well the default, and 
> in-place overwrites in the code. Eg, users can specify ts-extractors on a 
> per-topic basis.
> However, for the deserialization- and production-exception handlers, no such 
> overwrites are possible, and thus, `default.` does not really make sense, 
> because there is just one handler overall. Via KIP-1033 we added a new 
> processing-exception handler w/o a default-prefix, too.
> Thus, we should consider to deprecate the two existing configs names and add 
> them back w/o the `default.` prefix.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16863) Consider removing `default.` prefix for exception handler config

2024-05-29 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16863:
---

 Summary: Consider removing `default.` prefix for exception handler 
config
 Key: KAFKA-16863
 URL: https://issues.apache.org/jira/browse/KAFKA-16863
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams has a set of configs with `default.` prefix. The intent for the 
default-prefix is to make a distinction between, well the default, and in-place 
overwrites in the code. Eg, users can specify ts-extractors on a per-topic 
basis.

However, for the deserialization- and production-exception handlers, no such 
overwrites are possible, and thus, `default.` does not really make sense, 
because there is just one handler overall. Via KIP-1033 we added a new 
processing-exception handler w/o a default-prefix, too.

Thus, we should consider to deprecate the two existing configs names and add 
them back w/o the `default.` prefix.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16863) Consider removing `default.` prefix for exception handler config

2024-05-29 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16863:
---

 Summary: Consider removing `default.` prefix for exception handler 
config
 Key: KAFKA-16863
 URL: https://issues.apache.org/jira/browse/KAFKA-16863
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams has a set of configs with `default.` prefix. The intent for the 
default-prefix is to make a distinction between, well the default, and in-place 
overwrites in the code. Eg, users can specify ts-extractors on a per-topic 
basis.

However, for the deserialization- and production-exception handlers, no such 
overwrites are possible, and thus, `default.` does not really make sense, 
because there is just one handler overall. Via KIP-1033 we added a new 
processing-exception handler w/o a default-prefix, too.

Thus, we should consider to deprecate the two existing configs names and add 
them back w/o the `default.` prefix.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-28 Thread Matthias J. Sax
ries on disk and attempts to lock
them
   one by one. If they obtain the lock, check whether there is state
but no
   checkpoint, and write the checkpoint if needed. If it can't grab
the lock,
   then we know one of the other StreamThreads must be handling the
checkpoint
   file for that task directory, and we can move on.

Don't really feel too strongly about which approach is best,  doing it in
KafkaStreams#start is certainly the most simple while doing it in the
StreamThread's startup is more efficient. If we're worried about adding too
much weight to KafkaStreams#start then the 2nd option is probably best,
though slightly more complicated.

Thoughts?

On Tue, May 14, 2024 at 10:02 AM Nick Telford 
wrote:


Hi everyone,

Sorry for the delay in replying. I've finally now got some time to work

on

this.

Addressing Matthias's comments:

100.
Good point. As Bruno mentioned, there's already

AbstractReadWriteDecorator

which we could leverage to provide that protection. I'll add details on
this to the KIP.

101,102.
It looks like these points have already been addressed by Bruno. Let me
know if anything here is still unclear or you feel needs to be detailed
more in the KIP.

103.
I'm in favour of anything that gets the old code removed sooner, but
wouldn't deprecating an API that we expect (some) users to implement

cause

problems?
I'm thinking about implementers of custom StateStores, as they may be
confused by managesOffsets() being deprecated, especially since they

would

have to mark their implementation as @Deprecated in order to avoid

compile

warnings.
If deprecating an API *while it's still expected to be implemented* is
something that's generally done in the project, then I'm happy to do so
here.

104.
I think this is technically possible, but at the cost of considerable
additional code to maintain. Would we ever have a pathway to remove this
downgrade code in the future?


Regarding rebalance metadata:
Opening all stores on start-up to read and cache their offsets is an
interesting idea, especially if we can avoid re-opening the stores once

the

Tasks have been assigned. Scalability shouldn't be too much of a problem,
because typically users have a fairly short state.cleanup.delay, so the
number of on-disk Task directories should rarely exceed the number of

Tasks

previously assigned to that instance.
An advantage of this approach is that it would also simplify StateStore
implementations, as they would only need to guarantee that committed
offsets are available when the store is open.

I'll investigate this approach this week for feasibility and report back.

I think that covers all the outstanding feedback, unless I missed

anything?


Regards,
Nick

On Mon, 6 May 2024 at 14:06, Bruno Cadonna  wrote:


Hi Matthias,

I see what you mean.

To sum up:

With this KIP the .checkpoint file is written when the store closes.
That is when:
1. a task moves away from Kafka Streams client
2. Kafka Streams client shuts down

A Kafka Streams client needs the information in the .checkpoint file
1. on startup because it does not have any open stores yet.
2. during rebalances for non-empty state directories of tasks that are
not assigned to the Kafka Streams client.

With hard crashes, i.e., when the Streams client is not able to close
its state stores and write the .checkpoint file, the .checkpoint file
might be quite stale. That influences the next rebalance after failover
negatively.


My conclusion is that Kafka Streams either needs to open the state
stores at start up or we write the checkpoint file more often.

Writing the .checkpoint file during processing more often without
controlling the flush to disk would work. However, Kafka Streams would
checkpoint offsets that are not yet persisted on disk by the state
store. That is with a hard crash the offsets in the .checkpoint file
might be larger than the offsets checkpointed in the state store. That
might not be a problem if Kafka Streams uses the .checkpoint file only
to compute the task lag. The downside is that it makes the managing of
checkpoints more complex because now we have to maintain two
checkpoints: one for restoration and one for computing the task lag.
I think we should explore the option where Kafka Streams opens the

state

stores at start up to get the offsets.

I also checked when Kafka Streams needs the checkpointed offsets to
compute the task lag during a rebalance. Turns out Kafka Streams needs
them before sending the join request. Now, I am wondering if opening

the

state stores of unassigned tasks whose state directory exists locally

is

actually such a big issue due to the expected higher latency since it
happens actually before the Kafka Streams client joins the rebalance.

Best,
Bruno







On 5/4/24 12:05 AM, Matthias J. Sax wrote:

That's good questions... I could think of a few approaches, but I

admit

it might all be a littl

Re: kindly add me in community

2024-05-27 Thread Matthias J. Sax
Mailing list subscription is self-service. Please follow the instruction 
from the web-page: https://kafka.apache.org/contact


-Matthias

On 5/21/24 2:00 AM, Prashant Lohia wrote:

thanks
prashant lohia
prash...@gsl.in



Re: outerjoin not joining after window

2024-05-22 Thread Matthias J. Sax

Can someone confirm that each

partition has its own stream time and that the stream time for a partition
only advances when a record is written to the partition after the window
closes?


That's correct.



On 5/21/24 10:11 AM, Chad Preisler wrote:

After reviewing the logs, I think I understand what happens with the
repartition topics. Looks like they will be assigned to one or more
instances. In my example I ran three instances of the application (A, B,
C). Looks like the two repartition topics got assigned to A and B. The six
partitions from the input topics got split evenly across all three running
instances A, B, and C. Since the repartitioned streams are what I'm joining
on, I guess the join will run on two instances, and any input topic
processing will run across all three. Is that correct?

Still would like clarification regarding some records appearing to not get
processed: I think the issue is related to certain partitions not getting
records to advance stream time (because of low volume). Can someone confirm
that each partition has its own stream time and that the stream time for a
partition only advances when a record is written to the partition after the
window closes?

On Tue, May 21, 2024 at 10:27 AM Chad Preisler 
wrote:


See one small edit below...

On Tue, May 21, 2024 at 10:25 AM Chad Preisler 
wrote:


Hello,

I think the issue is related to certain partitions not getting records to
advance stream time (because of low volume). Can someone confirm that each
partition has its own stream time and that the stream time for a partition
only advances when a record is written to the partition after the window
closes?

If I use the repartition method on each input topic to reduce the number
of partitions for those streams, how many instances of the application will
process records? For example, if the input topics each have 6 partitions,
and I use the repartition method to set the number of partitions for the
streams to 2, how many instances of the application will process records?

Thanks,
Chad


On Wed, May 1, 2024 at 6:47 PM Matthias J. Sax  wrote:


How do you know this?

First thing we do is write a log message in the value joiner. We

don't see

the log message for the missed records.


Well, for left/right join results, the ValueJoiner would only be called
when the window is closed... And for invalid input (or late record, ie,
which arrive out-of-order and their window was already closes), records
would be dropped right away. So you cannot really infer that a record
did make it into the join or not, or what happens if it did make it into
the `Processor`.

-> https://kafka.apache.org/documentation/#kafka_streams_task_monitoring

`dropped-records-total` is the name of the metric.



-Matthias



On 5/1/24 11:35 AM, Chad Preisler wrote:

Hello,

We did some testing in our test environment today. We are seeing some
records processes where only one side of the join has a record. So

that's

good. However, we are still seeing some records get skipped. They

never hit

the value joiner (we write a log message first thing in the value

joiner).

During the test we were putting some load on the system, so stream

time was

advancing. We did notice that the join windows were taking much longer

than

30 minutes to close and process records. Thirty minutes is the window

plus

grace.


How do you know this?

First thing we do is write a log message in the value joiner. We don't

see

the log message for the missed records.

I will try pushing the same records locally. However, we don't see any
errors in our logs and the stream does process one sided joins after

the

skipped record. Do you have any docs on the "dropper records" metric?

I did

a Google search and didn't find many good results for that.

Thanks,

Chad

On Tue, Apr 30, 2024 at 2:49 PM Matthias J. Sax 

wrote:



Thanks for the information. I ran the code using Kafka locally.

After

submitting some records inside and outside of the time window and

grace,

the join performed as expected when running locally.


That gives some hope :)




However, they never get into the join.


How do you know this?


Did you check the metric for dropper records? Maybe records are
considers malformed and dropped? Are you using the same records in
production and in your local test?



Are there any settings for the stream client that would affect the

join?


Not that I can think of... There is one more internal config, but as
long as data is flowing, it should not impact the result you see.



Are there any settings on the broker side that would affect the

join?


No. The join is computed client side. Broker configs should not have

any

impact.


f I increase the log level for the streams API would that

shed some light on what is happening?


I don't think it would help much. The code in question is
org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin -- but

it

does not do any log

[jira] [Commented] (KAFKA-16586) Test TaskAssignorConvergenceTest failing

2024-05-22 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848476#comment-17848476
 ] 

Matthias J. Sax commented on KAFKA-16586:
-

I did suspect a bug, too, and did include the seed :) – Just did not know that 
we also need the strategy? Given that we don't have a lot of strategies, seems 
we could rerun each strategy with the given seed and one should just fail?

> Test TaskAssignorConvergenceTest failing
> 
>
> Key: KAFKA-16586
> URL: https://issues.apache.org/jira/browse/KAFKA-16586
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>    Reporter: Matthias J. Sax
>Priority: Major
>
> {code:java}
> java.lang.AssertionError: Assertion failed in randomized test. Reproduce 
> with: `runRandomizedScenario(-538095696758490522)`.  at 
> org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545)
>   at 
> org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code}
> This might expose an actual bug (or incorrect test setup) and should be 
> reproducible (did not try it myself yet).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16586) Test TaskAssignorConvergenceTest failing

2024-05-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16586:

Description: 
{code:java}
java.lang.AssertionError: Assertion failed in randomized test. Reproduce with: 
`runRandomizedScenario(-538095696758490522)`.at 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545)
  at 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code}
This might expose an actual bug (or incorrect test setup) and should be 
reproducible (did not try it myself yet).

  was:
{code:java}
java.lang.AssertionError: Assertion failed in randomized test. Reproduce with: 
`runRandomizedScenario(-538095696758490522)`.at 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545)
  at 
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code}
This might expose an actual bug (or incorrect test setup) and should be 
reproducible (die not try it myself yet).


> Test TaskAssignorConvergenceTest failing
> 
>
> Key: KAFKA-16586
> URL: https://issues.apache.org/jira/browse/KAFKA-16586
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>            Reporter: Matthias J. Sax
>Priority: Major
>
> {code:java}
> java.lang.AssertionError: Assertion failed in randomized test. Reproduce 
> with: `runRandomizedScenario(-538095696758490522)`.  at 
> org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545)
>   at 
> org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code}
> This might expose an actual bug (or incorrect test setup) and should be 
> reproducible (did not try it myself yet).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1027 Add MockFixedKeyProcessorContext

2024-05-22 Thread Matthias J. Sax
I was not aware of `InternalFixedKeyRecordFactory`. As the name 
indicates, it's considered an internal class, so not sure if we should 
recommend to use it in test...


I understand why this class is required, and why it was put into a 
public package; the way Java works, enforces this. Not sure if we could 
find a better solution.


Might be good to hear from others.


-Matthias

On 5/21/24 3:57 PM, Shashwat Pandey wrote:

Looking at the ticket and the sample code, I think it would be possible to
continue using `InternalFixedKeyRecordFactory` as the avenue to create
`FixedKeyRecord`s in tests. As long as there was a
MockFixedKeyProcessorContext, I think we would be able to test
FixedKeyProcessors without a Topology.

I created a sample repo with the `MockFixedKeyProcessorContext` here is
what I think the tests would look like:
https://github.com/s7pandey/kafka-processor-tests/blob/main/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java



On Mon, May 20, 2024 at 9:05 PM Matthias J. Sax  wrote:


Had a discussion on https://issues.apache.org/jira/browse/KAFKA-15242
and it was pointed out, that we also need to do something about
`FixedKeyRecord`. It does not have a public constructor (what is
correct; it should not have one). However, this makes testing
`FixedKeyProcessor` impossible w/o extending `FixedKeyRecord` manually
what does not seem to be right (too clumsy).

It seems, we either need some helper builder method (but not clear to me
where to add it in an elegant way) which would provide us with a
`FixedKeyRecord`, or add some sub-class to the test-utils module which
would extend `FixedKeyRecord`? -- Or maybe an even better solution? I
could not think of something else so far.


Thoughts?


On 5/3/24 9:46 AM, Matthias J. Sax wrote:

Please also update the KIP.

To get a wiki account created, please request it via a commet on this
ticket: https://issues.apache.org/jira/browse/INFRA-25451

After you have the account, please share your wiki id, and we can give
you write permission on the wiki.



-Matthias

On 5/3/24 6:30 AM, Shashwat Pandey wrote:

Hi Matthias,

Sorry this fell out of my radar for a bit.

Revisiting the topic, I think you’re right and we accept the duplicated
nesting as an appropriate solution to not affect the larger public API.

I can update my PR with the change.

Regards,
Shashwat Pandey


On Wed, May 1, 2024 at 11:00 PM Matthias J. Sax 

wrote:



Any updates on this KIP?

On 3/28/24 4:11 AM, Matthias J. Sax wrote:

It seems that `MockRecordMetadata` is a private class, and thus not
part
of the public API. If there are any changes required, we don't need to
discuss on the KIP.


For `CapturedPunctuator` and `CapturedForward` it's a little bit more
tricky. My gut feeling is, that the classes might not need to be
changed, but if we use them within `MockProcessorContext` and
`MockFixedKeyProcessorContext` it might be weird to keep the current
nesting... The problem I see is, that it's not straightforward how to
move the classes w/o breaking compatibility, nor if we duplicate
them as
standalone classes w/o a larger "splash radius". (We would need to add
new overloads for MockProcessorContext#scheduledPunctuators() and
MockProcessorContext#forwarded()).

Might be good to hear from others if we think it's worth this larger
changes to get rid of the nesting, or just accept the somewhat not
ideal
nesting as it technically is not a real issue?


-Matthias


On 3/15/24 1:47 AM, Shashwat Pandey wrote:

Thanks for the feedback Matthias!

The reason I proposed the extension of MockProcessorContext was more
to do
with the internals of the class (MockRecordMetadata,
CapturedPunctuator and
CapturedForward).

However, I do see your point, I would then think to split
MockProcessorContext and MockFixedKeyProcessorContext, some of the
internal
classes should also be extracted i.e. MockRecordMetadata,
CapturedPunctuator and probably a new CapturedFixedKeyForward.

Let me know what you think!


Regards,
Shashwat Pandey


On Mon, Mar 11, 2024 at 10:09 PM Matthias J. Sax 
wrote:


Thanks for the KIP Shashwat. Closing this testing gap is great! It
did
come up a few time already...

One question: why do you propose to `extend MockProcessorContext`?

Given how the actual runtime context classes are setup, it seems

that

the regular context and fixed-key-context are distinct, and thus I
believe both mock-context classes should be distinct, too?

What I mean is that FixedKeyProcessorContext does not extend
ProcessorContext. Both classes have a common parent

ProcessINGContext

(note the very similar but different names), but they are "siblings"
only, so why make the mock processor a parent-child relationship?

It seems better to do

public class MockFixedKeyProcessorContext
  implements FixedKeyProcessorContext,
 RecordCollector.Supplier


Of course, if there is code we can share between both mock-context

we

should so thi

Re: Permission to contribute to Apache Kafka

2024-05-21 Thread Matthias J. Sax

You should be all set.

On 5/21/24 8:30 AM, Harry Fallows wrote:

Hello,

I am following the[Getting Started guide for writing 
KIPs](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals).
 Could someone give me the permissions to write a KIP please? My Wiki ID is 
harryfallows, my Jira ID is hfallows, and my email address is 
harryfall...@protonmail.com.

Thanks,Harry


Re: Request for Authorization to Create KIP

2024-05-21 Thread Matthias J. Sax

You should be all set.

On 5/21/24 6:58 PM, 黃竣陽 wrote:

I want to create a KIP, and my
wiki id : m1a2st and Jira id : m1a2st, Thanks for your help.


jiang dou  於 2024年5月22日 上午9:01 寫道:

You should send your jira ID and wiki ID,
Please refer to this address :
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

黃竣陽  于2024年5月21日周二 22:42写道:


I am writing to request authorization to create a KIP.

Currently, I do not have the necessary permissions to access the 'Create
KIP' function. My account email is s7133...@gmail.com.

Could you please grant me the required permissions to create a KIP? Thanks
for your help.






Re: Request to be added to kafka contributors list

2024-05-21 Thread Matthias J. Sax

Ok. Hopefully it's working now. Sorry for the hiccup.

-Matthias

On 5/21/24 1:14 AM, Fan Yang wrote:

Hi Matthia,

I tried sign out and sign in, still can't find the "Assign" button, my JIRA ID 
is fanyan, could you help me set it again?

Best,
Fan

________
From: Matthias J. Sax 
Sent: Saturday, May 18, 2024 4:06
To: users@kafka.apache.org 
Subject: Re: 回复: Request to be added to kafka contributors list

Did you sign out and sign in again?

On 5/17/24 9:49 AM, Yang Fan wrote:

Thanks Matthias,

I still can't find "Assign to me" button beside Assignee and Reporter. Could 
you help me set it again?

Best regards,
Fan
____________
发件人: Matthias J. Sax 
发送时间: 2024年5月17日 2:24
收件人: users@kafka.apache.org 
主题: Re: Request to be added to kafka contributors list

Thanks for reaching out Yang. You should be all set.

-Matthias

On 5/16/24 7:40 AM, Yang Fan wrote:

Dear Apache Kafka Team,

I hope this email finds you well. My name is Fan Yang, JIRA ID is fanyan, I 
kindly request to be added to the contributors list for Apache Kafka. Being 
part of this list would allow me to be assigned to JIRA tickets and work on 
them.
Thank you for considering my request.
Best regards,
Fan


[jira] [Resolved] (KAFKA-15242) FixedKeyProcessor testing is unusable

2024-05-21 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15242.
-
  Assignee: (was: Alexander Aghili)
Resolution: Duplicate

> FixedKeyProcessor testing is unusable
> -
>
> Key: KAFKA-15242
> URL: https://issues.apache.org/jira/browse/KAFKA-15242
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Zlstibor Veljkovic
>Priority: Major
>
> Using mock processor context to get the forwarded message doesn't work.
> Also there is not a well documented way for testing FixedKeyProcessors.
> Please see the repo at [https://github.com/zveljkovic/kafka-repro]
> but most important piece is test file with runtime and compile time errors:
> [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15242) FixedKeyProcessor testing is unusable

2024-05-21 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15242.
-
  Assignee: (was: Alexander Aghili)
Resolution: Duplicate

> FixedKeyProcessor testing is unusable
> -
>
> Key: KAFKA-15242
> URL: https://issues.apache.org/jira/browse/KAFKA-15242
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Zlstibor Veljkovic
>Priority: Major
>
> Using mock processor context to get the forwarded message doesn't work.
> Also there is not a well documented way for testing FixedKeyProcessors.
> Please see the repo at [https://github.com/zveljkovic/kafka-repro]
> but most important piece is test file with runtime and compile time errors:
> [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Fwd: Request to be added to kafka contributors list

2024-05-20 Thread Matthias J. Sax

Done. You should be all set :)


-Matthias

On 5/20/24 10:10 AM, bou...@ulukai.net wrote:


Dear Apache Kafka Team,

     I hope to post in the right place: my name is Franck LEDAY, under 
Apache-Jira ID "handfreezer".


     I opened an issue as Improvement KAFKA-16707 but I failed to 
assigned it to me.


     May I ask to be added to the contributors list for Apache Kafka? As 
I already did the job of improvement, and would like to be assigned on 
to end my contribution.


Thank you for considering my request.
Best regards, Franck.


Re: Request for contributor list

2024-05-20 Thread Matthias J. Sax

What is your Jira ID?

-Matthias


On 5/20/24 9:55 AM, Brenden Deluna wrote:

Hello, I am requesting to be added to the contributor list to take care of
some tickets. Thank you.



Re: Release plan required

2024-05-20 Thread Matthias J. Sax
Zookeeper is already deprecated (since 3.5): 
https://kafka.apache.org/documentation/#zk_depr


It's planned to be fully removed in 4.0 release.

It's not confirmed yet, but there is a high probability that there won't 
be a 3.9 release, and that 4.0 will follow 3.8.



-Matthias


On 5/20/24 2:11 AM, Sahil Sharma D wrote:

Hello,

When Zookeeper is planned to depreciated from kafka, in which release this 
depreciation is planned?

Regards,
Sahil

-Original Message-
From: Sanskar Jhajharia 
Sent: Monday, May 20, 2024 1:38 PM
To: users@kafka.apache.org
Subject: Re: Release plan required

[You don't often get email from sjhajha...@confluent.io.invalid. Learn why this 
is important at https://aka.ms/LearnAboutSenderIdentification ]

Hey Sahil,

You can find the complete details of the releases and bug fix releases
here: https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan

The next release in Pipeline currently is 3.8.0 ( 
https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+3.8.0).
There is al.o a bugfix release 3.7.1 scheduled here (
https://cwiki.apache.org/confluence/display/KAFKA/Release+plan+3.7.1)

Hope that answers your question!
Cheers.

Sanskar Jhajharia
Software Engineer I
E-mail: Personal  | Official 


On Mon, May 20, 2024 at 1:31 PM Sahil Sharma D 
 wrote:


Hi team,

We need the Kafka release plan for our Kafka upgrade planning, kindly
share the latest Release Plan or when is the next release is planned
and which version?

Regards,
Sahil



[jira] [Commented] (KAFKA-16801) Streams upgrade :test target doesn't find any junit tests

2024-05-20 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848022#comment-17848022
 ] 

Matthias J. Sax commented on KAFKA-16801:
-

There packages contain code for system tests. We put the code under 
`src/test/java/...`; there is no `src/main/java/...` and the code is not unit 
test code either

What would be the right way to address this?

 

> Streams upgrade :test target doesn't find any junit tests
> -
>
> Key: KAFKA-16801
> URL: https://issues.apache.org/jira/browse/KAFKA-16801
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Greg Harris
>Priority: Major
>  Labels: newbie
>
> No test executed. This behavior has been deprecated.    
> This will fail with an error in Gradle 9.0.    
> There are test sources present but no test was executed. Please check your 
> test configuration.    
> [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#test_task_fail_on_no_test_executed]
>     
> 23 usages
>  
> Task::streams:upgrade-system-tests-0100:test    
> Task::streams:upgrade-system-tests-0101:test    
> Task::streams:upgrade-system-tests-0102:test    
> Task::streams:upgrade-system-tests-0110:test    
> Task::streams:upgrade-system-tests-10:test    
> Task::streams:upgrade-system-tests-11:test    
> Task::streams:upgrade-system-tests-20:test    
> Task::streams:upgrade-system-tests-21:test    
> Task::streams:upgrade-system-tests-22:test    
> Task::streams:upgrade-system-tests-23:test    
> Task::streams:upgrade-system-tests-24:test    
> Task::streams:upgrade-system-tests-25:test    
> Task::streams:upgrade-system-tests-26:test    
> Task::streams:upgrade-system-tests-27:test    
> Task::streams:upgrade-system-tests-28:test    
> Task::streams:upgrade-system-tests-30:test    
> Task::streams:upgrade-system-tests-31:test    
> Task::streams:upgrade-system-tests-32:test    
> Task::streams:upgrade-system-tests-33:test    
> Task::streams:upgrade-system-tests-34:test    
> Task::streams:upgrade-system-tests-35:test    
> Task::streams:upgrade-system-tests-36:test    
> Task::streams:upgrade-system-tests-37:test



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16794) Can't open videos in streams documentation

2024-05-20 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16794:

Component/s: streams

> Can't open videos in streams documentation
> --
>
> Key: KAFKA-16794
> URL: https://issues.apache.org/jira/browse/KAFKA-16794
> Project: Kafka
>  Issue Type: Bug
>  Components: docs, streams
>Reporter: Kuan Po Tseng
>Priority: Minor
> Attachments: IMG_4445.png, image.png
>
>
> Can't open videos in page [https://kafka.apache.org/documentation/streams/]
> Open console in chrome browser and it shows error message:
> {{Refused to frame 'https://www.youtube.com/' because it violates the 
> following Content Security Policy directive: "frame-src 'self'".}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1027 Add MockFixedKeyProcessorContext

2024-05-20 Thread Matthias J. Sax
Had a discussion on https://issues.apache.org/jira/browse/KAFKA-15242 
and it was pointed out, that we also need to do something about 
`FixedKeyRecord`. It does not have a public constructor (what is 
correct; it should not have one). However, this makes testing 
`FixedKeyProcessor` impossible w/o extending `FixedKeyRecord` manually 
what does not seem to be right (too clumsy).


It seems, we either need some helper builder method (but not clear to me 
where to add it in an elegant way) which would provide us with a 
`FixedKeyRecord`, or add some sub-class to the test-utils module which 
would extend `FixedKeyRecord`? -- Or maybe an even better solution? I 
could not think of something else so far.



Thoughts?


On 5/3/24 9:46 AM, Matthias J. Sax wrote:

Please also update the KIP.

To get a wiki account created, please request it via a commet on this 
ticket: https://issues.apache.org/jira/browse/INFRA-25451


After you have the account, please share your wiki id, and we can give 
you write permission on the wiki.




-Matthias

On 5/3/24 6:30 AM, Shashwat Pandey wrote:

Hi Matthias,

Sorry this fell out of my radar for a bit.

Revisiting the topic, I think you’re right and we accept the duplicated
nesting as an appropriate solution to not affect the larger public API.

I can update my PR with the change.

Regards,
Shashwat Pandey


On Wed, May 1, 2024 at 11:00 PM Matthias J. Sax  wrote:


Any updates on this KIP?

On 3/28/24 4:11 AM, Matthias J. Sax wrote:
It seems that `MockRecordMetadata` is a private class, and thus not 
part

of the public API. If there are any changes required, we don't need to
discuss on the KIP.


For `CapturedPunctuator` and `CapturedForward` it's a little bit more
tricky. My gut feeling is, that the classes might not need to be
changed, but if we use them within `MockProcessorContext` and
`MockFixedKeyProcessorContext` it might be weird to keep the current
nesting... The problem I see is, that it's not straightforward how to
move the classes w/o breaking compatibility, nor if we duplicate 
them as

standalone classes w/o a larger "splash radius". (We would need to add
new overloads for MockProcessorContext#scheduledPunctuators() and
MockProcessorContext#forwarded()).

Might be good to hear from others if we think it's worth this larger
changes to get rid of the nesting, or just accept the somewhat not 
ideal

nesting as it technically is not a real issue?


-Matthias


On 3/15/24 1:47 AM, Shashwat Pandey wrote:

Thanks for the feedback Matthias!

The reason I proposed the extension of MockProcessorContext was more
to do
with the internals of the class (MockRecordMetadata,
CapturedPunctuator and
CapturedForward).

However, I do see your point, I would then think to split
MockProcessorContext and MockFixedKeyProcessorContext, some of the
internal
classes should also be extracted i.e. MockRecordMetadata,
CapturedPunctuator and probably a new CapturedFixedKeyForward.

Let me know what you think!


Regards,
Shashwat Pandey


On Mon, Mar 11, 2024 at 10:09 PM Matthias J. Sax 
wrote:

Thanks for the KIP Shashwat. Closing this testing gap is great! It 
did

come up a few time already...

One question: why do you propose to `extend MockProcessorContext`?

Given how the actual runtime context classes are setup, it seems that
the regular context and fixed-key-context are distinct, and thus I
believe both mock-context classes should be distinct, too?

What I mean is that FixedKeyProcessorContext does not extend
ProcessorContext. Both classes have a common parent ProcessINGContext
(note the very similar but different names), but they are "siblings"
only, so why make the mock processor a parent-child relationship?

It seems better to do

public class MockFixedKeyProcessorContext
 implements FixedKeyProcessorContext,
    RecordCollector.Supplier


Of course, if there is code we can share between both mock-context we
should so this, but it should not leak into the public API?


-Matthias



On 3/11/24 5:21 PM, Shashwat Pandey wrote:

Hi everyone,

I would like to start the discussion on




https://cwiki.apache.org/confluence/display/KAFKA/KIP-1027%3A+Add+MockFixedKeyProcessorContext


This adds MockFixedKeyProcessorContext to the Kafka Streams Test 
Utils

library.

Regards,
Shashwat Pandey











[jira] [Commented] (KAFKA-15143) MockFixedKeyProcessorContext is missing from test-utils

2024-05-20 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848021#comment-17848021
 ] 

Matthias J. Sax commented on KAFKA-15143:
-

As pointed out on KAFKA-15242 cf comments, we also need to do something about 
`FixedKeyRecord`, because it does not have (and should not have) a public 
constructor. I'll point this out on the DISCUSS thread of the KIP, too.

> MockFixedKeyProcessorContext is missing from test-utils
> ---
>
> Key: KAFKA-15143
> URL: https://issues.apache.org/jira/browse/KAFKA-15143
> Project: Kafka
>  Issue Type: Bug
>  Components: streams-test-utils
>Affects Versions: 3.5.0
>Reporter: Tomasz Kaszuba
>Assignee: Shashwat Pandey
>Priority: Major
>  Labels: needs-kip
>
> I am trying to test a ContextualFixedKeyProcessor but it is not possible to 
> call the init method from within a unit test since the MockProcessorContext 
> doesn't implement  
> {code:java}
> FixedKeyProcessorContext {code}
> but only
> {code:java}
> ProcessorContext
> {code}
> Shouldn't there also be a *MockFixedKeyProcessorContext* in the test utils?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15242) FixedKeyProcessor testing is unusable

2024-05-20 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848020#comment-17848020
 ] 

Matthias J. Sax commented on KAFKA-15242:
-

I don't think that `TestRecord` has anything to do with it, because 
`TestRecord` is not used in combination with `MockProcessorContext`, but only 
in combination with the `TopologyTestDriver` (and corresponding 
`TestInputTopic` and `TestOutputTopic`).

I agree though, that we need some more helper class, because `FixedKeyRecord` 
objects cannot be instantiated directly (no public constructor). Thanks for the 
call out – the KIP needs to be extended accordingly – we would have missed 
this...

This ticket did not have this dependency in its description either though. I 
think we can still close it as duplicate, and add anything missing to the other 
ticket?

> FixedKeyProcessor testing is unusable
> -
>
> Key: KAFKA-15242
> URL: https://issues.apache.org/jira/browse/KAFKA-15242
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Zlstibor Veljkovic
>Assignee: Alexander Aghili
>Priority: Major
>
> Using mock processor context to get the forwarded message doesn't work.
> Also there is not a well documented way for testing FixedKeyProcessors.
> Please see the repo at [https://github.com/zveljkovic/kafka-repro]
> but most important piece is test file with runtime and compile time errors:
> [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: 回复: Request to be added to kafka contributors list

2024-05-17 Thread Matthias J. Sax

Did you sign out and sign in again?

On 5/17/24 9:49 AM, Yang Fan wrote:

Thanks Matthias,

I still can't find "Assign to me" button beside Assignee and Reporter. Could 
you help me set it again?

Best regards,
Fan
____
发件人: Matthias J. Sax 
发送时间: 2024年5月17日 2:24
收件人: users@kafka.apache.org 
主题: Re: Request to be added to kafka contributors list

Thanks for reaching out Yang. You should be all set.

-Matthias

On 5/16/24 7:40 AM, Yang Fan wrote:

Dear Apache Kafka Team,

I hope this email finds you well. My name is Fan Yang, JIRA ID is fanyan, I 
kindly request to be added to the contributors list for Apache Kafka. Being 
part of this list would allow me to be assigned to JIRA tickets and work on 
them.
Thank you for considering my request.
Best regards,
Fan


[jira] [Commented] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled

2024-05-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847064#comment-17847064
 ] 

Matthias J. Sax commented on KAFKA-16774:
-

Ah. Nice. Glad it's not a bug. Thanks for the PR Bruno. Approved.

> fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
> -
>
> Key: KAFKA-16774
> URL: https://issues.apache.org/jira/browse/KAFKA-16774
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Assignee: Bruno Cadonna
>Priority: Minor
>  Labels: flaky-test
>
> java.util.ConcurrentModificationException
>  at 
> java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408)
>  at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15242) FixedKeyProcessor testing is unusable

2024-05-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847063#comment-17847063
 ] 

Matthias J. Sax commented on KAFKA-15242:
-

The example 
([https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java])
 tries to use `MockProcessorContext` to test `FixedKeyProcessor` – this does 
not work.

Adding `MockFixKeyProcessorContext` should allow to test` FixKeyProcessor` 
using this newly added class.

What other issue does this ticket include that's not covered? Can you elaborate?

> FixedKeyProcessor testing is unusable
> -
>
> Key: KAFKA-15242
> URL: https://issues.apache.org/jira/browse/KAFKA-15242
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Zlstibor Veljkovic
>Assignee: Alexander Aghili
>Priority: Major
>
> Using mock processor context to get the forwarded message doesn't work.
> Also there is not a well documented way for testing FixedKeyProcessors.
> Please see the repo at [https://github.com/zveljkovic/kafka-repro]
> but most important piece is test file with runtime and compile time errors:
> [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-989: RocksDB Iterator Metrics

2024-05-16 Thread Matthias J. Sax

SGTM to use nano-seconds across the board.

On 5/16/24 7:12 AM, Nick Telford wrote:

Actually, one other point: our existing state store operation metrics are
measured in nanoseconds[1].

Should iterator-duration-(avg|max) also be measured in nanoseconds, for
consistency, or should we keep them milliseconds, as the KIP currently
states?

1:
https://docs.confluent.io/platform/current/streams/monitoring.html#state-store-metrics

On Thu, 16 May 2024 at 12:15, Nick Telford  wrote:


Good point! I've updated it to "Improved StateStore Iterator metrics for
detecting leaks" - let me know if you have a better suggestion.

This should affect the voting imo, as nothing of substance has changed.

Regards,
Nick

On Thu, 16 May 2024 at 01:39, Sophie Blee-Goldman 
wrote:


One quick thing -- can you update the title of this KIP to reflect the
decision to implement these metrics for all state stores implementations
rather than just RocksDB?


On Tue, May 14, 2024 at 1:36 PM Nick Telford 
wrote:


Woops! Thanks for the catch Lucas. Given this was just a typo, I don't
think this affects the voting.

Cheers,
Nick

On Tue, 14 May 2024 at 18:06, Lucas Brutschy 
wrote:


Hi Nick,

you are still referring to oldest-open-iterator-age-ms in the
`Proposed Changes` section.

Cheers,
Lucas

On Thu, May 2, 2024 at 4:00 PM Lucas Brutschy 


wrote:


Hi Nick!

I agree, the age variant is a bit nicer since the semantics are very
clear from the name. If you'd rather go for the simple

implementation,

how about calling it `oldest-iterator-open-since-ms`? I believe this
could be understood without docs. Either way, I think we should be
able to open the vote for this KIP because nobody raised any major /
blocking concerns.

Looking forward to getting this voted on soon!

Cheers
Lucas

On Sun, Mar 31, 2024 at 5:23 PM Nick Telford <

nick.telf...@gmail.com>

wrote:


Hi Matthias,


For the oldest iterator metric, I would propose something simple

like

`iterator-opened-ms` and it would just be the actual timestamp

when

the

iterator was opened. I don't think we need to compute the actual

age,

but user can to this computation themselves?


That works for me; it's easier to implement like that :-D I'm a

little

concerned that the name "iterator-opened-ms" may not be obvious

enough

without reading the docs.


If we think reporting the age instead of just the timestamp is

better, I

would propose `iterator-max-age-ms`. I should be sufficient to

call

out

(as it's kinda "obvious" anyway) that the metric applies to open
iterator only.


While I think it's preferable to record the timestamp, rather than

the

age,

this does have the benefit of a more obvious metric name.


Nit: the KIP says it's a store-level metric, but I think it

would

be

good to say explicitly that it's recorded with DEBUG level only?


Yes, I've already updated the KIP with this information in the

table.


Regards,

Nick

On Sun, 31 Mar 2024 at 10:53, Matthias J. Sax 

wrote:



The time window thing was just an idea. Happy to drop it.

For the oldest iterator metric, I would propose something simple

like

`iterator-opened-ms` and it would just be the actual timestamp

when

the

iterator was opened. I don't think we need to compute the actual

age,

but user can to this computation themselves?

If we think reporting the age instead of just the timestamp is

better, I

would propose `iterator-max-age-ms`. I should be sufficient to

call

out

(as it's kinda "obvious" anyway) that the metric applies to open
iterator only.

And yes, I was hoping that the code inside MetereXxxStore might

already

be setup in a way that custom stores would inherit the iterator

metrics

automatically -- I am just not sure, and left it as an exercise

for

somebody to confirm :)


Nit: the KIP says it's a store-level metric, but I think it

would

be

good to say explicitly that it's recorded with DEBUG level only?



-Matthias


On 3/28/24 2:52 PM, Nick Telford wrote:

Quick addendum:

My suggested metric "oldest-open-iterator-age-seconds" should

be

"oldest-open-iterator-age-ms". Milliseconds is obviously a

better

granularity for such a metric.

Still accepting suggestions for a better name.

On Thu, 28 Mar 2024 at 13:41, Nick Telford <

nick.telf...@gmail.com



wrote:



Hi everyone,

Sorry for leaving this for so long. So much for "3 weeks

until

KIP

freeze"!


On Sophie's comments:
1. Would Matthias's suggestion of a separate metric tracking

the

age of

the oldest open iterator (within the tag set) satisfy this?

That

way we

can

keep iterator-duration-(avg|max) for closed iterators, which

can

be

useful

for performance debugging for iterators that don't leak. I'm

not

sure

what

we'd call this metric, maybe:

"oldest-open-iterator-age-seconds"?

Seems

like a mouthful.

2. Y

Re: Kafka streams stores key in multiple state store instances

2024-05-16 Thread Matthias J. Sax

Hello Kay,

What you describe is "by design" -- unfortunately.

The problem is, that when we build the `Topology` we don't know the 
partition count of the input topics, and thus, StreamsBuilder cannot 
insert a repartition topic for this case (we always assume that the 
partition count is the same for all input topic).


To work around this, you would need to rewrite the program to use either 
`groupBy((k,v) -> k)` instead of `groupByKey()`, or do a 
`.repartition().groupByKey()`.


Does this make sense?


-Matthias


On 5/16/24 2:11 AM, Kay Hannay wrote:

Hi,

we have a Kafka streams application which merges (merge, groupByKey, 
aggretgate) a few topics into one topic. The application is stateful, of 
course. There are currently six instances of the application running in 
parallel.

We had an issue where one new Topic for aggregation did have another partition 
count than all other topics. This caused data corruption in our application. We 
expected that a re-partitioning topic would be created automatically by Kafka 
streams or that we would get an error. But this did not happen. Instead, some 
of the keys (all merged topics share the same key schema) found their way into 
at least two different instances of the application. One key is available in 
more than one local state store. Can you explain why this happened? As already 
said, we would have expected to get an error or a re-partitioning topic in this 
case.

Cheers
Kay



Re: Request to be added to kafka contributors list

2024-05-16 Thread Matthias J. Sax

Thanks for reaching out Yang. You should be all set.

-Matthias

On 5/16/24 7:40 AM, Yang Fan wrote:

Dear Apache Kafka Team,

I hope this email finds you well. My name is Fan Yang, JIRA ID is fanyan, I 
kindly request to be added to the contributors list for Apache Kafka. Being 
part of this list would allow me to be assigned to JIRA tickets and work on 
them.
Thank you for considering my request.
Best regards,
Fan


[jira] [Assigned] (KAFKA-16448) Add Kafka Streams exception handler for exceptions occuring during processing (KIP-1033)

2024-05-16 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-16448:
---

Assignee: Loïc Greffier

> Add Kafka Streams exception handler for exceptions occuring during processing 
> (KIP-1033)
> 
>
> Key: KAFKA-16448
> URL: https://issues.apache.org/jira/browse/KAFKA-16448
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Damien Gasparina
>Assignee: Loïc Greffier
>Priority: Minor
>
> Jira to follow work on KIP:  [KIP-1033: Add Kafka Streams exception handler 
> for exceptions occuring during 
> processing|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16333) Removed Deprecated methods KTable#join

2024-05-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846821#comment-17846821
 ] 

Matthias J. Sax commented on KAFKA-16333:
-

In general yes. But we should wait until a final decision was made if the 
release after 3.8 will be 3.9 or 4.0. – So we should hold off a few more weeks.

> Removed Deprecated methods KTable#join
> --
>
> Key: KAFKA-16333
> URL: https://issues.apache.org/jira/browse/KAFKA-16333
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>        Reporter: Matthias J. Sax
>Priority: Blocker
> Fix For: 4.0.0
>
>
> KTable#join() methods taking a `Named` parameter got deprecated in 3.1 
> release via https://issues.apache.org/jira/browse/KAFKA-13813 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16329) Remove Deprecated Task/ThreadMetadata classes and related methods

2024-05-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846820#comment-17846820
 ] 

Matthias J. Sax commented on KAFKA-16329:
-

In general yes. But we should wait until a final decision was made if the 
release after 3.8 will be 3.9 or 4.0. – So we should hold off a few more weeks.

> Remove Deprecated Task/ThreadMetadata classes and related methods
> -
>
> Key: KAFKA-16329
> URL: https://issues.apache.org/jira/browse/KAFKA-16329
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>        Reporter: Matthias J. Sax
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Deprecated in AK 3.0 via 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-744%3A+Migrate+TaskMetadata+and+ThreadMetadata+to+an+interface+with+internal+implementation]
>  
>  * 
> org.apache.kafka.streams.processor.TaskMetadata
>  * org.apache.kafka.streams.processo.ThreadMetadata
>  * org.apache.kafka.streams.KafkaStreams#localThredMetadata
>  * org.apache.kafka.streams.state.StreamsMetadata
>  * org.apache.kafka.streams.KafkaStreams#allMetadata
>  * org.apache.kafka.streams.KafkaStreams#allMetadataForStore
> This is related https://issues.apache.org/jira/browse/KAFKA-16330 and both 
> ticket should be worked on together.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16330) Remove Deprecated methods/variables from TaskId

2024-05-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846819#comment-17846819
 ] 

Matthias J. Sax commented on KAFKA-16330:
-

In general yes. But we should wait until a final decision was made if the 
release after 3.8 will be 3.9 or 4.0. – So we should hold off a few more weeks.

> Remove Deprecated methods/variables from TaskId
> ---
>
> Key: KAFKA-16330
> URL: https://issues.apache.org/jira/browse/KAFKA-16330
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>        Reporter: Matthias J. Sax
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Cf 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557]
>  
> This ticket relates to https://issues.apache.org/jira/browse/KAFKA-16329 and 
> both should be worked on together.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16448) Add Kafka Streams exception handler for exceptions occuring during processing (KIP-1033)

2024-05-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846818#comment-17846818
 ] 

Matthias J. Sax commented on KAFKA-16448:
-

[~muralibasani] – Thanks for you interest. This ticket is already WIP and the 
corresponding KIP-1033 was accepted, and there is already a PR... [~Dabz] 
should we assign this ticket to you or Loic (not sure if he has an Jira account 
– if not, we should create one so we can assign the ticket to him?)

> Add Kafka Streams exception handler for exceptions occuring during processing 
> (KIP-1033)
> 
>
> Key: KAFKA-16448
> URL: https://issues.apache.org/jira/browse/KAFKA-16448
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Damien Gasparina
>Priority: Minor
>
> Jira to follow work on KIP:  [KIP-1033: Add Kafka Streams exception handler 
> for exceptions occuring during 
> processing|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled

2024-05-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846815#comment-17846815
 ] 

Matthias J. Sax commented on KAFKA-16774:
-

Wondering if this is actually a flaky test, or if it exposes a real bug? 

> fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
> -
>
> Key: KAFKA-16774
> URL: https://issues.apache.org/jira/browse/KAFKA-16774
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Priority: Minor
>  Labels: flaky-test
>
> java.util.ConcurrentModificationException
>  at 
> java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408)
>  at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled

2024-05-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16774:

Labels: flaky-test  (was: )

> fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
> -
>
> Key: KAFKA-16774
> URL: https://issues.apache.org/jira/browse/KAFKA-16774
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Priority: Minor
>  Labels: flaky-test
>
> java.util.ConcurrentModificationException
>  at 
> java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408)
>  at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled

2024-05-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16774:

Component/s: streams
 unit tests

> fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
> -
>
> Key: KAFKA-16774
> URL: https://issues.apache.org/jira/browse/KAFKA-16774
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Priority: Minor
>
> java.util.ConcurrentModificationException
>  at 
> java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408)
>  at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15242) FixedKeyProcessor testing is unusable

2024-05-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846814#comment-17846814
 ] 

Matthias J. Sax edited comment on KAFKA-15242 at 5/16/24 5:32 AM:
--

There is https://issues.apache.org/jira/browse/KAFKA-15143 which is already WIP 
– ie, there is KIP-1027.

I believe it would cover this ticket? Wondering of we can close this ticket as 
duplicate?


was (Author: mjsax):
There is https://issues.apache.org/jira/browse/KAFKA-15143 which is already WIP 
– ie, there is KIP-1027.

I believe it would cover this ticket?

> FixedKeyProcessor testing is unusable
> -
>
> Key: KAFKA-15242
> URL: https://issues.apache.org/jira/browse/KAFKA-15242
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Zlstibor Veljkovic
>Assignee: Alexander Aghili
>Priority: Major
>
> Using mock processor context to get the forwarded message doesn't work.
> Also there is not a well documented way for testing FixedKeyProcessors.
> Please see the repo at [https://github.com/zveljkovic/kafka-repro]
> but most important piece is test file with runtime and compile time errors:
> [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15242) FixedKeyProcessor testing is unusable

2024-05-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846814#comment-17846814
 ] 

Matthias J. Sax commented on KAFKA-15242:
-

There is https://issues.apache.org/jira/browse/KAFKA-15143 which is already WIP 
– ie, there is KIP-1027.

I believe it would cover this ticket?

> FixedKeyProcessor testing is unusable
> -
>
> Key: KAFKA-15242
> URL: https://issues.apache.org/jira/browse/KAFKA-15242
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Zlstibor Veljkovic
>Assignee: Alexander Aghili
>Priority: Major
>
> Using mock processor context to get the forwarded message doesn't work.
> Also there is not a well documented way for testing FixedKeyProcessors.
> Please see the repo at [https://github.com/zveljkovic/kafka-repro]
> but most important piece is test file with runtime and compile time errors:
> [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Query regarding groupbykey in streams

2024-05-15 Thread Matthias J. Sax
If I read this correctly, your upstream producer which writes into the 
input topic of you KS app is using a custom partitioner?


If you do a `groupByKey()` and change the key upstream, it would result 
in a repartition step, which would fall back to the default partioner.


If you want to use a custom partitioner in KS, you should implement 
`StreamPartitioner` instead of the producer partitioner interface, and 
pass it into the relevant methods.


`groupByKey()` does not allow to set a partitioner (seem this is a gap 
we should close...) -- as a workaround you could add repartition() 
before the grouping to pass your custom partitioner.


For IQ, you should also need to pass your `StreamsPartitioner` to allow 
KS to fine the correct partition to query.


HTH.

-Matthias


On 5/13/24 4:35 PM, Dev Lover wrote:

Hi All,

I have a custom partitioner to distribute the data across partitions in my
cluster.

My setup looks like below
Version - 3.7.0
Kafka - 3 broker setup
Partition count - 10
Stream server pods - 2
Stream threads in each pod - 10
Deployed in Kubernetes
Custom partitioner on producer end.

I am doing a groupbykey . Is it correct to use it when I have custom
partitioner on producer end ?
I recently migrated to 3.7 from 3.5.1 . I am observing that partitions are
not evenly distributed across my 2 stream pods. Also my remote query is
failing with host being unavailable. But if I restart streams it works fine
for a certain time and again starts erroring out. Am I doing something
wrong?

Regards



Re: [VOTE] KIP-989: RocksDB Iterator Metrics

2024-05-14 Thread Matthias J. Sax

+1 (binding)

On 5/14/24 9:19 AM, Lucas Brutschy wrote:

Hi Nick!

Thanks for the KIP.

+1 (binding)

On Tue, May 14, 2024 at 5:16 PM Nick Telford  wrote:


Hi everyone,

I'd like to call a vote on the Kafka Streams KIP-989: RocksDB Iterator
Metrics:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+RocksDB+Iterator+Metrics

All of the points in the discussion thread have now been addressed.

Regards,

Nick


Re: [VOTE] KIP-1033: Add Kafka Streams exception handler for exceptions occurring during processing

2024-05-13 Thread Matthias J. Sax

+1 (binding)

On 5/13/24 5:54 PM, Sophie Blee-Goldman wrote:

Thanks for the KIP guys!

+1 (binding)

On Mon, May 13, 2024 at 6:02 AM Bill Bejeck  wrote:


Thanks for the KIP, this will be a great addition!

+1(binding)

-Bill

On Fri, May 3, 2024 at 4:48 AM Bruno Cadonna  wrote:


Hi Damien, Sébastien, and Loïc,

Thanks for the KIP!

+1 (binding)

Best,
Bruno


On 4/26/24 4:00 PM, Damien Gasparina wrote:

Hi all,

We would like to start a vote for KIP-1033: Add Kafka Streams
exception handler for exceptions occurring during processing

The KIP is available on




https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occurring+during+processing


If you have any suggestions or feedback, feel free to participate to
the discussion thread:
https://lists.apache.org/thread/1nhhsrogmmv15o7mk9nj4kvkb5k2bx9s

Best regards,
Damien Sebastien and Loic








[jira] [Commented] (KAFKA-16584) Make log processing summary configurable or debug

2024-05-09 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845111#comment-17845111
 ] 

Matthias J. Sax commented on KAFKA-16584:
-

I am on the PMC and can help you. :) 

After you wiki account was created, please share you wiki id and we can give 
you write access to the Kafka wiki space, so you can prepare a KIP.

The goal of this ticket is, to add a new config for the logging interval, so it 
should not be controversial. An example of another already approved KIP that 
also added a now config is 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-811%3A+Add+config+repartition.purge.interval.ms+to+Kafka+Streams]
 – This should help you to write your KIP for this ticket.

> Make log processing summary configurable or debug
> -
>
> Key: KAFKA-16584
> URL: https://issues.apache.org/jira/browse/KAFKA-16584
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Andras Hatvani
>Assignee: dujian
>Priority: Major
>  Labels: needs-kip, newbie
>
> Currently *every two minutes for every stream thread* statistics will be 
> logged on INFO log level. 
> {code}
> 2024-04-18T09:18:23.790+02:00  INFO 33178 --- [service] [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread         : stream-thread 
> [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 
> total records, ran 0 punctuators, and committed 0 total tasks since the last 
> update {code}
> This is absolutely unnecessary and even harmful since it fills the logs and 
> thus storage space with unwanted and useless data. Otherwise the INFO logs 
> are useful and helpful, therefore it's not an option to raise the log level 
> to WARN.
> Please make the logProcessingSummary 
> * either to a DEBUG level log or
> * make it configurable so that it can be disabled.
> This is the relevant code: 
> https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Creating kafka wiki id

2024-05-09 Thread Matthias J. Sax
Self-service to create an account is currently not working. Please reply 
on https://issues.apache.org/jira/browse/INFRA-25451 to request a wiki 
account.


I'll update the wiki page for now until the issue is resolved.

-Matthias

On 5/7/24 8:25 AM, 黃竣陽 wrote:

Hello, I want to create a KIP, but I don't have Kafka wiki id. I go to the
page (https://cwiki.apache.org/confluence/signup.action) but it doesn't
have a button to regist an account
Please help me to create an account, Thank you



Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-09 Thread Matthias J. Sax

Thanks Sophie! Makes it much clearer where you are coming from.

About the Type unsafety: isn't this also an issue for the 
`handleSerialziationException` case, because the handler is used for all 
sink topics, and thus key/value types are not really know w/o taking the 
sink topic into account? -- So I am not sure if having two handler 
methods really helps much with regard to type safety?


Just want to make this small comment for completeness. Let's hear what 
others think. Given that we both don't have a strong opinion but just a 
personal preference, we should be able to come to a conclusion quickly 
and get this KIP approved for 3.8 :)



-Matthias

On 5/8/24 3:12 PM, Sophie Blee-Goldman wrote:

Well I definitely don't feel super strongly about it, and more importantly,
I'm not a user. So I will happily defer to the preference of anyone who
will actually be using this feature. But  I'll explain my reasoning:

There *is* a relevant distinction between these two callbacks -- because
the passed-in record will have a different type depending on whether it was
a serialization exception or something else. Even if we combined them into
a single #handle method, users will still end up implementing two distinct
branches depending on whether it was a serialization exception or not,
since that determines the type of the ProducerRecord passed in.

Not to mention they'll need to cast it to a ProducerRecord
when we could have just passed it in as this type via a dedicated callback.
And note that because of the generics, they can't do an instanceof check to
make sure that the record type is ProducerRecord and will
have to suppress the "unchecked cast" warning.

So if we combined the two callbacks, their handler will look something like
this:

@SuppressWarnings("unchecked")
public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext
context,
final ProducerRecord record,
final Exception exception) {
if (exception instanceof SerializationException) {
if (exception.origin().equals(KEY)) {
log.error("Failed to serialize key", exception);
} else {
log.error("Failed to serialize value", exception);
}

} else {
final ProducerRecord serializedRecord = (ProducerRecord) record;
log.error("Failed to produce record with serialized key={} and serialized
value={}",
serializedRecord.key(), serializedRecord.value());
}
return ProductionExceptionHandlerResponse.FAIL;
}

That seems like the most basic case, and it still haswith distinct logic
even if they ultimately handle exceptions the same way. And looking forward
to KIP-1034: Dead-letter queues, it seems all the more likely that the
actual handling response might be different depending on whether it's a
serialization exception or not: a serialized record can probably be
retried/sent to a DLQ, whereas a record that can't be serialized should not
(can't, really) be forwarded to a DLQ. So if they're going to have
completely different implementations depending on whether it's a
serialization exception, why not just give them two separate callbacks?

And that's all assuming the user is perfectly aware of the different
exception types and their implications for the type of the ProducerRecord.
Many people might just miss the existence of the
RecordSerializationException altogether --
there are already so many different exception types, ESPECIALLY when it
comes to the Producer. Not to mention they'll need to understand the
nuances of how the ProducerRecord type changes depending on the type of
exception that's passed in. And on top of all that, they'll need to know
that there is metadata stored in the RecordSerializationException regarding
the origin of the error. Whereas if we just passed in the
SerializationExceptionOrigin to a #handlerSerialization callback, well,
that's pretty impossible to miss.

That all just seems like a lot for most people to have to understand to
implement a ProductionExceptionHandler, which imo is not at all an advanced
feature and should be as straightforward and easy to use as possible.

Lastly -- I don't think it's quite fair to compare this to the
RecordDeserializationException. We have a dedicated handler that's just for
deserialization exceptions specifically, hence there's no worry about users
having to be aware of the different exception types they might have to deal
with in the DeserializtionExceptionHandler. Whereas serialization
exceptions are just a subset of what might get passed in to the
ProductionExceptionHandler...

Just explaining my reasoning -- in the end I leave it up to the KIP authors
and anyone who will actually be using this feature in their applications :)



On Tue, May 7, 2024 at 8:35 PM Matthias J. Sax  wrote:


@Loic, yes, what you describe is exactly what I had in mind.



@Sophie, can you elaborate a little bit?


First of all, I agree that it makes se

Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-05-07 Thread Matthias J. Sax

Thank you all for the feedback!

Addressing the main concern: The KIP is about giving the

user

the

ability

to handle producer exceptions, but to be more conservative

and

avoid

future

issues, we decided to be limited to a short list of

exceptions.

I

included

*RecordTooLargeExceptin* and

*UnknownTopicOrPartitionException.

*Open

to

suggestion for adding some more ;-)

KIP Updates:
- clarified the way that the user should configure the

Producer

to

use

the

custom handler. I think adding a producer config property

is

the

cleanest

one.
- changed the *ClientExceptionHandler* to

*ProducerExceptionHandler*

to

be

closer to what we are changing.
- added the ProducerRecord as the input parameter of the

handle()

method

as

well.
- increased the response types to 3 to have fail and two

types

of

continue.

- The default behaviour is having no custom handler,

having

the

corresponding config parameter set to null. Therefore, the

KIP

provides

no

default implementation of the interface.
- We follow the interface solution as described in the
Rejected Alternetives section.


Cheers,
Alieh


On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax <

mj...@apache.org



wrote:



Thanks for the KIP Alieh! It addresses an important case

for

error

handling.

I agree that using this handler would be an expert API,

as

mentioned

by

a few people. But I don't think it would be a reason to

not

add

it.

It's

always a tricky tradeoff what to expose to users and to

avoid

foot

guns,

but we added similar handlers to Kafka Streams, and have

good

experience

with it. Hence, I understand, but don't share the concern

raised.


I also agree that there is some responsibility by the

user

to

understand

how such a handler should be implemented to not drop data

by

accident.

But it seem unavoidable and acceptable.

While I understand that a "simpler / reduced" API (eg via

configs)

might

also work, I personally prefer a full handler. Configs

have

the

same

issue that they could be miss-used potentially leading to

incorrectly

dropped data, but at the same time are less flexible (and

thus

maybe

ever harder to use correctly...?). Base on my experience,

there

is

also

often weird corner case for which it make sense to also

drop

records

for

other exceptions, and a full handler has the advantage of

full

flexibility and "absolute power!".

To be fair: I don't know the exact code paths of the

producer

in

details, so please keep me honest. But my understanding

is,

that

the

KIP

aims to allow users to react to internal exception, and

decide

to

keep

retrying internally, swallow the error and drop the

record,

or

raise

the

error?

Maybe the KIP would need to be a little bit more precises

what

error

we

want to cover -- I don't think this list must be

exhaustive,

as

we

can

always do follow up KIP to also apply the handler to

other

errors

to

expand the scope of the handler. The KIP does mention

examples,

but

it

might be good to explicitly state for what cases the

handler

gets

applied?


I am also not sure if CONTINUE and FAIL are enough

options?

Don't

we

need three options? Or would `CONTINUE` have different

meaning

depending

on the type of error? Ie, for a retryable error

`CONTINUE`

would

mean

keep retrying internally, but for a non-retryable error

`CONTINUE`

means

swallow the error and drop the record? This semantic

overload

seems

tricky to reason about by users, so it might better to

split

`CONTINUE`

into two cases -> `RETRY` and `SWALLOW` (or some better

names).


Additionally, should we just ship a

`DefaultClientExceptionHandler`

which would return `FAIL`, for backward compatibility. Or

don't

have

any

default handler to begin with and allow it to be `null`?

I

don't

see

the

need for a specific `TransactionExceptionHandler`. To me,

the

goal

should be to not modify the default behavior at all, but

to

just

allow

users to change the default behavior if there is a need.

What is missing on the KIP though it, how the handler is

passed

into

the

producer thought? Would we need a new config which allows

to

set

a

custom handler? And/or would we allow to pass in an

instance

via

the

constructor or add a new method to set a handler?


-Matthias

On 4/18/24 10:02 AM, Andrew Schofield wrote:

Hi Alieh,
Thanks for the KIP.

Exception handling in the Kafka producer and consumer is

really

not

ideal.

It’s even harder working out what’s going on with the

consumer.


I’m a bit nervous about this KIP and I agree with Chris

that

it

could

do

with additional

motivation. This would be an expert-level interface

given

how

complicated

the exception handling for Kafka has become.

7. The application is not really aware of the batching

being

done

on

its

behalf.

The ProduceResponse can actually return an array of

records

which

failed

per batch. If

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-07 Thread Matthias J. Sax

@Loic, yes, what you describe is exactly what I had in mind.



@Sophie, can you elaborate a little bit?


First of all, I agree that it makes sense to maintain the two separate
callbacks for the ProductionExceptionHandler, since one of them is
specifically for serialization exceptions while the other is used for
everything/anything else.


What makes a serialization exception special compare to other errors 
that it's valuable to treat it differently? Why can we put "everything 
else" into a single bucket? By your train of though, should we not split 
out the "everything else" bucket into a different callback method for 
every different error? If no, why not, but only for serialization errors?


From what I believe to remember, historically, we added the 
ProductionExceptionHandler, and kinda just missed the serialization 
error case. And later, when we extended the handler we just could not 
re-use the existing callback as it was typed with `` and 
it would have been an incompatible change; so it was rather a workaround 
to add the second method to then handler, but not really intended design?



It's of course only my personal opinion that I believe a single callback 
method is simpler/cleaner compared to sticking with two, and adding the 
new exception type to make it backward compatible seems worth it. It 
also kinda introduces the same patter we use elsewhere (cf KIP-1036) 
what I actually think is an argument for introducing 
`RercordSerializationExcetpion`, to unify user experience across the board.


Would be great to hear from others about this point. It's not that I 
strongly object to having two methods, and I would not block this KIP on 
this question.




-Matthias


On 5/7/24 3:40 PM, Sophie Blee-Goldman wrote:

First of all, I agree that it makes sense to maintain the two separate
callbacks for the ProductionExceptionHandler, since one of them is
specifically for serialization exceptions while the other is used for
everything/anything else. I also think we can take advantage of this fact
to simplify things a bit and cut down on the amount of new stuff added to
the API by just adding a parameter to the #handleSerializationException
callback and use that to pass in the SerializationExceptionOrigin enum to
distinguish between key vs value. This way we wouldn't need to introduce
yet another exception type (the RecordSerializationException) just to pass
in this information.

Thoughts?

On Tue, May 7, 2024 at 8:33 AM Loic Greffier 
wrote:


Hi Matthias,

To sum up with the ProductionExceptionHandler callback methods (106)
proposed changes.

A new method ProductionExceptionHandler#handle is added with the following
signature:


ProductionExceptionHandlerResponse handle(final ErrorHandlerContext

context, final ProducerRecord record, final Exception exception);

The ProducerRecord parameter has changed to accept a serialized or
non-serialized record.
Thus, the new ProductionExceptionHandler#handle method can handle both
production exception and serialization exception.

Both old ProductionExceptionHandler#handle and
ProductionExceptionHandler#handleSerializationException methods are now
deprecated.
The old ProductionExceptionHandler#handle method gets a default
implementation, so users do not have to implement a deprecated method.

To handle backward compatibility, the new
ProductionExceptionHandler#handle method gets a default implementation.


default ProductionExceptionHandlerResponse handle(final

ErrorHandlerContext context, final ProducerRecord record, final
Exception exception) {

   if (exception instanceof RecordSerializationException) {
   this.handleSerializationException(record, exception.getCause());
   }

   return handle((ProducerRecord) record, exception);
}


The default implementation either invokes #handleSerializationException or
#handle depending on the type of the exception, thus users still relying on
deprecated ProductionExceptionHandler#handle
or ProductionExceptionHandler#handleSerializationException custom
implementations won't break.

The new ProductionExceptionHandler#handle method is now invoked in case of
serialization exception:


public  void send(final String topic, final K key, final V value,

...) {

 try {
 keyBytes = keySerializer.serialize(topic, headers, key);
 ...
 } catch (final ClassCastException exception) {
   ...
 } catch (final Exception exception) {

 try {
 response = productionExceptionHandler.handle(context,

record, new RecordSerializationException(SerializationExceptionOrigin.KEY,
exception));

 } catch (final Exception e) {
 ...
 }
 }
}


To wrap the origin serialization exception and determine whether it comes
from the key or the value, a new exception class is created:


public class RecordSerializationException extends SerializationException

{

 public enum SerializationExceptionOrigin {
 KEY,
 VALUE
 }

 public RecordSerializatio

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-03 Thread Matthias J. Sax
That's good questions... I could think of a few approaches, but I admit 
it might all be a little bit tricky to code up...


However if we don't solve this problem, I think this KIP does not really 
solve the core issue we are facing? In the end, if we rely on the 
`.checkpoint` file to compute a task assignment, but the `.checkpoint` 
file can be arbitrary stale after a crash because we only write it on a 
clean close, there would be still a huge gap that this KIP does not close?


For the case in which we keep the checkpoint file, this KIP would still 
help for "soft errors" in which KS can recover, and roll back the store. 
A significant win for sure. -- But hard crashes would still be an 
problem? We might assign tasks to "wrong" instance, ie, which are not 
most up to date, as the checkpoint information could be very outdated? 
Would we end up with a half-baked solution? Would this be good enough to 
justify the introduced complexity? In the, for soft failures it's still 
a win. Just want to make sure we understand the limitations and make an 
educated decision.


Or do I miss something?


-Matthias

On 5/3/24 10:20 AM, Bruno Cadonna wrote:

Hi Matthias,


200:
I like the idea in general. However, it is not clear to me how the 
behavior should be with multiple stream threads in the same Kafka 
Streams client. What stream thread opens which store? How can a stream 
thread pass an open store to another stream thread that got the 
corresponding task assigned? How does a stream thread know that a task 
was not assigned to any of the stream threads of the Kafka Streams 
client? I have the feeling we should just keep the .checkpoint file on 
close for now to unblock this KIP and try to find a solution to get 
totally rid of it later.



Best,
Bruno



On 5/3/24 6:29 PM, Matthias J. Sax wrote:
101: Yes, but what I am saying is, that we don't need to flush the 
.position file to disk periodically, but only maintain it in main 
memory, and only write it to disk on close() to preserve it across 
restarts. This way, it would never be ahead, but might only lag? But 
with my better understanding about (102) it might be mood anyway...



102: Thanks for clarifying. Looked into the code now. Makes sense. 
Might be something to be worth calling out explicitly in the KIP 
writeup. -- Now that I realize that the position is tracked inside the 
store (not outside as the changelog offsets) it makes much more sense 
to pull position into RocksDB itself. In the end, it's actually a 
"store implementation" detail how it tracks the position (and kinda 
leaky abstraction currently, that we re-use the checkpoint file 
mechanism to track it and flush to disk).



200: I was thinking about this a little bit more, and maybe it's not 
too bad? When KS starts up, we could upon all stores we find on local 
disk pro-actively, and keep them all open until the first rebalance 
finishes: For tasks we get assigned, we hand in the already opened 
store (this would amortize the cost to open the store before the 
rebalance) and for non-assigned tasks, we know the offset information 
won't change and we could just cache it in-memory for later reuse (ie, 
next rebalance) and close the store to free up resources? -- Assuming 
that we would get a large percentage of opened stores assigned as 
tasks anyway, this could work?



-Matthias

On 5/3/24 1:29 AM, Bruno Cadonna wrote:

Hi Matthias,


101:
Let's assume a RocksDB store, but I think the following might be true 
also for other store implementations. With this KIP, if Kafka Streams 
commits the offsets, the committed offsets will be stored in an 
in-memory data structure (i.e. the memtable) and stay there until 
RocksDB decides that it is time to persist its in-memory data 
structure. If Kafka Streams writes its position to the .position file 
during a commit and a crash happens before RocksDB persist the 
memtable then the position in the .position file is ahead of the 
persisted offset. If IQ is done between the crash and the state store 
fully restored the changelog, the position might tell IQ that the 
state store is more up-to-date than it actually is.
In contrast, if Kafka Streams handles persisting positions the same 
as persisting offset, the position should always be consistent with 
the offset, because they are persisted together.



102:
I am confused about your confusion which tells me that we are talking 
about two different things.

You asked

"Do you intent to add this information [i.e. position] to the map 
passed via commit(final Map changelogOffsets)?"


and with what I wrote I meant that we do not need to pass the 
position into the implementation of the StateStore interface since 
the position is updated within the implementation of the StateStore 
interface (e.g. RocksDBStore [1]). My statement describes the 
behavior now, not the change proposed in this KIP, so it does not 
contradi

Re: Failed to initialize processor KSTREAM-AGGREGATE-0000000001

2024-05-03 Thread Matthias J. Sax

Can you file a ticket for it: https://issues.apache.org/jira/browse/KAFKA



On 5/3/24 3:34 AM, Penumarthi Durga Prasad Chowdary wrote:

Kafka versions 3.5.1 and 3.7.0, we're still encountering persistent issues.
The Kafka Streams library is aligned with these Kafka versions. Upon
analysis of the logs, it seems that the problem may occur when a Kafka node
disconnects from Kafka Streams processes. This suspicion is supported by
the abundance of network messages indicating disconnections, such as


  org.apache.kafka.clients.NetworkClient
ThreadName: 
kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9
   Message: [Consumer
clientId=kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9-consumer,
groupId=kafka-streams-exec-0-test-store ] Node 102 disconnected.




On Mon, Apr 22, 2024 at 7:16 AM Matthias J. Sax  wrote:


Not sure either, but it sounds like a bug to me. Can you reproduce this
reliably? What version are you using?

It would be best if you could file a Jira ticket and we can take it from
there.


-Matthias

On 4/21/24 5:38 PM, Penumarthi Durga Prasad Chowdary wrote:

Hi ,
I have an issue in kafka-streams while constructing kafka-streams state
store  windows(TimeWindow and SessionWindow). While kafka-streams
processing data sometimes intermittent kafka-streams process throwing

below

error
ThreadName:


kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9

TraceID: unknown  CorelationID: eff36722-1430-4ffb-bf2e-c6e6cf6ae164
   Message: stream-client [ kafka-streams-exec-0-test-store
-6d676cf0-3910-4c25-bfad-ea2b98953db3] Replacing thread in the streams
uncaught exception handler
org.apache.kafka.streams.errors.StreamsException: failed to initialize
processor KSTREAM-AGGREGATE-01
at


org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:115)

at


org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:986)

at


org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:271)

at


org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:716)

at


org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:901)

at


org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:778)

at


org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)

at


org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)

   Caused by: java.lang.NullPointerException
at


org.apache.kafka.streams.kstream.internals.TimestampedTupleForwarder.(TimestampedTupleForwarder.java:46)

at


org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.init(KStreamSessionWindowAggregate.java:138)

at


org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:107)

... 7 more
Here my understanding is state-store is null and at that time
stateStore.flush() gets invoked to send the data to stateStore, this

leads

to the above error. This error can be caught inside kafka-streams
setUncaughtExceptionHandler.
streams.setUncaughtExceptionHandler(throwable -> {
LOGGER.error("Exception in streams", throwable);
return


StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;

});
I'm uncertain about the exact reason for this issue. Everything seems to

be

in order, including the Kafka cluster, and there are no errors in the

Kafka

Streams except for a few logs indicating node disconnections.
Is there a better way to handle this error?
When can this issue happen ?
I would like to express my gratitude in advance for any assistance

provided.






Re: [DISCUSS] KIP-1027 Add MockFixedKeyProcessorContext

2024-05-03 Thread Matthias J. Sax

Please also update the KIP.

To get a wiki account created, please request it via a commet on this 
ticket: https://issues.apache.org/jira/browse/INFRA-25451


After you have the account, please share your wiki id, and we can give 
you write permission on the wiki.




-Matthias

On 5/3/24 6:30 AM, Shashwat Pandey wrote:

Hi Matthias,

Sorry this fell out of my radar for a bit.

Revisiting the topic, I think you’re right and we accept the duplicated
nesting as an appropriate solution to not affect the larger public API.

I can update my PR with the change.

Regards,
Shashwat Pandey


On Wed, May 1, 2024 at 11:00 PM Matthias J. Sax  wrote:


Any updates on this KIP?

On 3/28/24 4:11 AM, Matthias J. Sax wrote:

It seems that `MockRecordMetadata` is a private class, and thus not part
of the public API. If there are any changes required, we don't need to
discuss on the KIP.


For `CapturedPunctuator` and `CapturedForward` it's a little bit more
tricky. My gut feeling is, that the classes might not need to be
changed, but if we use them within `MockProcessorContext` and
`MockFixedKeyProcessorContext` it might be weird to keep the current
nesting... The problem I see is, that it's not straightforward how to
move the classes w/o breaking compatibility, nor if we duplicate them as
standalone classes w/o a larger "splash radius". (We would need to add
new overloads for MockProcessorContext#scheduledPunctuators() and
MockProcessorContext#forwarded()).

Might be good to hear from others if we think it's worth this larger
changes to get rid of the nesting, or just accept the somewhat not ideal
nesting as it technically is not a real issue?


-Matthias


On 3/15/24 1:47 AM, Shashwat Pandey wrote:

Thanks for the feedback Matthias!

The reason I proposed the extension of MockProcessorContext was more
to do
with the internals of the class (MockRecordMetadata,
CapturedPunctuator and
CapturedForward).

However, I do see your point, I would then think to split
MockProcessorContext and MockFixedKeyProcessorContext, some of the
internal
classes should also be extracted i.e. MockRecordMetadata,
CapturedPunctuator and probably a new CapturedFixedKeyForward.

Let me know what you think!


Regards,
Shashwat Pandey


On Mon, Mar 11, 2024 at 10:09 PM Matthias J. Sax 
wrote:


Thanks for the KIP Shashwat. Closing this testing gap is great! It did
come up a few time already...

One question: why do you propose to `extend MockProcessorContext`?

Given how the actual runtime context classes are setup, it seems that
the regular context and fixed-key-context are distinct, and thus I
believe both mock-context classes should be distinct, too?

What I mean is that FixedKeyProcessorContext does not extend
ProcessorContext. Both classes have a common parent ProcessINGContext
(note the very similar but different names), but they are "siblings"
only, so why make the mock processor a parent-child relationship?

It seems better to do

public class MockFixedKeyProcessorContext
 implements FixedKeyProcessorContext,
RecordCollector.Supplier


Of course, if there is code we can share between both mock-context we
should so this, but it should not leak into the public API?


-Matthias



On 3/11/24 5:21 PM, Shashwat Pandey wrote:

Hi everyone,

I would like to start the discussion on




https://cwiki.apache.org/confluence/display/KAFKA/KIP-1027%3A+Add+MockFixedKeyProcessorContext


This adds MockFixedKeyProcessorContext to the Kafka Streams Test Utils
library.

Regards,
Shashwat Pandey











Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-05-03 Thread Matthias J. Sax
117f: Good point by Bruno. We should check for this, and could have an 
additional `INVALID_STANDBY_TASK` error code?



-Matthias

On 5/3/24 5:52 AM, Guozhang Wang wrote:

Hi Sophie,

Re: As for the return type of the TaskAssignmentUtils, I think that
makes sense. LGTM.

On Fri, May 3, 2024 at 2:26 AM Bruno Cadonna  wrote:


Hi Sophie,

117f:
I think, removing the STATEFUL and STATELESS types is not enough to
avoid the error Guozhang mentioned. The StreamsPartitionAssignor passes
the information whether a task is stateless or stateful into the task
assignor. However, the task assignor can return a standby task for a
stateless task which is inconsistent.

Echoing Matthias' statement about the missing UNKNOWN_TASK_ID error.

nit:
The titles of some code blocks in the KIP are not consistent with their
content, e.g., KafkaStreamsState <-> NodeState


Best,
Bruno

On 5/3/24 2:43 AM, Matthias J. Sax wrote:

Thanks Sophie. My bad. You are of course right about `TaskAssignment`
and the StreamsPartitionAssignor's responsibitliy to map tasks of a
instance to consumers. When I wrote my reply, I forgot about this detail.

Seems you did not add `UNKNOWN_TASK_ID` error yet as proposed by Guozhang?

Otherwise LGTM.


-Matthias

On 5/2/24 4:20 PM, Sophie Blee-Goldman wrote:

Guozhang:

117. All three additions make sense to me. However, while thinking about
how users would actually produce an assignment, I realized that it seems
silly to make it their responsibility to distinguish between a stateless
and stateful task when they return the assignment. The
StreamsPartitionAssignor already knows which tasks are stateful vs
stateless, so there's no need to add this extra step for users to
figure it
out themselves, and potentially make a mistake.

117f: So, rather than add a new error type for "inconsistent task types",
I'm proposing to just flatten the AssignedTask.Type enum to only "ACTIVE"
and "STANDBY", and remove the "STATEFUL" and "STATELESS" types
altogether.
Any objections?

-

-Thanks, fixed the indentation of headers under "User APIs" and
"Read-Only
APIs"

-As for the return type of the TaskAssignmentUtils methods, I don't
personally feel too strongly about this, but the reason for the return
type
being a Map rather than a
TaskAssignment
is because they are meant to be used iteratively/to create a part of the
full assignment, and not necessarily a full assignment for each. Notice
that they all have an input parameter of the same type: Map. The idea is you can take the output of any of
these and pass it in to another to generate or optimize another piece of
the overall assignment. For example, if you want to perform the
rack-aware
optimization on both active and standby tasks, you would need to call
#optimizeRackAwareActiveTasks and then forward the output to
#optimizeRackAwareStandbyTasks to get the final assignment. If we
return a
TaskAssignment, it will usually need to be unwrapped right away. Perhaps
more importantly, I worry that returning a TaskAssignment will make it
seem
like each of these utility methods return a "full" and final assignment
that can just be returned as-is from the TaskAssignor's #assign method.
Whereas they are each just a single step in the full assignment process,
and not the final product. Does that make sense?

On Thu, May 2, 2024 at 3:50 PM Sophie Blee-Goldman

wrote:


Matthias:

Thanks for the naming suggestions for the error codes. I was
definitely not happy with my original naming but couldn't think of
anything
better.  I like your proposals though, will update the KIP names.
I'll also
add a "NONE" option as well -- much better than just passing in null
for no
error.


OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the
same active task


   Would also be an error if assigned to two consumers of the same
client...

Needs to be rephrased.



Well the TaskAssignor only assigns tasks to KafkaStreams clients,
it's not
responsible for the assignment of tasks to consumers within a
KafkaStreams.
It would be a bug in the StreamsPartitionAssignor if it received a valid
assignment from the TaskAssignor with only one copy of a task
assigned to a
single KAfkaStreams client, and then somehow ended up assigning that
task
to multiple consumers on the KafkaStreams client. It wouldn't be the
TaskAssignor's fault so imo it would not make sense to include this
case in
the OVERLAPPING_CLIENT error (or as it's now called, ACTIVE_TASK_
ASSIGNED_MULTIPLE_TIMES).  Not to mention, if there was a bug that
caused
the StreamsPartitionAssignor to assign a task to multiple consumers, it
presumably wouldn't even notice since it's a bug -- if it did notice, it
can just fix the issue. The error codes are about communicating
unfixable
issues due to the TaskAssignor itself returning an invalid
assignment. The
phrasing is inte

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-03 Thread Matthias J. Sax
101: Yes, but what I am saying is, that we don't need to flush the 
.position file to disk periodically, but only maintain it in main 
memory, and only write it to disk on close() to preserve it across 
restarts. This way, it would never be ahead, but might only lag? But 
with my better understanding about (102) it might be mood anyway...



102: Thanks for clarifying. Looked into the code now. Makes sense. Might 
be something to be worth calling out explicitly in the KIP writeup. -- 
Now that I realize that the position is tracked inside the store (not 
outside as the changelog offsets) it makes much more sense to pull 
position into RocksDB itself. In the end, it's actually a "store 
implementation" detail how it tracks the position (and kinda leaky 
abstraction currently, that we re-use the checkpoint file mechanism to 
track it and flush to disk).



200: I was thinking about this a little bit more, and maybe it's not too 
bad? When KS starts up, we could upon all stores we find on local disk 
pro-actively, and keep them all open until the first rebalance finishes: 
For tasks we get assigned, we hand in the already opened store (this 
would amortize the cost to open the store before the rebalance) and for 
non-assigned tasks, we know the offset information won't change and we 
could just cache it in-memory for later reuse (ie, next rebalance) and 
close the store to free up resources? -- Assuming that we would get a 
large percentage of opened stores assigned as tasks anyway, this could work?



-Matthias

On 5/3/24 1:29 AM, Bruno Cadonna wrote:

Hi Matthias,


101:
Let's assume a RocksDB store, but I think the following might be true 
also for other store implementations. With this KIP, if Kafka Streams 
commits the offsets, the committed offsets will be stored in an 
in-memory data structure (i.e. the memtable) and stay there until 
RocksDB decides that it is time to persist its in-memory data structure. 
If Kafka Streams writes its position to the .position file during a 
commit and a crash happens before RocksDB persist the memtable then the 
position in the .position file is ahead of the persisted offset. If IQ 
is done between the crash and the state store fully restored the 
changelog, the position might tell IQ that the state store is more 
up-to-date than it actually is.
In contrast, if Kafka Streams handles persisting positions the same as 
persisting offset, the position should always be consistent with the 
offset, because they are persisted together.



102:
I am confused about your confusion which tells me that we are talking 
about two different things.

You asked

"Do you intent to add this information [i.e. position] to the map passed 
via commit(final Map changelogOffsets)?"


and with what I wrote I meant that we do not need to pass the position 
into the implementation of the StateStore interface since the position 
is updated within the implementation of the StateStore interface (e.g. 
RocksDBStore [1]). My statement describes the behavior now, not the 
change proposed in this KIP, so it does not contradict what is stated in 
the KIP.



200:
This is about Matthias' main concern about rebalance metadata.
As far as I understand the KIP, Kafka Streams will only use the 
.checkpoint files to compute the task lag for unassigned tasks whose 
state is locally available. For assigned tasks, it will use the offsets 
managed by the open state store.


Best,
Bruno

[1] 
https://github.com/apache/kafka/blob/fcbfd3412eb746a0c81374eb55ad0f73de6b1e71/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L397


On 5/1/24 3:00 AM, Matthias J. Sax wrote:

Thanks Bruno.



101: I think I understand this better now. But just want to make sure 
I do. What do you mean by "they can diverge" and "Recovering after a 
failure might load inconsistent offsets and positions."


The checkpoint is the offset from the changelog, while the position is 
the offset from the upstream source topic, right? -- In the end, the 
position is about IQ, and if we fail to update it, it only means that 
there is some gap when we might not be able to query a standby task, 
because we think it's not up-to-date enough even if it is, which would 
resolve itself soon? Ie, the position might "lag", but it's not 
"inconsistent". Do we believe that this lag would be highly problematic?




102: I am confused.

The position is maintained inside the state store, but is persisted 
in the .position file when the state store closes. 


This contradicts the KIP:

 these position offsets will be stored in RocksDB, in the same column 
family as the changelog offsets, instead of the .position file




My main concern is currently about rebalance metadata -- opening 
RocksDB stores seems to be very expensive, but if we follow the KIP:


We will do this under EOS by updating the .checkpoint file whenever a 
store is c

Re: [VOTE] KIP-1036: Extend RecordDeserializationException exception

2024-05-03 Thread Matthias J. Sax

+1 (binding)

On 5/3/24 8:52 AM, Federico Valeri wrote:

Hi Fred, this is a useful addition.

+1 non binding

Thanks

On Fri, May 3, 2024 at 4:11 PM Andrew Schofield
 wrote:


Hi Fred,
Thanks for the KIP. It’s turned out nice and elegant I think. Definitely a 
worthwhile improvement.

+1 (non-binding)

Thanks,
Andrew


On 30 Apr 2024, at 14:02, Frédérik Rouleau  
wrote:

Hi all,

As there is no more activity for a while on the discuss thread, I think we
can start a vote.
The KIP is available on
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1036%3A+Extend+RecordDeserializationException+exception


If you have some feedback or suggestions, please participate to the
discussion thread:
https://lists.apache.org/thread/or85okygtfywvnsfd37kwykkq5jq7fy5

Best regards,
Fred




Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-03 Thread Matthias J. Sax
What about (106) to unify both exiting callback methods of 
`ProductionExceptionHandler` into a single one, instead of adding two 
new ones?


Damien's last reply about it was:


I will think about unifying, I do agree it would be cleaner.


There was not follow up on this question, and the KIP right now still 
proposes to add two new methods, which I believe we could (should?) 
unify to:


default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, 
  final ProducerRecord record,

  final Exception exception) {


Ie, we drop the generics `` on `ProducerRecord` what 
allows you to also pass in a non-serialized ProducerRecord of any type 
for the serialization error case.


Btw: wondering if we also want to pass in a flag/enum about key vs value 
serialization error similar to what was proposed in KIP-1036? The only 
"oddity" would be, that we call the handler other error cases, too, not 
just for serialization exceptions. But we wculd tackle this by 
introducing a new class `RecordSerializationException` which would 
include the flag and would ensure that KS hands this exception into the 
handler. This would keep the handler interface/method itself clean.



-Matthias




On 5/3/24 2:15 AM, Loic Greffier wrote:

Hi Bruno,

Good catch, KIP has been updated accordingly.

Regards,
Loïc


Re: [VOTE] KIP-924: customizable task assignment for Streams

2024-05-02 Thread Matthias J. Sax

I left one more nit on the discuss thread. But overall LGTM.

+1 (binding)

Thanks Rohan and Sophie for driving this KIP.


-Matthias

On 4/29/24 2:07 PM, Sophie Blee-Goldman wrote:

+1 (binding)

thanks for driving this KIP!

On Tue, Apr 16, 2024 at 1:46 PM Rohan Desai  wrote:



https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams

As this KIP has been open for a while, and gone through a couple rounds of
review/revision, I'm calling a vote to get it approved.





Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-05-02 Thread Matthias J. Sax
 folks would come up with :)


If any of these errors are detected, the StreamsPartitionAssignor will

immediately "fail" the rebalance and retry it by scheduling an immediate
followup rebalance.

I'm also a bit concerned here, as such endless retry loops have
happened in the past in my memory. Given that we would likely see most
of the user implementations be deterministic, I'm also leaning towards
failing the app immediately and let the crowd educates us if there are
some very interesting scenarios out there that are not on our radar to
re-consider this, rather than getting hard to debug cases in the dark.

-

And here are just some nits about the KIP writings itself:

* I think some bullet points under `User APIs` and `Read-only APIs`
should have a lower level indention? It caught me for a sec until I
realized there are just two categories.

* In TaskAssignmentUtils , why not let those util functions return
`TaskAssignment` (to me it feels more consistent with the user APIs),
but instead return a Map?


Guozhang

On Tue, Apr 30, 2024 at 5:28 PM Matthias J. Sax  wrote:


I like the idea of error codes. Not sure if the name are ideal?
UNKNOWN_PROCESS_ID makes sense, but the other two seems a little bit
difficult to understand?

Should we be very descriptive (and also try to avoid coupling it to the
threading model -- important for the first error code):
   - ACTIVE_TASK_ ASSIGNED_MULTIPLE_TIMES
   - ACTIVE_AND_STANDBY_ASSIGNED_TO_SAME_CLIENT (or _INSTANCE

I think we also need to add NONE as option or make the error parameter
an `Optional`?



OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the

same active task


Would also be an error if assigned to two consumers of the same
client... Needs to be rephrased.




If any of these errors are detected, the StreamsPartitionAssignor

will immediately "fail" the rebalance and retry it by scheduling an
immediate followup rebalance.


Does this make sense? If we assume that the task-assignment is
deterministic, we would end up with an infinite retry loop? Also,
assuming that an client leave the group, we cannot assign some task any
longer... I would rather throw a StreamsException and let the client

crash.




-Matthias

On 4/30/24 12:22 PM, Sophie Blee-Goldman wrote:

One last thing: I added an error code enum to be returned from the
#onAssignmentComputed method in case of an invalid assignment. I

created

one code for each of the invalid cases we described above. The

downside is

that this means we'll have to go through a deprecation cycle if we

want to

loosen up the restrictions on any of the enforced cases. The upside

is that

we can very clearly mark what is an invalid assignment and this will
(hopefully) assist users who are new to customizing assignments by

clearly

denoting the requirements, and returning a clear error if they are not
followed.

Of course the StreamsPartitionAssignor will also do a "fallback &

retry" in

this case by returning the same assignment to the consumers and

scheduling

a followup rebalance. I've added all of this to the TaskAssignor  and
#onAssignmentComputed javadocs, and added a section under "Public

Changes"

as well.

Please let me know if there are any concerns, or if you have

suggestions

for how else we can handle an invalid assignment

On Tue, Apr 30, 2024 at 11:39 AM Sophie Blee-Goldman <

sop...@responsive.dev>

wrote:


Thanks guys! I agree with what Lucas said about 117c, we can always

loosen

a restriction later and I don't want to do anything now that might

get in

the way of the new threading models.

With that I think we're all in agreement on 117. I'll update the KIP

to

include what we've discussed

(and will fix the remaining #finalAssignment mention as well, thanks
Bruno. Glad to have such good proof readers! :P)

On Tue, Apr 30, 2024 at 8:35 AM Bruno Cadonna 

wrote:



Hi again,

I forgot to ask whether you could add the agreement about handling
invalid assignment to the KIP.

Best,
Bruno

On 4/30/24 2:00 PM, Bruno Cadonna wrote:

Hi all,

I think we are converging!

117
a) fail: Since it is an invalid consumer assignment
b) pass: I agree that not assigning a task might be reasonable in

some

situations
c) fail: For the reasons Lucas pointed out. I am missing a good use

case

here.
d) fail: It is invalid


Somewhere in the KIP you still use finalAssignment() instead of the
wonderful method name onAssignmentComputed() ;-)
"... interface also includes a method named finalAssignment which

is

called with the final computed GroupAssignment ..."


Best,
Bruno


On 4/30/24 1:04 PM, Lucas Brutschy wrote:

Hi,

Looks like a great KIP to me!

I'm late, so I'm only going to comment on the last open point

117. I'm

against any fallbacks like "use the default assignor if the custom
assignment is invalid", as it's just going to hide bugs. For the 4
cases m

[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and window.inner.serde.class in StreamsConfig

2024-05-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Component/s: clients

> Deprecate window.size.ms and window.inner.serde.class in StreamsConfig
> --
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: KIP
>
> {{window.size.ms}} and {{window.inner.serde.class}} are not a true 
> KafkaStreams config, and are ignored when set from a KStreams application. 
> Both belong on the client.
> KIP-1020: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and window.inner.serde.class in StreamsConfig

2024-05-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Description: 
{{window.size.ms}} and {{window.inner.serde.class}} are not a true KafkaStreams 
config, and are ignored when set from a KStreams application. Both belong on 
the client.

KIP-1020: 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]

 

  was:
{{indwindow.size.ms}} and `is not a true KafkaStreams config, and results in an 
error when set from a KStreams application. It belongs on the client.

KIP-1020: 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]

 


> Deprecate window.size.ms and window.inner.serde.class in StreamsConfig
> --
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: KIP
>
> {{window.size.ms}} and {{window.inner.serde.class}} are not a true 
> KafkaStreams config, and are ignored when set from a KStreams application. 
> Both belong on the client.
> KIP-1020: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and window.inner.serde.class in StreamsConfig

2024-05-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Description: 
{{indwindow.size.ms}} and `is not a true KafkaStreams config, and results in an 
error when set from a KStreams application. It belongs on the client.

KIP-1020: 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]

 

  was:
{{window.size.ms}} and `is not a true KafkaStreams config, and results in an 
error when set from a KStreams application. It belongs on the client.

KIP-1020: 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]

 


> Deprecate window.size.ms and window.inner.serde.class in StreamsConfig
> --
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: KIP
>
> {{indwindow.size.ms}} and `is not a true KafkaStreams config, and results in 
> an error when set from a KStreams application. It belongs on the client.
> KIP-1020: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and inner.serde.class in StreamsConfig

2024-05-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Description: 
{{window.size.ms}} and `is not a true KafkaStreams config, and results in an 
error when set from a KStreams application. It belongs on the client.

KIP-1020: 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]

 

  was:
{{window.size.ms}}  is not a true KafkaStreams config, and results in an error 
when set from a KStreams application. It belongs on the client.

[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]

 


> Deprecate window.size.ms and inner.serde.class in StreamsConfig
> ---
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: KIP
>
> {{window.size.ms}} and `is not a true KafkaStreams config, and results in an 
> error when set from a KStreams application. It belongs on the client.
> KIP-1020: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and window.inner.serde.class in StreamsConfig

2024-05-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Summary: Deprecate window.size.ms and window.inner.serde.class in 
StreamsConfig  (was: Deprecate window.size.ms and inner.serde.class in 
StreamsConfig)

> Deprecate window.size.ms and window.inner.serde.class in StreamsConfig
> --
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: KIP
>
> {{window.size.ms}} and `is not a true KafkaStreams config, and results in an 
> error when set from a KStreams application. It belongs on the client.
> KIP-1020: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and inner.serde.class in StreamsConfig

2024-05-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Summary: Deprecate window.size.ms and inner.serde.class in StreamsConfig  
(was: Deprecate window.size.ms in StreamsConfig)

> Deprecate window.size.ms and inner.serde.class in StreamsConfig
> ---
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: KIP
>
> {{window.size.ms}}  is not a true KafkaStreams config, and results in an 
> error when set from a KStreams application. It belongs on the client.
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms in StreamsConfig

2024-05-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Description: 
{{window.size.ms}}  is not a true KafkaStreams config, and results in an error 
when set from a KStreams application. It belongs on the client.

[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]

 

  was:{{window.size.ms}}  is not a true KafkaStreams config, and results in an 
error when set from a KStreams application. It belongs on the client.


> Deprecate window.size.ms in StreamsConfig
> -
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: KIP
>
> {{window.size.ms}}  is not a true KafkaStreams config, and results in an 
> error when set from a KStreams application. It belongs on the client.
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms in StreamsConfig

2024-05-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16260:

Labels: KIP  (was: needs-kip)

> Deprecate window.size.ms in StreamsConfig
> -
>
> Key: KAFKA-16260
> URL: https://issues.apache.org/jira/browse/KAFKA-16260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Lucia Cerchie
>Assignee: Lucia Cerchie
>Priority: Major
>  Labels: KIP
>
> {{window.size.ms}}  is not a true KafkaStreams config, and results in an 
> error when set from a KStreams application. It belongs on the client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1027 Add MockFixedKeyProcessorContext

2024-05-01 Thread Matthias J. Sax

Any updates on this KIP?

On 3/28/24 4:11 AM, Matthias J. Sax wrote:
It seems that `MockRecordMetadata` is a private class, and thus not part 
of the public API. If there are any changes required, we don't need to 
discuss on the KIP.



For `CapturedPunctuator` and `CapturedForward` it's a little bit more 
tricky. My gut feeling is, that the classes might not need to be 
changed, but if we use them within `MockProcessorContext` and 
`MockFixedKeyProcessorContext` it might be weird to keep the current 
nesting... The problem I see is, that it's not straightforward how to 
move the classes w/o breaking compatibility, nor if we duplicate them as 
standalone classes w/o a larger "splash radius". (We would need to add 
new overloads for MockProcessorContext#scheduledPunctuators() and 
MockProcessorContext#forwarded()).


Might be good to hear from others if we think it's worth this larger 
changes to get rid of the nesting, or just accept the somewhat not ideal 
nesting as it technically is not a real issue?



-Matthias


On 3/15/24 1:47 AM, Shashwat Pandey wrote:

Thanks for the feedback Matthias!

The reason I proposed the extension of MockProcessorContext was more 
to do
with the internals of the class (MockRecordMetadata, 
CapturedPunctuator and

CapturedForward).

However, I do see your point, I would then think to split
MockProcessorContext and MockFixedKeyProcessorContext, some of the 
internal

classes should also be extracted i.e. MockRecordMetadata,
CapturedPunctuator and probably a new CapturedFixedKeyForward.

Let me know what you think!


Regards,
Shashwat Pandey


On Mon, Mar 11, 2024 at 10:09 PM Matthias J. Sax  
wrote:



Thanks for the KIP Shashwat. Closing this testing gap is great! It did
come up a few time already...

One question: why do you propose to `extend MockProcessorContext`?

Given how the actual runtime context classes are setup, it seems that
the regular context and fixed-key-context are distinct, and thus I
believe both mock-context classes should be distinct, too?

What I mean is that FixedKeyProcessorContext does not extend
ProcessorContext. Both classes have a common parent ProcessINGContext
(note the very similar but different names), but they are "siblings"
only, so why make the mock processor a parent-child relationship?

It seems better to do

public class MockFixedKeyProcessorContext
    implements FixedKeyProcessorContext,
   RecordCollector.Supplier


Of course, if there is code we can share between both mock-context we
should so this, but it should not leak into the public API?


-Matthias



On 3/11/24 5:21 PM, Shashwat Pandey wrote:

Hi everyone,

I would like to start the discussion on


https://cwiki.apache.org/confluence/display/KAFKA/KIP-1027%3A+Add+MockFixedKeyProcessorContext


This adds MockFixedKeyProcessorContext to the Kafka Streams Test Utils
library.

Regards,
Shashwat Pandey







Re: outerjoin not joining after window

2024-05-01 Thread Matthias J. Sax

How do you know this?

First thing we do is write a log message in the value joiner. We don't see
the log message for the missed records.


Well, for left/right join results, the ValueJoiner would only be called 
when the window is closed... And for invalid input (or late record, ie, 
which arrive out-of-order and their window was already closes), records 
would be dropped right away. So you cannot really infer that a record 
did make it into the join or not, or what happens if it did make it into 
the `Processor`.


-> https://kafka.apache.org/documentation/#kafka_streams_task_monitoring

`dropped-records-total` is the name of the metric.



-Matthias



On 5/1/24 11:35 AM, Chad Preisler wrote:

Hello,

We did some testing in our test environment today. We are seeing some
records processes where only one side of the join has a record. So that's
good. However, we are still seeing some records get skipped. They never hit
the value joiner (we write a log message first thing in the value joiner).
During the test we were putting some load on the system, so stream time was
advancing. We did notice that the join windows were taking much longer than
30 minutes to close and process records. Thirty minutes is the window plus
grace.


How do you know this?

First thing we do is write a log message in the value joiner. We don't see
the log message for the missed records.

I will try pushing the same records locally. However, we don't see any
errors in our logs and the stream does process one sided joins after the
skipped record. Do you have any docs on the "dropper records" metric? I did
a Google search and didn't find many good results for that.

Thanks,

Chad

On Tue, Apr 30, 2024 at 2:49 PM Matthias J. Sax  wrote:


Thanks for the information. I ran the code using Kafka locally. After
submitting some records inside and outside of the time window and grace,
the join performed as expected when running locally.


That gives some hope :)




However, they never get into the join.


How do you know this?


Did you check the metric for dropper records? Maybe records are
considers malformed and dropped? Are you using the same records in
production and in your local test?



Are there any settings for the stream client that would affect the join?


Not that I can think of... There is one more internal config, but as
long as data is flowing, it should not impact the result you see.



Are there any settings on the broker side that would affect the join?


No. The join is computed client side. Broker configs should not have any
impact.


f I increase the log level for the streams API would that

shed some light on what is happening?


I don't think it would help much. The code in question is
org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin -- but it
does not do any logging except WARN for the already mentioned "dropping
malformed" records that is also recorded via JMX.


WARN: "Skipping record due to null key or value. "



If you can identify a specific record from the input which would produce
an output, but does not, maybe you can try to feed it into your local
test env and try to re-produce the issue?


-Matthias

On 4/30/24 11:38 AM, Chad Preisler wrote:

Matthias,

Thanks for the information. I ran the code using Kafka locally. After
submitting some records inside and outside of the time window and grace,
the join performed as expected when running locally.

I'm not sure why the join is not working as expected when running against
our actual brokers. We are peeking at the records for the streams and we
are seeing the records get pulled. However, they never get into the join.
It's been over 24 hours since the expected records were created, and

there

has been plenty of traffic to advance the stream time. Only records that
have both a left and right side match are getting processed by the join.

Are there any settings for the stream client that would affect the join?

Are there any settings on the broker side that would affect the join?

The outer join is just one part of the topology. Compared to running it
locally there is a lot more data going through the app when running on

our

actual servers. If I increase the log level for the streams API would

that

shed some light on what is happening? Does anyone know if there are
specific packages that I should increase the log level for? Any specific
log message I can hone in on to tell me what is going on?

Basically, I'm looking for some pointers on where I can start looking.

Thanks,
Chad


On Tue, Apr 30, 2024 at 10:26 AM Matthias J. Sax 

wrote:



I expect the join to

execute after the 25 with one side of the join containing a record and

the

other being null


Given that you also have a grace period of 5 minutes, the result will
only be emitted after the grace-period passed and the window is closed
(not when window end time is reached).


One has a

naming convention of

[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-05-01 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842805#comment-17842805
 ] 

Matthias J. Sax commented on KAFKA-16514:
-

Thanks for the background! Makes sense.

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete

2024-04-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16644.
-
Resolution: Duplicate

> FK join emits duplicate tombstone on left-side delete
> -
>
> Key: KAFKA-16644
> URL: https://issues.apache.org/jira/browse/KAFKA-16644
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>        Reporter: Matthias J. Sax
>Priority: Major
>
> We introduced a regression bug in 3.7.0 release via KAFKA-14748. When a 
> left-hand side record is deleted, the join now emits two tombstone records 
> instead of one.
> The problem was not detected via unit test, because the tests use a `Map` 
> instead of a `List` when verifying the result topic records 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]
> We should update all test cases to use `List` when reading from the output 
> topic, and of course fix the introduced bug: The 
> `SubscriptionSendProcessorSupplier` is sending two subscription records 
> instead of just a single one: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete

2024-04-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16644.
-
Resolution: Duplicate

> FK join emits duplicate tombstone on left-side delete
> -
>
> Key: KAFKA-16644
> URL: https://issues.apache.org/jira/browse/KAFKA-16644
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>        Reporter: Matthias J. Sax
>Priority: Major
>
> We introduced a regression bug in 3.7.0 release via KAFKA-14748. When a 
> left-hand side record is deleted, the join now emits two tombstone records 
> instead of one.
> The problem was not detected via unit test, because the tests use a `Map` 
> instead of a `List` when verifying the result topic records 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]
> We should update all test cases to use `List` when reading from the output 
> topic, and of course fix the introduced bug: The 
> `SubscriptionSendProcessorSupplier` is sending two subscription records 
> instead of just a single one: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-30 Thread Matthias J. Sax

Thanks for the update.

I am wondering if we should use `ReadOnlyHeaders` instead of 
`ImmutableHeaders` as interface name?


Also, the returned `Header` interface is technically not immutable 
either, because `Header#key()` returns a mutable byte-array... Would we 
need a `ReadOnlyHeader` interface?


If yes, it seems that `ReadOnlyHeaders` should not be a super-interface 
of `Headers` but it would rather be a standalone interface, and a 
wrapper for a `Headers` instance? And `ReadOnlyHeader` would return some 
immutable type instead of `byte[]` for the value()?


An alternative would be to deep-copy the value byte-array what would not 
be free, but given that we are talking about exception handling, it 
would not be on the hot code path, and thus might be acceptable?



The above seems to increase the complexity significantly though. Hence, 
I have seconds thoughts on the immutability question:


Do we really need to worry about mutability after all, because in the 
end, KS runtime won't read the Headers instance after the handler was 
called, and if a user modifies the passed in headers, there won't be any 
actual damage (ie, no side effects)? For this case, it might even be ok 
to also not add `ImmutableHeaders` to begin with?




Sorry for the forth and back (yes, forth and back, because back and 
forth does not make sense -- it's not logical -- just trying to fix 
English :D) as I did bring up the immutability question in the first 
place...




-Matthias

On 4/25/24 5:56 AM, Loic Greffier wrote:

Hi Matthias,

I have updated the KIP regarding points 103 and 108.

103.
I have suggested a new `ImmutableHeaders` interface to deal with the
immutability concern of the headers, which is basically the `Headers`
interface without the write accesses.

public interface ImmutableHeaders {
 Header lastHeader(String key);
 Iterable headers(String key);
 Header[] toArray();
}

The `Headers` interface can be updated accordingly:

public interface Headers extends ImmutableHeaders, Iterable {
 //…
}

Loïc


Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-30 Thread Matthias J. Sax

Thanks Bruno.



101: I think I understand this better now. But just want to make sure I 
do. What do you mean by "they can diverge" and "Recovering after a 
failure might load inconsistent offsets and positions."


The checkpoint is the offset from the changelog, while the position is 
the offset from the upstream source topic, right? -- In the end, the 
position is about IQ, and if we fail to update it, it only means that 
there is some gap when we might not be able to query a standby task, 
because we think it's not up-to-date enough even if it is, which would 
resolve itself soon? Ie, the position might "lag", but it's not 
"inconsistent". Do we believe that this lag would be highly problematic?




102: I am confused.

The position is maintained inside the state store, but is persisted in the .position file when the state store closes. 


This contradicts the KIP:


 these position offsets will be stored in RocksDB, in the same column family as 
the changelog offsets, instead of the .position file




My main concern is currently about rebalance metadata -- opening RocksDB 
stores seems to be very expensive, but if we follow the KIP:


We will do this under EOS by updating the .checkpoint file whenever a store is close()d. 


It seems, having the offset inside RocksDB does not help us at all? In 
the end, when we crash, we don't want to lose the state, but when we 
update the .checkpoint only on a clean close, the .checkpoint might be 
stale (ie, still contains the checkpoint when we opened the store when 
we got a task assigned).




-Matthias

On 4/30/24 2:40 AM, Bruno Cadonna wrote:

Hi all,

100
I think we already have such a wrapper. It is called 
AbstractReadWriteDecorator.



101
Currently, the position is checkpointed when a offset checkpoint is 
written. If we let the state store manage the committed offsets, we need 
to also let the state store also manage the position otherwise they 
might diverge. State store managed offsets can get flushed (i.e. 
checkpointed) to the disk when the state store decides to flush its 
in-memory data structures, but the position is only checkpointed at 
commit time. Recovering after a failure might load inconsistent offsets 
and positions.



102
The position is maintained inside the state store, but is persisted in 
the .position file when the state store closes. The only public 
interface that uses the position is IQv2 in a read-only mode. So the 
position is only updated within the state store and read from IQv2. No 
need to add anything to the public StateStore interface.



103
Deprecating managesOffsets() right away might be a good idea.


104
I agree that we should try to support downgrades without wipes. At least 
Nick should state in the KIP why we do not support it.



Best,
Bruno




On 4/23/24 8:13 AM, Matthias J. Sax wrote:
Thanks for splitting out this KIP. The discussion shows, that it is a 
complex beast by itself, so worth to discuss by its own.



Couple of question / comment:


100 `StateStore#commit()`: The JavaDoc says "must not be called by 
users" -- I would propose to put a guard in place for this, by either 
throwing an exception (preferable) or adding a no-op implementation 
(at least for our own stores, by wrapping them -- we cannot enforce it 
for custom stores I assume), and document this contract explicitly.



101 adding `.position` to the store: Why do we actually need this? The 
KIP says "To ensure consistency with the committed data and changelog 
offsets" but I am not sure if I can follow? Can you elaborate why 
leaving the `.position` file as-is won't work?



If it's possible at all, it will need to be done by
creating temporary StateManagers and StateStores during rebalance. I 
think

it is possible, and probably not too expensive, but the devil will be in
the detail.


This sounds like a significant overhead to me. We know that opening a 
single RocksDB takes about 500ms, and thus opening RocksDB to get this 
information might slow down rebalances significantly.



102: It's unclear to me, how `.position` information is added. The KIP 
only says: "position offsets will be stored in RocksDB, in the same 
column family as the changelog offsets". Do you intent to add this 
information to the map passed via `commit(final MapLong> changelogOffsets)`? The KIP should describe this in more detail. 
Also, if my assumption is correct, we might want to rename the 
parameter and also have a better JavaDoc description?



103: Should we make it mandatory (long-term) that all stores 
(including custom stores) manage their offsets internally? Maintaining 
both options and thus both code paths puts a burden on everyone and 
make the code messy. I would strongly prefer if we could have mid-term 
path to get rid of supporting both.  -- For this case, we should 
deprecate the newly added `managesOffsets()` method right away, to 
point

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-30 Thread Matthias J. Sax
I like the idea of error codes. Not sure if the name are ideal? 
UNKNOWN_PROCESS_ID makes sense, but the other two seems a little bit 
difficult to understand?


Should we be very descriptive (and also try to avoid coupling it to the 
threading model -- important for the first error code):

 - ACTIVE_TASK_ ASSIGNED_MULTIPLE_TIMES
 - ACTIVE_AND_STANDBY_ASSIGNED_TO_SAME_CLIENT (or _INSTANCE

I think we also need to add NONE as option or make the error parameter 
an `Optional`?




OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the same 
active task


Would also be an error if assigned to two consumers of the same 
client... Needs to be rephrased.





If any of these errors are detected, the StreamsPartitionAssignor will immediately 
"fail" the rebalance and retry it by scheduling an immediate followup rebalance.


Does this make sense? If we assume that the task-assignment is 
deterministic, we would end up with an infinite retry loop? Also, 
assuming that an client leave the group, we cannot assign some task any 
longer... I would rather throw a StreamsException and let the client crash.




-Matthias

On 4/30/24 12:22 PM, Sophie Blee-Goldman wrote:

One last thing: I added an error code enum to be returned from the
#onAssignmentComputed method in case of an invalid assignment. I created
one code for each of the invalid cases we described above. The downside is
that this means we'll have to go through a deprecation cycle if we want to
loosen up the restrictions on any of the enforced cases. The upside is that
we can very clearly mark what is an invalid assignment and this will
(hopefully) assist users who are new to customizing assignments by clearly
denoting the requirements, and returning a clear error if they are not
followed.

Of course the StreamsPartitionAssignor will also do a "fallback & retry" in
this case by returning the same assignment to the consumers and scheduling
a followup rebalance. I've added all of this to the TaskAssignor  and
#onAssignmentComputed javadocs, and added a section under "Public Changes"
as well.

Please let me know if there are any concerns, or if you have suggestions
for how else we can handle an invalid assignment

On Tue, Apr 30, 2024 at 11:39 AM Sophie Blee-Goldman 
wrote:


Thanks guys! I agree with what Lucas said about 117c, we can always loosen
a restriction later and I don't want to do anything now that might get in
the way of the new threading models.

With that I think we're all in agreement on 117. I'll update the KIP to
include what we've discussed

(and will fix the remaining #finalAssignment mention as well, thanks
Bruno. Glad to have such good proof readers! :P)

On Tue, Apr 30, 2024 at 8:35 AM Bruno Cadonna  wrote:


Hi again,

I forgot to ask whether you could add the agreement about handling
invalid assignment to the KIP.

Best,
Bruno

On 4/30/24 2:00 PM, Bruno Cadonna wrote:

Hi all,

I think we are converging!

117
a) fail: Since it is an invalid consumer assignment
b) pass: I agree that not assigning a task might be reasonable in some
situations
c) fail: For the reasons Lucas pointed out. I am missing a good use

case

here.
d) fail: It is invalid


Somewhere in the KIP you still use finalAssignment() instead of the
wonderful method name onAssignmentComputed() ;-)
"... interface also includes a method named finalAssignment which is
called with the final computed GroupAssignment ..."


Best,
Bruno


On 4/30/24 1:04 PM, Lucas Brutschy wrote:

Hi,

Looks like a great KIP to me!

I'm late, so I'm only going to comment on the last open point 117. I'm
against any fallbacks like "use the default assignor if the custom
assignment is invalid", as it's just going to hide bugs. For the 4
cases mentioned by Sophie:

117a) I'd fail immediately here, as it's an implementation bug, and
should not lead to a valid consumer group assignment.
117b) Agreed. This is a useful assignment and should be allowed.
117c) This is the tricky case. However, I'm leaning towards not
allowing this, unless we have a concrete use case. This will block us
from potentially using a single consumer for active and standby tasks
in the future. It's easier to drop the restriction later if we have a
concrete use case.
117d) Definitely fail immediately, as you said.

Cheers,
Lucas



On Mon, Apr 29, 2024 at 11:13 PM Sophie Blee-Goldman
 wrote:


Yeah I think that sums it up well. Either you computed a *possible*
assignment,
or you returned something that makes it literally impossible for the
StreamsPartitionAssignor to decipher/translate into an actual group
assignment, in which case it should just fail

That's more or less it for the open questions that have been raised
so far,
so I just want to remind folks that there's already a voting thread

for

this. I cast my vote a few minutes ago so it should resurface in
everyone's
inbox :)

On Thu, Apr 25, 2024 at 11:42 PM Rohan Desai 


wrote:


117: as Sophie laid out, there are two cases here right:
1. cases that are considered 

[jira] [Commented] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete

2024-04-30 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842514#comment-17842514
 ] 

Matthias J. Sax commented on KAFKA-16644:
-

Sorry. Wrong link. Fixed -> https://issues.apache.org/jira/browse/KAFKA-14748 

> FK join emits duplicate tombstone on left-side delete
> -
>
> Key: KAFKA-16644
> URL: https://issues.apache.org/jira/browse/KAFKA-16644
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Priority: Major
>
> We introduced a regression bug in 3.7.0 release via KAFKA-14748. When a 
> left-hand side record is deleted, the join now emits two tombstone records 
> instead of one.
> The problem was not detected via unit test, because the tests use a `Map` 
> instead of a `List` when verifying the result topic records 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]
> We should update all test cases to use `List` when reading from the output 
> topic, and of course fix the introduced bug: The 
> `SubscriptionSendProcessorSupplier` is sending two subscription records 
> instead of just a single one: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete

2024-04-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16644:

Description: 
We introduced a regression bug in 3.7.0 release via KAFKA-14748. When a 
left-hand side record is deleted, the join now emits two tombstone records 
instead of one.

The problem was not detected via unit test, because the tests use a `Map` 
instead of a `List` when verifying the result topic records 
([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]

We should update all test cases to use `List` when reading from the output 
topic, and of course fix the introduced bug: The 
`SubscriptionSendProcessorSupplier` is sending two subscription records instead 
of just a single one: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]

 

  was:
We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a 
left-hand side record is deleted, the join now emits two tombstone records 
instead of one.

The problem was not detected via unit test, because the tests use a `Map` 
instead of a `List` when verifying the result topic records 
([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]

We should update all test cases to use `List` when reading from the output 
topic, and of course fix the introduced bug: The 
`SubscriptionSendProcessorSupplier` is sending two subscription records instead 
of just a single one: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]

 


> FK join emits duplicate tombstone on left-side delete
> -
>
> Key: KAFKA-16644
> URL: https://issues.apache.org/jira/browse/KAFKA-16644
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>        Reporter: Matthias J. Sax
>Priority: Major
>
> We introduced a regression bug in 3.7.0 release via KAFKA-14748. When a 
> left-hand side record is deleted, the join now emits two tombstone records 
> instead of one.
> The problem was not detected via unit test, because the tests use a `Map` 
> instead of a `List` when verifying the result topic records 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).]
> We should update all test cases to use `List` when reading from the output 
> topic, and of course fix the introduced bug: The 
> `SubscriptionSendProcessorSupplier` is sending two subscription records 
> instead of just a single one: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: outerjoin not joining after window

2024-04-30 Thread Matthias J. Sax

Thanks for the information. I ran the code using Kafka locally. After
submitting some records inside and outside of the time window and grace,
the join performed as expected when running locally.


That gives some hope :)




However, they never get into the join.


How do you know this?


Did you check the metric for dropper records? Maybe records are 
considers malformed and dropped? Are you using the same records in 
production and in your local test?




Are there any settings for the stream client that would affect the join?


Not that I can think of... There is one more internal config, but as 
long as data is flowing, it should not impact the result you see.




Are there any settings on the broker side that would affect the join?


No. The join is computed client side. Broker configs should not have any 
impact.



f I increase the log level for the streams API would that

shed some light on what is happening?


I don't think it would help much. The code in question is 
org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin -- but it 
does not do any logging except WARN for the already mentioned "dropping 
malformed" records that is also recorded via JMX.



WARN: "Skipping record due to null key or value. "



If you can identify a specific record from the input which would produce 
an output, but does not, maybe you can try to feed it into your local 
test env and try to re-produce the issue?



-Matthias

On 4/30/24 11:38 AM, Chad Preisler wrote:

Matthias,

Thanks for the information. I ran the code using Kafka locally. After
submitting some records inside and outside of the time window and grace,
the join performed as expected when running locally.

I'm not sure why the join is not working as expected when running against
our actual brokers. We are peeking at the records for the streams and we
are seeing the records get pulled. However, they never get into the join.
It's been over 24 hours since the expected records were created, and there
has been plenty of traffic to advance the stream time. Only records that
have both a left and right side match are getting processed by the join.

Are there any settings for the stream client that would affect the join?

Are there any settings on the broker side that would affect the join?

The outer join is just one part of the topology. Compared to running it
locally there is a lot more data going through the app when running on our
actual servers. If I increase the log level for the streams API would that
shed some light on what is happening? Does anyone know if there are
specific packages that I should increase the log level for? Any specific
log message I can hone in on to tell me what is going on?

Basically, I'm looking for some pointers on where I can start looking.

Thanks,
Chad


On Tue, Apr 30, 2024 at 10:26 AM Matthias J. Sax  wrote:


I expect the join to

execute after the 25 with one side of the join containing a record and

the

other being null


Given that you also have a grace period of 5 minutes, the result will
only be emitted after the grace-period passed and the window is closed
(not when window end time is reached).


One has a

naming convention of "KSTREAM_OUTERSHARED". I see a record there, but

I'm

not sure how to decode that message to see what is in it. What is the
purpose of those messages?


It's an internal store, that stores all records which are subject to be
emitted as left/right join result, ie, if there is no inner join result.
The format used is internal:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerde.java

Also note: time is based on event-time, ie, if the input stream stops to
send new records, "stream-time" will stop to advance and the result
might not be emitted because the window does not get closed.

(Last, there is some internal wall-clock time delay of one second to
emit results for performance reasons...)

HTH.

-Matthias

On 4/30/24 6:51 AM, Chad Preisler wrote:

Hello,

I have a KStream to KStream outer join with a time difference of 25

minutes

and 5 minutes of grace.  When I get a record for one side of the join,

but

don't get a record on the other side of the join, I expect the join to
execute after the 25 with one side of the join containing a record and

the

other being null. Is that correct?  If it is correct, it's not working

for

me.

I was poking around on the broker and saw some internal topics. I see the
key I expected to execute the join on some of those topics. One has a
naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm
not sure how to decode that message to see what is in it. What is the
purpose of those messages? If I decode the message will it help me see

when

the join should have been executed?

I also see the key on a topic with the naming convention
"KSTREAM_OUTERTHIS".

Are there any other topics that I should be looking at to troubleshoot

this

issue?

Thanks,
Chad







Re: outerjoin not joining after window

2024-04-30 Thread Matthias J. Sax

I expect the join to

execute after the 25 with one side of the join containing a record and the
other being null


Given that you also have a grace period of 5 minutes, the result will 
only be emitted after the grace-period passed and the window is closed 
(not when window end time is reached).



One has a

naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm
not sure how to decode that message to see what is in it. What is the
purpose of those messages?


It's an internal store, that stores all records which are subject to be 
emitted as left/right join result, ie, if there is no inner join result. 
The format used is internal: 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerde.java


Also note: time is based on event-time, ie, if the input stream stops to 
send new records, "stream-time" will stop to advance and the result 
might not be emitted because the window does not get closed.


(Last, there is some internal wall-clock time delay of one second to 
emit results for performance reasons...)


HTH.

-Matthias

On 4/30/24 6:51 AM, Chad Preisler wrote:

Hello,

I have a KStream to KStream outer join with a time difference of 25 minutes
and 5 minutes of grace.  When I get a record for one side of the join, but
don't get a record on the other side of the join, I expect the join to
execute after the 25 with one side of the join containing a record and the
other being null. Is that correct?  If it is correct, it's not working for
me.

I was poking around on the broker and saw some internal topics. I see the
key I expected to execute the join on some of those topics. One has a
naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm
not sure how to decode that message to see what is in it. What is the
purpose of those messages? If I decode the message will it help me see when
the join should have been executed?

I also see the key on a topic with the naming convention
"KSTREAM_OUTERTHIS".

Are there any other topics that I should be looking at to troubleshoot this
issue?

Thanks,
Chad



[jira] [Commented] (KAFKA-16382) Kafka Streams drop NULL values after reset

2024-04-30 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842423#comment-17842423
 ] 

Matthias J. Sax commented on KAFKA-16382:
-

Not yet from our side... Working on other things atm. Not sure when we will be 
able to pick it up, or if anybody from the community wants to take it.

> Kafka Streams drop NULL values after reset
> --
>
> Key: KAFKA-16382
> URL: https://issues.apache.org/jira/browse/KAFKA-16382
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> Kafka Streams (KTable) drops null values after full reset.
> See 
> [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java]
>  for sample topology
> Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics)
>  # Start example - 1st round
>  # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull"
>  # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab"
>  # Stop application 
>  # Run kafka-streams-application-reset 
> {code:java}
> call bin/windows/kafka-streams-application-reset --application-id 
> nullproblem-example^
>  --input-topics "NULL-IN,NULL-IN-AUX"^
>  --bootstrap-server "localhost:9092"
> {code}
>  # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app 
> running yet)
>  # Start example - 2nd round
>  # After initialization -> NULL-OUT *still contains* 2 messages "A1:anull, 
> A1:ab"
>  # Expected output *3 messages* "A1:anull, A1:ab, {*}A1:{*}"
> The issue is NOT reproduced if application just restarted (skip step 5). 
> The issue is NOT reproduced if internal cache is disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16645) CVEs in 3.7.0 docker image

2024-04-30 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842419#comment-17842419
 ] 

Matthias J. Sax commented on KAFKA-16645:
-

I believe fixing these CVEs should be a blocker for 3.7.1 and 3.8.0? Thoughts?

> CVEs in 3.7.0 docker image
> --
>
> Key: KAFKA-16645
> URL: https://issues.apache.org/jira/browse/KAFKA-16645
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.7.0
>Reporter: Mickael Maison
>Priority: Blocker
> Fix For: 3.8.0, 3.7.1
>
>
> Our [Docker Image CVE 
> Scanner|https://github.com/apache/kafka/actions/runs/874393] GitHub 
> action reports 2 high CVEs in our base image:
> apache/kafka:3.7.0 (alpine 3.19.1)
> ==
> Total: 2 (HIGH: 2, CRITICAL: 0)
> ┌──┬┬──┬┬───┬───┬─┐
> │ Library  │ Vulnerability  │ Severity │ Status │ Installed Version │ Fixed 
> Version │Title│
> ├──┼┼──┼┼───┼───┼─┤
> │ libexpat │ CVE-2023-52425 │ HIGH │ fixed  │ 2.5.0-r2  │ 
> 2.6.0-r0  │ expat: parsing large tokens can trigger a denial of service │
> │  ││  ││   │ 
>   │ https://avd.aquasec.com/nvd/cve-2023-52425  │
> │  ├┤  ││   
> ├───┼─┤
> │  │ CVE-2024-28757 │  ││   │ 
> 2.6.2-r0  │ expat: XML Entity Expansion │
> │  ││  ││   │ 
>   │ https://avd.aquasec.com/nvd/cve-2024-28757  │
> └──┴┴──┴┴───┴───┴─┘
> Looking at the 
> [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka#KIP975:DockerImageforApacheKafka-WhatifweobserveabugoracriticalCVEinthereleasedApacheKafkaDockerImage?]
>  that introduced the docker images, it seems we should release a bugfix when 
> high CVEs are detected. It would be good to investigate and assess whether 
> Kafka is impacted or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16645) CVEs in 3.7.0 docker image

2024-04-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16645:

Priority: Blocker  (was: Major)

> CVEs in 3.7.0 docker image
> --
>
> Key: KAFKA-16645
> URL: https://issues.apache.org/jira/browse/KAFKA-16645
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 3.7.0
>Reporter: Mickael Maison
>Priority: Blocker
>
> Our [Docker Image CVE 
> Scanner|https://github.com/apache/kafka/actions/runs/874393] GitHub 
> action reports 2 high CVEs in our base image:
> apache/kafka:3.7.0 (alpine 3.19.1)
> ==
> Total: 2 (HIGH: 2, CRITICAL: 0)
> ┌──┬┬──┬┬───┬───┬─┐
> │ Library  │ Vulnerability  │ Severity │ Status │ Installed Version │ Fixed 
> Version │Title│
> ├──┼┼──┼┼───┼───┼─┤
> │ libexpat │ CVE-2023-52425 │ HIGH │ fixed  │ 2.5.0-r2  │ 
> 2.6.0-r0  │ expat: parsing large tokens can trigger a denial of service │
> │  ││  ││   │ 
>   │ https://avd.aquasec.com/nvd/cve-2023-52425  │
> │  ├┤  ││   
> ├───┼─┤
> │  │ CVE-2024-28757 │  ││   │ 
> 2.6.2-r0  │ expat: XML Entity Expansion │
> │  ││  ││   │ 
>   │ https://avd.aquasec.com/nvd/cve-2024-28757  │
> └──┴┴──┴┴───┴───┴─┘
> Looking at the 
> [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka#KIP975:DockerImageforApacheKafka-WhatifweobserveabugoracriticalCVEinthereleasedApacheKafkaDockerImage?]
>  that introduced the docker images, it seems we should release a bugfix when 
> high CVEs are detected. It would be good to investigate and assess whether 
> Kafka is impacted or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


<    1   2   3   4   5   6   7   8   9   10   >