[jira] [Created] (KAFKA-16573) Streams does not specify where a Serde is needed
Ayoub Omari created KAFKA-16573: --- Summary: Streams does not specify where a Serde is needed Key: KAFKA-16573 URL: https://issues.apache.org/jira/browse/KAFKA-16573 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 3.7.0 Reporter: Ayoub Omari Example topology: {code:java} builder .table("input", Consumed.`with`(Serdes.String(), Serdes.String())) .groupBy((key, value) => new KeyValue(value, key)) .count() .toStream() .to("output", Produced.`with`(Serdes.String(), Serdes.Long())) {code} At runtime, we get the following exception {code:java} Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG org.apache.kafka.common.config.ConfigException: Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG at org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857) at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92) at org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47) at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63) at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232) at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102) at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code} The error does not give information about the line or the processor causing the issue. Here a Grouped was missing inside the groupBy, but because the groupBy api doesn't force to define Grouped, this one can be missed, and it could be difficult to spot on a more complex topology. Also, for someone who needs control over serdes in the topology and doesn't want to define default serdes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16458) Add contains method in KeyValue store interface
Ayoub Omari created KAFKA-16458: --- Summary: Add contains method in KeyValue store interface Key: KAFKA-16458 URL: https://issues.apache.org/jira/browse/KAFKA-16458 Project: Kafka Issue Type: Wish Components: streams Reporter: Ayoub Omari In some stream processors, we sometimes just want to check if a key exists in the state store or not. I find calling .get() and checking if the return value is null a little bit verbose {code:java} if (store.get(key) != null) { }{code} But I am not sure if it is on purpose that we would like to keep the store interface simple. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16434) ForeignKey INNER join does not unset join result when FK becomes null
Ayoub Omari created KAFKA-16434: --- Summary: ForeignKey INNER join does not unset join result when FK becomes null Key: KAFKA-16434 URL: https://issues.apache.org/jira/browse/KAFKA-16434 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.7.0, 2.8.2 Reporter: Ayoub Omari We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, String]_ where _LeftRecord_ : {code:scala} case class LeftRecord(foreignKey: String, name: String){code} we do a simple *INNER* foreign key join on left-topic's foreignKey field. The resulting join value is the value in right-topic. (same topology example as in KAFKA-16407) *Scenario: Unset foreign key of a primary key* {code:scala} rightTopic.pipeInput("fk1", "1") leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) leftTopic.pipeInput("pk1", ProductValue(null, "pk1")) {code} *+Actual result+* {code:java} KeyValue(pk1, 3) {code} *+Expected result+* {code:java} KeyValue(pk1, 3) KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code} However, in {+}other cases{+}, where the join result should be unset (e.g. the primary key is deleted, or the foreign key changes to a non existing FK), that record is {+}correctly emitted{+}. Also, the importance of unsetting the join result is mentioned in the code: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36 {code:java} //[...] Additionally, propagate null if no FK is found there, // since we must "unset" any output set by the previous FK-join. This is true for both INNER and LEFT join. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16407) ForeignKey INNER join ignores FK change when its previous value is null
Ayoub Omari created KAFKA-16407: --- Summary: ForeignKey INNER join ignores FK change when its previous value is null Key: KAFKA-16407 URL: https://issues.apache.org/jira/browse/KAFKA-16407 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.7.0 Reporter: Ayoub Omari We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, String]_ where _LeftRecord_ : {code:scala} case class LeftRecord(foreignKey: String, name: String){code} we do a simple *INNER* foreign key join on left-topic's foreignKey field. The resulting join value is the value in right-topic. *Scenario: Primary key pk1 gets mapped to a new FK after having a null FK* {code:scala} rightTopic.pipeInput("fk", "1") leftTopic.pipeInput("pk1", LeftRecord(null, "pk1")) leftTopic.pipeInput("pk1", LeftRecord("fk", "pk1")) {code} *+Expected result+* {code:scala} KeyValue(pk1, 1){code} *+Actual result+* {code:scala} # No output ! # Logs: 20:14:29,723 WARN org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] topic=[left-topic] partition=[0] offset=[0] 20:14:29,728 WARN org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] topic=[left-topic] partition=[0] offset=[1] {code} After looking into the code, I believe this is the line behind the issue : https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L147 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16394) ForeignKey LEFT join propagates null value on foreignKey change
Ayoub Omari created KAFKA-16394: --- Summary: ForeignKey LEFT join propagates null value on foreignKey change Key: KAFKA-16394 URL: https://issues.apache.org/jira/browse/KAFKA-16394 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.7.0 Reporter: Ayoub Omari Attachments: ForeignJoinTest.scala, JsonSerde.scala We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, String]_ where _LeftRecord_ : {code:scala} case class LeftRecord(foreignKey: String, name: String){code} we do a simple foreign key join on left-topic's foreignKey field which returns the value in right-topic. +*Scenario1: change foreignKey*+ Input the following {code:scala} rightTopic.pipeInput("fk1", "1") rightTopic.pipeInput("fk2", "2") leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1")) leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1")) {code} *+Expected result+* {code:scala} KeyValue(pk1, 1) KeyValue(pk1, 2){code} *+Actual result+* {code:scala} KeyValue(pk1, 1) KeyValue(pk1, null) KeyValue(pk1, 2){code} A null is propagated to the join result when the foreign key changes +*Scenario 2: Delete PrimaryKey*+ Input {code:scala} rightTopic.pipeInput("fk1", "1") rightTopic.pipeInput("fk2", "2") leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1")) leftTopic.pipeInput("pk1", null) {code} *+Expected result+* {code:scala} KeyValue(pk1, 1) KeyValue(pk1, null) {code} *+Actual result+* {code:java} KeyValue(pk1, 1) KeyValue(pk1, null) KeyValue(pk1, null) {code} An additional null is propagated to the join result. This bug doesn't exist on versions 3.6.0 and below. I believe the issue comes from the line [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L134] where we propagate the deletion in the two scenarios above Attaching the topology I used. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16343) Improve tests of streams foreignkey package
Ayoub Omari created KAFKA-16343: --- Summary: Improve tests of streams foreignkey package Key: KAFKA-16343 URL: https://issues.apache.org/jira/browse/KAFKA-16343 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 3.7.0 Reporter: Ayoub Omari Assignee: Ayoub Omari Some classes are not tested in streams foreignkey package, such as SubscriptionSendProcessorSupplier and ForeignTableJoinProcessorSupplier. Corresponding tests should be added. The class ForeignTableJoinProcessorSupplierTest should be renamed as it is not testing ForeignTableJoinProcessor, but rather SubscriptionJoinProcessorSupplier. -- This message was sent by Atlassian Jira (v8.20.10#820010)