[jira] [Comment Edited] (KAFKA-10722) Timestamped store is used even if not desired

2020-11-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17232993#comment-17232993
 ] 

Matthias J. Sax edited comment on KAFKA-10722 at 11/16/20, 6:59 PM:


Every record in Kafka Streams has a timestamp, and aggregate() needs to set a 
timestamp for its output records. It computes the output record timestamps as 
"max" over all input records. That is why it needs a timestamped key-value 
store to track the maximum timestamp.

Unfortunately, we cannot deprecate the API easily because of Java type 
erasure... I guess we could log a warn message thought... Feel free to do a PR 
for it. We could log when we create the 
`KeyValueToTimestampedKeyValueByteStoreAdapter`.

And yes, you always get a timestamped key-value store and you can simplify your 
code accordingly. (Note thought, that if you provide a non-timestamped store, 
the timestamp won't really be stored, because the above mentioned adapter will 
just drop the timestamp before storing the data in the provided store – on 
read, the adapter will just set `-1`, ie, unknown, as timestamp.)

Thus, I would actually recommend to pass in a timestamped key-value store to 
begin with.  On the other hand, why do you pass in a store at all? I seem you 
actually only want to set a name (to be able to access the store from the other 
`Processor`) what you can do via `Materialized.as("MyStore")` – passing in a 
`StoreSupplier` should be used if you want to pass in your own custom store 
implementation. As you create the store using `Stores` anyway, you can just let 
KS DSL create the store for you.

I am also open to improve our docs, to point out this issue better. Atm, it 
seem we only documented in the upgrade guide when the feature was added: 
[https://kafka.apache.org/26/documentation/streams/upgrade-guide#streams_api_changes_230]


was (Author: mjsax):
Every record in Kafka Streams has a timestamp, and aggregate() needs to set a 
timestamp for its output records. It computes the output record timestamps as 
"max" over all input records. That is why it needs a timestamped key-value 
store to track the maximum timestamp.

Unfortunately, we cannot deprecate the API easily because of Java type 
erasure... I guess we could log a warn message thought... Feel free to do a PR 
for it. We could log when we create the 
`KeyValueToTimestampedKeyValueByteStoreAdapter`.

And yes, you always get a timestamped key-value store and you can simplify your 
code accordingly. (Note thought, that if you provide a non-timestamped store, 
the timestamp won't really be stored, because the above mentioned adapter will 
just drop the timestamp before storing the data in the provided store – on 
read, the adapter will just set `-1`, ie, unknown, as timestamp.)

Thus, I would actually recommend to pass in a timestamped key-value store to 
begin with. 

I am also open to improve our docs, to point out this issue better. Atm, it 
seem we only documented in the upgrade guide when the feature was added: 
https://kafka.apache.org/26/documentation/streams/upgrade-guide#streams_api_changes_230

> Timestamped store is used even if not desired
> -
>
> Key: KAFKA-10722
> URL: https://issues.apache.org/jira/browse/KAFKA-10722
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.1, 2.6.0
>Reporter: fml2
>Priority: Major
>
> I have a stream which I then group and aggregate (this results in a KTable). 
> When aggregating, I explicitly tell to materialize the result table using a 
> usual (not timestamped) store.
> After that, the KTable is filtered and streamed. This stream is processed by 
> a processor that accesses the store.
> The problem/bug is that even if I tell to use a non-timestamped store, a 
> timestamped one is used, which leads to a ClassCastException in the processor 
> (it iterates over the store and expects the items to be of type "KeyValue" 
> but they are of type "ValueAndTimestamp").
> Here is the code (schematically).
> First, I define the topology:
> {code:java}
> KTable table = ...aggregate(
>   initializer, // initializer for the KTable row
>   aggregator, // aggregator
>   Materialized.as(Stores.persistentKeyValueStore("MyStore")) // <-- 
> Non-Timestamped!
> .withKeySerde(...).withValueSerde(...));
> table.toStream().process(theProcessor);
> {code}
> In the class for the processor:
> {code:java}
> public void init(ProcessorContext context) {
>var store = context.getStateStore("MyStore"); // Returns a 
> TimestampedKeyValueStore!
> }
> {code}
> A timestamped store is returned even if I explicitly told to use a 
> non-timestamped one!
>  
> I tried to find the cause for this behaviour and think that I've found it. It 
> lies in this line: 
> 

[jira] [Comment Edited] (KAFKA-10722) Timestamped store is used even if not desired

2020-11-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17232993#comment-17232993
 ] 

Matthias J. Sax edited comment on KAFKA-10722 at 11/16/20, 6:55 PM:


Every record in Kafka Streams has a timestamp, and aggregate() needs to set a 
timestamp for its output records. It computes the output record timestamps as 
"max" over all input records. That is why it needs a timestamped key-value 
store to track the maximum timestamp.

Unfortunately, we cannot deprecate the API easily because of Java type 
erasure... I guess we could log a warn message thought... Feel free to do a PR 
for it. We could log when we create the 
`KeyValueToTimestampedKeyValueByteStoreAdapter`.

