[jira] [Resolved] (KAFKA-16903) Task should consider producer error previously occurred for different task
[ https://issues.apache.org/jira/browse/KAFKA-16903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-16903. - Resolution: Fixed > Task should consider producer error previously occurred for different task > -- > > Key: KAFKA-16903 > URL: https://issues.apache.org/jira/browse/KAFKA-16903 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 3.8.0 > > > A task does not consider a producer error that occurred for a different task. > The following log messages show the issue. > Task {{0_2}} of a Streams app (EOSv2 enabled) crashes while sending records > with an {{InvalidTxnStateException}}: > {code:java} > [2024-05-30 10:20:35,881] ERROR [kafka-producer-network-thread | > i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread > [i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_2] Error encountered > sending record to topic stream-soak-test-node-name-repartition for task 0_2 > due to: > org.apache.kafka.common.errors.InvalidTxnStateException: The producer > attempted a transactional operation in an invalid state. > Exception handler choose to FAIL the processing, no more records would be > sent. (org.apache.kafka.streams.processor.internals.RecordCollectorImpl) > org.apache.kafka.common.errors.InvalidTxnStateException: The producer > attempted a transactional operation in an invalid state. > [2024-05-30 10:20:35,886] ERROR [i-0af25f5c2bd9bba31-StreamThread-1] > stream-thread [i-0af25f5c2bd9bba31-StreamThread-1] Failed to process stream > task 0_2 due to the following error: > (org.apache.kafka.streams.processor.internals.TaskExecutor) > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic stream-soak-test-node-name-repartition for task 0_2 due to: > org.apache.kafka.common.errors.InvalidTxnStateException: The producer > attempted a transactional operation in an invalid state. > Exception handler choose to FAIL the processing, no more records would be > sent. > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:316) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:285) > at > org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1565) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:311) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:272) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:236) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:829) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:818) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:770) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$null$2(Sender.java:627) > at java.util.ArrayList.forEach(ArrayList.java:1259) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$3(Sender.java:612) > at java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$9(Sender.java:916) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:348) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250) > at java.lang.Thread.run(Thread.java:750) > Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The > producer attempted a transactional operation in an invalid state. > {code} > Just before the exception of task 0_2 also task 0_0 encountered an > exception while producing: > {code:java} > [2024-05-
[jira] [Updated] (KAFKA-16903) Task should consider producer error previously occurred for different task
[ https://issues.apache.org/jira/browse/KAFKA-16903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16903: Fix Version/s: 3.8.0 > Task should consider producer error previously occurred for different task > -- > > Key: KAFKA-16903 > URL: https://issues.apache.org/jira/browse/KAFKA-16903 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 3.8.0 > > > A task does not consider a producer error that occurred for a different task. > The following log messages show the issue. > Task {{0_2}} of a Streams app (EOSv2 enabled) crashes while sending records > with an {{InvalidTxnStateException}}: > {code:java} > [2024-05-30 10:20:35,881] ERROR [kafka-producer-network-thread | > i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread > [i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_2] Error encountered > sending record to topic stream-soak-test-node-name-repartition for task 0_2 > due to: > org.apache.kafka.common.errors.InvalidTxnStateException: The producer > attempted a transactional operation in an invalid state. > Exception handler choose to FAIL the processing, no more records would be > sent. (org.apache.kafka.streams.processor.internals.RecordCollectorImpl) > org.apache.kafka.common.errors.InvalidTxnStateException: The producer > attempted a transactional operation in an invalid state. > [2024-05-30 10:20:35,886] ERROR [i-0af25f5c2bd9bba31-StreamThread-1] > stream-thread [i-0af25f5c2bd9bba31-StreamThread-1] Failed to process stream > task 0_2 due to the following error: > (org.apache.kafka.streams.processor.internals.TaskExecutor) > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic stream-soak-test-node-name-repartition for task 0_2 due to: > org.apache.kafka.common.errors.InvalidTxnStateException: The producer > attempted a transactional operation in an invalid state. > Exception handler choose to FAIL the processing, no more records would be > sent. > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:316) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:285) > at > org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1565) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:311) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:272) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:236) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:829) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:818) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:770) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$null$2(Sender.java:627) > at java.util.ArrayList.forEach(ArrayList.java:1259) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$3(Sender.java:612) > at java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$9(Sender.java:916) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:348) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250) > at java.lang.Thread.run(Thread.java:750) > Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The > producer attempted a transactional operation in an invalid state. > {code} > Just before the exception of task 0_2 also task 0_0 encountered an > exception while producing: > {code:java} > [2024-05-
[jira] [Resolved] (KAFKA-16903) Task should consider producer error previously occurred for different task
[ https://issues.apache.org/jira/browse/KAFKA-16903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-16903. - Resolution: Fixed > Task should consider producer error previously occurred for different task > -- > > Key: KAFKA-16903 > URL: https://issues.apache.org/jira/browse/KAFKA-16903 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Major > Fix For: 3.8.0 > > > A task does not consider a producer error that occurred for a different task. > The following log messages show the issue. > Task {{0_2}} of a Streams app (EOSv2 enabled) crashes while sending records > with an {{InvalidTxnStateException}}: > {code:java} > [2024-05-30 10:20:35,881] ERROR [kafka-producer-network-thread | > i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread > [i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_2] Error encountered > sending record to topic stream-soak-test-node-name-repartition for task 0_2 > due to: > org.apache.kafka.common.errors.InvalidTxnStateException: The producer > attempted a transactional operation in an invalid state. > Exception handler choose to FAIL the processing, no more records would be > sent. (org.apache.kafka.streams.processor.internals.RecordCollectorImpl) > org.apache.kafka.common.errors.InvalidTxnStateException: The producer > attempted a transactional operation in an invalid state. > [2024-05-30 10:20:35,886] ERROR [i-0af25f5c2bd9bba31-StreamThread-1] > stream-thread [i-0af25f5c2bd9bba31-StreamThread-1] Failed to process stream > task 0_2 due to the following error: > (org.apache.kafka.streams.processor.internals.TaskExecutor) > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic stream-soak-test-node-name-repartition for task 0_2 due to: > org.apache.kafka.common.errors.InvalidTxnStateException: The producer > attempted a transactional operation in an invalid state. > Exception handler choose to FAIL the processing, no more records would be > sent. > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:316) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:285) > at > org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1565) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:311) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:272) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:236) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:829) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:818) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:770) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$null$2(Sender.java:627) > at java.util.ArrayList.forEach(ArrayList.java:1259) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$3(Sender.java:612) > at java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$9(Sender.java:916) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:348) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250) > at java.lang.Thread.run(Thread.java:750) > Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The > producer attempted a transactional operation in an invalid state. > {code} > Just before the exception of task 0_2 also task 0_0 encountered an > exception while producing: > {code:java} > [2024-05-
[jira] [Commented] (KAFKA-16811) Punctuate Ratio metric almost impossible to track
[ https://issues.apache.org/jira/browse/KAFKA-16811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852531#comment-17852531 ] Matthias J. Sax commented on KAFKA-16811: - Let's more the discussion on the PR > Punctuate Ratio metric almost impossible to track > - > > Key: KAFKA-16811 > URL: https://issues.apache.org/jira/browse/KAFKA-16811 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sebastien Viale >Assignee: Ganesh Sadanala >Priority: Minor > Time Spent: 48h > Remaining Estimate: 0h > > The Punctuate ratio metric is returned after the last record of the poll > loop. It is recomputed in every poll loop. > After a puntuate, the value is close to 1, but there is little chance that > metric is sampled at this time. > So its value is almost always 0. > A solution could be to apply a kind of "sliding window" to it and report the > value for the last x seconds. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16811) Punctuate Ratio metric almost impossible to track
[ https://issues.apache.org/jira/browse/KAFKA-16811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852231#comment-17852231 ] Matthias J. Sax commented on KAFKA-16811: - [~ganesh_6] – WordCountProcessorDemo does not contain any punctuation calls, right? Or did you modify the code and added punctuations? > Punctuate Ratio metric almost impossible to track > - > > Key: KAFKA-16811 > URL: https://issues.apache.org/jira/browse/KAFKA-16811 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sebastien Viale >Assignee: Ganesh Sadanala >Priority: Minor > Time Spent: 48h > Remaining Estimate: 0h > > The Punctuate ratio metric is returned after the last record of the poll > loop. It is recomputed in every poll loop. > After a puntuate, the value is close to 1, but there is little chance that > metric is sampled at this time. > So its value is almost always 0. > A solution could be to apply a kind of "sliding window" to it and report the > value for the last x seconds. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16874) Remove old TaskAssignor interface
[ https://issues.apache.org/jira/browse/KAFKA-16874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16874: Component/s: streams > Remove old TaskAssignor interface > - > > Key: KAFKA-16874 > URL: https://issues.apache.org/jira/browse/KAFKA-16874 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > > Once we have the new HAAssignor that implements the new TaskAssignor > interface, we can remove the old TaskAssignor interface. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15905) Restarts of MirrorCheckpointTask should not permanently interrupt offset translation
[ https://issues.apache.org/jira/browse/KAFKA-15905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15905: Fix Version/s: 3.8.0 (was: 3.8) > Restarts of MirrorCheckpointTask should not permanently interrupt offset > translation > > > Key: KAFKA-15905 > URL: https://issues.apache.org/jira/browse/KAFKA-15905 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Affects Versions: 3.6.0 >Reporter: Greg Harris >Assignee: Edoardo Comar >Priority: Major > Fix For: 3.8.0, 3.7.1 > > > Executive summary: When the MirrorCheckpointTask restarts, it loses the state > of checkpointsPerConsumerGroup, which limits offset translation to records > mirrored after the latest restart. > For example, if 1000 records are mirrored and the OffsetSyncs are read by > MirrorCheckpointTask, the emitted checkpoints are cached, and translation can > happen at the ~500th record. If MirrorCheckpointTask restarts, and 1000 more > records are mirrored, translation can happen at the ~1500th record, but no > longer at the ~500th record. > Context: > Before KAFKA-13659, MM2 made translation decisions based on the > incompletely-initialized OffsetSyncStore, and the checkpoint could appear to > go backwards temporarily during restarts. To fix this, we forced the > OffsetSyncStore to initialize completely before translation could take place, > ensuring that the latest OffsetSync had been read, and thus providing the > most accurate translation. > Before KAFKA-14666, MM2 translated offsets only off of the latest OffsetSync. > Afterwards, an in-memory sparse cache of historical OffsetSyncs was kept, to > allow for translation of earlier offsets. This came with the caveat that the > cache's sparseness allowed translations to go backwards permanently. To > prevent this behavior, a cache of the latest Checkpoints was kept in the > MirrorCheckpointTask#checkpointsPerConsumerGroup variable, and offset > translation remained restricted to the fully-initialized OffsetSyncStore. > Effectively, the MirrorCheckpointTask ensures that it translates based on an > OffsetSync emitted during it's lifetime, to ensure that no previous > MirrorCheckpointTask emitted a later sync. If we can read the checkpoints > emitted by previous generations of MirrorCheckpointTask, we can still ensure > that checkpoints are monotonic, while allowing translation further back in > history. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16622) Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once
[ https://issues.apache.org/jira/browse/KAFKA-16622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16622: Fix Version/s: 3.8.0 (was: 3.8) > Mirromaker2 first Checkpoint not emitted until consumer group fully catches > up once > --- > > Key: KAFKA-16622 > URL: https://issues.apache.org/jira/browse/KAFKA-16622 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.0, 3.6.2, 3.8.0 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Fix For: 3.8.0, 3.7.1 > > Attachments: connect.log.2024-04-26-10.zip, > edo-connect-mirror-maker-sourcetarget.properties > > > We observed an excessively delayed emission of the MM2 Checkpoint record. > It only gets created when the source consumer reaches the end of a topic. > This does not seem reasonable. > In a very simple setup : > Tested with a standalone single process MirrorMaker2 mirroring between two > single-node kafka clusters(mirromaker config attached) with quick refresh > intervals (eg 5 sec) and a small offset.lag.max (eg 10) > create a single topic in the source cluster > produce data to it (e.g. 1 records) > start a slow consumer - e.g. fetching 50records/poll and pausing 1 sec > between polls which commits after each poll > watch the Checkpoint topic in the target cluster > bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \ > --topic source.checkpoints.internal \ > --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \ >--from-beginning > -> no record appears in the checkpoint topic until the consumer reaches the > end of the topic (ie its consumer group lag gets down to 0). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16757) Fix broker re-registration issues around MV 3.7-IV2
[ https://issues.apache.org/jira/browse/KAFKA-16757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16757: Fix Version/s: 3.8.0 (was: 3.8) > Fix broker re-registration issues around MV 3.7-IV2 > --- > > Key: KAFKA-16757 > URL: https://issues.apache.org/jira/browse/KAFKA-16757 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Fix For: 3.8.0, 3.7.1 > > > When upgrading from a MetadataVersion older than 3.7-IV2, we need to resend > the broker registration, so that the controller can record the storage > directories. The current code for doing this has several problems, however. > One is that it tends to trigger even in cases where we don't actually need > it. Another is that when re-registering the broker, the broker is marked as > fenced. > This PR moves the handling of the re-registration case out of > BrokerMetadataPublisher and into BrokerRegistrationTracker. The > re-registration code there will only trigger in the case where the broker > sees an existing registration for itself with no directories set. This is > much more targetted than the original code. > Additionally, in ClusterControlManager, when re-registering the same broker, > we now preserve its fencing and shutdown state, rather than clearing those. > (There isn't any good reason re-registering the same broker should clear > these things... this was purely an oversight.) Note that we can tell the > broker is "the same" because it has the same IncarnationId. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16692: Fix Version/s: 3.8.0 (was: 3.8) > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.7.0, 3.6.1, 3.8 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > Fix For: 3.8.0, 3.7.1, 3.6.3 > > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, these requests are still sent to other brokers in our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping the _latestUsableVersion_ check. I > am wondering if it is possible that because _discoverBrokerVersions_ is set > to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it > skips fetching {_}NodeApiVersions{_}? I can see that we create the network > client here: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641] > The _NetworkUtils.buildNe
[jira] [Updated] (KAFKA-15045) Move Streams task assignor to public configs
[ https://issues.apache.org/jira/browse/KAFKA-15045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15045: Fix Version/s: 3.8.0 > Move Streams task assignor to public configs > > > Key: KAFKA-15045 > URL: https://issues.apache.org/jira/browse/KAFKA-15045 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Major > Labels: kip > Fix For: 3.8.0 > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16875) Replace ClientState with TaskAssignment when creating individual consumer Assignments
[ https://issues.apache.org/jira/browse/KAFKA-16875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16875: Component/s: streams > Replace ClientState with TaskAssignment when creating individual consumer > Assignments > - > > Key: KAFKA-16875 > URL: https://issues.apache.org/jira/browse/KAFKA-16875 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > > In the initial implementation of KIP-924 in version 3.8, we converted from > the new TaskAssignor's output type (TaskAssignment) into the old > ClientState-based assignment representation. This allowed us to plug in a > custom assignor without converting all the internal mechanisms that occur > after the KafkaStreams client level assignment and process it into a consumer > level assignment. > However we ultimately want to get rid of ClientState altogether, so we need > to invert this logic so that we instead convert the ClientState into a > TaskAssignment and then use the TaskAssignment to process the assigned tasks > into consumer Assignments -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16873) Remove StreamsConfig.INTERNAL_TASK_ASSIGNOR_CLASS
[ https://issues.apache.org/jira/browse/KAFKA-16873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16873: Component/s: streams > Remove StreamsConfig.INTERNAL_TASK_ASSIGNOR_CLASS > - > > Key: KAFKA-16873 > URL: https://issues.apache.org/jira/browse/KAFKA-16873 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > > Once we have all the out-of-the-box assignors implementing the new > TaskAssignor interface that corresponds to the new public task assignor > config, we can remove the old internal task assignor config altogether. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16872) Remove ClientState class
[ https://issues.apache.org/jira/browse/KAFKA-16872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16872: Component/s: streams > Remove ClientState class > > > Key: KAFKA-16872 > URL: https://issues.apache.org/jira/browse/KAFKA-16872 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > > One of the end-state goals of KIP-924 is to remove the ClientState class > altogether. There are some blockers to this such as the removal of the old > internal task assignor config and the old HAAssignor, so this ticket will > probably be one of the very last KAFKA-16868 subtasks to be tackled. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16871) Clean up internal AssignmentConfigs class in Streams
[ https://issues.apache.org/jira/browse/KAFKA-16871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16871: Component/s: streams > Clean up internal AssignmentConfigs class in Streams > > > Key: KAFKA-16871 > URL: https://issues.apache.org/jira/browse/KAFKA-16871 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: newbie, newbie++ > > In KIP-924 we added a new public AssignmentConfigs class to hold all of the, > you guessed it, assignment related configs. > However, there is an existing config class of the same name and largely the > same contents but that's in an internal package, specifically inside the > AssignorConfiguration class. > We should remove the old AssignmentConfigs class that's in > AssignorConfiguration and replace any usages of it with the new public > AssignmentConfigs that we added in KIP-924 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16868) Post KIP-924 StreamsPartitionAssignor code cleanup
[ https://issues.apache.org/jira/browse/KAFKA-16868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16868: Component/s: streams > Post KIP-924 StreamsPartitionAssignor code cleanup > -- > > Key: KAFKA-16868 > URL: https://issues.apache.org/jira/browse/KAFKA-16868 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > > Making an umbrella task for all of the tech debt and code consolidation > cleanup work that can/should be done following the implementation of > [KIP-924: customizable task assignment for > Streams|https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams] > Most of this revolves around deduplicating code once it's no longer needed, > including classes like the ClientState, StandbyTaskAssignor and related > elements, and the old TaskAssignor interface along with its implementations. > Note that in 3.8, the first version in which KIP-924 was released, we just > added the new public config and new TaskAssignor interface but did not get > rid of the old internal config or old TaskAssignor interface. If neither > config is set in 3.8 we still default to the old HAAssignor, as a kind of > opt-in feature flag, and internally will convert the output of the new > TaskAssignor into the old style of ClientState-based assignment tracking. We > intend to clean up all of the old code and eventually support only the new > TaskAssignor interface as well as converting everything internally from the > ClientState to the TaskAssignment/KafkaStreamsAssignment style output -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] KIP-1049: Add config log.summary.interval.ms to Kafka Streams
Jiang, Thanks for the KIP. I think it make sense. I agree with Sophie that the KIP writeup should be improved a little bit, with regard to public API (ie, config) which are changed. The only other idea I had to avoid this issue would be some internal change: we would introduce a new logger class allowing you to disable logging for this specific logger. However, not sure how we could make this a public contract you could rely on? -- Also, given Sophie's comment about potential other logs we might want to configure, it seems like a good idea to add this config. @Sophie: what other logs did you have in mind? One more nit: rejected alternatives lists `log.summary.interval.ms` as rejected -- seems this needs to be removed? -Matthias On 5/29/24 12:56 AM, Sophie Blee-Goldman wrote: Sure, as I said I'm supportive of this KIP. Just wanted to mention how the issue could be mitigated in the meantime since the description made it sound like you were suffering from excessive logs right now. Apologies if I misinterpreted that. I do think it would be nice to have a general setting for log intervals in Streams. There are some other places where a regular summary log might be nice. The config name you proposed is generic enough that we could reuse it for other areas where we'd like to log summaries, so this seems like a good config to introduce My only question/request is that the KIP doesn't mention where this config is being added. I assume from the context and Motivation section that you're proposing to add this to StreamsConfig, which makes sense to me. But please update the KIP to say this somewhere. Otherwise the KIP LGTM. Anyone else have thoughts on this? On Thu, May 23, 2024 at 12:19 AM jiang dou wrote: Thank you for your reply, I do not recommend agreeing set log level is WARN, because INFO level logs should be useful Sophie Blee-Goldman 于2024年5月23日周四 04:30写道: Thanks for the KIP! I'm not against adding this as a config for this per se, but if this is causing you trouble right now you should be able to disable it via log4j configuration so you don't need to wait for a fix in Kafka Streams itself. Putting something like this in your log4j will shut off the offending log: log4j.logger.org.apache.kafka.streams.processor.internals.StreamThread=WARN On Wed, May 22, 2024 at 6:46 AM jiang dou wrote: Hi I would like to propose a change in the kafka-stream summary log。 Now the summary of stream-tread is record every two minutes, and not support close or update intervals. When the kafka is running, this is absolutely unnecessary and even harmful since it fills the logs and thus storage space with unwanted and useless data. I propose adding a configuration to control the output interval or disable it KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-1049%3A+Add+config+log.summary.interval.ms+to+Kafka+Streams
Re: [DISCUSS] KIP-655: Add deduplication processor in kafka-streams
Ayoub, thanks for resurrecting this KIP. I think a built-in de-duplication operator will be very useful. Couple of questions: 100: `deduplicationKeySelector` Is this the best name? It might indicate that we select a "key" what is an overloaded term... Maybe we could use `Field` or `Id` or `Attribute` instead of `Key` in the name? Just brainstorming. If we think `Key` is the best word, I am also ok with it. 101: `Deduplicated` class You propose to add static methods `keySerde()` and `valueSerde()` -- in other config classes, we use only `with(keySerde, valueSerde)` as we try to use the "builder" pattern, and avoid too many overloads. I would prefer to omit both methods you suggest and just use a single `with` for both serdes. Similarly, I thing we don't want to add `with(...)` which takes all parameters at once (which should only be 3 parameters, not 4 as it's currently in the KIP)? 102: Usage of `WindowedStore`: Would this be efficient? The physical byte layout it "" for the store key, so it would be difficult to do an efficient lookup for a given "de-duplication key" to discard duplicates, as we don't know the timestamp of the first record with the same "de-duplication key". This boils down to the actual de-duplication logic (some more comments below), but what you propose seems to require expensive range-scans what could be cost prohibitive in practice. I think we need to find a way to use efficient key-point-lookups to make this work. 103: "Processing logic": Might need some updates (Cf 102 comment). I am not sure if I fully understand the logic: cf 105 below. 104: If no entries found → forward the record + save the record in the store This part is critical, and we should discuss in detail. In the end, de-duplication does only make sense when EOS is used, and we might want to call this out (eg, on the JavaDocs)? But if used with ALOS, it's very difficult to ensure that we never lose data... Your proposal to first-forward goes into the right direction, but does not really solve the problem entirely: Even if we forward the message first, all downstream processing happens, `context.forward()` returns and we update the state store, we could now crash w/o committing offsets. For this case, we have no guarantee that the result records where published (as we did not flush the producer yet), but when re-reading from the input topic, we would find the record in the store and incorrectly drop as duplicate... I think the only solution to make this work would be to use TX-state stores in combination with ALOS as proposed via KIP-892? Using an in-memory store won't help much either? The producer could have send the write into the changelog topic, but not into the result topic, and thus we could still not guarantee ALOS...? How do we want to go about this? We could also say, this new operator only works with EOS. Would this be too restrictive? -- At lest for know, until KIP-892 lands, and we could relax it? 105: "How to detect late records" In the end, it seems to boil down to determine which of the records to forward and which record to drop, for (1) the regular case and (2) the out-of-order data case. Regular case (no out-of-order data): For this case, offset and ts order is the same, and we can forward the first record we get. All later record within "de-duplication period" with the same "de-duplication key" would be dropped. If a record with the same "de-duplication key" arrives after "de-duplication period" passed, we cannot drop it any longer, but would still forward it, as by the contract of the operator / de-duplication period. For the out-of-order case: The first question we need to answer is, do we want to forward the record with the smallest offset or the record with the smallest ts? Logically, forwarding with the smallest ts might be "more correct", however, it implies we could only forward it after "de-duplication period" passed, what might be undesired latency? Would this be desired/acceptable? In contrast, if we forward record with the smallest offset (this is what you seem to propose) we don't have a latency issue, but of course the question what records to drop is more tricky to answer: it seems you propose to compare the time difference of the stored record to the current record, but I am wondering why? Would it not be desired to drop all duplicates independent of their ts, as long as we find a record in the store? Would be good to get some more motivation and tradeoffs discussed about the different strategies we could use. You also propose to drop _any_ late record: I am also not sure if that's desired? Could this not lead to data loss? Assume we get a late record, but in fact there was never a duplicate? Why would we want to drop it? If there is a late record which is indeed a duplicate, but we purged the original record from the store already, it seems to be the same case as for the "
Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets
rds, Nick On Wed, 29 May 2024 at 09:28, Nick Telford wrote: Hi everyone, Sorry I haven't got around to updating the KIP yet. Now that I've wrapped up KIP-989, I'm going to be working on 1035 starting today. I'll update the KIP first, and then call a vote. Regards, Nick On Wed, 29 May 2024 at 07:25, Bruno Cadonna wrote: Totally agree on moving forward and starting the VOTE! However, the KIP should be updated with the new info before starting the VOTE. Best, Bruno On 5/29/24 2:36 AM, Matthias J. Sax wrote: Sounds like a good plan. -- I think we are still wrapping up 3.8 release, but would also like to move forward with with one. Should we start a VOTE? For merging PRs we need to wait after code freeze, and 3.8 branch was but. But we could start reviewing PRs before this already. -Matthias On 5/17/24 3:05 AM, Nick Telford wrote: Hi everyone, As discussed on the Zoom call, we're going to handle rebalance meta-data by: - On start-up, Streams will open each store and read its changelog offsets into an in-memory cache. This cache will be shared among all StreamThreads. - On rebalance, the cache will be consulted for Task offsets for any Task that is not active on any instance-local StreamThreads. If the Task is active on *any* instance-local StreamThread, we will report the Task lag as "up to date" (i.e. -1), because we know that the local state is currently up-to-date. We will avoid caching offsets across restarts in the legacy ".checkpoint" file, so that we can eliminate the logic for handling this class. If performance of opening/closing many state stores is poor, we can parallelise it by forking off a thread for each Task directory when reading the offsets. I'll update the KIP later today to reflect this design, but I will try to keep it high-level, so that the exact implementation can vary. Regards, Nick On Thu, 16 May 2024 at 03:12, Sophie Blee-Goldman < sop...@responsive.dev> wrote: 103: I like the idea of immediately deprecating #managesOffsets and aiming to make offset management mandatory in the long run. I assume we would also log a warning for any custom stores that return "false" from this method to encourage custom store implementations to start doing so? My only question/concern is that if we want folks to start managing their own offsets then we should make this transition easy for them, perhaps by exposing some public utility APIs for things that are currently handled by Kafka Streams such as reading/writing checkpoint files. Maybe it would be useful to include a small example in the KIP of what it would actually mean to "manage your own offsets" -- I know (all too well) that plugging in custom storage implementations is not easy and most people who do this are probably fairly advanced users, but offset management will be a totally new ballgame to most people people and this kind of feels like throwing them off the deep end. We should at least provide a lifejacket via some kind of utility API and/or example 200. There's been a lot of back and forth on the rebalance metadata/task lag computation question, so forgive me if I missed any part of this, but I think we've landed at the right idea here. To summarize: the "tl;dr" explanation is that we'll write the checkpoint file only on close and will account for hard-crash scenarios by opening up the stores on startup and writing a checkpoint file for any missing tasks. Does that sound about right? A few clarifications: I think we're all more or less on the same page here but just to be absolutely clear, the task lags for each task directory found on disk will be reported by only one of the StreamThreads, and each StreamThread will report lags only for tasks that it already owns or are not assigned to any other StreamThread in the client. In other words, we only need to get the task lag for completely unassigned/unlocked tasks, which means if there is a checkpoint file at all then it must be up-to-date, because there is no other StreamThread actively writing to that state store (if so then only that StreamThread would report lag for that particular task). This still leaves the "no checkpoint at all" case which as previously mentioned can occur after a hard-crash. Luckily we only have to worry about this once, after starting up again following said hard crash. We can simply open up each of the state stores before ever joining the group, get the offsets from rocksdb, and write them to a new checkpoint file. After that, we can depend on the checkpoints written at close and won't have to open up any stores that aren't already assigned for the reasons laid out in the paragraph above. As for the specific mechanism and which thread-does-what, since there were some questions, this is how I'm imagining the process: 1. The general idea is that
Re: [DISCUSS] KIP-924: customizable task assignment for Streams
favor of option #2 (pull/15959 <https://github.com/apache/kafka/pull/15959>) as I believe including general task metadata may be useful and this API would be easy to evolve if we wanted to add anything else in a future KIP. The current KIP was updated using this option, although nothing related to the rack ids has been merged yet. We're happy to defer to anyone with a strong preference for either of these options, or a new suggestion of their own. As always, let us know if you have any questions or concerns or feedback of any kind. Thanks! On Mon, May 6, 2024 at 1:33 PM Sophie Blee-Goldman < sop...@responsive.dev> wrote: Thanks guys. Updated the error codes in both the code and the explanation under "Public Changes". To sum up, here are the error codes listed in the KIP: enum AssignmentError { NONE, ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES, ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS, INVALID_STANDBY_TASK, UNKNOWN_PROCESS_ID, UNKNOWN_TASK_ID } Anything missing? (also updated all the code block headings, thanks for noticing that Bruno) On Fri, May 3, 2024 at 9:33 AM Matthias J. Sax wrote: 117f: Good point by Bruno. We should check for this, and could have an additional `INVALID_STANDBY_TASK` error code? -Matthias On 5/3/24 5:52 AM, Guozhang Wang wrote: Hi Sophie, Re: As for the return type of the TaskAssignmentUtils, I think that makes sense. LGTM. On Fri, May 3, 2024 at 2:26 AM Bruno Cadonna wrote: Hi Sophie, 117f: I think, removing the STATEFUL and STATELESS types is not enough to avoid the error Guozhang mentioned. The StreamsPartitionAssignor passes the information whether a task is stateless or stateful into the task assignor. However, the task assignor can return a standby task for a stateless task which is inconsistent. Echoing Matthias' statement about the missing UNKNOWN_TASK_ID error. nit: The titles of some code blocks in the KIP are not consistent with their content, e.g., KafkaStreamsState <-> NodeState Best, Bruno On 5/3/24 2:43 AM, Matthias J. Sax wrote: Thanks Sophie. My bad. You are of course right about `TaskAssignment` and the StreamsPartitionAssignor's responsibitliy to map tasks of a instance to consumers. When I wrote my reply, I forgot about this detail. Seems you did not add `UNKNOWN_TASK_ID` error yet as proposed by Guozhang? Otherwise LGTM. -Matthias On 5/2/24 4:20 PM, Sophie Blee-Goldman wrote: Guozhang: 117. All three additions make sense to me. However, while thinking about how users would actually produce an assignment, I realized that it seems silly to make it their responsibility to distinguish between a stateless and stateful task when they return the assignment. The StreamsPartitionAssignor already knows which tasks are stateful vs stateless, so there's no need to add this extra step for users to figure it out themselves, and potentially make a mistake. 117f: So, rather than add a new error type for "inconsistent task types", I'm proposing to just flatten the AssignedTask.Type enum to only "ACTIVE" and "STANDBY", and remove the "STATEFUL" and "STATELESS" types altogether. Any objections? - -Thanks, fixed the indentation of headers under "User APIs" and "Read-Only APIs" -As for the return type of the TaskAssignmentUtils methods, I don't personally feel too strongly about this, but the reason for the return type being a Map rather than a TaskAssignment is because they are meant to be used iteratively/to create a part of the full assignment, and not necessarily a full assignment for each. Notice that they all have an input parameter of the same type: Map KafkaStreamsAssignment>. The idea is you can take the output of any of these and pass it in to another to generate or optimize another piece of the overall assignment. For example, if you want to perform the rack-aware optimization on both active and standby tasks, you would need to call #optimizeRackAwareActiveTasks and then forward the output to #optimizeRackAwareStandbyTasks to get the final assignment. If we return a TaskAssignment, it will usually need to be unwrapped right away. Perhaps more importantly, I worry that returning a TaskAssignment will make it seem like each of these utility methods return a "full" and final assignment that can just be returned as-is from the TaskAssignor's #assign method. Whereas they are each just a single step in the full assignment process, and not the final product. Does that make sense? On Thu, May 2, 2024 at 3:50 PM Sophie Blee-Goldman wrote: Matthias: Thanks for the naming suggestions for the error codes. I was definitely not happy with my original naming but couldn't think of anything better. I like your proposals though, will update the KIP names. I
Re: Hello world - please subscribe me!
Chris, Subscription is self-service. Please follow the instruction from the web page: https://kafka.apache.org/contact -Matthias On 6/3/24 1:59 AM, Chris wrote:
[jira] [Assigned] (KAFKA-16248) Kafka consumer should cache leader offset ranges
[ https://issues.apache.org/jira/browse/KAFKA-16248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-16248: --- Assignee: Alieh Saeedi > Kafka consumer should cache leader offset ranges > > > Key: KAFKA-16248 > URL: https://issues.apache.org/jira/browse/KAFKA-16248 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Brutschy >Assignee: Alieh Saeedi >Priority: Critical > > We noticed a streams application received an OFFSET_OUT_OF_RANGE error > following a network partition and streams task rebalance and subsequently > reset its offsets to the beginning. > Inspecting the logs, we saw multiple consumer log messages like: > {code:java} > Setting offset for partition tp to the committed offset > FetchPosition{offset=1234, offsetEpoch=Optional.empty...) > {code} > Inspecting the streams code, it looks like kafka streams calls `commitSync` > passing through an explicit OffsetAndMetadata object but does not populate > the offset leader epoch. > The offset leader epoch is required in the offset commit to ensure that all > consumers in the consumer group have coherent metadata before fetching. > Otherwise after a consumer group rebalance, a consumer may fetch with a stale > leader epoch with respect to the committed offset and get an offset out of > range error from a zombie partition leader. > An example of where this can cause issues: > 1. We have a consumer group with consumer 1 and consumer 2. Partition P is > assigned to consumer 1 which has up-to-date metadata for P. Consumer 2 has > stale metadata for P. > 2. Consumer 1 fetches partition P with offset 50, epoch 8. commits the offset > 50 without an epoch. > 3. The consumer group rebalances and P is now assigned to consumer 2. > Consumer 2 has a stale leader epoch for P (let's say leader epoch 7). > Consumer 2 will now try to fetch with leader epoch 7, offset 50. If we have a > zombie leader due to a network partition, the zombie leader may accept > consumer 2's fetch leader epoch and return an OFFSET_OUT_OF_RANGE to consumer > 2. > If in step 1, consumer 1 committed the leader epoch for the message, then > when consumer 2 receives assignment P it would force a metadata refresh to > discover a sufficiently new leader epoch for the committed offset. > Kafka Streams cannot fully determine the leader epoch of the offsets it wants > to commit - in EOS mode, streams commits the offset after the last control > records (to avoid always having a lag of >0), but the leader epoch of the > control record is not known to streams (since only non-control records are > returned from Consumer.poll). > A fix discussed with [~hachikuji] is to have the consumer cache leader epoch > ranges, similar to how the broker maintains a leader epoch cache. > This ticket was split from the original ticket > https://issues.apache.org/jira/browse/KAFKA-15344 which was described as a > streams fix, but the problem cannot be fully fixed in streams. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16863) Consider removing `default.` prefix for exception handler config
[ https://issues.apache.org/jira/browse/KAFKA-16863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17850869#comment-17850869 ] Matthias J. Sax commented on KAFKA-16863: - I think `traffic_cost` was on purpose... But I really don't feel strong about it at all. In general, I am always in favor to cleanup stuff; we also just did KIP-1020. Not sure if we should do a single KIP though. I can become very convoluted quickly. I would rather to multiple smaller KIPs? Not sure what other issue there might be? Do you have a list? > Consider removing `default.` prefix for exception handler config > > > Key: KAFKA-16863 > URL: https://issues.apache.org/jira/browse/KAFKA-16863 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax >Priority: Trivial > Labels: need-kip > > Kafka Streams has a set of configs with `default.` prefix. The intent for the > default-prefix is to make a distinction between, well the default, and > in-place overwrites in the code. Eg, users can specify ts-extractors on a > per-topic basis. > However, for the deserialization- and production-exception handlers, no such > overwrites are possible, and thus, `default.` does not really make sense, > because there is just one handler overall. Via KIP-1033 we added a new > processing-exception handler w/o a default-prefix, too. > Thus, we should consider to deprecate the two existing configs names and add > them back w/o the `default.` prefix. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16863) Consider removing `default.` prefix for exception handler config
Matthias J. Sax created KAFKA-16863: --- Summary: Consider removing `default.` prefix for exception handler config Key: KAFKA-16863 URL: https://issues.apache.org/jira/browse/KAFKA-16863 Project: Kafka Issue Type: Improvement Components: streams Reporter: Matthias J. Sax Kafka Streams has a set of configs with `default.` prefix. The intent for the default-prefix is to make a distinction between, well the default, and in-place overwrites in the code. Eg, users can specify ts-extractors on a per-topic basis. However, for the deserialization- and production-exception handlers, no such overwrites are possible, and thus, `default.` does not really make sense, because there is just one handler overall. Via KIP-1033 we added a new processing-exception handler w/o a default-prefix, too. Thus, we should consider to deprecate the two existing configs names and add them back w/o the `default.` prefix. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16863) Consider removing `default.` prefix for exception handler config
Matthias J. Sax created KAFKA-16863: --- Summary: Consider removing `default.` prefix for exception handler config Key: KAFKA-16863 URL: https://issues.apache.org/jira/browse/KAFKA-16863 Project: Kafka Issue Type: Improvement Components: streams Reporter: Matthias J. Sax Kafka Streams has a set of configs with `default.` prefix. The intent for the default-prefix is to make a distinction between, well the default, and in-place overwrites in the code. Eg, users can specify ts-extractors on a per-topic basis. However, for the deserialization- and production-exception handlers, no such overwrites are possible, and thus, `default.` does not really make sense, because there is just one handler overall. Via KIP-1033 we added a new processing-exception handler w/o a default-prefix, too. Thus, we should consider to deprecate the two existing configs names and add them back w/o the `default.` prefix. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets
ries on disk and attempts to lock them one by one. If they obtain the lock, check whether there is state but no checkpoint, and write the checkpoint if needed. If it can't grab the lock, then we know one of the other StreamThreads must be handling the checkpoint file for that task directory, and we can move on. Don't really feel too strongly about which approach is best, doing it in KafkaStreams#start is certainly the most simple while doing it in the StreamThread's startup is more efficient. If we're worried about adding too much weight to KafkaStreams#start then the 2nd option is probably best, though slightly more complicated. Thoughts? On Tue, May 14, 2024 at 10:02 AM Nick Telford wrote: Hi everyone, Sorry for the delay in replying. I've finally now got some time to work on this. Addressing Matthias's comments: 100. Good point. As Bruno mentioned, there's already AbstractReadWriteDecorator which we could leverage to provide that protection. I'll add details on this to the KIP. 101,102. It looks like these points have already been addressed by Bruno. Let me know if anything here is still unclear or you feel needs to be detailed more in the KIP. 103. I'm in favour of anything that gets the old code removed sooner, but wouldn't deprecating an API that we expect (some) users to implement cause problems? I'm thinking about implementers of custom StateStores, as they may be confused by managesOffsets() being deprecated, especially since they would have to mark their implementation as @Deprecated in order to avoid compile warnings. If deprecating an API *while it's still expected to be implemented* is something that's generally done in the project, then I'm happy to do so here. 104. I think this is technically possible, but at the cost of considerable additional code to maintain. Would we ever have a pathway to remove this downgrade code in the future? Regarding rebalance metadata: Opening all stores on start-up to read and cache their offsets is an interesting idea, especially if we can avoid re-opening the stores once the Tasks have been assigned. Scalability shouldn't be too much of a problem, because typically users have a fairly short state.cleanup.delay, so the number of on-disk Task directories should rarely exceed the number of Tasks previously assigned to that instance. An advantage of this approach is that it would also simplify StateStore implementations, as they would only need to guarantee that committed offsets are available when the store is open. I'll investigate this approach this week for feasibility and report back. I think that covers all the outstanding feedback, unless I missed anything? Regards, Nick On Mon, 6 May 2024 at 14:06, Bruno Cadonna wrote: Hi Matthias, I see what you mean. To sum up: With this KIP the .checkpoint file is written when the store closes. That is when: 1. a task moves away from Kafka Streams client 2. Kafka Streams client shuts down A Kafka Streams client needs the information in the .checkpoint file 1. on startup because it does not have any open stores yet. 2. during rebalances for non-empty state directories of tasks that are not assigned to the Kafka Streams client. With hard crashes, i.e., when the Streams client is not able to close its state stores and write the .checkpoint file, the .checkpoint file might be quite stale. That influences the next rebalance after failover negatively. My conclusion is that Kafka Streams either needs to open the state stores at start up or we write the checkpoint file more often. Writing the .checkpoint file during processing more often without controlling the flush to disk would work. However, Kafka Streams would checkpoint offsets that are not yet persisted on disk by the state store. That is with a hard crash the offsets in the .checkpoint file might be larger than the offsets checkpointed in the state store. That might not be a problem if Kafka Streams uses the .checkpoint file only to compute the task lag. The downside is that it makes the managing of checkpoints more complex because now we have to maintain two checkpoints: one for restoration and one for computing the task lag. I think we should explore the option where Kafka Streams opens the state stores at start up to get the offsets. I also checked when Kafka Streams needs the checkpointed offsets to compute the task lag during a rebalance. Turns out Kafka Streams needs them before sending the join request. Now, I am wondering if opening the state stores of unassigned tasks whose state directory exists locally is actually such a big issue due to the expected higher latency since it happens actually before the Kafka Streams client joins the rebalance. Best, Bruno On 5/4/24 12:05 AM, Matthias J. Sax wrote: That's good questions... I could think of a few approaches, but I admit it might all be a littl
Re: kindly add me in community
Mailing list subscription is self-service. Please follow the instruction from the web-page: https://kafka.apache.org/contact -Matthias On 5/21/24 2:00 AM, Prashant Lohia wrote: thanks prashant lohia prash...@gsl.in
Re: outerjoin not joining after window
Can someone confirm that each partition has its own stream time and that the stream time for a partition only advances when a record is written to the partition after the window closes? That's correct. On 5/21/24 10:11 AM, Chad Preisler wrote: After reviewing the logs, I think I understand what happens with the repartition topics. Looks like they will be assigned to one or more instances. In my example I ran three instances of the application (A, B, C). Looks like the two repartition topics got assigned to A and B. The six partitions from the input topics got split evenly across all three running instances A, B, and C. Since the repartitioned streams are what I'm joining on, I guess the join will run on two instances, and any input topic processing will run across all three. Is that correct? Still would like clarification regarding some records appearing to not get processed: I think the issue is related to certain partitions not getting records to advance stream time (because of low volume). Can someone confirm that each partition has its own stream time and that the stream time for a partition only advances when a record is written to the partition after the window closes? On Tue, May 21, 2024 at 10:27 AM Chad Preisler wrote: See one small edit below... On Tue, May 21, 2024 at 10:25 AM Chad Preisler wrote: Hello, I think the issue is related to certain partitions not getting records to advance stream time (because of low volume). Can someone confirm that each partition has its own stream time and that the stream time for a partition only advances when a record is written to the partition after the window closes? If I use the repartition method on each input topic to reduce the number of partitions for those streams, how many instances of the application will process records? For example, if the input topics each have 6 partitions, and I use the repartition method to set the number of partitions for the streams to 2, how many instances of the application will process records? Thanks, Chad On Wed, May 1, 2024 at 6:47 PM Matthias J. Sax wrote: How do you know this? First thing we do is write a log message in the value joiner. We don't see the log message for the missed records. Well, for left/right join results, the ValueJoiner would only be called when the window is closed... And for invalid input (or late record, ie, which arrive out-of-order and their window was already closes), records would be dropped right away. So you cannot really infer that a record did make it into the join or not, or what happens if it did make it into the `Processor`. -> https://kafka.apache.org/documentation/#kafka_streams_task_monitoring `dropped-records-total` is the name of the metric. -Matthias On 5/1/24 11:35 AM, Chad Preisler wrote: Hello, We did some testing in our test environment today. We are seeing some records processes where only one side of the join has a record. So that's good. However, we are still seeing some records get skipped. They never hit the value joiner (we write a log message first thing in the value joiner). During the test we were putting some load on the system, so stream time was advancing. We did notice that the join windows were taking much longer than 30 minutes to close and process records. Thirty minutes is the window plus grace. How do you know this? First thing we do is write a log message in the value joiner. We don't see the log message for the missed records. I will try pushing the same records locally. However, we don't see any errors in our logs and the stream does process one sided joins after the skipped record. Do you have any docs on the "dropper records" metric? I did a Google search and didn't find many good results for that. Thanks, Chad On Tue, Apr 30, 2024 at 2:49 PM Matthias J. Sax wrote: Thanks for the information. I ran the code using Kafka locally. After submitting some records inside and outside of the time window and grace, the join performed as expected when running locally. That gives some hope :) However, they never get into the join. How do you know this? Did you check the metric for dropper records? Maybe records are considers malformed and dropped? Are you using the same records in production and in your local test? Are there any settings for the stream client that would affect the join? Not that I can think of... There is one more internal config, but as long as data is flowing, it should not impact the result you see. Are there any settings on the broker side that would affect the join? No. The join is computed client side. Broker configs should not have any impact. f I increase the log level for the streams API would that shed some light on what is happening? I don't think it would help much. The code in question is org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin -- but it does not do any log
[jira] [Commented] (KAFKA-16586) Test TaskAssignorConvergenceTest failing
[ https://issues.apache.org/jira/browse/KAFKA-16586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848476#comment-17848476 ] Matthias J. Sax commented on KAFKA-16586: - I did suspect a bug, too, and did include the seed :) – Just did not know that we also need the strategy? Given that we don't have a lot of strategies, seems we could rerun each strategy with the given seed and one should just fail? > Test TaskAssignorConvergenceTest failing > > > Key: KAFKA-16586 > URL: https://issues.apache.org/jira/browse/KAFKA-16586 > Project: Kafka > Issue Type: Test > Components: streams, unit tests > Reporter: Matthias J. Sax >Priority: Major > > {code:java} > java.lang.AssertionError: Assertion failed in randomized test. Reproduce > with: `runRandomizedScenario(-538095696758490522)`. at > org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545) > at > org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code} > This might expose an actual bug (or incorrect test setup) and should be > reproducible (did not try it myself yet). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16586) Test TaskAssignorConvergenceTest failing
[ https://issues.apache.org/jira/browse/KAFKA-16586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16586: Description: {code:java} java.lang.AssertionError: Assertion failed in randomized test. Reproduce with: `runRandomizedScenario(-538095696758490522)`.at org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545) at org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code} This might expose an actual bug (or incorrect test setup) and should be reproducible (did not try it myself yet). was: {code:java} java.lang.AssertionError: Assertion failed in randomized test. Reproduce with: `runRandomizedScenario(-538095696758490522)`.at org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545) at org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code} This might expose an actual bug (or incorrect test setup) and should be reproducible (die not try it myself yet). > Test TaskAssignorConvergenceTest failing > > > Key: KAFKA-16586 > URL: https://issues.apache.org/jira/browse/KAFKA-16586 > Project: Kafka > Issue Type: Test > Components: streams, unit tests > Reporter: Matthias J. Sax >Priority: Major > > {code:java} > java.lang.AssertionError: Assertion failed in randomized test. Reproduce > with: `runRandomizedScenario(-538095696758490522)`. at > org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.runRandomizedScenario(TaskAssignorConvergenceTest.java:545) > at > org.apache.kafka.streams.processor.internals.assignment.TaskAssignorConvergenceTest.randomClusterPerturbationsShouldConverge(TaskAssignorConvergenceTest.java:479){code} > This might expose an actual bug (or incorrect test setup) and should be > reproducible (did not try it myself yet). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] KIP-1027 Add MockFixedKeyProcessorContext
I was not aware of `InternalFixedKeyRecordFactory`. As the name indicates, it's considered an internal class, so not sure if we should recommend to use it in test... I understand why this class is required, and why it was put into a public package; the way Java works, enforces this. Not sure if we could find a better solution. Might be good to hear from others. -Matthias On 5/21/24 3:57 PM, Shashwat Pandey wrote: Looking at the ticket and the sample code, I think it would be possible to continue using `InternalFixedKeyRecordFactory` as the avenue to create `FixedKeyRecord`s in tests. As long as there was a MockFixedKeyProcessorContext, I think we would be able to test FixedKeyProcessors without a Topology. I created a sample repo with the `MockFixedKeyProcessorContext` here is what I think the tests would look like: https://github.com/s7pandey/kafka-processor-tests/blob/main/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java On Mon, May 20, 2024 at 9:05 PM Matthias J. Sax wrote: Had a discussion on https://issues.apache.org/jira/browse/KAFKA-15242 and it was pointed out, that we also need to do something about `FixedKeyRecord`. It does not have a public constructor (what is correct; it should not have one). However, this makes testing `FixedKeyProcessor` impossible w/o extending `FixedKeyRecord` manually what does not seem to be right (too clumsy). It seems, we either need some helper builder method (but not clear to me where to add it in an elegant way) which would provide us with a `FixedKeyRecord`, or add some sub-class to the test-utils module which would extend `FixedKeyRecord`? -- Or maybe an even better solution? I could not think of something else so far. Thoughts? On 5/3/24 9:46 AM, Matthias J. Sax wrote: Please also update the KIP. To get a wiki account created, please request it via a commet on this ticket: https://issues.apache.org/jira/browse/INFRA-25451 After you have the account, please share your wiki id, and we can give you write permission on the wiki. -Matthias On 5/3/24 6:30 AM, Shashwat Pandey wrote: Hi Matthias, Sorry this fell out of my radar for a bit. Revisiting the topic, I think you’re right and we accept the duplicated nesting as an appropriate solution to not affect the larger public API. I can update my PR with the change. Regards, Shashwat Pandey On Wed, May 1, 2024 at 11:00 PM Matthias J. Sax wrote: Any updates on this KIP? On 3/28/24 4:11 AM, Matthias J. Sax wrote: It seems that `MockRecordMetadata` is a private class, and thus not part of the public API. If there are any changes required, we don't need to discuss on the KIP. For `CapturedPunctuator` and `CapturedForward` it's a little bit more tricky. My gut feeling is, that the classes might not need to be changed, but if we use them within `MockProcessorContext` and `MockFixedKeyProcessorContext` it might be weird to keep the current nesting... The problem I see is, that it's not straightforward how to move the classes w/o breaking compatibility, nor if we duplicate them as standalone classes w/o a larger "splash radius". (We would need to add new overloads for MockProcessorContext#scheduledPunctuators() and MockProcessorContext#forwarded()). Might be good to hear from others if we think it's worth this larger changes to get rid of the nesting, or just accept the somewhat not ideal nesting as it technically is not a real issue? -Matthias On 3/15/24 1:47 AM, Shashwat Pandey wrote: Thanks for the feedback Matthias! The reason I proposed the extension of MockProcessorContext was more to do with the internals of the class (MockRecordMetadata, CapturedPunctuator and CapturedForward). However, I do see your point, I would then think to split MockProcessorContext and MockFixedKeyProcessorContext, some of the internal classes should also be extracted i.e. MockRecordMetadata, CapturedPunctuator and probably a new CapturedFixedKeyForward. Let me know what you think! Regards, Shashwat Pandey On Mon, Mar 11, 2024 at 10:09 PM Matthias J. Sax wrote: Thanks for the KIP Shashwat. Closing this testing gap is great! It did come up a few time already... One question: why do you propose to `extend MockProcessorContext`? Given how the actual runtime context classes are setup, it seems that the regular context and fixed-key-context are distinct, and thus I believe both mock-context classes should be distinct, too? What I mean is that FixedKeyProcessorContext does not extend ProcessorContext. Both classes have a common parent ProcessINGContext (note the very similar but different names), but they are "siblings" only, so why make the mock processor a parent-child relationship? It seems better to do public class MockFixedKeyProcessorContext implements FixedKeyProcessorContext, RecordCollector.Supplier Of course, if there is code we can share between both mock-context we should so thi
Re: Permission to contribute to Apache Kafka
You should be all set. On 5/21/24 8:30 AM, Harry Fallows wrote: Hello, I am following the[Getting Started guide for writing KIPs](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals). Could someone give me the permissions to write a KIP please? My Wiki ID is harryfallows, my Jira ID is hfallows, and my email address is harryfall...@protonmail.com. Thanks,Harry
Re: Request for Authorization to Create KIP
You should be all set. On 5/21/24 6:58 PM, 黃竣陽 wrote: I want to create a KIP, and my wiki id : m1a2st and Jira id : m1a2st, Thanks for your help. jiang dou 於 2024年5月22日 上午9:01 寫道: You should send your jira ID and wiki ID, Please refer to this address : https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals 黃竣陽 于2024年5月21日周二 22:42写道: I am writing to request authorization to create a KIP. Currently, I do not have the necessary permissions to access the 'Create KIP' function. My account email is s7133...@gmail.com. Could you please grant me the required permissions to create a KIP? Thanks for your help.
Re: Request to be added to kafka contributors list
Ok. Hopefully it's working now. Sorry for the hiccup. -Matthias On 5/21/24 1:14 AM, Fan Yang wrote: Hi Matthia, I tried sign out and sign in, still can't find the "Assign" button, my JIRA ID is fanyan, could you help me set it again? Best, Fan ________ From: Matthias J. Sax Sent: Saturday, May 18, 2024 4:06 To: users@kafka.apache.org Subject: Re: 回复: Request to be added to kafka contributors list Did you sign out and sign in again? On 5/17/24 9:49 AM, Yang Fan wrote: Thanks Matthias, I still can't find "Assign to me" button beside Assignee and Reporter. Could you help me set it again? Best regards, Fan ____________ 发件人: Matthias J. Sax 发送时间: 2024年5月17日 2:24 收件人: users@kafka.apache.org 主题: Re: Request to be added to kafka contributors list Thanks for reaching out Yang. You should be all set. -Matthias On 5/16/24 7:40 AM, Yang Fan wrote: Dear Apache Kafka Team, I hope this email finds you well. My name is Fan Yang, JIRA ID is fanyan, I kindly request to be added to the contributors list for Apache Kafka. Being part of this list would allow me to be assigned to JIRA tickets and work on them. Thank you for considering my request. Best regards, Fan
[jira] [Resolved] (KAFKA-15242) FixedKeyProcessor testing is unusable
[ https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-15242. - Assignee: (was: Alexander Aghili) Resolution: Duplicate > FixedKeyProcessor testing is unusable > - > > Key: KAFKA-15242 > URL: https://issues.apache.org/jira/browse/KAFKA-15242 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Zlstibor Veljkovic >Priority: Major > > Using mock processor context to get the forwarded message doesn't work. > Also there is not a well documented way for testing FixedKeyProcessors. > Please see the repo at [https://github.com/zveljkovic/kafka-repro] > but most important piece is test file with runtime and compile time errors: > [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15242) FixedKeyProcessor testing is unusable
[ https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-15242. - Assignee: (was: Alexander Aghili) Resolution: Duplicate > FixedKeyProcessor testing is unusable > - > > Key: KAFKA-15242 > URL: https://issues.apache.org/jira/browse/KAFKA-15242 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Zlstibor Veljkovic >Priority: Major > > Using mock processor context to get the forwarded message doesn't work. > Also there is not a well documented way for testing FixedKeyProcessors. > Please see the repo at [https://github.com/zveljkovic/kafka-repro] > but most important piece is test file with runtime and compile time errors: > [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: Fwd: Request to be added to kafka contributors list
Done. You should be all set :) -Matthias On 5/20/24 10:10 AM, bou...@ulukai.net wrote: Dear Apache Kafka Team, I hope to post in the right place: my name is Franck LEDAY, under Apache-Jira ID "handfreezer". I opened an issue as Improvement KAFKA-16707 but I failed to assigned it to me. May I ask to be added to the contributors list for Apache Kafka? As I already did the job of improvement, and would like to be assigned on to end my contribution. Thank you for considering my request. Best regards, Franck.
Re: Request for contributor list
What is your Jira ID? -Matthias On 5/20/24 9:55 AM, Brenden Deluna wrote: Hello, I am requesting to be added to the contributor list to take care of some tickets. Thank you.
Re: Release plan required
Zookeeper is already deprecated (since 3.5): https://kafka.apache.org/documentation/#zk_depr It's planned to be fully removed in 4.0 release. It's not confirmed yet, but there is a high probability that there won't be a 3.9 release, and that 4.0 will follow 3.8. -Matthias On 5/20/24 2:11 AM, Sahil Sharma D wrote: Hello, When Zookeeper is planned to depreciated from kafka, in which release this depreciation is planned? Regards, Sahil -Original Message- From: Sanskar Jhajharia Sent: Monday, May 20, 2024 1:38 PM To: users@kafka.apache.org Subject: Re: Release plan required [You don't often get email from sjhajha...@confluent.io.invalid. Learn why this is important at https://aka.ms/LearnAboutSenderIdentification ] Hey Sahil, You can find the complete details of the releases and bug fix releases here: https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan The next release in Pipeline currently is 3.8.0 ( https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+3.8.0). There is al.o a bugfix release 3.7.1 scheduled here ( https://cwiki.apache.org/confluence/display/KAFKA/Release+plan+3.7.1) Hope that answers your question! Cheers. Sanskar Jhajharia Software Engineer I E-mail: Personal | Official On Mon, May 20, 2024 at 1:31 PM Sahil Sharma D wrote: Hi team, We need the Kafka release plan for our Kafka upgrade planning, kindly share the latest Release Plan or when is the next release is planned and which version? Regards, Sahil
[jira] [Commented] (KAFKA-16801) Streams upgrade :test target doesn't find any junit tests
[ https://issues.apache.org/jira/browse/KAFKA-16801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848022#comment-17848022 ] Matthias J. Sax commented on KAFKA-16801: - There packages contain code for system tests. We put the code under `src/test/java/...`; there is no `src/main/java/...` and the code is not unit test code either What would be the right way to address this? > Streams upgrade :test target doesn't find any junit tests > - > > Key: KAFKA-16801 > URL: https://issues.apache.org/jira/browse/KAFKA-16801 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Greg Harris >Priority: Major > Labels: newbie > > No test executed. This behavior has been deprecated. > This will fail with an error in Gradle 9.0. > There are test sources present but no test was executed. Please check your > test configuration. > [Documentation|https://docs.gradle.org/8.7/userguide/upgrading_version_8.html#test_task_fail_on_no_test_executed] > > 23 usages > > Task::streams:upgrade-system-tests-0100:test > Task::streams:upgrade-system-tests-0101:test > Task::streams:upgrade-system-tests-0102:test > Task::streams:upgrade-system-tests-0110:test > Task::streams:upgrade-system-tests-10:test > Task::streams:upgrade-system-tests-11:test > Task::streams:upgrade-system-tests-20:test > Task::streams:upgrade-system-tests-21:test > Task::streams:upgrade-system-tests-22:test > Task::streams:upgrade-system-tests-23:test > Task::streams:upgrade-system-tests-24:test > Task::streams:upgrade-system-tests-25:test > Task::streams:upgrade-system-tests-26:test > Task::streams:upgrade-system-tests-27:test > Task::streams:upgrade-system-tests-28:test > Task::streams:upgrade-system-tests-30:test > Task::streams:upgrade-system-tests-31:test > Task::streams:upgrade-system-tests-32:test > Task::streams:upgrade-system-tests-33:test > Task::streams:upgrade-system-tests-34:test > Task::streams:upgrade-system-tests-35:test > Task::streams:upgrade-system-tests-36:test > Task::streams:upgrade-system-tests-37:test -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16794) Can't open videos in streams documentation
[ https://issues.apache.org/jira/browse/KAFKA-16794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16794: Component/s: streams > Can't open videos in streams documentation > -- > > Key: KAFKA-16794 > URL: https://issues.apache.org/jira/browse/KAFKA-16794 > Project: Kafka > Issue Type: Bug > Components: docs, streams >Reporter: Kuan Po Tseng >Priority: Minor > Attachments: IMG_4445.png, image.png > > > Can't open videos in page [https://kafka.apache.org/documentation/streams/] > Open console in chrome browser and it shows error message: > {{Refused to frame 'https://www.youtube.com/' because it violates the > following Content Security Policy directive: "frame-src 'self'".}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] KIP-1027 Add MockFixedKeyProcessorContext
Had a discussion on https://issues.apache.org/jira/browse/KAFKA-15242 and it was pointed out, that we also need to do something about `FixedKeyRecord`. It does not have a public constructor (what is correct; it should not have one). However, this makes testing `FixedKeyProcessor` impossible w/o extending `FixedKeyRecord` manually what does not seem to be right (too clumsy). It seems, we either need some helper builder method (but not clear to me where to add it in an elegant way) which would provide us with a `FixedKeyRecord`, or add some sub-class to the test-utils module which would extend `FixedKeyRecord`? -- Or maybe an even better solution? I could not think of something else so far. Thoughts? On 5/3/24 9:46 AM, Matthias J. Sax wrote: Please also update the KIP. To get a wiki account created, please request it via a commet on this ticket: https://issues.apache.org/jira/browse/INFRA-25451 After you have the account, please share your wiki id, and we can give you write permission on the wiki. -Matthias On 5/3/24 6:30 AM, Shashwat Pandey wrote: Hi Matthias, Sorry this fell out of my radar for a bit. Revisiting the topic, I think you’re right and we accept the duplicated nesting as an appropriate solution to not affect the larger public API. I can update my PR with the change. Regards, Shashwat Pandey On Wed, May 1, 2024 at 11:00 PM Matthias J. Sax wrote: Any updates on this KIP? On 3/28/24 4:11 AM, Matthias J. Sax wrote: It seems that `MockRecordMetadata` is a private class, and thus not part of the public API. If there are any changes required, we don't need to discuss on the KIP. For `CapturedPunctuator` and `CapturedForward` it's a little bit more tricky. My gut feeling is, that the classes might not need to be changed, but if we use them within `MockProcessorContext` and `MockFixedKeyProcessorContext` it might be weird to keep the current nesting... The problem I see is, that it's not straightforward how to move the classes w/o breaking compatibility, nor if we duplicate them as standalone classes w/o a larger "splash radius". (We would need to add new overloads for MockProcessorContext#scheduledPunctuators() and MockProcessorContext#forwarded()). Might be good to hear from others if we think it's worth this larger changes to get rid of the nesting, or just accept the somewhat not ideal nesting as it technically is not a real issue? -Matthias On 3/15/24 1:47 AM, Shashwat Pandey wrote: Thanks for the feedback Matthias! The reason I proposed the extension of MockProcessorContext was more to do with the internals of the class (MockRecordMetadata, CapturedPunctuator and CapturedForward). However, I do see your point, I would then think to split MockProcessorContext and MockFixedKeyProcessorContext, some of the internal classes should also be extracted i.e. MockRecordMetadata, CapturedPunctuator and probably a new CapturedFixedKeyForward. Let me know what you think! Regards, Shashwat Pandey On Mon, Mar 11, 2024 at 10:09 PM Matthias J. Sax wrote: Thanks for the KIP Shashwat. Closing this testing gap is great! It did come up a few time already... One question: why do you propose to `extend MockProcessorContext`? Given how the actual runtime context classes are setup, it seems that the regular context and fixed-key-context are distinct, and thus I believe both mock-context classes should be distinct, too? What I mean is that FixedKeyProcessorContext does not extend ProcessorContext. Both classes have a common parent ProcessINGContext (note the very similar but different names), but they are "siblings" only, so why make the mock processor a parent-child relationship? It seems better to do public class MockFixedKeyProcessorContext implements FixedKeyProcessorContext, RecordCollector.Supplier Of course, if there is code we can share between both mock-context we should so this, but it should not leak into the public API? -Matthias On 3/11/24 5:21 PM, Shashwat Pandey wrote: Hi everyone, I would like to start the discussion on https://cwiki.apache.org/confluence/display/KAFKA/KIP-1027%3A+Add+MockFixedKeyProcessorContext This adds MockFixedKeyProcessorContext to the Kafka Streams Test Utils library. Regards, Shashwat Pandey
[jira] [Commented] (KAFKA-15143) MockFixedKeyProcessorContext is missing from test-utils
[ https://issues.apache.org/jira/browse/KAFKA-15143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848021#comment-17848021 ] Matthias J. Sax commented on KAFKA-15143: - As pointed out on KAFKA-15242 cf comments, we also need to do something about `FixedKeyRecord`, because it does not have (and should not have) a public constructor. I'll point this out on the DISCUSS thread of the KIP, too. > MockFixedKeyProcessorContext is missing from test-utils > --- > > Key: KAFKA-15143 > URL: https://issues.apache.org/jira/browse/KAFKA-15143 > Project: Kafka > Issue Type: Bug > Components: streams-test-utils >Affects Versions: 3.5.0 >Reporter: Tomasz Kaszuba >Assignee: Shashwat Pandey >Priority: Major > Labels: needs-kip > > I am trying to test a ContextualFixedKeyProcessor but it is not possible to > call the init method from within a unit test since the MockProcessorContext > doesn't implement > {code:java} > FixedKeyProcessorContext {code} > but only > {code:java} > ProcessorContext > {code} > Shouldn't there also be a *MockFixedKeyProcessorContext* in the test utils? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15242) FixedKeyProcessor testing is unusable
[ https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848020#comment-17848020 ] Matthias J. Sax commented on KAFKA-15242: - I don't think that `TestRecord` has anything to do with it, because `TestRecord` is not used in combination with `MockProcessorContext`, but only in combination with the `TopologyTestDriver` (and corresponding `TestInputTopic` and `TestOutputTopic`). I agree though, that we need some more helper class, because `FixedKeyRecord` objects cannot be instantiated directly (no public constructor). Thanks for the call out – the KIP needs to be extended accordingly – we would have missed this... This ticket did not have this dependency in its description either though. I think we can still close it as duplicate, and add anything missing to the other ticket? > FixedKeyProcessor testing is unusable > - > > Key: KAFKA-15242 > URL: https://issues.apache.org/jira/browse/KAFKA-15242 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Zlstibor Veljkovic >Assignee: Alexander Aghili >Priority: Major > > Using mock processor context to get the forwarded message doesn't work. > Also there is not a well documented way for testing FixedKeyProcessors. > Please see the repo at [https://github.com/zveljkovic/kafka-repro] > but most important piece is test file with runtime and compile time errors: > [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: 回复: Request to be added to kafka contributors list
Did you sign out and sign in again? On 5/17/24 9:49 AM, Yang Fan wrote: Thanks Matthias, I still can't find "Assign to me" button beside Assignee and Reporter. Could you help me set it again? Best regards, Fan ____ 发件人: Matthias J. Sax 发送时间: 2024年5月17日 2:24 收件人: users@kafka.apache.org 主题: Re: Request to be added to kafka contributors list Thanks for reaching out Yang. You should be all set. -Matthias On 5/16/24 7:40 AM, Yang Fan wrote: Dear Apache Kafka Team, I hope this email finds you well. My name is Fan Yang, JIRA ID is fanyan, I kindly request to be added to the contributors list for Apache Kafka. Being part of this list would allow me to be assigned to JIRA tickets and work on them. Thank you for considering my request. Best regards, Fan
[jira] [Commented] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
[ https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847064#comment-17847064 ] Matthias J. Sax commented on KAFKA-16774: - Ah. Nice. Glad it's not a bug. Thanks for the PR Bruno. Approved. > fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled > - > > Key: KAFKA-16774 > URL: https://issues.apache.org/jira/browse/KAFKA-16774 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Chia-Ping Tsai >Assignee: Bruno Cadonna >Priority: Minor > Labels: flaky-test > > java.util.ConcurrentModificationException > at > java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) > at > org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686) > at > org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79) > at > org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408) > at > java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15242) FixedKeyProcessor testing is unusable
[ https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17847063#comment-17847063 ] Matthias J. Sax commented on KAFKA-15242: - The example ([https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java]) tries to use `MockProcessorContext` to test `FixedKeyProcessor` – this does not work. Adding `MockFixKeyProcessorContext` should allow to test` FixKeyProcessor` using this newly added class. What other issue does this ticket include that's not covered? Can you elaborate? > FixedKeyProcessor testing is unusable > - > > Key: KAFKA-15242 > URL: https://issues.apache.org/jira/browse/KAFKA-15242 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Zlstibor Veljkovic >Assignee: Alexander Aghili >Priority: Major > > Using mock processor context to get the forwarded message doesn't work. > Also there is not a well documented way for testing FixedKeyProcessors. > Please see the repo at [https://github.com/zveljkovic/kafka-repro] > but most important piece is test file with runtime and compile time errors: > [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] KIP-989: RocksDB Iterator Metrics
SGTM to use nano-seconds across the board. On 5/16/24 7:12 AM, Nick Telford wrote: Actually, one other point: our existing state store operation metrics are measured in nanoseconds[1]. Should iterator-duration-(avg|max) also be measured in nanoseconds, for consistency, or should we keep them milliseconds, as the KIP currently states? 1: https://docs.confluent.io/platform/current/streams/monitoring.html#state-store-metrics On Thu, 16 May 2024 at 12:15, Nick Telford wrote: Good point! I've updated it to "Improved StateStore Iterator metrics for detecting leaks" - let me know if you have a better suggestion. This should affect the voting imo, as nothing of substance has changed. Regards, Nick On Thu, 16 May 2024 at 01:39, Sophie Blee-Goldman wrote: One quick thing -- can you update the title of this KIP to reflect the decision to implement these metrics for all state stores implementations rather than just RocksDB? On Tue, May 14, 2024 at 1:36 PM Nick Telford wrote: Woops! Thanks for the catch Lucas. Given this was just a typo, I don't think this affects the voting. Cheers, Nick On Tue, 14 May 2024 at 18:06, Lucas Brutschy wrote: Hi Nick, you are still referring to oldest-open-iterator-age-ms in the `Proposed Changes` section. Cheers, Lucas On Thu, May 2, 2024 at 4:00 PM Lucas Brutschy wrote: Hi Nick! I agree, the age variant is a bit nicer since the semantics are very clear from the name. If you'd rather go for the simple implementation, how about calling it `oldest-iterator-open-since-ms`? I believe this could be understood without docs. Either way, I think we should be able to open the vote for this KIP because nobody raised any major / blocking concerns. Looking forward to getting this voted on soon! Cheers Lucas On Sun, Mar 31, 2024 at 5:23 PM Nick Telford < nick.telf...@gmail.com> wrote: Hi Matthias, For the oldest iterator metric, I would propose something simple like `iterator-opened-ms` and it would just be the actual timestamp when the iterator was opened. I don't think we need to compute the actual age, but user can to this computation themselves? That works for me; it's easier to implement like that :-D I'm a little concerned that the name "iterator-opened-ms" may not be obvious enough without reading the docs. If we think reporting the age instead of just the timestamp is better, I would propose `iterator-max-age-ms`. I should be sufficient to call out (as it's kinda "obvious" anyway) that the metric applies to open iterator only. While I think it's preferable to record the timestamp, rather than the age, this does have the benefit of a more obvious metric name. Nit: the KIP says it's a store-level metric, but I think it would be good to say explicitly that it's recorded with DEBUG level only? Yes, I've already updated the KIP with this information in the table. Regards, Nick On Sun, 31 Mar 2024 at 10:53, Matthias J. Sax wrote: The time window thing was just an idea. Happy to drop it. For the oldest iterator metric, I would propose something simple like `iterator-opened-ms` and it would just be the actual timestamp when the iterator was opened. I don't think we need to compute the actual age, but user can to this computation themselves? If we think reporting the age instead of just the timestamp is better, I would propose `iterator-max-age-ms`. I should be sufficient to call out (as it's kinda "obvious" anyway) that the metric applies to open iterator only. And yes, I was hoping that the code inside MetereXxxStore might already be setup in a way that custom stores would inherit the iterator metrics automatically -- I am just not sure, and left it as an exercise for somebody to confirm :) Nit: the KIP says it's a store-level metric, but I think it would be good to say explicitly that it's recorded with DEBUG level only? -Matthias On 3/28/24 2:52 PM, Nick Telford wrote: Quick addendum: My suggested metric "oldest-open-iterator-age-seconds" should be "oldest-open-iterator-age-ms". Milliseconds is obviously a better granularity for such a metric. Still accepting suggestions for a better name. On Thu, 28 Mar 2024 at 13:41, Nick Telford < nick.telf...@gmail.com wrote: Hi everyone, Sorry for leaving this for so long. So much for "3 weeks until KIP freeze"! On Sophie's comments: 1. Would Matthias's suggestion of a separate metric tracking the age of the oldest open iterator (within the tag set) satisfy this? That way we can keep iterator-duration-(avg|max) for closed iterators, which can be useful for performance debugging for iterators that don't leak. I'm not sure what we'd call this metric, maybe: "oldest-open-iterator-age-seconds"? Seems like a mouthful. 2. Y
Re: Kafka streams stores key in multiple state store instances
Hello Kay, What you describe is "by design" -- unfortunately. The problem is, that when we build the `Topology` we don't know the partition count of the input topics, and thus, StreamsBuilder cannot insert a repartition topic for this case (we always assume that the partition count is the same for all input topic). To work around this, you would need to rewrite the program to use either `groupBy((k,v) -> k)` instead of `groupByKey()`, or do a `.repartition().groupByKey()`. Does this make sense? -Matthias On 5/16/24 2:11 AM, Kay Hannay wrote: Hi, we have a Kafka streams application which merges (merge, groupByKey, aggretgate) a few topics into one topic. The application is stateful, of course. There are currently six instances of the application running in parallel. We had an issue where one new Topic for aggregation did have another partition count than all other topics. This caused data corruption in our application. We expected that a re-partitioning topic would be created automatically by Kafka streams or that we would get an error. But this did not happen. Instead, some of the keys (all merged topics share the same key schema) found their way into at least two different instances of the application. One key is available in more than one local state store. Can you explain why this happened? As already said, we would have expected to get an error or a re-partitioning topic in this case. Cheers Kay
Re: Request to be added to kafka contributors list
Thanks for reaching out Yang. You should be all set. -Matthias On 5/16/24 7:40 AM, Yang Fan wrote: Dear Apache Kafka Team, I hope this email finds you well. My name is Fan Yang, JIRA ID is fanyan, I kindly request to be added to the contributors list for Apache Kafka. Being part of this list would allow me to be assigned to JIRA tickets and work on them. Thank you for considering my request. Best regards, Fan
[jira] [Assigned] (KAFKA-16448) Add Kafka Streams exception handler for exceptions occuring during processing (KIP-1033)
[ https://issues.apache.org/jira/browse/KAFKA-16448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-16448: --- Assignee: Loïc Greffier > Add Kafka Streams exception handler for exceptions occuring during processing > (KIP-1033) > > > Key: KAFKA-16448 > URL: https://issues.apache.org/jira/browse/KAFKA-16448 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Damien Gasparina >Assignee: Loïc Greffier >Priority: Minor > > Jira to follow work on KIP: [KIP-1033: Add Kafka Streams exception handler > for exceptions occuring during > processing|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16333) Removed Deprecated methods KTable#join
[ https://issues.apache.org/jira/browse/KAFKA-16333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846821#comment-17846821 ] Matthias J. Sax commented on KAFKA-16333: - In general yes. But we should wait until a final decision was made if the release after 3.8 will be 3.9 or 4.0. – So we should hold off a few more weeks. > Removed Deprecated methods KTable#join > -- > > Key: KAFKA-16333 > URL: https://issues.apache.org/jira/browse/KAFKA-16333 > Project: Kafka > Issue Type: Sub-task > Components: streams > Reporter: Matthias J. Sax >Priority: Blocker > Fix For: 4.0.0 > > > KTable#join() methods taking a `Named` parameter got deprecated in 3.1 > release via https://issues.apache.org/jira/browse/KAFKA-13813 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16329) Remove Deprecated Task/ThreadMetadata classes and related methods
[ https://issues.apache.org/jira/browse/KAFKA-16329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846820#comment-17846820 ] Matthias J. Sax commented on KAFKA-16329: - In general yes. But we should wait until a final decision was made if the release after 3.8 will be 3.9 or 4.0. – So we should hold off a few more weeks. > Remove Deprecated Task/ThreadMetadata classes and related methods > - > > Key: KAFKA-16329 > URL: https://issues.apache.org/jira/browse/KAFKA-16329 > Project: Kafka > Issue Type: Sub-task > Components: streams > Reporter: Matthias J. Sax >Priority: Blocker > Fix For: 4.0.0 > > > Deprecated in AK 3.0 via > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-744%3A+Migrate+TaskMetadata+and+ThreadMetadata+to+an+interface+with+internal+implementation] > > * > org.apache.kafka.streams.processor.TaskMetadata > * org.apache.kafka.streams.processo.ThreadMetadata > * org.apache.kafka.streams.KafkaStreams#localThredMetadata > * org.apache.kafka.streams.state.StreamsMetadata > * org.apache.kafka.streams.KafkaStreams#allMetadata > * org.apache.kafka.streams.KafkaStreams#allMetadataForStore > This is related https://issues.apache.org/jira/browse/KAFKA-16330 and both > ticket should be worked on together. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16330) Remove Deprecated methods/variables from TaskId
[ https://issues.apache.org/jira/browse/KAFKA-16330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846819#comment-17846819 ] Matthias J. Sax commented on KAFKA-16330: - In general yes. But we should wait until a final decision was made if the release after 3.8 will be 3.9 or 4.0. – So we should hold off a few more weeks. > Remove Deprecated methods/variables from TaskId > --- > > Key: KAFKA-16330 > URL: https://issues.apache.org/jira/browse/KAFKA-16330 > Project: Kafka > Issue Type: Sub-task > Components: streams > Reporter: Matthias J. Sax >Priority: Blocker > Fix For: 4.0.0 > > > Cf > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306557] > > This ticket relates to https://issues.apache.org/jira/browse/KAFKA-16329 and > both should be worked on together. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16448) Add Kafka Streams exception handler for exceptions occuring during processing (KIP-1033)
[ https://issues.apache.org/jira/browse/KAFKA-16448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846818#comment-17846818 ] Matthias J. Sax commented on KAFKA-16448: - [~muralibasani] – Thanks for you interest. This ticket is already WIP and the corresponding KIP-1033 was accepted, and there is already a PR... [~Dabz] should we assign this ticket to you or Loic (not sure if he has an Jira account – if not, we should create one so we can assign the ticket to him?) > Add Kafka Streams exception handler for exceptions occuring during processing > (KIP-1033) > > > Key: KAFKA-16448 > URL: https://issues.apache.org/jira/browse/KAFKA-16448 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Damien Gasparina >Priority: Minor > > Jira to follow work on KIP: [KIP-1033: Add Kafka Streams exception handler > for exceptions occuring during > processing|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
[ https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846815#comment-17846815 ] Matthias J. Sax commented on KAFKA-16774: - Wondering if this is actually a flaky test, or if it exposes a real bug? > fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled > - > > Key: KAFKA-16774 > URL: https://issues.apache.org/jira/browse/KAFKA-16774 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Chia-Ping Tsai >Priority: Minor > Labels: flaky-test > > java.util.ConcurrentModificationException > at > java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) > at > org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686) > at > org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79) > at > org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408) > at > java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
[ https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16774: Labels: flaky-test (was: ) > fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled > - > > Key: KAFKA-16774 > URL: https://issues.apache.org/jira/browse/KAFKA-16774 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Chia-Ping Tsai >Priority: Minor > Labels: flaky-test > > java.util.ConcurrentModificationException > at > java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) > at > org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686) > at > org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79) > at > org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408) > at > java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
[ https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16774: Component/s: streams unit tests > fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled > - > > Key: KAFKA-16774 > URL: https://issues.apache.org/jira/browse/KAFKA-16774 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Chia-Ping Tsai >Priority: Minor > > java.util.ConcurrentModificationException > at > java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) > at > org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686) > at > org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79) > at > org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408) > at > java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15242) FixedKeyProcessor testing is unusable
[ https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846814#comment-17846814 ] Matthias J. Sax edited comment on KAFKA-15242 at 5/16/24 5:32 AM: -- There is https://issues.apache.org/jira/browse/KAFKA-15143 which is already WIP – ie, there is KIP-1027. I believe it would cover this ticket? Wondering of we can close this ticket as duplicate? was (Author: mjsax): There is https://issues.apache.org/jira/browse/KAFKA-15143 which is already WIP – ie, there is KIP-1027. I believe it would cover this ticket? > FixedKeyProcessor testing is unusable > - > > Key: KAFKA-15242 > URL: https://issues.apache.org/jira/browse/KAFKA-15242 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Zlstibor Veljkovic >Assignee: Alexander Aghili >Priority: Major > > Using mock processor context to get the forwarded message doesn't work. > Also there is not a well documented way for testing FixedKeyProcessors. > Please see the repo at [https://github.com/zveljkovic/kafka-repro] > but most important piece is test file with runtime and compile time errors: > [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15242) FixedKeyProcessor testing is unusable
[ https://issues.apache.org/jira/browse/KAFKA-15242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846814#comment-17846814 ] Matthias J. Sax commented on KAFKA-15242: - There is https://issues.apache.org/jira/browse/KAFKA-15143 which is already WIP – ie, there is KIP-1027. I believe it would cover this ticket? > FixedKeyProcessor testing is unusable > - > > Key: KAFKA-15242 > URL: https://issues.apache.org/jira/browse/KAFKA-15242 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Zlstibor Veljkovic >Assignee: Alexander Aghili >Priority: Major > > Using mock processor context to get the forwarded message doesn't work. > Also there is not a well documented way for testing FixedKeyProcessors. > Please see the repo at [https://github.com/zveljkovic/kafka-repro] > but most important piece is test file with runtime and compile time errors: > [https://github.com/zveljkovic/kafka-repro/blob/master/src/test/java/com/example/demo/MyFixedKeyProcessorTest.java] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: Query regarding groupbykey in streams
If I read this correctly, your upstream producer which writes into the input topic of you KS app is using a custom partitioner? If you do a `groupByKey()` and change the key upstream, it would result in a repartition step, which would fall back to the default partioner. If you want to use a custom partitioner in KS, you should implement `StreamPartitioner` instead of the producer partitioner interface, and pass it into the relevant methods. `groupByKey()` does not allow to set a partitioner (seem this is a gap we should close...) -- as a workaround you could add repartition() before the grouping to pass your custom partitioner. For IQ, you should also need to pass your `StreamsPartitioner` to allow KS to fine the correct partition to query. HTH. -Matthias On 5/13/24 4:35 PM, Dev Lover wrote: Hi All, I have a custom partitioner to distribute the data across partitions in my cluster. My setup looks like below Version - 3.7.0 Kafka - 3 broker setup Partition count - 10 Stream server pods - 2 Stream threads in each pod - 10 Deployed in Kubernetes Custom partitioner on producer end. I am doing a groupbykey . Is it correct to use it when I have custom partitioner on producer end ? I recently migrated to 3.7 from 3.5.1 . I am observing that partitions are not evenly distributed across my 2 stream pods. Also my remote query is failing with host being unavailable. But if I restart streams it works fine for a certain time and again starts erroring out. Am I doing something wrong? Regards
Re: [VOTE] KIP-989: RocksDB Iterator Metrics
+1 (binding) On 5/14/24 9:19 AM, Lucas Brutschy wrote: Hi Nick! Thanks for the KIP. +1 (binding) On Tue, May 14, 2024 at 5:16 PM Nick Telford wrote: Hi everyone, I'd like to call a vote on the Kafka Streams KIP-989: RocksDB Iterator Metrics: https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+RocksDB+Iterator+Metrics All of the points in the discussion thread have now been addressed. Regards, Nick
Re: [VOTE] KIP-1033: Add Kafka Streams exception handler for exceptions occurring during processing
+1 (binding) On 5/13/24 5:54 PM, Sophie Blee-Goldman wrote: Thanks for the KIP guys! +1 (binding) On Mon, May 13, 2024 at 6:02 AM Bill Bejeck wrote: Thanks for the KIP, this will be a great addition! +1(binding) -Bill On Fri, May 3, 2024 at 4:48 AM Bruno Cadonna wrote: Hi Damien, Sébastien, and Loïc, Thanks for the KIP! +1 (binding) Best, Bruno On 4/26/24 4:00 PM, Damien Gasparina wrote: Hi all, We would like to start a vote for KIP-1033: Add Kafka Streams exception handler for exceptions occurring during processing The KIP is available on https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occurring+during+processing If you have any suggestions or feedback, feel free to participate to the discussion thread: https://lists.apache.org/thread/1nhhsrogmmv15o7mk9nj4kvkb5k2bx9s Best regards, Damien Sebastien and Loic
[jira] [Commented] (KAFKA-16584) Make log processing summary configurable or debug
[ https://issues.apache.org/jira/browse/KAFKA-16584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845111#comment-17845111 ] Matthias J. Sax commented on KAFKA-16584: - I am on the PMC and can help you. :) After you wiki account was created, please share you wiki id and we can give you write access to the Kafka wiki space, so you can prepare a KIP. The goal of this ticket is, to add a new config for the logging interval, so it should not be controversial. An example of another already approved KIP that also added a now config is [https://cwiki.apache.org/confluence/display/KAFKA/KIP-811%3A+Add+config+repartition.purge.interval.ms+to+Kafka+Streams] – This should help you to write your KIP for this ticket. > Make log processing summary configurable or debug > - > > Key: KAFKA-16584 > URL: https://issues.apache.org/jira/browse/KAFKA-16584 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.2 >Reporter: Andras Hatvani >Assignee: dujian >Priority: Major > Labels: needs-kip, newbie > > Currently *every two minutes for every stream thread* statistics will be > logged on INFO log level. > {code} > 2024-04-18T09:18:23.790+02:00 INFO 33178 --- [service] [-StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread > [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 > total records, ran 0 punctuators, and committed 0 total tasks since the last > update {code} > This is absolutely unnecessary and even harmful since it fills the logs and > thus storage space with unwanted and useless data. Otherwise the INFO logs > are useful and helpful, therefore it's not an option to raise the log level > to WARN. > Please make the logProcessingSummary > * either to a DEBUG level log or > * make it configurable so that it can be disabled. > This is the relevant code: > https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: Creating kafka wiki id
Self-service to create an account is currently not working. Please reply on https://issues.apache.org/jira/browse/INFRA-25451 to request a wiki account. I'll update the wiki page for now until the issue is resolved. -Matthias On 5/7/24 8:25 AM, 黃竣陽 wrote: Hello, I want to create a KIP, but I don't have Kafka wiki id. I go to the page (https://cwiki.apache.org/confluence/signup.action) but it doesn't have a button to regist an account Please help me to create an account, Thank you
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Thanks Sophie! Makes it much clearer where you are coming from. About the Type unsafety: isn't this also an issue for the `handleSerialziationException` case, because the handler is used for all sink topics, and thus key/value types are not really know w/o taking the sink topic into account? -- So I am not sure if having two handler methods really helps much with regard to type safety? Just want to make this small comment for completeness. Let's hear what others think. Given that we both don't have a strong opinion but just a personal preference, we should be able to come to a conclusion quickly and get this KIP approved for 3.8 :) -Matthias On 5/8/24 3:12 PM, Sophie Blee-Goldman wrote: Well I definitely don't feel super strongly about it, and more importantly, I'm not a user. So I will happily defer to the preference of anyone who will actually be using this feature. But I'll explain my reasoning: There *is* a relevant distinction between these two callbacks -- because the passed-in record will have a different type depending on whether it was a serialization exception or something else. Even if we combined them into a single #handle method, users will still end up implementing two distinct branches depending on whether it was a serialization exception or not, since that determines the type of the ProducerRecord passed in. Not to mention they'll need to cast it to a ProducerRecord when we could have just passed it in as this type via a dedicated callback. And note that because of the generics, they can't do an instanceof check to make sure that the record type is ProducerRecord and will have to suppress the "unchecked cast" warning. So if we combined the two callbacks, their handler will look something like this: @SuppressWarnings("unchecked") public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, final ProducerRecord record, final Exception exception) { if (exception instanceof SerializationException) { if (exception.origin().equals(KEY)) { log.error("Failed to serialize key", exception); } else { log.error("Failed to serialize value", exception); } } else { final ProducerRecord serializedRecord = (ProducerRecord) record; log.error("Failed to produce record with serialized key={} and serialized value={}", serializedRecord.key(), serializedRecord.value()); } return ProductionExceptionHandlerResponse.FAIL; } That seems like the most basic case, and it still haswith distinct logic even if they ultimately handle exceptions the same way. And looking forward to KIP-1034: Dead-letter queues, it seems all the more likely that the actual handling response might be different depending on whether it's a serialization exception or not: a serialized record can probably be retried/sent to a DLQ, whereas a record that can't be serialized should not (can't, really) be forwarded to a DLQ. So if they're going to have completely different implementations depending on whether it's a serialization exception, why not just give them two separate callbacks? And that's all assuming the user is perfectly aware of the different exception types and their implications for the type of the ProducerRecord. Many people might just miss the existence of the RecordSerializationException altogether -- there are already so many different exception types, ESPECIALLY when it comes to the Producer. Not to mention they'll need to understand the nuances of how the ProducerRecord type changes depending on the type of exception that's passed in. And on top of all that, they'll need to know that there is metadata stored in the RecordSerializationException regarding the origin of the error. Whereas if we just passed in the SerializationExceptionOrigin to a #handlerSerialization callback, well, that's pretty impossible to miss. That all just seems like a lot for most people to have to understand to implement a ProductionExceptionHandler, which imo is not at all an advanced feature and should be as straightforward and easy to use as possible. Lastly -- I don't think it's quite fair to compare this to the RecordDeserializationException. We have a dedicated handler that's just for deserialization exceptions specifically, hence there's no worry about users having to be aware of the different exception types they might have to deal with in the DeserializtionExceptionHandler. Whereas serialization exceptions are just a subset of what might get passed in to the ProductionExceptionHandler... Just explaining my reasoning -- in the end I leave it up to the KIP authors and anyone who will actually be using this feature in their applications :) On Tue, May 7, 2024 at 8:35 PM Matthias J. Sax wrote: @Loic, yes, what you describe is exactly what I had in mind. @Sophie, can you elaborate a little bit? First of all, I agree that it makes se
Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer
Thank you all for the feedback! Addressing the main concern: The KIP is about giving the user the ability to handle producer exceptions, but to be more conservative and avoid future issues, we decided to be limited to a short list of exceptions. I included *RecordTooLargeExceptin* and *UnknownTopicOrPartitionException. *Open to suggestion for adding some more ;-) KIP Updates: - clarified the way that the user should configure the Producer to use the custom handler. I think adding a producer config property is the cleanest one. - changed the *ClientExceptionHandler* to *ProducerExceptionHandler* to be closer to what we are changing. - added the ProducerRecord as the input parameter of the handle() method as well. - increased the response types to 3 to have fail and two types of continue. - The default behaviour is having no custom handler, having the corresponding config parameter set to null. Therefore, the KIP provides no default implementation of the interface. - We follow the interface solution as described in the Rejected Alternetives section. Cheers, Alieh On Thu, Apr 18, 2024 at 8:11 PM Matthias J. Sax < mj...@apache.org wrote: Thanks for the KIP Alieh! It addresses an important case for error handling. I agree that using this handler would be an expert API, as mentioned by a few people. But I don't think it would be a reason to not add it. It's always a tricky tradeoff what to expose to users and to avoid foot guns, but we added similar handlers to Kafka Streams, and have good experience with it. Hence, I understand, but don't share the concern raised. I also agree that there is some responsibility by the user to understand how such a handler should be implemented to not drop data by accident. But it seem unavoidable and acceptable. While I understand that a "simpler / reduced" API (eg via configs) might also work, I personally prefer a full handler. Configs have the same issue that they could be miss-used potentially leading to incorrectly dropped data, but at the same time are less flexible (and thus maybe ever harder to use correctly...?). Base on my experience, there is also often weird corner case for which it make sense to also drop records for other exceptions, and a full handler has the advantage of full flexibility and "absolute power!". To be fair: I don't know the exact code paths of the producer in details, so please keep me honest. But my understanding is, that the KIP aims to allow users to react to internal exception, and decide to keep retrying internally, swallow the error and drop the record, or raise the error? Maybe the KIP would need to be a little bit more precises what error we want to cover -- I don't think this list must be exhaustive, as we can always do follow up KIP to also apply the handler to other errors to expand the scope of the handler. The KIP does mention examples, but it might be good to explicitly state for what cases the handler gets applied? I am also not sure if CONTINUE and FAIL are enough options? Don't we need three options? Or would `CONTINUE` have different meaning depending on the type of error? Ie, for a retryable error `CONTINUE` would mean keep retrying internally, but for a non-retryable error `CONTINUE` means swallow the error and drop the record? This semantic overload seems tricky to reason about by users, so it might better to split `CONTINUE` into two cases -> `RETRY` and `SWALLOW` (or some better names). Additionally, should we just ship a `DefaultClientExceptionHandler` which would return `FAIL`, for backward compatibility. Or don't have any default handler to begin with and allow it to be `null`? I don't see the need for a specific `TransactionExceptionHandler`. To me, the goal should be to not modify the default behavior at all, but to just allow users to change the default behavior if there is a need. What is missing on the KIP though it, how the handler is passed into the producer thought? Would we need a new config which allows to set a custom handler? And/or would we allow to pass in an instance via the constructor or add a new method to set a handler? -Matthias On 4/18/24 10:02 AM, Andrew Schofield wrote: Hi Alieh, Thanks for the KIP. Exception handling in the Kafka producer and consumer is really not ideal. It’s even harder working out what’s going on with the consumer. I’m a bit nervous about this KIP and I agree with Chris that it could do with additional motivation. This would be an expert-level interface given how complicated the exception handling for Kafka has become. 7. The application is not really aware of the batching being done on its behalf. The ProduceResponse can actually return an array of records which failed per batch. If
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
@Loic, yes, what you describe is exactly what I had in mind. @Sophie, can you elaborate a little bit? First of all, I agree that it makes sense to maintain the two separate callbacks for the ProductionExceptionHandler, since one of them is specifically for serialization exceptions while the other is used for everything/anything else. What makes a serialization exception special compare to other errors that it's valuable to treat it differently? Why can we put "everything else" into a single bucket? By your train of though, should we not split out the "everything else" bucket into a different callback method for every different error? If no, why not, but only for serialization errors? From what I believe to remember, historically, we added the ProductionExceptionHandler, and kinda just missed the serialization error case. And later, when we extended the handler we just could not re-use the existing callback as it was typed with `` and it would have been an incompatible change; so it was rather a workaround to add the second method to then handler, but not really intended design? It's of course only my personal opinion that I believe a single callback method is simpler/cleaner compared to sticking with two, and adding the new exception type to make it backward compatible seems worth it. It also kinda introduces the same patter we use elsewhere (cf KIP-1036) what I actually think is an argument for introducing `RercordSerializationExcetpion`, to unify user experience across the board. Would be great to hear from others about this point. It's not that I strongly object to having two methods, and I would not block this KIP on this question. -Matthias On 5/7/24 3:40 PM, Sophie Blee-Goldman wrote: First of all, I agree that it makes sense to maintain the two separate callbacks for the ProductionExceptionHandler, since one of them is specifically for serialization exceptions while the other is used for everything/anything else. I also think we can take advantage of this fact to simplify things a bit and cut down on the amount of new stuff added to the API by just adding a parameter to the #handleSerializationException callback and use that to pass in the SerializationExceptionOrigin enum to distinguish between key vs value. This way we wouldn't need to introduce yet another exception type (the RecordSerializationException) just to pass in this information. Thoughts? On Tue, May 7, 2024 at 8:33 AM Loic Greffier wrote: Hi Matthias, To sum up with the ProductionExceptionHandler callback methods (106) proposed changes. A new method ProductionExceptionHandler#handle is added with the following signature: ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, final ProducerRecord record, final Exception exception); The ProducerRecord parameter has changed to accept a serialized or non-serialized record. Thus, the new ProductionExceptionHandler#handle method can handle both production exception and serialization exception. Both old ProductionExceptionHandler#handle and ProductionExceptionHandler#handleSerializationException methods are now deprecated. The old ProductionExceptionHandler#handle method gets a default implementation, so users do not have to implement a deprecated method. To handle backward compatibility, the new ProductionExceptionHandler#handle method gets a default implementation. default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, final ProducerRecord record, final Exception exception) { if (exception instanceof RecordSerializationException) { this.handleSerializationException(record, exception.getCause()); } return handle((ProducerRecord) record, exception); } The default implementation either invokes #handleSerializationException or #handle depending on the type of the exception, thus users still relying on deprecated ProductionExceptionHandler#handle or ProductionExceptionHandler#handleSerializationException custom implementations won't break. The new ProductionExceptionHandler#handle method is now invoked in case of serialization exception: public void send(final String topic, final K key, final V value, ...) { try { keyBytes = keySerializer.serialize(topic, headers, key); ... } catch (final ClassCastException exception) { ... } catch (final Exception exception) { try { response = productionExceptionHandler.handle(context, record, new RecordSerializationException(SerializationExceptionOrigin.KEY, exception)); } catch (final Exception e) { ... } } } To wrap the origin serialization exception and determine whether it comes from the key or the value, a new exception class is created: public class RecordSerializationException extends SerializationException { public enum SerializationExceptionOrigin { KEY, VALUE } public RecordSerializatio
Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets
That's good questions... I could think of a few approaches, but I admit it might all be a little bit tricky to code up... However if we don't solve this problem, I think this KIP does not really solve the core issue we are facing? In the end, if we rely on the `.checkpoint` file to compute a task assignment, but the `.checkpoint` file can be arbitrary stale after a crash because we only write it on a clean close, there would be still a huge gap that this KIP does not close? For the case in which we keep the checkpoint file, this KIP would still help for "soft errors" in which KS can recover, and roll back the store. A significant win for sure. -- But hard crashes would still be an problem? We might assign tasks to "wrong" instance, ie, which are not most up to date, as the checkpoint information could be very outdated? Would we end up with a half-baked solution? Would this be good enough to justify the introduced complexity? In the, for soft failures it's still a win. Just want to make sure we understand the limitations and make an educated decision. Or do I miss something? -Matthias On 5/3/24 10:20 AM, Bruno Cadonna wrote: Hi Matthias, 200: I like the idea in general. However, it is not clear to me how the behavior should be with multiple stream threads in the same Kafka Streams client. What stream thread opens which store? How can a stream thread pass an open store to another stream thread that got the corresponding task assigned? How does a stream thread know that a task was not assigned to any of the stream threads of the Kafka Streams client? I have the feeling we should just keep the .checkpoint file on close for now to unblock this KIP and try to find a solution to get totally rid of it later. Best, Bruno On 5/3/24 6:29 PM, Matthias J. Sax wrote: 101: Yes, but what I am saying is, that we don't need to flush the .position file to disk periodically, but only maintain it in main memory, and only write it to disk on close() to preserve it across restarts. This way, it would never be ahead, but might only lag? But with my better understanding about (102) it might be mood anyway... 102: Thanks for clarifying. Looked into the code now. Makes sense. Might be something to be worth calling out explicitly in the KIP writeup. -- Now that I realize that the position is tracked inside the store (not outside as the changelog offsets) it makes much more sense to pull position into RocksDB itself. In the end, it's actually a "store implementation" detail how it tracks the position (and kinda leaky abstraction currently, that we re-use the checkpoint file mechanism to track it and flush to disk). 200: I was thinking about this a little bit more, and maybe it's not too bad? When KS starts up, we could upon all stores we find on local disk pro-actively, and keep them all open until the first rebalance finishes: For tasks we get assigned, we hand in the already opened store (this would amortize the cost to open the store before the rebalance) and for non-assigned tasks, we know the offset information won't change and we could just cache it in-memory for later reuse (ie, next rebalance) and close the store to free up resources? -- Assuming that we would get a large percentage of opened stores assigned as tasks anyway, this could work? -Matthias On 5/3/24 1:29 AM, Bruno Cadonna wrote: Hi Matthias, 101: Let's assume a RocksDB store, but I think the following might be true also for other store implementations. With this KIP, if Kafka Streams commits the offsets, the committed offsets will be stored in an in-memory data structure (i.e. the memtable) and stay there until RocksDB decides that it is time to persist its in-memory data structure. If Kafka Streams writes its position to the .position file during a commit and a crash happens before RocksDB persist the memtable then the position in the .position file is ahead of the persisted offset. If IQ is done between the crash and the state store fully restored the changelog, the position might tell IQ that the state store is more up-to-date than it actually is. In contrast, if Kafka Streams handles persisting positions the same as persisting offset, the position should always be consistent with the offset, because they are persisted together. 102: I am confused about your confusion which tells me that we are talking about two different things. You asked "Do you intent to add this information [i.e. position] to the map passed via commit(final Map changelogOffsets)?" and with what I wrote I meant that we do not need to pass the position into the implementation of the StateStore interface since the position is updated within the implementation of the StateStore interface (e.g. RocksDBStore [1]). My statement describes the behavior now, not the change proposed in this KIP, so it does not contradi
Re: Failed to initialize processor KSTREAM-AGGREGATE-0000000001
Can you file a ticket for it: https://issues.apache.org/jira/browse/KAFKA On 5/3/24 3:34 AM, Penumarthi Durga Prasad Chowdary wrote: Kafka versions 3.5.1 and 3.7.0, we're still encountering persistent issues. The Kafka Streams library is aligned with these Kafka versions. Upon analysis of the logs, it seems that the problem may occur when a Kafka node disconnects from Kafka Streams processes. This suspicion is supported by the abundance of network messages indicating disconnections, such as org.apache.kafka.clients.NetworkClient ThreadName: kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9 Message: [Consumer clientId=kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9-consumer, groupId=kafka-streams-exec-0-test-store ] Node 102 disconnected. On Mon, Apr 22, 2024 at 7:16 AM Matthias J. Sax wrote: Not sure either, but it sounds like a bug to me. Can you reproduce this reliably? What version are you using? It would be best if you could file a Jira ticket and we can take it from there. -Matthias On 4/21/24 5:38 PM, Penumarthi Durga Prasad Chowdary wrote: Hi , I have an issue in kafka-streams while constructing kafka-streams state store windows(TimeWindow and SessionWindow). While kafka-streams processing data sometimes intermittent kafka-streams process throwing below error ThreadName: kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9 TraceID: unknown CorelationID: eff36722-1430-4ffb-bf2e-c6e6cf6ae164 Message: stream-client [ kafka-streams-exec-0-test-store -6d676cf0-3910-4c25-bfad-ea2b98953db3] Replacing thread in the streams uncaught exception handler org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KSTREAM-AGGREGATE-01 at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:115) at org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:986) at org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:271) at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:716) at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:901) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:778) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) Caused by: java.lang.NullPointerException at org.apache.kafka.streams.kstream.internals.TimestampedTupleForwarder.(TimestampedTupleForwarder.java:46) at org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.init(KStreamSessionWindowAggregate.java:138) at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:107) ... 7 more Here my understanding is state-store is null and at that time stateStore.flush() gets invoked to send the data to stateStore, this leads to the above error. This error can be caught inside kafka-streams setUncaughtExceptionHandler. streams.setUncaughtExceptionHandler(throwable -> { LOGGER.error("Exception in streams", throwable); return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD; }); I'm uncertain about the exact reason for this issue. Everything seems to be in order, including the Kafka cluster, and there are no errors in the Kafka Streams except for a few logs indicating node disconnections. Is there a better way to handle this error? When can this issue happen ? I would like to express my gratitude in advance for any assistance provided.
Re: [DISCUSS] KIP-1027 Add MockFixedKeyProcessorContext
Please also update the KIP. To get a wiki account created, please request it via a commet on this ticket: https://issues.apache.org/jira/browse/INFRA-25451 After you have the account, please share your wiki id, and we can give you write permission on the wiki. -Matthias On 5/3/24 6:30 AM, Shashwat Pandey wrote: Hi Matthias, Sorry this fell out of my radar for a bit. Revisiting the topic, I think you’re right and we accept the duplicated nesting as an appropriate solution to not affect the larger public API. I can update my PR with the change. Regards, Shashwat Pandey On Wed, May 1, 2024 at 11:00 PM Matthias J. Sax wrote: Any updates on this KIP? On 3/28/24 4:11 AM, Matthias J. Sax wrote: It seems that `MockRecordMetadata` is a private class, and thus not part of the public API. If there are any changes required, we don't need to discuss on the KIP. For `CapturedPunctuator` and `CapturedForward` it's a little bit more tricky. My gut feeling is, that the classes might not need to be changed, but if we use them within `MockProcessorContext` and `MockFixedKeyProcessorContext` it might be weird to keep the current nesting... The problem I see is, that it's not straightforward how to move the classes w/o breaking compatibility, nor if we duplicate them as standalone classes w/o a larger "splash radius". (We would need to add new overloads for MockProcessorContext#scheduledPunctuators() and MockProcessorContext#forwarded()). Might be good to hear from others if we think it's worth this larger changes to get rid of the nesting, or just accept the somewhat not ideal nesting as it technically is not a real issue? -Matthias On 3/15/24 1:47 AM, Shashwat Pandey wrote: Thanks for the feedback Matthias! The reason I proposed the extension of MockProcessorContext was more to do with the internals of the class (MockRecordMetadata, CapturedPunctuator and CapturedForward). However, I do see your point, I would then think to split MockProcessorContext and MockFixedKeyProcessorContext, some of the internal classes should also be extracted i.e. MockRecordMetadata, CapturedPunctuator and probably a new CapturedFixedKeyForward. Let me know what you think! Regards, Shashwat Pandey On Mon, Mar 11, 2024 at 10:09 PM Matthias J. Sax wrote: Thanks for the KIP Shashwat. Closing this testing gap is great! It did come up a few time already... One question: why do you propose to `extend MockProcessorContext`? Given how the actual runtime context classes are setup, it seems that the regular context and fixed-key-context are distinct, and thus I believe both mock-context classes should be distinct, too? What I mean is that FixedKeyProcessorContext does not extend ProcessorContext. Both classes have a common parent ProcessINGContext (note the very similar but different names), but they are "siblings" only, so why make the mock processor a parent-child relationship? It seems better to do public class MockFixedKeyProcessorContext implements FixedKeyProcessorContext, RecordCollector.Supplier Of course, if there is code we can share between both mock-context we should so this, but it should not leak into the public API? -Matthias On 3/11/24 5:21 PM, Shashwat Pandey wrote: Hi everyone, I would like to start the discussion on https://cwiki.apache.org/confluence/display/KAFKA/KIP-1027%3A+Add+MockFixedKeyProcessorContext This adds MockFixedKeyProcessorContext to the Kafka Streams Test Utils library. Regards, Shashwat Pandey
Re: [DISCUSS] KIP-924: customizable task assignment for Streams
117f: Good point by Bruno. We should check for this, and could have an additional `INVALID_STANDBY_TASK` error code? -Matthias On 5/3/24 5:52 AM, Guozhang Wang wrote: Hi Sophie, Re: As for the return type of the TaskAssignmentUtils, I think that makes sense. LGTM. On Fri, May 3, 2024 at 2:26 AM Bruno Cadonna wrote: Hi Sophie, 117f: I think, removing the STATEFUL and STATELESS types is not enough to avoid the error Guozhang mentioned. The StreamsPartitionAssignor passes the information whether a task is stateless or stateful into the task assignor. However, the task assignor can return a standby task for a stateless task which is inconsistent. Echoing Matthias' statement about the missing UNKNOWN_TASK_ID error. nit: The titles of some code blocks in the KIP are not consistent with their content, e.g., KafkaStreamsState <-> NodeState Best, Bruno On 5/3/24 2:43 AM, Matthias J. Sax wrote: Thanks Sophie. My bad. You are of course right about `TaskAssignment` and the StreamsPartitionAssignor's responsibitliy to map tasks of a instance to consumers. When I wrote my reply, I forgot about this detail. Seems you did not add `UNKNOWN_TASK_ID` error yet as proposed by Guozhang? Otherwise LGTM. -Matthias On 5/2/24 4:20 PM, Sophie Blee-Goldman wrote: Guozhang: 117. All three additions make sense to me. However, while thinking about how users would actually produce an assignment, I realized that it seems silly to make it their responsibility to distinguish between a stateless and stateful task when they return the assignment. The StreamsPartitionAssignor already knows which tasks are stateful vs stateless, so there's no need to add this extra step for users to figure it out themselves, and potentially make a mistake. 117f: So, rather than add a new error type for "inconsistent task types", I'm proposing to just flatten the AssignedTask.Type enum to only "ACTIVE" and "STANDBY", and remove the "STATEFUL" and "STATELESS" types altogether. Any objections? - -Thanks, fixed the indentation of headers under "User APIs" and "Read-Only APIs" -As for the return type of the TaskAssignmentUtils methods, I don't personally feel too strongly about this, but the reason for the return type being a Map rather than a TaskAssignment is because they are meant to be used iteratively/to create a part of the full assignment, and not necessarily a full assignment for each. Notice that they all have an input parameter of the same type: Map. The idea is you can take the output of any of these and pass it in to another to generate or optimize another piece of the overall assignment. For example, if you want to perform the rack-aware optimization on both active and standby tasks, you would need to call #optimizeRackAwareActiveTasks and then forward the output to #optimizeRackAwareStandbyTasks to get the final assignment. If we return a TaskAssignment, it will usually need to be unwrapped right away. Perhaps more importantly, I worry that returning a TaskAssignment will make it seem like each of these utility methods return a "full" and final assignment that can just be returned as-is from the TaskAssignor's #assign method. Whereas they are each just a single step in the full assignment process, and not the final product. Does that make sense? On Thu, May 2, 2024 at 3:50 PM Sophie Blee-Goldman wrote: Matthias: Thanks for the naming suggestions for the error codes. I was definitely not happy with my original naming but couldn't think of anything better. I like your proposals though, will update the KIP names. I'll also add a "NONE" option as well -- much better than just passing in null for no error. OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the same active task Would also be an error if assigned to two consumers of the same client... Needs to be rephrased. Well the TaskAssignor only assigns tasks to KafkaStreams clients, it's not responsible for the assignment of tasks to consumers within a KafkaStreams. It would be a bug in the StreamsPartitionAssignor if it received a valid assignment from the TaskAssignor with only one copy of a task assigned to a single KAfkaStreams client, and then somehow ended up assigning that task to multiple consumers on the KafkaStreams client. It wouldn't be the TaskAssignor's fault so imo it would not make sense to include this case in the OVERLAPPING_CLIENT error (or as it's now called, ACTIVE_TASK_ ASSIGNED_MULTIPLE_TIMES). Not to mention, if there was a bug that caused the StreamsPartitionAssignor to assign a task to multiple consumers, it presumably wouldn't even notice since it's a bug -- if it did notice, it can just fix the issue. The error codes are about communicating unfixable issues due to the TaskAssignor itself returning an invalid assignment. The phrasing is inte
Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets
101: Yes, but what I am saying is, that we don't need to flush the .position file to disk periodically, but only maintain it in main memory, and only write it to disk on close() to preserve it across restarts. This way, it would never be ahead, but might only lag? But with my better understanding about (102) it might be mood anyway... 102: Thanks for clarifying. Looked into the code now. Makes sense. Might be something to be worth calling out explicitly in the KIP writeup. -- Now that I realize that the position is tracked inside the store (not outside as the changelog offsets) it makes much more sense to pull position into RocksDB itself. In the end, it's actually a "store implementation" detail how it tracks the position (and kinda leaky abstraction currently, that we re-use the checkpoint file mechanism to track it and flush to disk). 200: I was thinking about this a little bit more, and maybe it's not too bad? When KS starts up, we could upon all stores we find on local disk pro-actively, and keep them all open until the first rebalance finishes: For tasks we get assigned, we hand in the already opened store (this would amortize the cost to open the store before the rebalance) and for non-assigned tasks, we know the offset information won't change and we could just cache it in-memory for later reuse (ie, next rebalance) and close the store to free up resources? -- Assuming that we would get a large percentage of opened stores assigned as tasks anyway, this could work? -Matthias On 5/3/24 1:29 AM, Bruno Cadonna wrote: Hi Matthias, 101: Let's assume a RocksDB store, but I think the following might be true also for other store implementations. With this KIP, if Kafka Streams commits the offsets, the committed offsets will be stored in an in-memory data structure (i.e. the memtable) and stay there until RocksDB decides that it is time to persist its in-memory data structure. If Kafka Streams writes its position to the .position file during a commit and a crash happens before RocksDB persist the memtable then the position in the .position file is ahead of the persisted offset. If IQ is done between the crash and the state store fully restored the changelog, the position might tell IQ that the state store is more up-to-date than it actually is. In contrast, if Kafka Streams handles persisting positions the same as persisting offset, the position should always be consistent with the offset, because they are persisted together. 102: I am confused about your confusion which tells me that we are talking about two different things. You asked "Do you intent to add this information [i.e. position] to the map passed via commit(final Map changelogOffsets)?" and with what I wrote I meant that we do not need to pass the position into the implementation of the StateStore interface since the position is updated within the implementation of the StateStore interface (e.g. RocksDBStore [1]). My statement describes the behavior now, not the change proposed in this KIP, so it does not contradict what is stated in the KIP. 200: This is about Matthias' main concern about rebalance metadata. As far as I understand the KIP, Kafka Streams will only use the .checkpoint files to compute the task lag for unassigned tasks whose state is locally available. For assigned tasks, it will use the offsets managed by the open state store. Best, Bruno [1] https://github.com/apache/kafka/blob/fcbfd3412eb746a0c81374eb55ad0f73de6b1e71/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L397 On 5/1/24 3:00 AM, Matthias J. Sax wrote: Thanks Bruno. 101: I think I understand this better now. But just want to make sure I do. What do you mean by "they can diverge" and "Recovering after a failure might load inconsistent offsets and positions." The checkpoint is the offset from the changelog, while the position is the offset from the upstream source topic, right? -- In the end, the position is about IQ, and if we fail to update it, it only means that there is some gap when we might not be able to query a standby task, because we think it's not up-to-date enough even if it is, which would resolve itself soon? Ie, the position might "lag", but it's not "inconsistent". Do we believe that this lag would be highly problematic? 102: I am confused. The position is maintained inside the state store, but is persisted in the .position file when the state store closes. This contradicts the KIP: these position offsets will be stored in RocksDB, in the same column family as the changelog offsets, instead of the .position file My main concern is currently about rebalance metadata -- opening RocksDB stores seems to be very expensive, but if we follow the KIP: We will do this under EOS by updating the .checkpoint file whenever a store is c
Re: [VOTE] KIP-1036: Extend RecordDeserializationException exception
+1 (binding) On 5/3/24 8:52 AM, Federico Valeri wrote: Hi Fred, this is a useful addition. +1 non binding Thanks On Fri, May 3, 2024 at 4:11 PM Andrew Schofield wrote: Hi Fred, Thanks for the KIP. It’s turned out nice and elegant I think. Definitely a worthwhile improvement. +1 (non-binding) Thanks, Andrew On 30 Apr 2024, at 14:02, Frédérik Rouleau wrote: Hi all, As there is no more activity for a while on the discuss thread, I think we can start a vote. The KIP is available on https://cwiki.apache.org/confluence/display/KAFKA/KIP-1036%3A+Extend+RecordDeserializationException+exception If you have some feedback or suggestions, please participate to the discussion thread: https://lists.apache.org/thread/or85okygtfywvnsfd37kwykkq5jq7fy5 Best regards, Fred
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
What about (106) to unify both exiting callback methods of `ProductionExceptionHandler` into a single one, instead of adding two new ones? Damien's last reply about it was: I will think about unifying, I do agree it would be cleaner. There was not follow up on this question, and the KIP right now still proposes to add two new methods, which I believe we could (should?) unify to: default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, final ProducerRecord record, final Exception exception) { Ie, we drop the generics `` on `ProducerRecord` what allows you to also pass in a non-serialized ProducerRecord of any type for the serialization error case. Btw: wondering if we also want to pass in a flag/enum about key vs value serialization error similar to what was proposed in KIP-1036? The only "oddity" would be, that we call the handler other error cases, too, not just for serialization exceptions. But we wculd tackle this by introducing a new class `RecordSerializationException` which would include the flag and would ensure that KS hands this exception into the handler. This would keep the handler interface/method itself clean. -Matthias On 5/3/24 2:15 AM, Loic Greffier wrote: Hi Bruno, Good catch, KIP has been updated accordingly. Regards, Loïc
Re: [VOTE] KIP-924: customizable task assignment for Streams
I left one more nit on the discuss thread. But overall LGTM. +1 (binding) Thanks Rohan and Sophie for driving this KIP. -Matthias On 4/29/24 2:07 PM, Sophie Blee-Goldman wrote: +1 (binding) thanks for driving this KIP! On Tue, Apr 16, 2024 at 1:46 PM Rohan Desai wrote: https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams As this KIP has been open for a while, and gone through a couple rounds of review/revision, I'm calling a vote to get it approved.
Re: [DISCUSS] KIP-924: customizable task assignment for Streams
folks would come up with :) If any of these errors are detected, the StreamsPartitionAssignor will immediately "fail" the rebalance and retry it by scheduling an immediate followup rebalance. I'm also a bit concerned here, as such endless retry loops have happened in the past in my memory. Given that we would likely see most of the user implementations be deterministic, I'm also leaning towards failing the app immediately and let the crowd educates us if there are some very interesting scenarios out there that are not on our radar to re-consider this, rather than getting hard to debug cases in the dark. - And here are just some nits about the KIP writings itself: * I think some bullet points under `User APIs` and `Read-only APIs` should have a lower level indention? It caught me for a sec until I realized there are just two categories. * In TaskAssignmentUtils , why not let those util functions return `TaskAssignment` (to me it feels more consistent with the user APIs), but instead return a Map? Guozhang On Tue, Apr 30, 2024 at 5:28 PM Matthias J. Sax wrote: I like the idea of error codes. Not sure if the name are ideal? UNKNOWN_PROCESS_ID makes sense, but the other two seems a little bit difficult to understand? Should we be very descriptive (and also try to avoid coupling it to the threading model -- important for the first error code): - ACTIVE_TASK_ ASSIGNED_MULTIPLE_TIMES - ACTIVE_AND_STANDBY_ASSIGNED_TO_SAME_CLIENT (or _INSTANCE I think we also need to add NONE as option or make the error parameter an `Optional`? OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the same active task Would also be an error if assigned to two consumers of the same client... Needs to be rephrased. If any of these errors are detected, the StreamsPartitionAssignor will immediately "fail" the rebalance and retry it by scheduling an immediate followup rebalance. Does this make sense? If we assume that the task-assignment is deterministic, we would end up with an infinite retry loop? Also, assuming that an client leave the group, we cannot assign some task any longer... I would rather throw a StreamsException and let the client crash. -Matthias On 4/30/24 12:22 PM, Sophie Blee-Goldman wrote: One last thing: I added an error code enum to be returned from the #onAssignmentComputed method in case of an invalid assignment. I created one code for each of the invalid cases we described above. The downside is that this means we'll have to go through a deprecation cycle if we want to loosen up the restrictions on any of the enforced cases. The upside is that we can very clearly mark what is an invalid assignment and this will (hopefully) assist users who are new to customizing assignments by clearly denoting the requirements, and returning a clear error if they are not followed. Of course the StreamsPartitionAssignor will also do a "fallback & retry" in this case by returning the same assignment to the consumers and scheduling a followup rebalance. I've added all of this to the TaskAssignor and #onAssignmentComputed javadocs, and added a section under "Public Changes" as well. Please let me know if there are any concerns, or if you have suggestions for how else we can handle an invalid assignment On Tue, Apr 30, 2024 at 11:39 AM Sophie Blee-Goldman < sop...@responsive.dev> wrote: Thanks guys! I agree with what Lucas said about 117c, we can always loosen a restriction later and I don't want to do anything now that might get in the way of the new threading models. With that I think we're all in agreement on 117. I'll update the KIP to include what we've discussed (and will fix the remaining #finalAssignment mention as well, thanks Bruno. Glad to have such good proof readers! :P) On Tue, Apr 30, 2024 at 8:35 AM Bruno Cadonna wrote: Hi again, I forgot to ask whether you could add the agreement about handling invalid assignment to the KIP. Best, Bruno On 4/30/24 2:00 PM, Bruno Cadonna wrote: Hi all, I think we are converging! 117 a) fail: Since it is an invalid consumer assignment b) pass: I agree that not assigning a task might be reasonable in some situations c) fail: For the reasons Lucas pointed out. I am missing a good use case here. d) fail: It is invalid Somewhere in the KIP you still use finalAssignment() instead of the wonderful method name onAssignmentComputed() ;-) "... interface also includes a method named finalAssignment which is called with the final computed GroupAssignment ..." Best, Bruno On 4/30/24 1:04 PM, Lucas Brutschy wrote: Hi, Looks like a great KIP to me! I'm late, so I'm only going to comment on the last open point 117. I'm against any fallbacks like "use the default assignor if the custom assignment is invalid", as it's just going to hide bugs. For the 4 cases m
[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and window.inner.serde.class in StreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16260: Component/s: clients > Deprecate window.size.ms and window.inner.serde.class in StreamsConfig > -- > > Key: KAFKA-16260 > URL: https://issues.apache.org/jira/browse/KAFKA-16260 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Reporter: Lucia Cerchie >Assignee: Lucia Cerchie >Priority: Major > Labels: KIP > > {{window.size.ms}} and {{window.inner.serde.class}} are not a true > KafkaStreams config, and are ignored when set from a KStreams application. > Both belong on the client. > KIP-1020: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and window.inner.serde.class in StreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16260: Description: {{window.size.ms}} and {{window.inner.serde.class}} are not a true KafkaStreams config, and are ignored when set from a KStreams application. Both belong on the client. KIP-1020: [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] was: {{indwindow.size.ms}} and `is not a true KafkaStreams config, and results in an error when set from a KStreams application. It belongs on the client. KIP-1020: [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] > Deprecate window.size.ms and window.inner.serde.class in StreamsConfig > -- > > Key: KAFKA-16260 > URL: https://issues.apache.org/jira/browse/KAFKA-16260 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Lucia Cerchie >Assignee: Lucia Cerchie >Priority: Major > Labels: KIP > > {{window.size.ms}} and {{window.inner.serde.class}} are not a true > KafkaStreams config, and are ignored when set from a KStreams application. > Both belong on the client. > KIP-1020: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and window.inner.serde.class in StreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16260: Description: {{indwindow.size.ms}} and `is not a true KafkaStreams config, and results in an error when set from a KStreams application. It belongs on the client. KIP-1020: [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] was: {{window.size.ms}} and `is not a true KafkaStreams config, and results in an error when set from a KStreams application. It belongs on the client. KIP-1020: [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] > Deprecate window.size.ms and window.inner.serde.class in StreamsConfig > -- > > Key: KAFKA-16260 > URL: https://issues.apache.org/jira/browse/KAFKA-16260 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Lucia Cerchie >Assignee: Lucia Cerchie >Priority: Major > Labels: KIP > > {{indwindow.size.ms}} and `is not a true KafkaStreams config, and results in > an error when set from a KStreams application. It belongs on the client. > KIP-1020: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and inner.serde.class in StreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16260: Description: {{window.size.ms}} and `is not a true KafkaStreams config, and results in an error when set from a KStreams application. It belongs on the client. KIP-1020: [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] was: {{window.size.ms}} is not a true KafkaStreams config, and results in an error when set from a KStreams application. It belongs on the client. [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] > Deprecate window.size.ms and inner.serde.class in StreamsConfig > --- > > Key: KAFKA-16260 > URL: https://issues.apache.org/jira/browse/KAFKA-16260 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Lucia Cerchie >Assignee: Lucia Cerchie >Priority: Major > Labels: KIP > > {{window.size.ms}} and `is not a true KafkaStreams config, and results in an > error when set from a KStreams application. It belongs on the client. > KIP-1020: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and window.inner.serde.class in StreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16260: Summary: Deprecate window.size.ms and window.inner.serde.class in StreamsConfig (was: Deprecate window.size.ms and inner.serde.class in StreamsConfig) > Deprecate window.size.ms and window.inner.serde.class in StreamsConfig > -- > > Key: KAFKA-16260 > URL: https://issues.apache.org/jira/browse/KAFKA-16260 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Lucia Cerchie >Assignee: Lucia Cerchie >Priority: Major > Labels: KIP > > {{window.size.ms}} and `is not a true KafkaStreams config, and results in an > error when set from a KStreams application. It belongs on the client. > KIP-1020: > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms and inner.serde.class in StreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16260: Summary: Deprecate window.size.ms and inner.serde.class in StreamsConfig (was: Deprecate window.size.ms in StreamsConfig) > Deprecate window.size.ms and inner.serde.class in StreamsConfig > --- > > Key: KAFKA-16260 > URL: https://issues.apache.org/jira/browse/KAFKA-16260 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Lucia Cerchie >Assignee: Lucia Cerchie >Priority: Major > Labels: KIP > > {{window.size.ms}} is not a true KafkaStreams config, and results in an > error when set from a KStreams application. It belongs on the client. > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms in StreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16260: Description: {{window.size.ms}} is not a true KafkaStreams config, and results in an error when set from a KStreams application. It belongs on the client. [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] was:{{window.size.ms}} is not a true KafkaStreams config, and results in an error when set from a KStreams application. It belongs on the client. > Deprecate window.size.ms in StreamsConfig > - > > Key: KAFKA-16260 > URL: https://issues.apache.org/jira/browse/KAFKA-16260 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Lucia Cerchie >Assignee: Lucia Cerchie >Priority: Major > Labels: KIP > > {{window.size.ms}} is not a true KafkaStreams config, and results in an > error when set from a KStreams application. It belongs on the client. > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=290982804] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16260) Deprecate window.size.ms in StreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16260: Labels: KIP (was: needs-kip) > Deprecate window.size.ms in StreamsConfig > - > > Key: KAFKA-16260 > URL: https://issues.apache.org/jira/browse/KAFKA-16260 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Lucia Cerchie >Assignee: Lucia Cerchie >Priority: Major > Labels: KIP > > {{window.size.ms}} is not a true KafkaStreams config, and results in an > error when set from a KStreams application. It belongs on the client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] KIP-1027 Add MockFixedKeyProcessorContext
Any updates on this KIP? On 3/28/24 4:11 AM, Matthias J. Sax wrote: It seems that `MockRecordMetadata` is a private class, and thus not part of the public API. If there are any changes required, we don't need to discuss on the KIP. For `CapturedPunctuator` and `CapturedForward` it's a little bit more tricky. My gut feeling is, that the classes might not need to be changed, but if we use them within `MockProcessorContext` and `MockFixedKeyProcessorContext` it might be weird to keep the current nesting... The problem I see is, that it's not straightforward how to move the classes w/o breaking compatibility, nor if we duplicate them as standalone classes w/o a larger "splash radius". (We would need to add new overloads for MockProcessorContext#scheduledPunctuators() and MockProcessorContext#forwarded()). Might be good to hear from others if we think it's worth this larger changes to get rid of the nesting, or just accept the somewhat not ideal nesting as it technically is not a real issue? -Matthias On 3/15/24 1:47 AM, Shashwat Pandey wrote: Thanks for the feedback Matthias! The reason I proposed the extension of MockProcessorContext was more to do with the internals of the class (MockRecordMetadata, CapturedPunctuator and CapturedForward). However, I do see your point, I would then think to split MockProcessorContext and MockFixedKeyProcessorContext, some of the internal classes should also be extracted i.e. MockRecordMetadata, CapturedPunctuator and probably a new CapturedFixedKeyForward. Let me know what you think! Regards, Shashwat Pandey On Mon, Mar 11, 2024 at 10:09 PM Matthias J. Sax wrote: Thanks for the KIP Shashwat. Closing this testing gap is great! It did come up a few time already... One question: why do you propose to `extend MockProcessorContext`? Given how the actual runtime context classes are setup, it seems that the regular context and fixed-key-context are distinct, and thus I believe both mock-context classes should be distinct, too? What I mean is that FixedKeyProcessorContext does not extend ProcessorContext. Both classes have a common parent ProcessINGContext (note the very similar but different names), but they are "siblings" only, so why make the mock processor a parent-child relationship? It seems better to do public class MockFixedKeyProcessorContext implements FixedKeyProcessorContext, RecordCollector.Supplier Of course, if there is code we can share between both mock-context we should so this, but it should not leak into the public API? -Matthias On 3/11/24 5:21 PM, Shashwat Pandey wrote: Hi everyone, I would like to start the discussion on https://cwiki.apache.org/confluence/display/KAFKA/KIP-1027%3A+Add+MockFixedKeyProcessorContext This adds MockFixedKeyProcessorContext to the Kafka Streams Test Utils library. Regards, Shashwat Pandey
Re: outerjoin not joining after window
How do you know this? First thing we do is write a log message in the value joiner. We don't see the log message for the missed records. Well, for left/right join results, the ValueJoiner would only be called when the window is closed... And for invalid input (or late record, ie, which arrive out-of-order and their window was already closes), records would be dropped right away. So you cannot really infer that a record did make it into the join or not, or what happens if it did make it into the `Processor`. -> https://kafka.apache.org/documentation/#kafka_streams_task_monitoring `dropped-records-total` is the name of the metric. -Matthias On 5/1/24 11:35 AM, Chad Preisler wrote: Hello, We did some testing in our test environment today. We are seeing some records processes where only one side of the join has a record. So that's good. However, we are still seeing some records get skipped. They never hit the value joiner (we write a log message first thing in the value joiner). During the test we were putting some load on the system, so stream time was advancing. We did notice that the join windows were taking much longer than 30 minutes to close and process records. Thirty minutes is the window plus grace. How do you know this? First thing we do is write a log message in the value joiner. We don't see the log message for the missed records. I will try pushing the same records locally. However, we don't see any errors in our logs and the stream does process one sided joins after the skipped record. Do you have any docs on the "dropper records" metric? I did a Google search and didn't find many good results for that. Thanks, Chad On Tue, Apr 30, 2024 at 2:49 PM Matthias J. Sax wrote: Thanks for the information. I ran the code using Kafka locally. After submitting some records inside and outside of the time window and grace, the join performed as expected when running locally. That gives some hope :) However, they never get into the join. How do you know this? Did you check the metric for dropper records? Maybe records are considers malformed and dropped? Are you using the same records in production and in your local test? Are there any settings for the stream client that would affect the join? Not that I can think of... There is one more internal config, but as long as data is flowing, it should not impact the result you see. Are there any settings on the broker side that would affect the join? No. The join is computed client side. Broker configs should not have any impact. f I increase the log level for the streams API would that shed some light on what is happening? I don't think it would help much. The code in question is org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin -- but it does not do any logging except WARN for the already mentioned "dropping malformed" records that is also recorded via JMX. WARN: "Skipping record due to null key or value. " If you can identify a specific record from the input which would produce an output, but does not, maybe you can try to feed it into your local test env and try to re-produce the issue? -Matthias On 4/30/24 11:38 AM, Chad Preisler wrote: Matthias, Thanks for the information. I ran the code using Kafka locally. After submitting some records inside and outside of the time window and grace, the join performed as expected when running locally. I'm not sure why the join is not working as expected when running against our actual brokers. We are peeking at the records for the streams and we are seeing the records get pulled. However, they never get into the join. It's been over 24 hours since the expected records were created, and there has been plenty of traffic to advance the stream time. Only records that have both a left and right side match are getting processed by the join. Are there any settings for the stream client that would affect the join? Are there any settings on the broker side that would affect the join? The outer join is just one part of the topology. Compared to running it locally there is a lot more data going through the app when running on our actual servers. If I increase the log level for the streams API would that shed some light on what is happening? Does anyone know if there are specific packages that I should increase the log level for? Any specific log message I can hone in on to tell me what is going on? Basically, I'm looking for some pointers on where I can start looking. Thanks, Chad On Tue, Apr 30, 2024 at 10:26 AM Matthias J. Sax wrote: I expect the join to execute after the 25 with one side of the join containing a record and the other being null Given that you also have a grace period of 5 minutes, the result will only be emitted after the grace-period passed and the window is closed (not when window end time is reached). One has a naming convention of
[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.
[ https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842805#comment-17842805 ] Matthias J. Sax commented on KAFKA-16514: - Thanks for the background! Makes sense. > Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup > flag. > --- > > Key: KAFKA-16514 > URL: https://issues.apache.org/jira/browse/KAFKA-16514 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Sal Sorrentino >Priority: Minor > > Working with Kafka Streams 3.7.0, but may affect earlier versions as well. > When attempting to shutdown a streams application and leave the associated > consumer group, the supplied `leaveGroup` option seems to have no effect. > Sample code: > {code:java} > CloseOptions options = new CloseOptions().leaveGroup(true); > stream.close(options);{code} > The expected behavior here is that the group member would shutdown and leave > the group, immediately triggering a consumer group rebalance. In practice, > the rebalance happens after the appropriate timeout configuration has expired. > I understand the default behavior in that there is an assumption that any > associated StateStores would be persisted to disk and that in the case of a > rolling restart/deployment, the rebalance delay may be preferable. However, > in our application we are using in-memory state stores and standby replicas. > There is no benefit in delaying the rebalance in this setup and we are in > need of a way to force a member to leave the group when shutting down. > The workaround we found is to set an undocumented internal StreamConfig to > enforce this behavior: > {code:java} > props.put("internal.leave.group.on.close", true); > {code} > To state the obvious, this is less than ideal. > Additional configuration details: > {code:java} > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId"); > props.put( > StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); > props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors); > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE_V2);{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete
[ https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-16644. - Resolution: Duplicate > FK join emits duplicate tombstone on left-side delete > - > > Key: KAFKA-16644 > URL: https://issues.apache.org/jira/browse/KAFKA-16644 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 > Reporter: Matthias J. Sax >Priority: Major > > We introduced a regression bug in 3.7.0 release via KAFKA-14748. When a > left-hand side record is deleted, the join now emits two tombstone records > instead of one. > The problem was not detected via unit test, because the tests use a `Map` > instead of a `List` when verifying the result topic records > ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).] > We should update all test cases to use `List` when reading from the output > topic, and of course fix the introduced bug: The > `SubscriptionSendProcessorSupplier` is sending two subscription records > instead of just a single one: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete
[ https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-16644. - Resolution: Duplicate > FK join emits duplicate tombstone on left-side delete > - > > Key: KAFKA-16644 > URL: https://issues.apache.org/jira/browse/KAFKA-16644 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 > Reporter: Matthias J. Sax >Priority: Major > > We introduced a regression bug in 3.7.0 release via KAFKA-14748. When a > left-hand side record is deleted, the join now emits two tombstone records > instead of one. > The problem was not detected via unit test, because the tests use a `Map` > instead of a `List` when verifying the result topic records > ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).] > We should update all test cases to use `List` when reading from the output > topic, and of course fix the introduced bug: The > `SubscriptionSendProcessorSupplier` is sending two subscription records > instead of just a single one: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing
Thanks for the update. I am wondering if we should use `ReadOnlyHeaders` instead of `ImmutableHeaders` as interface name? Also, the returned `Header` interface is technically not immutable either, because `Header#key()` returns a mutable byte-array... Would we need a `ReadOnlyHeader` interface? If yes, it seems that `ReadOnlyHeaders` should not be a super-interface of `Headers` but it would rather be a standalone interface, and a wrapper for a `Headers` instance? And `ReadOnlyHeader` would return some immutable type instead of `byte[]` for the value()? An alternative would be to deep-copy the value byte-array what would not be free, but given that we are talking about exception handling, it would not be on the hot code path, and thus might be acceptable? The above seems to increase the complexity significantly though. Hence, I have seconds thoughts on the immutability question: Do we really need to worry about mutability after all, because in the end, KS runtime won't read the Headers instance after the handler was called, and if a user modifies the passed in headers, there won't be any actual damage (ie, no side effects)? For this case, it might even be ok to also not add `ImmutableHeaders` to begin with? Sorry for the forth and back (yes, forth and back, because back and forth does not make sense -- it's not logical -- just trying to fix English :D) as I did bring up the immutability question in the first place... -Matthias On 4/25/24 5:56 AM, Loic Greffier wrote: Hi Matthias, I have updated the KIP regarding points 103 and 108. 103. I have suggested a new `ImmutableHeaders` interface to deal with the immutability concern of the headers, which is basically the `Headers` interface without the write accesses. public interface ImmutableHeaders { Header lastHeader(String key); Iterable headers(String key); Header[] toArray(); } The `Headers` interface can be updated accordingly: public interface Headers extends ImmutableHeaders, Iterable { //… } Loïc
Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets
Thanks Bruno. 101: I think I understand this better now. But just want to make sure I do. What do you mean by "they can diverge" and "Recovering after a failure might load inconsistent offsets and positions." The checkpoint is the offset from the changelog, while the position is the offset from the upstream source topic, right? -- In the end, the position is about IQ, and if we fail to update it, it only means that there is some gap when we might not be able to query a standby task, because we think it's not up-to-date enough even if it is, which would resolve itself soon? Ie, the position might "lag", but it's not "inconsistent". Do we believe that this lag would be highly problematic? 102: I am confused. The position is maintained inside the state store, but is persisted in the .position file when the state store closes. This contradicts the KIP: these position offsets will be stored in RocksDB, in the same column family as the changelog offsets, instead of the .position file My main concern is currently about rebalance metadata -- opening RocksDB stores seems to be very expensive, but if we follow the KIP: We will do this under EOS by updating the .checkpoint file whenever a store is close()d. It seems, having the offset inside RocksDB does not help us at all? In the end, when we crash, we don't want to lose the state, but when we update the .checkpoint only on a clean close, the .checkpoint might be stale (ie, still contains the checkpoint when we opened the store when we got a task assigned). -Matthias On 4/30/24 2:40 AM, Bruno Cadonna wrote: Hi all, 100 I think we already have such a wrapper. It is called AbstractReadWriteDecorator. 101 Currently, the position is checkpointed when a offset checkpoint is written. If we let the state store manage the committed offsets, we need to also let the state store also manage the position otherwise they might diverge. State store managed offsets can get flushed (i.e. checkpointed) to the disk when the state store decides to flush its in-memory data structures, but the position is only checkpointed at commit time. Recovering after a failure might load inconsistent offsets and positions. 102 The position is maintained inside the state store, but is persisted in the .position file when the state store closes. The only public interface that uses the position is IQv2 in a read-only mode. So the position is only updated within the state store and read from IQv2. No need to add anything to the public StateStore interface. 103 Deprecating managesOffsets() right away might be a good idea. 104 I agree that we should try to support downgrades without wipes. At least Nick should state in the KIP why we do not support it. Best, Bruno On 4/23/24 8:13 AM, Matthias J. Sax wrote: Thanks for splitting out this KIP. The discussion shows, that it is a complex beast by itself, so worth to discuss by its own. Couple of question / comment: 100 `StateStore#commit()`: The JavaDoc says "must not be called by users" -- I would propose to put a guard in place for this, by either throwing an exception (preferable) or adding a no-op implementation (at least for our own stores, by wrapping them -- we cannot enforce it for custom stores I assume), and document this contract explicitly. 101 adding `.position` to the store: Why do we actually need this? The KIP says "To ensure consistency with the committed data and changelog offsets" but I am not sure if I can follow? Can you elaborate why leaving the `.position` file as-is won't work? If it's possible at all, it will need to be done by creating temporary StateManagers and StateStores during rebalance. I think it is possible, and probably not too expensive, but the devil will be in the detail. This sounds like a significant overhead to me. We know that opening a single RocksDB takes about 500ms, and thus opening RocksDB to get this information might slow down rebalances significantly. 102: It's unclear to me, how `.position` information is added. The KIP only says: "position offsets will be stored in RocksDB, in the same column family as the changelog offsets". Do you intent to add this information to the map passed via `commit(final MapLong> changelogOffsets)`? The KIP should describe this in more detail. Also, if my assumption is correct, we might want to rename the parameter and also have a better JavaDoc description? 103: Should we make it mandatory (long-term) that all stores (including custom stores) manage their offsets internally? Maintaining both options and thus both code paths puts a burden on everyone and make the code messy. I would strongly prefer if we could have mid-term path to get rid of supporting both. -- For this case, we should deprecate the newly added `managesOffsets()` method right away, to point
Re: [DISCUSS] KIP-924: customizable task assignment for Streams
I like the idea of error codes. Not sure if the name are ideal? UNKNOWN_PROCESS_ID makes sense, but the other two seems a little bit difficult to understand? Should we be very descriptive (and also try to avoid coupling it to the threading model -- important for the first error code): - ACTIVE_TASK_ ASSIGNED_MULTIPLE_TIMES - ACTIVE_AND_STANDBY_ASSIGNED_TO_SAME_CLIENT (or _INSTANCE I think we also need to add NONE as option or make the error parameter an `Optional`? OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the same active task Would also be an error if assigned to two consumers of the same client... Needs to be rephrased. If any of these errors are detected, the StreamsPartitionAssignor will immediately "fail" the rebalance and retry it by scheduling an immediate followup rebalance. Does this make sense? If we assume that the task-assignment is deterministic, we would end up with an infinite retry loop? Also, assuming that an client leave the group, we cannot assign some task any longer... I would rather throw a StreamsException and let the client crash. -Matthias On 4/30/24 12:22 PM, Sophie Blee-Goldman wrote: One last thing: I added an error code enum to be returned from the #onAssignmentComputed method in case of an invalid assignment. I created one code for each of the invalid cases we described above. The downside is that this means we'll have to go through a deprecation cycle if we want to loosen up the restrictions on any of the enforced cases. The upside is that we can very clearly mark what is an invalid assignment and this will (hopefully) assist users who are new to customizing assignments by clearly denoting the requirements, and returning a clear error if they are not followed. Of course the StreamsPartitionAssignor will also do a "fallback & retry" in this case by returning the same assignment to the consumers and scheduling a followup rebalance. I've added all of this to the TaskAssignor and #onAssignmentComputed javadocs, and added a section under "Public Changes" as well. Please let me know if there are any concerns, or if you have suggestions for how else we can handle an invalid assignment On Tue, Apr 30, 2024 at 11:39 AM Sophie Blee-Goldman wrote: Thanks guys! I agree with what Lucas said about 117c, we can always loosen a restriction later and I don't want to do anything now that might get in the way of the new threading models. With that I think we're all in agreement on 117. I'll update the KIP to include what we've discussed (and will fix the remaining #finalAssignment mention as well, thanks Bruno. Glad to have such good proof readers! :P) On Tue, Apr 30, 2024 at 8:35 AM Bruno Cadonna wrote: Hi again, I forgot to ask whether you could add the agreement about handling invalid assignment to the KIP. Best, Bruno On 4/30/24 2:00 PM, Bruno Cadonna wrote: Hi all, I think we are converging! 117 a) fail: Since it is an invalid consumer assignment b) pass: I agree that not assigning a task might be reasonable in some situations c) fail: For the reasons Lucas pointed out. I am missing a good use case here. d) fail: It is invalid Somewhere in the KIP you still use finalAssignment() instead of the wonderful method name onAssignmentComputed() ;-) "... interface also includes a method named finalAssignment which is called with the final computed GroupAssignment ..." Best, Bruno On 4/30/24 1:04 PM, Lucas Brutschy wrote: Hi, Looks like a great KIP to me! I'm late, so I'm only going to comment on the last open point 117. I'm against any fallbacks like "use the default assignor if the custom assignment is invalid", as it's just going to hide bugs. For the 4 cases mentioned by Sophie: 117a) I'd fail immediately here, as it's an implementation bug, and should not lead to a valid consumer group assignment. 117b) Agreed. This is a useful assignment and should be allowed. 117c) This is the tricky case. However, I'm leaning towards not allowing this, unless we have a concrete use case. This will block us from potentially using a single consumer for active and standby tasks in the future. It's easier to drop the restriction later if we have a concrete use case. 117d) Definitely fail immediately, as you said. Cheers, Lucas On Mon, Apr 29, 2024 at 11:13 PM Sophie Blee-Goldman wrote: Yeah I think that sums it up well. Either you computed a *possible* assignment, or you returned something that makes it literally impossible for the StreamsPartitionAssignor to decipher/translate into an actual group assignment, in which case it should just fail That's more or less it for the open questions that have been raised so far, so I just want to remind folks that there's already a voting thread for this. I cast my vote a few minutes ago so it should resurface in everyone's inbox :) On Thu, Apr 25, 2024 at 11:42 PM Rohan Desai wrote: 117: as Sophie laid out, there are two cases here right: 1. cases that are considered
[jira] [Commented] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete
[ https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842514#comment-17842514 ] Matthias J. Sax commented on KAFKA-16644: - Sorry. Wrong link. Fixed -> https://issues.apache.org/jira/browse/KAFKA-14748 > FK join emits duplicate tombstone on left-side delete > - > > Key: KAFKA-16644 > URL: https://issues.apache.org/jira/browse/KAFKA-16644 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Matthias J. Sax >Priority: Major > > We introduced a regression bug in 3.7.0 release via KAFKA-14748. When a > left-hand side record is deleted, the join now emits two tombstone records > instead of one. > The problem was not detected via unit test, because the tests use a `Map` > instead of a `List` when verifying the result topic records > ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).] > We should update all test cases to use `List` when reading from the output > topic, and of course fix the introduced bug: The > `SubscriptionSendProcessorSupplier` is sending two subscription records > instead of just a single one: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16644) FK join emits duplicate tombstone on left-side delete
[ https://issues.apache.org/jira/browse/KAFKA-16644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16644: Description: We introduced a regression bug in 3.7.0 release via KAFKA-14748. When a left-hand side record is deleted, the join now emits two tombstone records instead of one. The problem was not detected via unit test, because the tests use a `Map` instead of a `List` when verifying the result topic records ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).] We should update all test cases to use `List` when reading from the output topic, and of course fix the introduced bug: The `SubscriptionSendProcessorSupplier` is sending two subscription records instead of just a single one: [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136] was: We introduced a regression bug in 3.7.0 release via KAFKA-14778. When a left-hand side record is deleted, the join now emits two tombstone records instead of one. The problem was not detected via unit test, because the tests use a `Map` instead of a `List` when verifying the result topic records ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).] We should update all test cases to use `List` when reading from the output topic, and of course fix the introduced bug: The `SubscriptionSendProcessorSupplier` is sending two subscription records instead of just a single one: [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136] > FK join emits duplicate tombstone on left-side delete > - > > Key: KAFKA-16644 > URL: https://issues.apache.org/jira/browse/KAFKA-16644 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 > Reporter: Matthias J. Sax >Priority: Major > > We introduced a regression bug in 3.7.0 release via KAFKA-14748. When a > left-hand side record is deleted, the join now emits two tombstone records > instead of one. > The problem was not detected via unit test, because the tests use a `Map` > instead of a `List` when verifying the result topic records > ([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java#L250-L255).] > We should update all test cases to use `List` when reading from the output > topic, and of course fix the introduced bug: The > `SubscriptionSendProcessorSupplier` is sending two subscription records > instead of just a single one: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L133-L136] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: outerjoin not joining after window
Thanks for the information. I ran the code using Kafka locally. After submitting some records inside and outside of the time window and grace, the join performed as expected when running locally. That gives some hope :) However, they never get into the join. How do you know this? Did you check the metric for dropper records? Maybe records are considers malformed and dropped? Are you using the same records in production and in your local test? Are there any settings for the stream client that would affect the join? Not that I can think of... There is one more internal config, but as long as data is flowing, it should not impact the result you see. Are there any settings on the broker side that would affect the join? No. The join is computed client side. Broker configs should not have any impact. f I increase the log level for the streams API would that shed some light on what is happening? I don't think it would help much. The code in question is org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin -- but it does not do any logging except WARN for the already mentioned "dropping malformed" records that is also recorded via JMX. WARN: "Skipping record due to null key or value. " If you can identify a specific record from the input which would produce an output, but does not, maybe you can try to feed it into your local test env and try to re-produce the issue? -Matthias On 4/30/24 11:38 AM, Chad Preisler wrote: Matthias, Thanks for the information. I ran the code using Kafka locally. After submitting some records inside and outside of the time window and grace, the join performed as expected when running locally. I'm not sure why the join is not working as expected when running against our actual brokers. We are peeking at the records for the streams and we are seeing the records get pulled. However, they never get into the join. It's been over 24 hours since the expected records were created, and there has been plenty of traffic to advance the stream time. Only records that have both a left and right side match are getting processed by the join. Are there any settings for the stream client that would affect the join? Are there any settings on the broker side that would affect the join? The outer join is just one part of the topology. Compared to running it locally there is a lot more data going through the app when running on our actual servers. If I increase the log level for the streams API would that shed some light on what is happening? Does anyone know if there are specific packages that I should increase the log level for? Any specific log message I can hone in on to tell me what is going on? Basically, I'm looking for some pointers on where I can start looking. Thanks, Chad On Tue, Apr 30, 2024 at 10:26 AM Matthias J. Sax wrote: I expect the join to execute after the 25 with one side of the join containing a record and the other being null Given that you also have a grace period of 5 minutes, the result will only be emitted after the grace-period passed and the window is closed (not when window end time is reached). One has a naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm not sure how to decode that message to see what is in it. What is the purpose of those messages? It's an internal store, that stores all records which are subject to be emitted as left/right join result, ie, if there is no inner join result. The format used is internal: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerde.java Also note: time is based on event-time, ie, if the input stream stops to send new records, "stream-time" will stop to advance and the result might not be emitted because the window does not get closed. (Last, there is some internal wall-clock time delay of one second to emit results for performance reasons...) HTH. -Matthias On 4/30/24 6:51 AM, Chad Preisler wrote: Hello, I have a KStream to KStream outer join with a time difference of 25 minutes and 5 minutes of grace. When I get a record for one side of the join, but don't get a record on the other side of the join, I expect the join to execute after the 25 with one side of the join containing a record and the other being null. Is that correct? If it is correct, it's not working for me. I was poking around on the broker and saw some internal topics. I see the key I expected to execute the join on some of those topics. One has a naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm not sure how to decode that message to see what is in it. What is the purpose of those messages? If I decode the message will it help me see when the join should have been executed? I also see the key on a topic with the naming convention "KSTREAM_OUTERTHIS". Are there any other topics that I should be looking at to troubleshoot this issue? Thanks, Chad
Re: outerjoin not joining after window
I expect the join to execute after the 25 with one side of the join containing a record and the other being null Given that you also have a grace period of 5 minutes, the result will only be emitted after the grace-period passed and the window is closed (not when window end time is reached). One has a naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm not sure how to decode that message to see what is in it. What is the purpose of those messages? It's an internal store, that stores all records which are subject to be emitted as left/right join result, ie, if there is no inner join result. The format used is internal: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerde.java Also note: time is based on event-time, ie, if the input stream stops to send new records, "stream-time" will stop to advance and the result might not be emitted because the window does not get closed. (Last, there is some internal wall-clock time delay of one second to emit results for performance reasons...) HTH. -Matthias On 4/30/24 6:51 AM, Chad Preisler wrote: Hello, I have a KStream to KStream outer join with a time difference of 25 minutes and 5 minutes of grace. When I get a record for one side of the join, but don't get a record on the other side of the join, I expect the join to execute after the 25 with one side of the join containing a record and the other being null. Is that correct? If it is correct, it's not working for me. I was poking around on the broker and saw some internal topics. I see the key I expected to execute the join on some of those topics. One has a naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm not sure how to decode that message to see what is in it. What is the purpose of those messages? If I decode the message will it help me see when the join should have been executed? I also see the key on a topic with the naming convention "KSTREAM_OUTERTHIS". Are there any other topics that I should be looking at to troubleshoot this issue? Thanks, Chad
[jira] [Commented] (KAFKA-16382) Kafka Streams drop NULL values after reset
[ https://issues.apache.org/jira/browse/KAFKA-16382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842423#comment-17842423 ] Matthias J. Sax commented on KAFKA-16382: - Not yet from our side... Working on other things atm. Not sure when we will be able to pick it up, or if anybody from the community wants to take it. > Kafka Streams drop NULL values after reset > -- > > Key: KAFKA-16382 > URL: https://issues.apache.org/jira/browse/KAFKA-16382 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: Stanislav Spiridonov >Priority: Major > > Kafka Streams (KTable) drops null values after full reset. > See > [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java] > for sample topology > Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics) > # Start example - 1st round > # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull" > # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab" > # Stop application > # Run kafka-streams-application-reset > {code:java} > call bin/windows/kafka-streams-application-reset --application-id > nullproblem-example^ > --input-topics "NULL-IN,NULL-IN-AUX"^ > --bootstrap-server "localhost:9092" > {code} > # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app > running yet) > # Start example - 2nd round > # After initialization -> NULL-OUT *still contains* 2 messages "A1:anull, > A1:ab" > # Expected output *3 messages* "A1:anull, A1:ab, {*}A1:{*}" > The issue is NOT reproduced if application just restarted (skip step 5). > The issue is NOT reproduced if internal cache is disabled. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16645) CVEs in 3.7.0 docker image
[ https://issues.apache.org/jira/browse/KAFKA-16645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842419#comment-17842419 ] Matthias J. Sax commented on KAFKA-16645: - I believe fixing these CVEs should be a blocker for 3.7.1 and 3.8.0? Thoughts? > CVEs in 3.7.0 docker image > -- > > Key: KAFKA-16645 > URL: https://issues.apache.org/jira/browse/KAFKA-16645 > Project: Kafka > Issue Type: Task >Affects Versions: 3.7.0 >Reporter: Mickael Maison >Priority: Blocker > Fix For: 3.8.0, 3.7.1 > > > Our [Docker Image CVE > Scanner|https://github.com/apache/kafka/actions/runs/874393] GitHub > action reports 2 high CVEs in our base image: > apache/kafka:3.7.0 (alpine 3.19.1) > == > Total: 2 (HIGH: 2, CRITICAL: 0) > ┌──┬┬──┬┬───┬───┬─┐ > │ Library │ Vulnerability │ Severity │ Status │ Installed Version │ Fixed > Version │Title│ > ├──┼┼──┼┼───┼───┼─┤ > │ libexpat │ CVE-2023-52425 │ HIGH │ fixed │ 2.5.0-r2 │ > 2.6.0-r0 │ expat: parsing large tokens can trigger a denial of service │ > │ ││ ││ │ > │ https://avd.aquasec.com/nvd/cve-2023-52425 │ > │ ├┤ ││ > ├───┼─┤ > │ │ CVE-2024-28757 │ ││ │ > 2.6.2-r0 │ expat: XML Entity Expansion │ > │ ││ ││ │ > │ https://avd.aquasec.com/nvd/cve-2024-28757 │ > └──┴┴──┴┴───┴───┴─┘ > Looking at the > [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka#KIP975:DockerImageforApacheKafka-WhatifweobserveabugoracriticalCVEinthereleasedApacheKafkaDockerImage?] > that introduced the docker images, it seems we should release a bugfix when > high CVEs are detected. It would be good to investigate and assess whether > Kafka is impacted or not. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16645) CVEs in 3.7.0 docker image
[ https://issues.apache.org/jira/browse/KAFKA-16645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16645: Priority: Blocker (was: Major) > CVEs in 3.7.0 docker image > -- > > Key: KAFKA-16645 > URL: https://issues.apache.org/jira/browse/KAFKA-16645 > Project: Kafka > Issue Type: Task >Affects Versions: 3.7.0 >Reporter: Mickael Maison >Priority: Blocker > > Our [Docker Image CVE > Scanner|https://github.com/apache/kafka/actions/runs/874393] GitHub > action reports 2 high CVEs in our base image: > apache/kafka:3.7.0 (alpine 3.19.1) > == > Total: 2 (HIGH: 2, CRITICAL: 0) > ┌──┬┬──┬┬───┬───┬─┐ > │ Library │ Vulnerability │ Severity │ Status │ Installed Version │ Fixed > Version │Title│ > ├──┼┼──┼┼───┼───┼─┤ > │ libexpat │ CVE-2023-52425 │ HIGH │ fixed │ 2.5.0-r2 │ > 2.6.0-r0 │ expat: parsing large tokens can trigger a denial of service │ > │ ││ ││ │ > │ https://avd.aquasec.com/nvd/cve-2023-52425 │ > │ ├┤ ││ > ├───┼─┤ > │ │ CVE-2024-28757 │ ││ │ > 2.6.2-r0 │ expat: XML Entity Expansion │ > │ ││ ││ │ > │ https://avd.aquasec.com/nvd/cve-2024-28757 │ > └──┴┴──┴┴───┴───┴─┘ > Looking at the > [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka#KIP975:DockerImageforApacheKafka-WhatifweobserveabugoracriticalCVEinthereleasedApacheKafkaDockerImage?] > that introduced the docker images, it seems we should release a bugfix when > high CVEs are detected. It would be good to investigate and assess whether > Kafka is impacted or not. -- This message was sent by Atlassian Jira (v8.20.10#820010)