[jira] [Commented] (KAFKA-16382) Kafka Streams drop NULL values after reset

2024-04-30 Thread Stanislav Spiridonov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17842293#comment-17842293
 ] 

Stanislav Spiridonov commented on KAFKA-16382:
--

Any updates/estimations on fix?

> Kafka Streams drop NULL values after reset
> --
>
> Key: KAFKA-16382
> URL: https://issues.apache.org/jira/browse/KAFKA-16382
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> Kafka Streams (KTable) drops null values after full reset.
> See 
> [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java]
>  for sample topology
> Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics)
>  # Start example - 1st round
>  # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull"
>  # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab"
>  # Stop application 
>  # Run kafka-streams-application-reset 
> {code:java}
> call bin/windows/kafka-streams-application-reset --application-id 
> nullproblem-example^
>  --input-topics "NULL-IN,NULL-IN-AUX"^
>  --bootstrap-server "localhost:9092"
> {code}
>  # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app 
> running yet)
>  # Start example - 2nd round
>  # After initialization -> NULL-OUT *still contains* 2 messages "A1:anull, 
> A1:ab"
>  # Expected output *3 messages* "A1:anull, A1:ab, {*}A1:{*}"
> The issue is NOT reproduced if application just restarted (skip step 5). 
> The issue is NOT reproduced if internal cache is disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor

2024-04-26 Thread Stanislav Spiridonov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841079#comment-17841079
 ] 

Stanislav Spiridonov commented on KAFKA-16585:
--

Ok. Let's not make simple things complicated. If the processor is more suitable 
and doesn't (will not) additional overhead - it is the best option to implement 
such things.

> No way to forward message from punctuation method in the FixedKeyProcessor
> --
>
> Key: KAFKA-16585
> URL: https://issues.apache.org/jira/browse/KAFKA-16585
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> The FixedKeyProcessorContext can forward only FixedKeyRecord. This class 
> doesn't have a public constructor and can be created based on existing 
> records. But such record usually is absent in the punctuation method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor

2024-04-25 Thread Stanislav Spiridonov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840664#comment-17840664
 ] 

Stanislav Spiridonov commented on KAFKA-16585:
--

# I can use the regular Processor, but as I understand it add some overhead 
comparing with FixedKeyProcessor
 # Really, I think FixedKeyProcessor do not need to be "ensure that the key is 
not changed". IMHO there is enough to have a key from the same partition. So, 
if you will provide the way to generate the *FixedKeyRecord* from any local 
store it will be enough.
 # As variant FixedKeyProcessor have to use own internal (optional) store for 
allowed keys to share them with punktuators, and generate the *FixedKeyRecord* 
from it.

> No way to forward message from punctuation method in the FixedKeyProcessor
> --
>
> Key: KAFKA-16585
> URL: https://issues.apache.org/jira/browse/KAFKA-16585
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> The FixedKeyProcessorContext can forward only FixedKeyRecord. This class 
> doesn't have a public constructor and can be created based on existing 
> records. But such record usually is absent in the punctuation method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor

2024-04-22 Thread Stanislav Spiridonov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839791#comment-17839791
 ] 

Stanislav Spiridonov commented on KAFKA-16585:
--

If you check the example on 
[https://developer.confluent.io/tutorials/kafka-streams-schedule-operations/kstreams.html]
 you will see the same situation *transform* vs {*}transformValues{*}. 
{noformat}
I used transform in this tutorial as it makes for a better example because you 
can use the ProcessorContext.forward method.{noformat}

> No way to forward message from punctuation method in the FixedKeyProcessor
> --
>
> Key: KAFKA-16585
> URL: https://issues.apache.org/jira/browse/KAFKA-16585
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> The FixedKeyProcessorContext can forward only FixedKeyRecord. This class 
> doesn't have a public constructor and can be created based on existing 
> records. But such record usually is absent in the punctuation method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor

2024-04-22 Thread Stanislav Spiridonov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838979#comment-17838979
 ] 

Stanislav Spiridonov edited comment on KAFKA-16585 at 4/22/24 4:47 PM:
---

The case is relatively simple. I have KTable with entities that have to be 
enrichment with icon attribute from side service. So, the processor maintains 
the internal store with entities keys and periodically ask the service for 
update for registered ids. If icon has changes it forward the message with new 
icon. The key of record is entity key (String), value is a icon (String).