And yes, you always get a timestamped key-value store and you can simplify your 
code accordingly. (Note thought, that if you provide a non-timestamped store, 
the timestamp won't really be stored, because the above mentioned adapter will 
just drop the timestamp before storing the data in the provided store – on 
read, the adapter will just set `-1`, ie, unknown, as timestamp.)

Thus, I would actually recommend to pass in a timestamped key-value store to 
begin with. 

I am also open to improve our docs, to point out this issue better. Atm, it 
seem we only documented in the upgrade guide when the feature was added: 
https://kafka.apache.org/26/documentation/streams/upgrade-guide#streams_api_changes_230


was (Author: mjsax):
Every record in Kafka Streams has a timestamp, and aggregate() needs to set a 
timestamp for its output records. It computes the output record timestamps as 
"max" over all input records. That is why it needs a timestamped key-value 
store to track the maximum timestamp.

Unfortunately, we cannot deprecate the API easily because of Java type 
erasure... I guess we could log a warn message thought... Feel free to do a PR 
for it. We could log when we create the 
`KeyValueToTimestampedKeyValueByteStoreAdapter`.

And yes, you always get a timestamped key-value store and you can simplify your 
code accordingly. (Note thought, that if you provide a non-timestamped store, 
the timestamp won't really be stored, because the above mentioned adapter will 
just drop the timestamp before storing the data in the provided store – on 
read, the adapter will just set `-1`, ie, unknown, as timestamp.)

Thus, I would actually recommend to pass in a timestamped key-value store to 
begin with. 

> Timestamped store is used even if not desired
> -
>
> Key: KAFKA-10722
> URL: https://issues.apache.org/jira/browse/KAFKA-10722
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.1, 2.6.0
>Reporter: fml2
>Priority: Major
>
> I have a stream which I then group and aggregate (this results in a KTable). 
> When aggregating, I explicitly tell to materialize the result table using a 
> usual (not timestamped) store.
> After that, the KTable is filtered and streamed. This stream is processed by 
> a processor that accesses the store.
> The problem/bug is that even if I tell to use a non-timestamped store, a 
> timestamped one is used, which leads to a ClassCastException in the processor 
> (it iterates over the store and expects the items to be of type "KeyValue" 
> but they are of type "ValueAndTimestamp").
> Here is the code (schematically).
> First, I define the topology:
> {code:java}
> KTable table = ...aggregate(
>   initializer, // initializer for the KTable row
>   aggregator, // aggregator
>   Materialized.as(Stores.persistentKeyValueStore("MyStore")) // <-- 
> Non-Timestamped!
> .withKeySerde(...).withValueSerde(...));
> table.toStream().process(theProcessor);
> {code}
> In the class for the processor:
> {code:java}
> public void init(ProcessorContext context) {
>var store = context.getStateStore("MyStore"); // Returns a 
> TimestampedKeyValueStore!
> }
> {code}
> A timestamped store is returned even if I explicitly told to use a 
> non-timestamped one!
>  
> I tried to find the cause for this behaviour and think that I've found it. It 
> lies in this line: 
> [https://github.com/apache/kafka/blob/cfc813537e955c267106eea989f6aec4879e14d7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java#L241]
> There, TimestampedKeyValueStoreMaterializer is used regardless of whether 
> materialization supplier is a timestamped one or not.
> I think this is a bug.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10722) Timestamped store is used even if not desired

2020-11-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17232993#comment-17232993
 ] 

Matthias J. Sax edited comment on KAFKA-10722 at 11/16/20, 6:50 PM:


Every record in Kafka Streams has a timestamp, and aggregate() needs to set a 
timestamp for its output records. It computes the output record timestamps as 
"max" over all input records. That is why it needs a timestamped key-value 
store to track the maximum timestamp.

Unfortunately, we cannot deprecate the API easily because of Java type 
erasure... I guess we could log a warn message thought... Feel free to do a PR 
for it. We could log when we create the 
`KeyValueToTimestampedKeyValueByteStoreAdapter`.

And yes, you always get a timestamped key-value store and you can simplify your 
code accordingly. (Note thought, that if you provide a non-timestamped store, 
the timestamp won't really be stored, because the above mentioned adapter will 
just drop the timestamp before storing the data in the provided store – on 
read, the adapter will just set `-1`, ie, unknown, as timestamp.)

Thus, I would actually recommend to pass in a timestamped key-value store to 
begin with. 


was (Author: mjsax):
Every record in Kafka Streams has a timestamp, and aggregate() needs to set a 
timestamp for its output records. It computes the output record timestamps as 
"max" over all input records. That is why it needs a timestamped key-value 
store to track the maximum timestamp.

Unfortunately, we cannot deprecate the API easily because of Java type 
erasure... I guess we could log a warn message thought... Feel free to do a PR 
for it. We could log when we create the 
`KeyValueToTimestampedKeyValueByteStoreAdapter`.

