[jira] [Commented] (KAFKA-12925) prefixScan missing from intermediate interfaces
[ https://issues.apache.org/jira/browse/KAFKA-12925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361062#comment-17361062 ] Michael Viamari commented on KAFKA-12925: - I can flesh out a larger example if needed, but the basic usage for me was getting a reference to the state store using {{context.getStateStore()}} inside {{Transformer#init}}, and then when attempting to use {{TimestampedKeyValueStore#prefixScan}}, the exception was thrown. {code:java} public class TransformerPrefixScan implements Transformer> { private ProcessorContext context; private TimestampedKeyValueStore lookupStore; public TransformerPrefixScan() {} @Override @SuppressWarnings("unchecked") public void init(ProcessorContext context) { this.context = context; lookupStore = context.getStateStore(lookupStoreName); } @Override public KeyValue transform(K key, V value) { String keyPrefix = extractPrefix(key); try (KeyValueIterator> lookupIterator = lookupStore.prefixScan(keyPrefix, Serdes.String())) { //handle results } return null; } @Override public void close() { } } {code} > prefixScan missing from intermediate interfaces > --- > > Key: KAFKA-12925 > URL: https://issues.apache.org/jira/browse/KAFKA-12925 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Michael Viamari >Assignee: Sagar Rao >Priority: Major > Fix For: 3.0.0, 2.8.1 > > > [KIP-614|https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores] > and [KAFKA-10648|https://issues.apache.org/jira/browse/KAFKA-10648] > introduced support for {{prefixScan}} to StateStores. > It seems that many of the intermediate {{StateStore}} interfaces are missing > a definition for {{prefixScan}}, and as such is not accessible in all cases. > For example, when accessing the state stores through a the processor context, > the {{KeyValueStoreReadWriteDecorator}} and associated interfaces do not > define {{prefixScan}} and it falls back to the default implementation in > {{KeyValueStore}}, which throws {{UnsupportedOperationException}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12925) prefixScan missing from intermediate interfaces
[ https://issues.apache.org/jira/browse/KAFKA-12925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Viamari updated KAFKA-12925: Summary: prefixScan missing from intermediate interfaces (was: StateStore::prefixScan missing from intermediate interfaces) > prefixScan missing from intermediate interfaces > --- > > Key: KAFKA-12925 > URL: https://issues.apache.org/jira/browse/KAFKA-12925 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Michael Viamari >Priority: Major > > [KIP-614|https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores] > and [KAFKA-10648|https://issues.apache.org/jira/browse/KAFKA-10648] > introduced support for {{prefixScan}} to StateStores. > It seems that many of the intermediate {{StateStore}} interfaces are missing > a definition for {{prefixScan}}, and as such is not accessible in all cases. > For example, when accessing the state stores through a the processor context, > the {{KeyValueStoreReadWriteDecorator}} and associated interfaces do not > define {{prefixScan}} and it falls back to the default implementation in > {{KeyValueStore}}, which throws {{UnsupportedOperationException}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12925) StateStore::prefixScan missing from intermediate interfaces
Michael Viamari created KAFKA-12925: --- Summary: StateStore::prefixScan missing from intermediate interfaces Key: KAFKA-12925 URL: https://issues.apache.org/jira/browse/KAFKA-12925 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.8.0 Reporter: Michael Viamari [KIP-614|https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores] and [KAFKA-10648|https://issues.apache.org/jira/browse/KAFKA-10648] introduced support for {{prefixScan}} to StateStores. It seems that many of the intermediate {{StateStore}} interfaces are missing a definition for {{prefixScan}}, and as such is not accessible in all cases. For example, when accessing the state stores through a the processor context, the {{KeyValueStoreReadWriteDecorator}} and associated interfaces do not define {{prefixScan}} and it falls back to the default implementation in {{KeyValueStore}}, which throws {{UnsupportedOperationException}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9694) Reduce String Operations during WindowStore Operations
Michael Viamari created KAFKA-9694: -- Summary: Reduce String Operations during WindowStore Operations Key: KAFKA-9694 URL: https://issues.apache.org/jira/browse/KAFKA-9694 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.4.0 Reporter: Michael Viamari During most (all?) window store operations, whenever a timestamp is required a call to {{ApiUtils.validateMillisecond}} is used to validate the inputs. This involves a call to {{ApiUtils.prepareMillisCheckFailMsgPrefix}}, which builds part of a string that is used for any necessary error messages. The string is constructed whether or not it is used, which incurs overhead penalties for the WindowStore operation. This has a nominally minimal impact, but can add up in scenarios that involve a lot of WindowStore operations, where performance is at a premium. To reduce this overhead, {{ApiUtils.prepareMillisCheckFailMsgPrefix}} could return a {{Supplier}} instead, so that the string operations only occur when the string is needed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9675) RocksDB metrics reported always at zero
Michael Viamari created KAFKA-9675: -- Summary: RocksDB metrics reported always at zero Key: KAFKA-9675 URL: https://issues.apache.org/jira/browse/KAFKA-9675 Project: Kafka Issue Type: Improvement Affects Versions: 2.4.0 Reporter: Michael Viamari The rocksdb metrics listed under {{stream-state-metrics}} are reported as zero for all metrics and all rocksdb instances. The metrics are present in JMX, but are always zero. The streams state is configured with {{MetricsRecordingLevel}} at {{debug}}. I am able to see metrics with appropriate values in the {{stream-rocksdb-window-state-metrics}}, {{stream-record-cache-metrics}}, {{stream-task-metrics}}, and {{stream-processor-node-metrics}}. Additionally, my DEBUG logs show the appropriate messages for recording events, i.e. {{org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder [RocksDB Metrics Recorder for agg-store] Recording metrics for store agg-store}} It happens that all of my rocksdb instances are windowed stores, not key value stores, so I haven't been able to check if this issue is unique to windowed stores. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9533) ValueTransform forwards `null` values
[ https://issues.apache.org/jira/browse/KAFKA-9533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17044719#comment-17044719 ] Michael Viamari commented on KAFKA-9533: Ok. Thanks for the detailed explanation. I'll take a look at the changes you're proposing here as well. > ValueTransform forwards `null` values > - > > Key: KAFKA-9533 > URL: https://issues.apache.org/jira/browse/KAFKA-9533 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0, 0.10.2.2, 0.11.0.3, 1.1.1, 2.0.1, 2.2.2, > 2.4.0, 2.3.1 >Reporter: Michael Viamari >Assignee: Michael Viamari >Priority: Major > Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.1 > > > According to the documentation for `KStream#transformValues`, nulls returned > from `ValueTransformer#transform` are not forwarded. (see > [KStream#transformValues|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-] > However, this does not appear to be the case. In > `KStreamTransformValuesProcessor#process` the result of the transform is > forwarded directly. > {code:java} > @Override > public void process(final K key, final V value) { > context.forward(key, valueTransformer.transform(key, value)); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9551) Alternate WindowKeySchema Implementations
Michael Viamari created KAFKA-9551: -- Summary: Alternate WindowKeySchema Implementations Key: KAFKA-9551 URL: https://issues.apache.org/jira/browse/KAFKA-9551 Project: Kafka Issue Type: Improvement Reporter: Michael Viamari Currently, the {{WindowKeySchema}} used by all {{WindowStore}} implementations serializes the key with window information as {{keyBytes + timestampBytes + seqNumByte}}. This is optimal for iterations and queries that have a fixed key with a variable time window (which I think is leveraged in KStream-KStream join windows). In cases where the time-window is fixed, but the key range is variable, there is a significant overhead for iteration: all time-windows for a given key must be traversed. The iteration only uses 1 out of every N keys, where N is the number of windows. A key serialization format that is structured as {{timestampBytes}} + {{keyBytes + seqNumByte}} would be much more efficient when iterating over keys in a fixed window. Implementing a custom {{KeySchema}} is not easy at the moment. Currently, {{WindowKeySchema}} is instantiated when supplying a RocksDB instance in {{RocksDbWindowBytesStoreSupplier}}, but most/all other references to a {{KeySchema}} use static functions on {{WindowKeySchema}}. This makes supporting an alternate {{KeySchema}} very challenging. Additionally, making a custom implementation of {{KeySchema}} is complicated by the fact that although {{RocksDBSegmentedBytesStore.KeySchema}} is a public interface, the interface depends on {{HasNextCondition}} which is package-private to {{org.apache.kafka.streams.state.internals}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9533) ValueTransform forwards `null` values
[ https://issues.apache.org/jira/browse/KAFKA-9533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036291#comment-17036291 ] Michael Viamari commented on KAFKA-9533: Ok. Great. I'll address the adaptor code separately if necessary. I cannot yet assign this to myself. > ValueTransform forwards `null` values > - > > Key: KAFKA-9533 > URL: https://issues.apache.org/jira/browse/KAFKA-9533 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Michael Viamari >Priority: Minor > > According to the documentation for `KStream#transformValues`, nulls returned > from `ValueTransformer#transform` are not forwarded. (see > [KStream#transformValues|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-] > However, this does not appear to be the case. In > `KStreamTransformValuesProcessor#process` the result of the transform is > forwarded directly. > {code:java} > @Override > public void process(final K key, final V value) { > context.forward(key, valueTransformer.transform(key, value)); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9533) ValueTransform forwards `null` values
[ https://issues.apache.org/jira/browse/KAFKA-9533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035927#comment-17035927 ] Michael Viamari commented on KAFKA-9533: Sure. I can take a look at it. Before I get started: 1) How should I think about the case where someone might unintentionally (or intentionally) be relying on the buggy behavior? Is that something I should handle in code, or is it handled elsewhere? 2) I noticed that KStream#transform uses an adaptor to conform to KStream#flatTransform. Should I do something similar for KStream#transformValues and KStream#flatTransformValues, since I will be modifying that area of code already? > ValueTransform forwards `null` values > - > > Key: KAFKA-9533 > URL: https://issues.apache.org/jira/browse/KAFKA-9533 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Michael Viamari >Priority: Minor > > According to the documentation for `KStream#transformValues`, nulls returned > from `ValueTransformer#transform` are not forwarded. (see > [KStream#transformValues|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-] > However, this does not appear to be the case. In > `KStreamTransformValuesProcessor#process` the result of the transform is > forwarded directly. > {code:java} > @Override > public void process(final K key, final V value) { > context.forward(key, valueTransformer.transform(key, value)); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9533) ValueTransform forwards `null` values
[ https://issues.apache.org/jira/browse/KAFKA-9533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Viamari updated KAFKA-9533: --- Description: According to the documentation for `KStream#transformValues`, nulls returned from `ValueTransformer#transform` are not forwarded. (see [KStream#transformValues|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-] However, this does not appear to be the case. In `KStreamTransformValuesProcessor#process` the result of the transform is forwarded directly. {code:java} @Override public void process(final K key, final V value) { context.forward(key, valueTransformer.transform(key, value)); } {code} was: According to the documentation for `KStream#transformValues`, nulls returned from `ValueTransformer#transform` are not forwarded. (see [KStream#transformValues|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-] However, this does not appear to be the case. In `KStreamTransformValuesProcessor#transform` the result of the transform is forwarded directly. {code:java} @Override public void process(final K key, final V value) { context.forward(key, valueTransformer.transform(key, value)); } {code} > ValueTransform forwards `null` values > - > > Key: KAFKA-9533 > URL: https://issues.apache.org/jira/browse/KAFKA-9533 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Michael Viamari >Priority: Minor > > According to the documentation for `KStream#transformValues`, nulls returned > from `ValueTransformer#transform` are not forwarded. (see > [KStream#transformValues|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-] > However, this does not appear to be the case. In > `KStreamTransformValuesProcessor#process` the result of the transform is > forwarded directly. > {code:java} > @Override > public void process(final K key, final V value) { > context.forward(key, valueTransformer.transform(key, value)); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9533) ValueTransform forwards `null` values
Michael Viamari created KAFKA-9533: -- Summary: ValueTransform forwards `null` values Key: KAFKA-9533 URL: https://issues.apache.org/jira/browse/KAFKA-9533 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.4.0 Reporter: Michael Viamari According to the documentation for `KStream#transformValues`, nulls returned from `ValueTransformer#transform` are not forwarded. (see [KStream#transformValues|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-] However, this does not appear to be the case. In `KStreamTransformValuesProcessor#transform` the result of the transform is forwarded directly. {code:java} @Override public void process(final K key, final V value) { context.forward(key, valueTransformer.transform(key, value)); } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9222) StreamPartitioner for internal repartition topics does not match defaults for to() operation
Michael Viamari created KAFKA-9222: -- Summary: StreamPartitioner for internal repartition topics does not match defaults for to() operation Key: KAFKA-9222 URL: https://issues.apache.org/jira/browse/KAFKA-9222 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.3.1 Reporter: Michael Viamari When a KStream has a Windowed key, different StreamPartitions are selected depending on how the stream sink is generated. When using `KStream#to()`, the topology uses a `StreamSinkNode`, which chooses a `WindowedStreamPartitioner` when no partitioner is provided when creating a `SinkNode` for the topology. {code:java} KTable<> aggResult = inputStream.groupByKey().windowed(...).aggregate(...); aggResult.toStream().to(aggStreamTopic) {code} When an internal repartition is created before a stateful operation, an `OptimizableRepartitionNode` is used, which results in a `SinkNode` being added to the topology. This node is created with a null partitioner, which then would always use the Producer default partitioner. This becomes an issue when attempting to join a windowed stream/ktable with a stream that was mapped into a windowed key. {code:java} KTable<> windowedAgg = inputStream.groupByKey().windowed(...).aggregate(...); windowedAgg.toStream().to(aggStreamTopic); KStream<> windowedStream = inputStream.map((k, v) -> { Map w = windows.windowsFor(v.getTimestamp()); Window minW = getMinWindow(w.values()); return KeyValue.pair(new Windowed<>(k, minW), v); }); windowedStream.leftJoin(windowedAgg, ); {code} The only work around I've found is to either use the default partitioner for the `KStream#to()` operation, or to use `KStream.through()` for the repartition operation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9222) StreamPartitioner for internal repartition topics does not match defaults for to() operation
[ https://issues.apache.org/jira/browse/KAFKA-9222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Viamari updated KAFKA-9222: --- Priority: Minor (was: Major) > StreamPartitioner for internal repartition topics does not match defaults for > to() operation > > > Key: KAFKA-9222 > URL: https://issues.apache.org/jira/browse/KAFKA-9222 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.1 >Reporter: Michael Viamari >Priority: Minor > > When a KStream has a Windowed key, different StreamPartitions are selected > depending on how the stream sink is generated. > When using `KStream#to()`, the topology uses a `StreamSinkNode`, which > chooses a `WindowedStreamPartitioner` when no partitioner is provided when > creating a `SinkNode` for the topology. > {code:java} > KTable<> aggResult = inputStream.groupByKey().windowed(...).aggregate(...); > aggResult.toStream().to(aggStreamTopic) > {code} > When an internal repartition is created before a stateful operation, an > `OptimizableRepartitionNode` is used, which results in a `SinkNode` being > added to the topology. This node is created with a null partitioner, which > then would always use the Producer default partitioner. This becomes an issue > when attempting to join a windowed stream/ktable with a stream that was > mapped into a windowed key. > {code:java} > KTable<> windowedAgg = inputStream.groupByKey().windowed(...).aggregate(...); > windowedAgg.toStream().to(aggStreamTopic); > KStream<> windowedStream = inputStream.map((k, v) -> { > Map w = windows.windowsFor(v.getTimestamp()); > Window minW = getMinWindow(w.values()); > return KeyValue.pair(new Windowed<>(k, minW), v); > }); > windowedStream.leftJoin(windowedAgg, ); > {code} > The only work around I've found is to either use the default partitioner for > the `KStream#to()` operation, or to use `KStream.through()` for the > repartition operation. -- This message was sent by Atlassian Jira (v8.3.4#803005)