[jira] [Created] (KAFKA-16573) Streams does not specify where a Serde is needed

2024-04-17 Thread Ayoub Omari (Jira)
Ayoub Omari created KAFKA-16573:
---

 Summary: Streams does not specify where a Serde is needed
 Key: KAFKA-16573
 URL: https://issues.apache.org/jira/browse/KAFKA-16573
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 3.7.0
Reporter: Ayoub Omari


Example topology:
{code:java}
 builder
   .table("input", Consumed.`with`(Serdes.String(), Serdes.String()))
   .groupBy((key, value) => new KeyValue(value, key))
   .count()
   .toStream()
   .to("output", Produced.`with`(Serdes.String(), Serdes.Long()))
 {code}
At runtime, we get the following exception 
{code:java}
Please specify a key serde or set one through 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
org.apache.kafka.common.config.ConfigException: Please specify a key serde or 
set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
    at 
org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857)
    at 
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92)
    at 
org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
    at 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
    at 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90)
    at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188)
    at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143)
    at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
    at 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code}
The error does not give information about the line or the processor causing the 
issue.

Here a Grouped was missing inside the groupBy, but because the groupBy api 
doesn't force to define Grouped, this one can be missed, and it could be 
difficult to spot on a more complex topology. 

Also, for someone who needs control over serdes in the topology and doesn't 
want to define default serdes.

 

  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16458) Add contains method in KeyValue store interface

2024-04-02 Thread Ayoub Omari (Jira)
Ayoub Omari created KAFKA-16458:
---

 Summary: Add contains method in KeyValue store interface
 Key: KAFKA-16458
 URL: https://issues.apache.org/jira/browse/KAFKA-16458
 Project: Kafka
  Issue Type: Wish
  Components: streams
Reporter: Ayoub Omari


In some stream processors, we sometimes just want to check if a key exists in 
the state store or not.

 

I find calling .get() and checking if the return value is null a little bit 
verbose
{code:java}
if (store.get(key) != null) {

}{code}
 

But I am not sure if it is on purpose that we would like to keep the store 
interface simple.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16434) ForeignKey INNER join does not unset join result when FK becomes null

2024-03-27 Thread Ayoub Omari (Jira)
Ayoub Omari created KAFKA-16434:
---

 Summary: ForeignKey INNER join does not unset join result when FK 
becomes null
 Key: KAFKA-16434
 URL: https://issues.apache.org/jira/browse/KAFKA-16434
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.7.0, 2.8.2
Reporter: Ayoub Omari


We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
resulting join value is the value in right-topic. (same topology example as in 
KAFKA-16407)

 

*Scenario: Unset foreign key of a primary key*
{code:scala}
rightTopic.pipeInput("fk1", "1")

leftTopic.pipeInput("pk1", ProductValue("fk1", "pk1")) 
leftTopic.pipeInput("pk1", ProductValue(null, "pk1"))
{code}
 

*+Actual result+*

 
{code:java}
KeyValue(pk1, 3) {code}
 

*+Expected result+*

 
{code:java}
KeyValue(pk1, 3)
KeyValue(pk1, null) // This unsets the join between pk1 and fk1{code}
 

However, in {+}other cases{+}, where the join result should be unset (e.g. the 
primary key is deleted, or the foreign key changes to a non existing FK), that 
record is {+}correctly emitted{+}.

 

Also, the importance of unsetting the join result is mentioned in the code: 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L161C21-L163C36

 
{code:java}
//[...] Additionally, propagate null if no FK is found there,
// since we must "unset" any output set by the previous FK-join. This is true 
for both INNER and LEFT join. {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16407) ForeignKey INNER join ignores FK change when its previous value is null

2024-03-22 Thread Ayoub Omari (Jira)
Ayoub Omari created KAFKA-16407:
---

 Summary: ForeignKey INNER join ignores FK change when its previous 
value is null
 Key: KAFKA-16407
 URL: https://issues.apache.org/jira/browse/KAFKA-16407
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.7.0
Reporter: Ayoub Omari


We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
resulting join value is the value in right-topic.

 

*Scenario: Primary key pk1 gets mapped to a new FK after having a null FK*
{code:scala}
rightTopic.pipeInput("fk", "1")

leftTopic.pipeInput("pk1", LeftRecord(null, "pk1"))
leftTopic.pipeInput("pk1", LeftRecord("fk", "pk1")) {code}
 

*+Expected result+*
{code:scala}
KeyValue(pk1, 1){code}
 

*+Actual result+*
{code:scala}
# No output !

# Logs:

20:14:29,723 WARN  
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
  - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
topic=[left-topic] partition=[0] offset=[0]

20:14:29,728 WARN  
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
  - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
topic=[left-topic] partition=[0] offset=[1]
{code}
 

After looking into the code, I believe this is the line behind the issue : 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L147



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16394) ForeignKey LEFT join propagates null value on foreignKey change

2024-03-20 Thread Ayoub Omari (Jira)
Ayoub Omari created KAFKA-16394:
---

 Summary: ForeignKey LEFT join propagates null value on foreignKey 
change
 Key: KAFKA-16394
 URL: https://issues.apache.org/jira/browse/KAFKA-16394
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.7.0
Reporter: Ayoub Omari
 Attachments: ForeignJoinTest.scala, JsonSerde.scala

We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple foreign key join on left-topic's foreignKey field which returns 
the value in right-topic.

 

+*Scenario1: change foreignKey*+

Input the following
{code:scala}
rightTopic.pipeInput("fk1", "1")
rightTopic.pipeInput("fk2", "2") 

leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1"))
{code}
 

*+Expected result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, 2){code}
 

*+Actual result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, null)
KeyValue(pk1, 2){code}
 

A null is propagated to the join result when the foreign key changes

 

+*Scenario 2: Delete PrimaryKey*+

Input
{code:scala}
rightTopic.pipeInput("fk1", "1")
rightTopic.pipeInput("fk2", "2")

leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
leftTopic.pipeInput("pk1", null) {code}
 

*+Expected result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, null) {code}
 

*+Actual result+*
{code:java}
KeyValue(pk1, 1)
KeyValue(pk1, null)
KeyValue(pk1, null) {code}
An additional null is propagated to the join result.

 

This bug doesn't exist on versions 3.6.0 and below.

 

I believe the issue comes from the line 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L134]

where we propagate the deletion in the two scenarios above

 

Attaching the topology I used.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16343) Improve tests of streams foreignkey package

2024-03-04 Thread Ayoub Omari (Jira)
Ayoub Omari created KAFKA-16343:
---

 Summary: Improve tests of streams foreignkey package
 Key: KAFKA-16343
 URL: https://issues.apache.org/jira/browse/KAFKA-16343
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 3.7.0
Reporter: Ayoub Omari
Assignee: Ayoub Omari


Some classes are not tested in streams foreignkey package, such as 
SubscriptionSendProcessorSupplier and ForeignTableJoinProcessorSupplier. 
Corresponding tests should be added.

The class ForeignTableJoinProcessorSupplierTest should be renamed as it is not 
testing ForeignTableJoinProcessor, but rather SubscriptionJoinProcessorSupplier.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)