And yes, you always get a timestamped key-value store and you can simplify your 
code accordingly. (Note thought, that if you provide a non-timestamped store, 
the timestamp won't really be stored, because the above mentioned adapter will 
just drop the timestamp before storing the data in the provided store – on 
read, the adapter will just set `-1`, ie, unknown, as timestamp.)

 

> Timestamped store is used even if not desired
> -
>
> Key: KAFKA-10722
> URL: https://issues.apache.org/jira/browse/KAFKA-10722
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.1, 2.6.0
>Reporter: fml2
>Priority: Major
>
> I have a stream which I then group and aggregate (this results in a KTable). 
> When aggregating, I explicitly tell to materialize the result table using a 
> usual (not timestamped) store.
> After that, the KTable is filtered and streamed. This stream is processed by 
> a processor that accesses the store.
> The problem/bug is that even if I tell to use a non-timestamped store, a 
> timestamped one is used, which leads to a ClassCastException in the processor 
> (it iterates over the store and expects the items to be of type "KeyValue" 
> but they are of type "ValueAndTimestamp").
> Here is the code (schematically).
> First, I define the topology:
> {code:java}
> KTable table = ...aggregate(
>   initializer, // initializer for the KTable row
>   aggregator, // aggregator
>   Materialized.as(Stores.persistentKeyValueStore("MyStore")) // <-- 
> Non-Timestamped!
> .withKeySerde(...).withValueSerde(...));
> table.toStream().process(theProcessor);
> {code}
> In the class for the processor:
> {code:java}
> public void init(ProcessorContext context) {
>var store = context.getStateStore("MyStore"); // Returns a 
> TimestampedKeyValueStore!
> }
> {code}
> A timestamped store is returned even if I explicitly told to use a 
> non-timestamped one!
>  
> I tried to find the cause for this behaviour and think that I've found it. It 
> lies in this line: 
> [https://github.com/apache/kafka/blob/cfc813537e955c267106eea989f6aec4879e14d7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java#L241]
> There, TimestampedKeyValueStoreMaterializer is used regardless of whether 
> materialization supplier is a timestamped one or not.
> I think this is a bug.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10722) Timestamped store is used even if not desired

2020-11-14 Thread fml2 (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17231972#comment-17231972
 ] 

fml2 edited comment on KAFKA-10722 at 11/14/20, 10:29 AM:
--

Hello, thank you [~mjsax] for the quick response! This perfectly makes sense. 
Could you please explain why the aggregate operation requires a timestamped 
store? The operation is not windowed if I understand correctly. It it was 
windowed (by time), then I'd understand it. Could you please explain? Why is 
the timestamp needed?

And one more thing (proposal): if such usage is discouraged, wouldn't it make 
sense to log some warning that such usage is not good anymore? Or make the API 
deprecated so that IDEs warn the developers.

Because, as of now, it mesleads to wrong usage.


was (Author: fml2):
Hello, thank you for the quick response! This perfectly makes sense. Could you 
please explain why the aggregate operation requires a timestamped store? The 
operation is not windowed if I understand correctly. It it was windowed (by 
time), then I'd understand it. Could you please explain? Why is the timestamp 
needed?

And one more thing (proposal): if such usage is discouraged, wouldn't it make 
sense to log some warning that such usage is not good anymore? Or make the API 
deprecated so that IDEs warn the developers.

Because, as of now, it mesleads to wrong usage.

> Timestamped store is used even if not desired
> -
>
> Key: KAFKA-10722
> URL: https://issues.apache.org/jira/browse/KAFKA-10722
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.1, 2.6.0
>Reporter: fml2
>Priority: Major
>
> I have a stream which I then group and aggregate (this results in a KTable). 
> When aggregating, I explicitly tell to materialize the result table using a 
> usual (not timestamped) store.
> After that, the KTable is filtered and streamed. This stream is processed by 
> a processor that accesses the store.
> The problem/bug is that even if I tell to use a non-timestamped store, a 
> timestamped one is used, which leads to a ClassCastException in the processor 
> (it iterates over the store and expects the items to be of type "KeyValue" 
> but they are of type "ValueAndTimestamp").
> Here is the code (schematically).
> First, I define the topology:
> {code:java}
> KTable table = ...aggregate(
>   initializer, // initializer for the KTable row
>   aggregator, // aggregator
>   Materialized.as(Stores.persistentKeyValueStore("MyStore")) // <-- 
> Non-Timestamped!
> .withKeySerde(...).withValueSerde(...));
> table.toStream().process(theProcessor);
> {code}
> In the class for the processor:
> {code:java}
> public void init(ProcessorContext context) {
>var store = context.getStateStore("MyStore"); // Returns a 
> TimestampedKeyValueStore!
> }
> {code}
> A timestamped store is returned even if I explicitly told to use a 
> non-timestamped one!
>  
> I tried to find the cause for this behaviour and think that I've found it. It 
> lies in this line: 
> [https://github.com/apache/kafka/blob/cfc813537e955c267106eea989f6aec4879e14d7/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java#L241]
> There, TimestampedKeyValueStoreMaterializer is used regardless of whether 
> materialization supplier is a timestamped one or not.
> I think this is a bug.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)