was (Author: foal):
The case is relatively simple. I have KTable with entities that have to be 
enrichment with icon attribute from side service. So, the processor maintains 
the internal store with entities keys and periodically ask the service for 
update for registered ids. If icon has changes it forward the message with new 
icon. The key of record is entity key (String), value is a icon (String).

 

BTW I faced into strange behaviour - if I forward new record from another 
thread it arrived to incorrect processor. So now I just update store from icon 
KTable instead of forward the record.   

> No way to forward message from punctuation method in the FixedKeyProcessor
> --
>
> Key: KAFKA-16585
> URL: https://issues.apache.org/jira/browse/KAFKA-16585
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> The FixedKeyProcessorContext can forward only FixedKeyRecord. This class 
> doesn't have a public constructor and can be created based on existing 
> records. But such record usually is absent in the punctuation method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor

2024-04-19 Thread Stanislav Spiridonov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838979#comment-17838979
 ] 

Stanislav Spiridonov commented on KAFKA-16585:
--

The case is relatively simple. I have KTable with entities that have to be 
enrichment with icon attribute from side service. So, the processor maintains 
the internal store with entities keys and periodically ask the service for 
update for registered ids. If icon has changes it forward the message with new 
icon. The key of record is entity key (String), value is a icon (String).

 

BTW I faced into strange behaviour - if I forward new record from another 
thread it arrived to incorrect processor. So now I just update store from icon 
KTable instead of forward the record.   

> No way to forward message from punctuation method in the FixedKeyProcessor
> --
>
> Key: KAFKA-16585
> URL: https://issues.apache.org/jira/browse/KAFKA-16585
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.2
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> The FixedKeyProcessorContext can forward only FixedKeyRecord. This class 
> doesn't have a public constructor and can be created based on existing 
> records. But such record usually is absent in the punctuation method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16585) No way to forward message from punctuation method in the FixedKeyProcessor

2024-04-18 Thread Stanislav Spiridonov (Jira)
Stanislav Spiridonov created KAFKA-16585:


 Summary: No way to forward message from punctuation method in the 
FixedKeyProcessor
 Key: KAFKA-16585
 URL: https://issues.apache.org/jira/browse/KAFKA-16585
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.6.2
Reporter: Stanislav Spiridonov


The FixedKeyProcessorContext can forward only FixedKeyRecord. This class 
doesn't have a public constructor and can be created based on existing records. 
But such record usually is absent in the punctuation method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16382) Kafka Streams drop NULL values after reset

2024-03-28 Thread Stanislav Spiridonov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Spiridonov updated KAFKA-16382:
-
Description: 
Kafka Streams (KTable) drops null values after full reset.

See 
[https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java]
 for sample topology

Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics)
 # Start example - 1st round
 # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull"
 # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab"
 # Stop application 
 # Run kafka-streams-application-reset 
{code:java}
call bin/windows/kafka-streams-application-reset --application-id 
nullproblem-example^
 --input-topics "NULL-IN,NULL-IN-AUX"^
 --bootstrap-server "localhost:9092"
{code}

 # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app 
running yet)
 # Start example - 2nd round
 # After initialization -> NULL-OUT *still contains* 2 messages "A1:anull, 
A1:ab"
 # Expected output *3 messages* "A1:anull, A1:ab, {*}A1:{*}"

The issue is NOT reproduced if application just restarted (skip step 5). 

The issue is NOT reproduced if internal cache is disabled.

  was:
Kafka Streams (KTable) drops null values after full reset.

See 
[https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java]
 for sample topology

Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics)
 # Start example - 1st round
 # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull"
 # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab"
 # Stop application 
 # Run kafka-streams-application-reset 
{code:java}
call bin/windows/kafka-streams-application-reset --application-id 
nullproblem-example^
 --input-topics "NULL-IN,NULL-IN-AUX"^
 --bootstrap-server "localhost:9092"
{code}

 # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app 
running yet)
 # Start example - 2nd round
 # After initialization -> NULL-OUT *still contains* "A1:anull, A1:ab"
 # Expected output "A1:anull, A1:ab, {*}A1:{*}"

The issue is NOT reproduced if application just restarted (skip step 5). 

The issue is NOT reproduced if internal cache is disabled.


