[jira] [Commented] (KAFKA-12925) prefixScan missing from intermediate interfaces

2021-06-10 Thread Michael Viamari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361062#comment-17361062
 ] 

Michael Viamari commented on KAFKA-12925:
-

I can flesh out a larger example if needed, but the basic usage for me was 
getting a reference to the state store using {{context.getStateStore()}} inside 
{{Transformer#init}}, and then when attempting to use 
{{TimestampedKeyValueStore#prefixScan}}, the exception was thrown.
{code:java}
public class TransformerPrefixScan implements Transformer> {

private ProcessorContext context;
private TimestampedKeyValueStore lookupStore;

public TransformerPrefixScan() {}

@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
lookupStore = context.getStateStore(lookupStoreName);
}

@Override
public KeyValue transform(K key, V value) {

String keyPrefix = extractPrefix(key);
try (KeyValueIterator> lookupIterator = 
lookupStore.prefixScan(keyPrefix, Serdes.String())) {
//handle results
}

return null;
}

@Override
public void close() {

}
}
{code}

> prefixScan missing from intermediate interfaces
> ---
>
> Key: KAFKA-12925
> URL: https://issues.apache.org/jira/browse/KAFKA-12925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Michael Viamari
>Assignee: Sagar Rao
>Priority: Major
> Fix For: 3.0.0, 2.8.1
>
>
> [KIP-614|https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores]
>  and [KAFKA-10648|https://issues.apache.org/jira/browse/KAFKA-10648] 
> introduced support for {{prefixScan}} to StateStores.
> It seems that many of the intermediate {{StateStore}} interfaces are missing 
> a definition for {{prefixScan}}, and as such is not accessible in all cases.
> For example, when accessing the state stores through a the processor context, 
> the {{KeyValueStoreReadWriteDecorator}} and associated interfaces do not 
> define {{prefixScan}} and it falls back to the default implementation in 
> {{KeyValueStore}}, which throws {{UnsupportedOperationException}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12925) prefixScan missing from intermediate interfaces

2021-06-09 Thread Michael Viamari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Viamari updated KAFKA-12925:

Summary: prefixScan missing from intermediate interfaces  (was: 
StateStore::prefixScan missing from intermediate interfaces)

> prefixScan missing from intermediate interfaces
> ---
>
> Key: KAFKA-12925
> URL: https://issues.apache.org/jira/browse/KAFKA-12925
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Michael Viamari
>Priority: Major
>
> [KIP-614|https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores]
>  and [KAFKA-10648|https://issues.apache.org/jira/browse/KAFKA-10648] 
> introduced support for {{prefixScan}} to StateStores.
> It seems that many of the intermediate {{StateStore}} interfaces are missing 
> a definition for {{prefixScan}}, and as such is not accessible in all cases.
> For example, when accessing the state stores through a the processor context, 
> the {{KeyValueStoreReadWriteDecorator}} and associated interfaces do not 
> define {{prefixScan}} and it falls back to the default implementation in 
> {{KeyValueStore}}, which throws {{UnsupportedOperationException}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12925) StateStore::prefixScan missing from intermediate interfaces

2021-06-09 Thread Michael Viamari (Jira)
Michael Viamari created KAFKA-12925:
---

 Summary: StateStore::prefixScan missing from intermediate 
interfaces
 Key: KAFKA-12925
 URL: https://issues.apache.org/jira/browse/KAFKA-12925
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.8.0
Reporter: Michael Viamari


[KIP-614|https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores]
 and [KAFKA-10648|https://issues.apache.org/jira/browse/KAFKA-10648] introduced 
support for {{prefixScan}} to StateStores.

It seems that many of the intermediate {{StateStore}} interfaces are missing a 
definition for {{prefixScan}}, and as such is not accessible in all cases.

For example, when accessing the state stores through a the processor context, 
the {{KeyValueStoreReadWriteDecorator}} and associated interfaces do not define 
{{prefixScan}} and it falls back to the default implementation in 
{{KeyValueStore}}, which throws {{UnsupportedOperationException}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9694) Reduce String Operations during WindowStore Operations

2020-03-10 Thread Michael Viamari (Jira)
Michael Viamari created KAFKA-9694:
--

 Summary: Reduce String Operations during WindowStore Operations
 Key: KAFKA-9694
 URL: https://issues.apache.org/jira/browse/KAFKA-9694
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.4.0
Reporter: Michael Viamari


During most (all?) window store operations, whenever a timestamp is required a 
call to {{ApiUtils.validateMillisecond}} is used to validate the inputs. 
This involves a call to {{ApiUtils.prepareMillisCheckFailMsgPrefix}}, which 
builds part of a string that is used for any necessary error messages. The 
string is constructed whether or not it is used, which incurs overhead 
penalties for the WindowStore operation. This has a nominally minimal impact, 
but can add up in scenarios that involve a lot of WindowStore operations, where 
performance is at a premium.

To reduce this overhead, {{ApiUtils.prepareMillisCheckFailMsgPrefix}} could 
return a {{Supplier}} instead, so that the string operations only occur 
when the string is needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9675) RocksDB metrics reported always at zero

2020-03-06 Thread Michael Viamari (Jira)
Michael Viamari created KAFKA-9675:
--

 Summary: RocksDB metrics reported always at zero
 Key: KAFKA-9675
 URL: https://issues.apache.org/jira/browse/KAFKA-9675
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 2.4.0
Reporter: Michael Viamari


The rocksdb metrics listed under {{stream-state-metrics}} are reported as zero 
for all metrics and all rocksdb instances. The metrics are present in JMX, but 
are always zero.

The streams state is configured with {{MetricsRecordingLevel}} at {{debug}}. I 
am able to see metrics with appropriate values in the 
{{stream-rocksdb-window-state-metrics}}, {{stream-record-cache-metrics}}, 
{{stream-task-metrics}}, and {{stream-processor-node-metrics}}.

Additionally, my DEBUG logs show the appropriate messages for recording events, 
i.e.

{{org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder 
[RocksDB Metrics Recorder for agg-store] Recording metrics for store agg-store}}

It happens that all of my rocksdb instances are windowed stores, not key value 
stores, so I haven't been able to check if this issue is unique to windowed 
stores.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9533) ValueTransform forwards `null` values

2020-02-25 Thread Michael Viamari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17044719#comment-17044719
 ] 

Michael Viamari commented on KAFKA-9533:


Ok. Thanks for the detailed explanation. I'll take a look at the changes you're 
proposing here as well.

> ValueTransform forwards `null` values
> -
>
> Key: KAFKA-9533
> URL: https://issues.apache.org/jira/browse/KAFKA-9533
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0, 0.10.2.2, 0.11.0.3, 1.1.1, 2.0.1, 2.2.2, 
> 2.4.0, 2.3.1
>Reporter: Michael Viamari
>Assignee: Michael Viamari
>Priority: Major
> Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.1
>
>
> According to the documentation for `KStream#transformValues`, nulls returned 
> from `ValueTransformer#transform` are not forwarded. (see 
> [KStream#transformValues|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-]
> However, this does not appear to be the case. In 
> `KStreamTransformValuesProcessor#process` the result of the transform is 
> forwarded directly.
> {code:java}
>  @Override
>  public void process(final K key, final V value) {
>  context.forward(key, valueTransformer.transform(key, value));
>  }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9551) Alternate WindowKeySchema Implementations

2020-02-13 Thread Michael Viamari (Jira)
Michael Viamari created KAFKA-9551:
--

 Summary: Alternate WindowKeySchema Implementations
 Key: KAFKA-9551
 URL: https://issues.apache.org/jira/browse/KAFKA-9551
 Project: Kafka
  Issue Type: Improvement
Reporter: Michael Viamari


Currently, the {{WindowKeySchema}} used by all {{WindowStore}} implementations 
serializes the key with window information as {{keyBytes + timestampBytes + 
seqNumByte}}. This is optimal for iterations and queries that have a fixed key 
with a variable time window (which I think is leveraged in KStream-KStream join 
windows).

 

In cases where the time-window is fixed, but the key range is variable, there 
is a significant overhead for iteration: all time-windows for a given key must 
be traversed. The iteration only uses 1 out of every N keys, where N is the 
number of windows.

A key serialization format that is structured as {{timestampBytes}} + 
{{keyBytes + seqNumByte}} would be much more efficient when iterating over keys 
in a fixed window.

Implementing a custom {{KeySchema}} is not easy at the moment. Currently, 
{{WindowKeySchema}} is instantiated when supplying a RocksDB instance in 
{{RocksDbWindowBytesStoreSupplier}}, but most/all other references to a 
{{KeySchema}} use static functions on {{WindowKeySchema}}. This makes 
supporting an alternate {{KeySchema}} very challenging. Additionally, making a 
custom implementation of {{KeySchema}} is complicated by the fact that although 
{{RocksDBSegmentedBytesStore.KeySchema}} is a public interface, the interface 
depends on {{HasNextCondition}} which is package-private to 
{{org.apache.kafka.streams.state.internals}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9533) ValueTransform forwards `null` values

2020-02-13 Thread Michael Viamari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036291#comment-17036291
 ] 

Michael Viamari commented on KAFKA-9533:


Ok. Great. I'll address the adaptor code separately if necessary.

I cannot yet assign this to myself.

> ValueTransform forwards `null` values
> -
>
> Key: KAFKA-9533
> URL: https://issues.apache.org/jira/browse/KAFKA-9533
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Michael Viamari
>Priority: Minor
>
> According to the documentation for `KStream#transformValues`, nulls returned 
> from `ValueTransformer#transform` are not forwarded. (see 
> [KStream#transformValues|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-]
> However, this does not appear to be the case. In 
> `KStreamTransformValuesProcessor#process` the result of the transform is 
> forwarded directly.
> {code:java}
>  @Override
>  public void process(final K key, final V value) {
>  context.forward(key, valueTransformer.transform(key, value));
>  }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9533) ValueTransform forwards `null` values

2020-02-12 Thread Michael Viamari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17035927#comment-17035927
 ] 

Michael Viamari commented on KAFKA-9533:


Sure. I can take a look at it. Before I get started:

1) How should I think about the case where someone might unintentionally (or 
intentionally) be relying on the buggy behavior? Is that something I should 
handle in code, or is it handled elsewhere?

2) I noticed that KStream#transform uses an adaptor to conform to 
KStream#flatTransform. Should I do something similar for 
KStream#transformValues and KStream#flatTransformValues, since I will be 
modifying that area of code already?

> ValueTransform forwards `null` values
> -
>
> Key: KAFKA-9533
> URL: https://issues.apache.org/jira/browse/KAFKA-9533
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Michael Viamari
>Priority: Minor
>
> According to the documentation for `KStream#transformValues`, nulls returned 
> from `ValueTransformer#transform` are not forwarded. (see 
> [KStream#transformValues|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-]
> However, this does not appear to be the case. In 
> `KStreamTransformValuesProcessor#process` the result of the transform is 
> forwarded directly.
> {code:java}
>  @Override
>  public void process(final K key, final V value) {
>  context.forward(key, valueTransformer.transform(key, value));
>  }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9533) ValueTransform forwards `null` values

2020-02-10 Thread Michael Viamari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Viamari updated KAFKA-9533:
---
Description: 
According to the documentation for `KStream#transformValues`, nulls returned 
from `ValueTransformer#transform` are not forwarded. (see 
[KStream#transformValues|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-]

However, this does not appear to be the case. In 
`KStreamTransformValuesProcessor#process` the result of the transform is 
forwarded directly.
{code:java}
 @Override
 public void process(final K key, final V value) {
 context.forward(key, valueTransformer.transform(key, value));
 }
{code}

  was:
According to the documentation for `KStream#transformValues`, nulls returned 
from `ValueTransformer#transform` are not forwarded. (see 
[KStream#transformValues|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-]

However, this does not appear to be the case. In 
`KStreamTransformValuesProcessor#transform` the result of the transform is 
forwarded directly.
{code:java}
 @Override
 public void process(final K key, final V value) {
 context.forward(key, valueTransformer.transform(key, value));
 }
{code}


> ValueTransform forwards `null` values
> -
>
> Key: KAFKA-9533
> URL: https://issues.apache.org/jira/browse/KAFKA-9533
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Michael Viamari
>Priority: Minor
>
> According to the documentation for `KStream#transformValues`, nulls returned 
> from `ValueTransformer#transform` are not forwarded. (see 
> [KStream#transformValues|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-]
> However, this does not appear to be the case. In 
> `KStreamTransformValuesProcessor#process` the result of the transform is 
> forwarded directly.
> {code:java}
>  @Override
>  public void process(final K key, final V value) {
>  context.forward(key, valueTransformer.transform(key, value));
>  }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9533) ValueTransform forwards `null` values

2020-02-10 Thread Michael Viamari (Jira)
Michael Viamari created KAFKA-9533:
--

 Summary: ValueTransform forwards `null` values
 Key: KAFKA-9533
 URL: https://issues.apache.org/jira/browse/KAFKA-9533
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.4.0
Reporter: Michael Viamari


According to the documentation for `KStream#transformValues`, nulls returned 
from `ValueTransformer#transform` are not forwarded. (see 
[KStream#transformValues|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-]

However, this does not appear to be the case. In 
`KStreamTransformValuesProcessor#transform` the result of the transform is 
forwarded directly.
{code:java}
 @Override
 public void process(final K key, final V value) {
 context.forward(key, valueTransformer.transform(key, value));
 }
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9222) StreamPartitioner for internal repartition topics does not match defaults for to() operation

2019-11-21 Thread Michael Viamari (Jira)
Michael Viamari created KAFKA-9222:
--

 Summary: StreamPartitioner for internal repartition topics does 
not match defaults for to() operation
 Key: KAFKA-9222
 URL: https://issues.apache.org/jira/browse/KAFKA-9222
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.3.1
Reporter: Michael Viamari


When a KStream has a Windowed key, different StreamPartitions are selected 
depending on how the stream sink is generated.

When using `KStream#to()`, the topology uses a `StreamSinkNode`, which chooses 
a `WindowedStreamPartitioner` when no partitioner is provided when creating a 
`SinkNode` for the topology.
{code:java}
KTable<> aggResult = inputStream.groupByKey().windowed(...).aggregate(...); 
aggResult.toStream().to(aggStreamTopic)
{code}
When an internal repartition is created before a stateful operation, an 
`OptimizableRepartitionNode` is used, which results in a `SinkNode` being added 
to the topology. This node is created with a null partitioner, which then would 
always use the Producer default partitioner. This becomes an issue when 
attempting to join a windowed stream/ktable with a stream that was mapped into 
a windowed key.
{code:java}
KTable<> windowedAgg = inputStream.groupByKey().windowed(...).aggregate(...); 
windowedAgg.toStream().to(aggStreamTopic);

KStream<> windowedStream = inputStream.map((k, v) -> {
Map w = windows.windowsFor(v.getTimestamp());
Window minW = getMinWindow(w.values());
return KeyValue.pair(new Windowed<>(k, minW), v);
});
windowedStream.leftJoin(windowedAgg, );
{code}
The only work around I've found is to either use the default partitioner for 
the `KStream#to()` operation, or to use `KStream.through()` for the repartition 
operation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9222) StreamPartitioner for internal repartition topics does not match defaults for to() operation

2019-11-21 Thread Michael Viamari (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Viamari updated KAFKA-9222:
---
Priority: Minor  (was: Major)

> StreamPartitioner for internal repartition topics does not match defaults for 
> to() operation
> 
>
> Key: KAFKA-9222
> URL: https://issues.apache.org/jira/browse/KAFKA-9222
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.1
>Reporter: Michael Viamari
>Priority: Minor
>
> When a KStream has a Windowed key, different StreamPartitions are selected 
> depending on how the stream sink is generated.
> When using `KStream#to()`, the topology uses a `StreamSinkNode`, which 
> chooses a `WindowedStreamPartitioner` when no partitioner is provided when 
> creating a `SinkNode` for the topology.
> {code:java}
> KTable<> aggResult = inputStream.groupByKey().windowed(...).aggregate(...); 
> aggResult.toStream().to(aggStreamTopic)
> {code}
> When an internal repartition is created before a stateful operation, an 
> `OptimizableRepartitionNode` is used, which results in a `SinkNode` being 
> added to the topology. This node is created with a null partitioner, which 
> then would always use the Producer default partitioner. This becomes an issue 
> when attempting to join a windowed stream/ktable with a stream that was 
> mapped into a windowed key.
> {code:java}
> KTable<> windowedAgg = inputStream.groupByKey().windowed(...).aggregate(...); 
> windowedAgg.toStream().to(aggStreamTopic);
> KStream<> windowedStream = inputStream.map((k, v) -> {
> Map w = windows.windowsFor(v.getTimestamp());
> Window minW = getMinWindow(w.values());
> return KeyValue.pair(new Windowed<>(k, minW), v);
> });
> windowedStream.leftJoin(windowedAgg, );
> {code}
> The only work around I've found is to either use the default partitioner for 
> the `KStream#to()` operation, or to use `KStream.through()` for the 
> repartition operation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)