> Kafka Streams drop NULL values after reset
> --
>
> Key: KAFKA-16382
> URL: https://issues.apache.org/jira/browse/KAFKA-16382
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> Kafka Streams (KTable) drops null values after full reset.
> See 
> [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java]
>  for sample topology
> Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics)
>  # Start example - 1st round
>  # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull"
>  # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab"
>  # Stop application 
>  # Run kafka-streams-application-reset 
> {code:java}
> call bin/windows/kafka-streams-application-reset --application-id 
> nullproblem-example^
>  --input-topics "NULL-IN,NULL-IN-AUX"^
>  --bootstrap-server "localhost:9092"
> {code}
>  # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app 
> running yet)
>  # Start example - 2nd round
>  # After initialization -> NULL-OUT *still contains* 2 messages "A1:anull, 
> A1:ab"
>  # Expected output *3 messages* "A1:anull, A1:ab, {*}A1:{*}"
> The issue is NOT reproduced if application just restarted (skip step 5). 
> The issue is NOT reproduced if internal cache is disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16382) Kafka Streams drop NULL values after reset

2024-03-28 Thread Stanislav Spiridonov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Spiridonov updated KAFKA-16382:
-
Description: 
Kafka Streams (KTable) drops null values after full reset.

See 
[https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java]
 for sample topology

Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics)
 # Start example - 1st round
 # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull"
 # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab"
 # Stop application 
 # Run kafka-streams-application-reset 
{code:java}
call bin/windows/kafka-streams-application-reset --application-id 
nullproblem-example^
 --input-topics "NULL-IN,NULL-IN-AUX"^
 --bootstrap-server "localhost:9092"
{code}

 # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app 
running yet)
 # Start example - 2nd round
 # After initialization -> NULL-OUT *still contains* "A1:anull, A1:ab"
 # Expected output "A1:anull, A1:ab, {*}A1:{*}"

The issue is NOT reproduced if application just restarted (skip step 5). 

The issue is NOT reproduced if internal cache is disabled.

  was:
Kafka Streams (KTable) drops null values after full reset.

See 
[https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java]
 for sample topology

Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics)
 # Start example - 1st round
 # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull"
 # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab"
 # Stop application 
 # Run kafka-streams-application-reset 
{code:java}
call bin/windows/kafka-streams-application-reset --application-id 
nullproblem-example^
 --input-topics "NULL-IN,NULL-IN-AUX"^
 --bootstrap-server "localhost:9092"
{code}

 # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app 
running yet)
 # Start example - 2nd round
 # After initialization -> NULL-OUT *still contains* "A1:anull, A1:ab"
 # Expected output "A1:anull, A1:ab, A1:"

The issue is NOT reproduced if application just restarted (skip step 5). 

The issue is NOT reproduced if internal cache is disabled.


> Kafka Streams drop NULL values after reset
> --
>
> Key: KAFKA-16382
> URL: https://issues.apache.org/jira/browse/KAFKA-16382
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> Kafka Streams (KTable) drops null values after full reset.
> See 
> [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java]
>  for sample topology
> Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics)
>  # Start example - 1st round
>  # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull"
>  # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab"
>  # Stop application 
>  # Run kafka-streams-application-reset 
> {code:java}
> call bin/windows/kafka-streams-application-reset --application-id 
> nullproblem-example^
>  --input-topics "NULL-IN,NULL-IN-AUX"^
>  --bootstrap-server "localhost:9092"
> {code}
>  # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app 
> running yet)
>  # Start example - 2nd round
>  # After initialization -> NULL-OUT *still contains* "A1:anull, A1:ab"
>  # Expected output "A1:anull, A1:ab, {*}A1:{*}"
> The issue is NOT reproduced if application just restarted (skip step 5). 
> The issue is NOT reproduced if internal cache is disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16382) Kafka Streams drop NULL values after reset

2024-03-27 Thread Stanislav Spiridonov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831228#comment-17831228
 ] 

Stanislav Spiridonov edited comment on KAFKA-16382 at 3/27/24 8:57 AM:
---

Wait, why is it not a bug? I have the real scenario. 

Good scenario:
 # Some events happened. Output topic contains mapped result.
 # Event ends ({_}*null*{_} body).  Output topic contains delete message for 
the event. 

Wrong scenario:
 # Some events happened. Output topic contains mapped result.
 # We stopped Kafka, Make some updates + {*}full rest{*}. 
 # Meanwhile event ends ({*}_null_{*} body).  
 # We start Kafka, it process the input topic from scratch but "optimise" 
internally nulls. The output topic *still* contains mapped result. The delete 
message *never* reach the output topic.

As work around we can clear the output topic before Kafka start. But in out 
system the output topic is input for another Kafka Stream application so we 
need to reset all subsequent Kafka Stream applications to correct this 
behaviour.

Another workaround is on each delete message (null body) generate synthetic 
message with synthetic key and some value in the body (it will not optimised) 
and check for such messages on write to output topic and generate back the 
delete message to the output, but it is also looks as a hack.

 


was (Author: foal):
Wait, why is it not a bug? I have the real scenario. 

Good scenario:
 # Some events happened. Output topic contains mapped result.
 # Event ends ({_}*null*{_} body).  Output topic contains delete message for 
the event. 

Wrong scenario:
 # Some events happened. Output topic contains mapped result.
 # We stopped Kafka, Make some updates + {*}full rest{*}. 
 # Meanwhile event ends ({*}_null_{*} body).  
 # We start Kafka, it process the input topic from scratch but "optimise" 
internally nulls. The output topic *still* contains mapped result. The delete 
message *never* reach the output topic.

As work around we can clear the output topic before Kafka start. But in out 
system the output topic is input for another Kafka Stream application so we 
need to reset all subsequent Kafka Stream applications to correct this 
behaviour.

Another workaround is on each null body generate another synthetic message with 
another key and some value in the body (it will not optimised) and check for 
such messages on write to output topic and generate back the delete message, 
but it is also looks as a hack.

 

> Kafka Streams drop NULL values after reset
> --
>
> Key: KAFKA-16382
> URL: https://issues.apache.org/jira/browse/KAFKA-16382
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> Kafka Streams (KTable) drops null values after full reset.
> See 
> [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java]
>  for sample topology
> Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics)
>  # Start example - 1st round
>  # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull"
>  # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab"
>  # Stop application 
>  # Run kafka-streams-application-reset 
> {code:java}
> call bin/windows/kafka-streams-application-reset --application-id 
> nullproblem-example^
>  --input-topics "NULL-IN,NULL-IN-AUX"^
>  --bootstrap-server "localhost:9092"
> {code}
>  # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app 
> running yet)
>  # Start example - 2nd round
>  # After initialization -> NULL-OUT *still contains* "A1:anull, A1:ab"
>  # Expected output "A1:anull, A1:ab, A1:"
> The issue is NOT reproduced if application just restarted (skip step 5). 
> The issue is NOT reproduced if internal cache is disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16382) Kafka Streams drop NULL values after reset

2024-03-27 Thread Stanislav Spiridonov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831228#comment-17831228
 ] 

Stanislav Spiridonov edited comment on KAFKA-16382 at 3/27/24 8:55 AM:
---

Wait, why is it not a bug? I have the real scenario. 

Good scenario:
 # Some events happened. Output topic contains mapped result.
 # Event ends ({_}*null*{_} body).  Output topic contains delete message for 
the event. 

Wrong scenario:
 # Some events happened. Output topic contains mapped result.
 # We stopped Kafka, Make some updates + {*}full rest{*}. 
 # Meanwhile event ends ({*}_null_{*} body).  
 # We start Kafka, it process the input topic from scratch but "optimise" 
internally nulls. The output topic *still* contains mapped result. The delete 
message *never* reach the output topic.

As work around we can clear the output topic before Kafka start. But in out 
system the output topic is input for another Kafka Stream application so we 
need to reset all subsequent Kafka Stream applications to correct this 
behaviour.

Another workaround is on each null body generate another synthetic message with 
another key and some value in the body (it will not optimised) and check for 
such messages on write to output topic and generate back the delete message, 
but it is also looks as a hack.

 


was (Author: foal):
Wait, why is it not a bug? I have the real scenario. 

Good scenario:
 # Some events happened. Output topic contains mapped result.
 # Event ends ({_}*null*{_} body).  Output topic contains delete message for 
the event. 

Wrong scenario:
 # Some events happened. Output topic contains mapped result.
 # We stopped Kafka, Make some updates + {*}full rest{*}. 
 # Meanwhile event ends ({*}_null_{*} body).  
 # We start Kafka, it process the input topic from scratch but "optimise" 
internally nulls. The output topic *still* contains mapped result. The delete 
message *never* reach the output topic.

As work around we can clear the output topic, But in out system the output 
topic is input for another Kafka Stream application so we need to reset all 
subsequent Kafka Stream applications to correct this behaviour.

Another workaround is on each null body generate another synthetic message with 
another key and some value in the body (it will not optimised) and check for 
such messages on write to output topic and generate back the delete message, 
but it is also looks as a hack.

 

> Kafka Streams drop NULL values after reset
> --
>
> Key: KAFKA-16382
> URL: https://issues.apache.org/jira/browse/KAFKA-16382
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> Kafka Streams (KTable) drops null values after full reset.
> See 
> [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java]
>  for sample topology
> Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics)
>  # Start example - 1st round
>  # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull"
>  # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab"
>  # Stop application 
>  # Run kafka-streams-application-reset 
> {code:java}
> call bin/windows/kafka-streams-application-reset --application-id 
> nullproblem-example^
>  --input-topics "NULL-IN,NULL-IN-AUX"^
>  --bootstrap-server "localhost:9092"
> {code}
>  # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app 
> running yet)
>  # Start example - 2nd round
>  # After initialization -> NULL-OUT *still contains* "A1:anull, A1:ab"
>  # Expected output "A1:anull, A1:ab, A1:"
> The issue is NOT reproduced if application just restarted (skip step 5). 
> The issue is NOT reproduced if internal cache is disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16382) Kafka Streams drop NULL values after reset

2024-03-27 Thread Stanislav Spiridonov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831230#comment-17831230
 ] 

Stanislav Spiridonov commented on KAFKA-16382:
--

BTW if you take the wrong scenario from previous comment but instead of 
*delete* message the input topic will contains an *update* message for the 
event, all will works correct - the output topic will contain the updated 
version of the event.

So, as you can see the {*}update{*}, as well as *add* will work correctly in 
case of full reset and the only *delete* are broken.

> Kafka Streams drop NULL values after reset
> --
>
> Key: KAFKA-16382
> URL: https://issues.apache.org/jira/browse/KAFKA-16382
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> Kafka Streams (KTable) drops null values after full reset.
> See 
> [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java]
>  for sample topology
> Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics)
>  # Start example - 1st round
>  # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull"
>  # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab"
>  # Stop application 
>  # Run kafka-streams-application-reset 
> {code:java}
> call bin/windows/kafka-streams-application-reset --application-id 
> nullproblem-example^
>  --input-topics "NULL-IN,NULL-IN-AUX"^
>  --bootstrap-server "localhost:9092"
> {code}
>  # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app 
> running yet)
>  # Start example - 2nd round
>  # After initialization -> NULL-OUT *still contains* "A1:anull, A1:ab"
>  # Expected output "A1:anull, A1:ab, A1:"
> The issue is NOT reproduced if application just restarted (skip step 5). 
> The issue is NOT reproduced if internal cache is disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16382) Kafka Streams drop NULL values after reset

2024-03-27 Thread Stanislav Spiridonov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831228#comment-17831228
 ] 

Stanislav Spiridonov commented on KAFKA-16382:
--

Wait, why is it not a bug? I have the real scenario. 

Good scenario:
 # Some events happened. Output topic contains mapped result.
 # Event ends ({_}*null*{_} body).  Output topic contains delete message for 
the event. 

Wrong scenario:
 # Some events happened. Output topic contains mapped result.
 # We stopped Kafka, Make some updates + {*}full rest{*}. 
 # Meanwhile event ends ({*}_null_{*} body).  
 # We start Kafka, it process the input topic from scratch but "optimise" 
internally nulls. The output topic *still* contains mapped result. The delete 
message *never* reach the output topic.

As work around we can clear the output topic, But in out system the output 
topic is input for another Kafka Stream application so we need to reset all 
subsequent Kafka Stream applications to correct this behaviour.

Another workaround is on each null body generate another synthetic message with 
another key and some value in the body (it will not optimised) and check for 
such messages on write to output topic and generate back the delete message, 
but it is also looks as a hack.

 

> Kafka Streams drop NULL values after reset
> --
>
> Key: KAFKA-16382
> URL: https://issues.apache.org/jira/browse/KAFKA-16382
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> Kafka Streams (KTable) drops null values after full reset.
> See 
> [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java]
>  for sample topology
> Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics)
>  # Start example - 1st round
>  # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull"
>  # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab"
>  # Stop application 
>  # Run kafka-streams-application-reset 
> {code:java}
> call bin/windows/kafka-streams-application-reset --application-id 
> nullproblem-example^
>  --input-topics "NULL-IN,NULL-IN-AUX"^
>  --bootstrap-server "localhost:9092"
> {code}
>  # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app 
> running yet)
>  # Start example - 2nd round
>  # After initialization -> NULL-OUT *still contains* "A1:anull, A1:ab"
>  # Expected output "A1:anull, A1:ab, A1:"
> The issue is NOT reproduced if application just restarted (skip step 5). 
> The issue is NOT reproduced if internal cache is disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16382) Kafka Streams drop NULL values after reset

2024-03-18 Thread Stanislav Spiridonov (Jira)
Stanislav Spiridonov created KAFKA-16382:


 Summary: Kafka Streams drop NULL values after reset
 Key: KAFKA-16382
 URL: https://issues.apache.org/jira/browse/KAFKA-16382
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.6.1
Reporter: Stanislav Spiridonov


Kafka Streams (KTable) drops null values after full reset.

See 
[https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java]
 for sample topology

Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics)
 # Start example - 1st round
 # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull"
 # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab"
 # Stop application 
 # Run kafka-streams-application-reset 
{code:java}
call bin/windows/kafka-streams-application-reset --application-id 
nullproblem-example^
 --input-topics "NULL-IN,NULL-IN-AUX"^
 --bootstrap-server "localhost:9092"
{code}

 # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app 
running yet)
 # Start example - 2nd round
 # After initialization -> NULL-OUT *still contains* "A1:anull, A1:ab"
 # Expected output "A1:anull, A1:ab, A1:"

The issue is NOT reproduced if application just restarted (skip step 5). 

The issue is NOT reproduced if internal cache is disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10659) Cogroup topology generation fails if input streams are repartitioned

2024-01-23 Thread Stanislav Spiridonov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809859#comment-17809859
 ] 

Stanislav Spiridonov commented on KAFKA-10659:
--

The same issue here. The workaround with groupByKey -> with works but I need to 
create these topics manually

> Cogroup topology generation fails if input streams are repartitioned
> 
>
> Key: KAFKA-10659
> URL: https://issues.apache.org/jira/browse/KAFKA-10659
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0, 2.5.1
>Reporter: blueedgenick
>Priority: Major
>
> Example to reproduce:
>  
> {code:java}
> KGroupedStream groupedA = builder
>   .stream(topicA, Consumed.with(Serdes.String(), serdeA))
>   .selectKey((aKey, aVal) -> aVal.someId)
>   .groupByKey();
> KGroupedStream groupedB = builder
>   .stream(topicB, Consumed.with(Serdes.String(), serdeB))
>   .selectKey((bKey, bVal) -> bVal.someId)
>   .groupByKey();
> KGroupedStream groupedC = builder
>   .stream(topicC, Consumed.with(Serdes.String(), serdeC))
>   .selectKey((cKey, cVal) -> cVal.someId)
>   .groupByKey();
> CogroupedKStream cogroup = groupedA.cogroup(AggregatorA)
>   .cogroup(groupedB, AggregatorB)
>  .  cogroup(groupedC, AggregatorC);
> // Aggregate all streams of the cogroup
>  KTable agg = cogroup.aggregate(
>   () -> new ABC(),
>   Named.as("my-agg-proc-name"),
>   Materialized.>as(
>  "abc-agg-store") 
>  .withKeySerde(Serdes.String())
>  .withValueSerde(serdeABC)
>  );
> {code}
>  
>  
> This throws an exception during topology generation: 
>  
> {code:java}
> org.apache.kafka.streams.errors.TopologyException: Invalid topology: 
> Processor abc-agg-store-repartition-filter is already added. at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addProcessor(Inter
> nalTopologyBuilder.java:485)`
>  at 
> org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:70)
>  at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:307)
>  at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:564)
>  at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:553)
>  at ...
> {code}
>  
> The same exception is observed if the `selectKey(...).groupByKey()`  pattern 
> is replaced with `groupBy(...)`.
> This behavior is observed with topology optimization at default state, 
> explicitly set off, or explicitly set on.
> Interestingly the problem is avoided, and a workable topology produced,, if 
> the grouping step is named by passing a `Grouped.with(...)` expression to 
> either `groupByKey`` or `groupBy`.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)