[jira] [Commented] (KAFKA-13963) Topology Description ignores context.forward

2022-06-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551333#comment-17551333
 ] 

Matthias J. Sax commented on KAFKA-13963:
-

{quote}Is it worth updating the java doc to mention this?
{quote}
Updating docs can never hurt :) – are you interested in doing a PR?
{quote}if you use the internal RecordCollector, which I feel should be better 
hidden from the streams api users
{quote}
Yes, you should NEVER use internal stuff... Not sure how we could "better hide" 
it though? Seems not to be possible as long as we are using Java 8...
{quote}I can open up a separate bug for that if it makes sense.
{quote}
Don't think it's a bug? It (unfortunately) how Java works.

> Topology Description ignores context.forward
> 
>
> Key: KAFKA-13963
> URL: https://issues.apache.org/jira/browse/KAFKA-13963
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.2
>Reporter: Tomasz Kaszuba
>Priority: Minor
>
> I have a simple topology:
> {code:java}
>       val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, 
> Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new RecordCollectorProcessor()
>           },
>           "source"
>         ) {code}
> And a simple processor that uses context.forward to forward messages:
> {code:java}
>   private class ContextForwardProcessor extends AbstractProcessor[String, 
> String]() {    override def process(key: String, value: String): Unit =
>       context().forward("key", "value", To.child("output"))    override def 
> close(): Unit = ()
>   }  {code}
> when I call topology.describe() I receive this:
> {noformat}
> Topologies:
>    Sub-topology: 0
>     Source: source (topics: [input])
>       --> process
>     Processor: process (stores: [])
>       --> none
>       <-- source {noformat}
> Ignoring the fact that this will not run since it will throw a runtime 
> exception why is the To.child ignored?
> Taking it one point further if I add multiple sinks to the topology like so:
> {code:java}
> val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, 
> Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new ContextForwardProcessor()
>           },
>           "source"
>         )
>         .addSink("sink", "output1", Serdes.stringSerde.serializer(), 
> Serdes.stringSerde.serializer(), "process")
>         .addSink("sink2", "output2", Serdes.stringSerde.serializer(), 
> Serdes.stringSerde.serializer(), "process")  {code}
> but have the processor only output to "output1" it is in no way reflected in 
> the described topology graph.
> I assume this is by design since it's a lot more work to interpret what the 
> context.forward is doing but when I tried to look for this information in the 
> java doc I couldn't find it.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13963) Topology Description ignores context.forward

2022-06-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551271#comment-17551271
 ] 

Matthias J. Sax commented on KAFKA-13963:
-

TopologyDescription only describes the structure of you graph of operators. In 
your first example, you only added two nodes to the graph ("source" and 
"process") and there is no node "output", and thus it's not contained in the 
`TopologyDescription`.

It's not really possible to take the business logic (ie, what `forward()` is 
doing) into account – at least I have not idea how this could be done with 
reasonable effort.

It's for sure not a bug. We should either close this ticket and change it into 
a feature request.

> Topology Description ignores context.forward
> 
>
> Key: KAFKA-13963
> URL: https://issues.apache.org/jira/browse/KAFKA-13963
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.2
>Reporter: Tomasz Kaszuba
>Priority: Minor
>
> I have a simple topology:
> {code:java}
>       val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, 
> Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new RecordCollectorProcessor()
>           },
>           "source"
>         ) {code}
> And a simple processor that uses context.forward to forward messages:
> {code:java}
>   private class ContextForwardProcessor extends AbstractProcessor[String, 
> String]() {    override def process(key: String, value: String): Unit =
>       context().forward("key", "value", To.child("output"))    override def 
> close(): Unit = ()
>   }  {code}
> when I call topology.describe() I receive this:
> {noformat}
> Topologies:
>    Sub-topology: 0
>     Source: source (topics: [input])
>       --> process
>     Processor: process (stores: [])
>       --> none
>       <-- source {noformat}
> Ignoring the fact that this will not run since it will throw a runtime 
> exception why is the To.child ignored?
> Taking it one point further if I add multiple sinks to the topology like so:
> {code:java}
> val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, 
> Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new ContextForwardProcessor()
>           },
>           "source"
>         )
>         .addSink("sink", "output1", Serdes.stringSerde.serializer(), 
> Serdes.stringSerde.serializer(), "process")
>         .addSink("sink2", "output2", Serdes.stringSerde.serializer(), 
> Serdes.stringSerde.serializer(), "process")  {code}
> but have the processor only output to "output1" it is in no way reflected in 
> the described topology graph.
> I assume this is by design since it's a lot more work to interpret what the 
> context.forward is doing but when I tried to look for this information in the 
> java doc I couldn't find it.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: How it is safe to break message ordering but not idempotency after getting an OutOfOrderSequenceException?

2022-06-07 Thread Matthias J. Sax

Yes, the broker de-dupes using the sequence number.

But for example, if a sequence number is skipped, you could get this 
exception: the current batch of messages cannot be appended to the log, 
as one batch is missing, and the producer would need to re-send the 
previous/missing batch with lower sequence number before it can move to 
the "next" (ie current) batch.


Does this make sense?


-Matthias

On 5/27/22 10:43 AM, Gabriel Giussi wrote:

The docs say
"This exception indicates that the broker received an unexpected sequence
number from the producer, which means that data may have been lost. If the
producer is configured for idempotence only (i.e. if enable.idempotence is
set and no transactional.id is configured), it is possible to continue
sending with the same producer instance, but doing so risks reordering of
sent record"

Isn't the broker using the monotonically increasing sequence number to
dedup messages? So how can it break message ordering without breaking
idempotency?
I can't see an example scenario where this could happen, I guess
the OutOfOrderSequenceException can only happen
with max.in.flight.requests.per.connection > 1, but even in that case why
are not going to keep getting an OutOfOrderSequenceException but instead a
success that broke message ordering?

Thanks.



Re: Newbie how to get key/value pojo out of a stream?

2022-06-07 Thread Matthias J. Sax
`enable.auto.commit` is a Consumer config and does not apply to Kafka 
Stream.


In Kafka Streams, you basically always have auto commit enabled, and you 
can control how frequently commits happen via `commit.interval.ms`.


Also on `close()` Kafka Streams would commit offsets.


-Matthias

On 5/31/22 12:29 PM, Luca wrote:

Hi Andy,

The defaults are sensible enough that, under normal operational conditions, 
your app should pick up from where it left. To dig a little more into this, I 
suggest you look into `auto.offset.reset` and `enable.auto.commit` options.

In case, you do need to reprocess everything, kafka streams comes with a handy 
reset tool. You can read about it here: 
https://kafka.apache.org/32/documentation/streams/developer-guide/app-reset-tool.html

Luca

On Tue, May 31, 2022, at 5:17 PM, andrew davidson wrote:

Thanks Luca

This is exactly what I was looking for.

On a related note let's say I stop and restart my application. What would I 
have to do so that the I do not re process events?

I am still working through the kstreams 101 tutorial. I have not gotten to the 
DSL tutorials yet

Andy

On 5/30/22, 11:16 PM, "Luca"  wrote:

 Hi Andy,

 If I understand your problem correctly, you want a "foreach" terminal 
operation. You can check out the API here: 
https://kafka.apache.org/32/documentation/streams/developer-guide/dsl-api.html

 Luca

 On Tue, May 31, 2022, at 6:37 AM, Andy wrote:
 > All the Kstream examples I have found demonstrate how to use map, filter,
 > and join on streams. The last step they typically user to() to
 > publish/produce the results to a new stream
 >
 > How can I get the data out of the stream? For example I need to send the
 > data to a legacy data that can not use kafka. Or maybe I want to plot the
 > data,…
 >
 > I looked at the java doc and did not find anything
 >
 > Any idea what I should “google” to to find a code example?
 >
 > Kind regards
 >
 > Andy
 >

 lucapette.me



lucapette.me



[jira] [Commented] (KAFKA-13936) Invalid consumer lag when monitoring from a kafka streams application

2022-06-03 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17546407#comment-17546407
 ] 

Matthias J. Sax commented on KAFKA-13936:
-

The docs are in the same repository as the code: 
[https://github.com/apache/kafka/tree/trunk/docs]

A good place might be the consumer monitoring section 
[https://kafka.apache.org/documentation/#consumer_fetch_monitoring] ?

> Invalid consumer lag when monitoring from a kafka streams application
> -
>
> Key: KAFKA-13936
> URL: https://issues.apache.org/jira/browse/KAFKA-13936
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Prashanth Joseph Babu
>Priority: Major
>
> I have a kafka streams application and I'm trying to monitor the consumer lag 
> via stream metrics.
> Here's some code snippet
> {code:java}
> metrics = streams.metrics();
> lag = 0;
> for (Metric m : metrics.values()) {
> tags = m.metricName().tags();
> if ( m.metricName().name().equals(MONITOR_CONSUMER_LAG) && 
> tags.containsKey(MONTOR_TAG_TOPIC) && 
> tags.get(MONTOR_TAG_TOPIC).equals(inputTopic) ) {
> partitionLag = 
> Float.valueOf(m.metricValue().toString()).floatValue();
> if ( !partitionLag.isNaN() ) {
> lag += partitionLag;
> }
> }
> }
> {code}
> Here MONITOR_CONSUMER_LAG is {{{}records-lag-max{}}}.
> However these numbers dont match with the consumer lag we see in the kafka UI 
> . is records-lag-max the right metric to track for a kafka streams 
> application when the objective is to get consumer lag?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13936) Invalid consumer lag when monitoring from a kafka streams application

2022-06-02 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17545686#comment-17545686
 ] 

Matthias J. Sax commented on KAFKA-13936:
-

Might be worth to document for this case :) – Would you be interested to do a 
PR?

> Invalid consumer lag when monitoring from a kafka streams application
> -
>
> Key: KAFKA-13936
> URL: https://issues.apache.org/jira/browse/KAFKA-13936
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Prashanth Joseph Babu
>Priority: Major
>
> I have a kafka streams application and I'm trying to monitor the consumer lag 
> via stream metrics.
> Here's some code snippet
> {code:java}
> metrics = streams.metrics();
> lag = 0;
> for (Metric m : metrics.values()) {
> tags = m.metricName().tags();
> if ( m.metricName().name().equals(MONITOR_CONSUMER_LAG) && 
> tags.containsKey(MONTOR_TAG_TOPIC) && 
> tags.get(MONTOR_TAG_TOPIC).equals(inputTopic) ) {
> partitionLag = 
> Float.valueOf(m.metricValue().toString()).floatValue();
> if ( !partitionLag.isNaN() ) {
> lag += partitionLag;
> }
> }
> }
> {code}
> Here MONITOR_CONSUMER_LAG is {{{}records-lag-max{}}}.
> However these numbers dont match with the consumer lag we see in the kafka UI 
> . is records-lag-max the right metric to track for a kafka streams 
> application when the objective is to get consumer lag?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13945) Add task-level metrics to Streams for bytes/records Produced

2022-05-31 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13945:

Description: KIP-846: 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211886093] 

> Add task-level metrics to Streams for bytes/records Produced
> 
>
> Key: KAFKA-13945
> URL: https://issues.apache.org/jira/browse/KAFKA-13945
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: kip
>
> KIP-846: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211886093] 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13936) Invalid consumer lag when monitoring from a kafka streams application

2022-05-31 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17544647#comment-17544647
 ] 

Matthias J. Sax commented on KAFKA-13936:
-

As mentioned above, offsets are by default committed every 30 seconds and thus 
the difference between committed offset to end offset (as reported by Kafka CLI 
tools, and presumably by provectus) are expected to be larger than the metric 
directly reported by the consumer that report the difference of it's current 
(not yet committed) position to end offset.

If you reduce the commit interval (not necessarily recommended), the difference 
should be smaller as the CLI should report a smaller number.

Overall, it seems you report expected behavior, and I don't see a bug. – Of 
course, we _could_ add a new consumer metric that report the difference between 
committed offset and end offset, but what would we gain? In the end, the 
consumer reported metric is more accurate compared to what the CLI reports.

Maybe you can explain why it is a problem that the numbers are not the same?

> Invalid consumer lag when monitoring from a kafka streams application
> -
>
> Key: KAFKA-13936
> URL: https://issues.apache.org/jira/browse/KAFKA-13936
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Prashanth Joseph Babu
>Priority: Major
>
> I have a kafka streams application and I'm trying to monitor the consumer lag 
> via stream metrics.
> Here's some code snippet
> {code:java}
> metrics = streams.metrics();
> lag = 0;
> for (Metric m : metrics.values()) {
> tags = m.metricName().tags();
> if ( m.metricName().name().equals(MONITOR_CONSUMER_LAG) && 
> tags.containsKey(MONTOR_TAG_TOPIC) && 
> tags.get(MONTOR_TAG_TOPIC).equals(inputTopic) ) {
> partitionLag = 
> Float.valueOf(m.metricValue().toString()).floatValue();
> if ( !partitionLag.isNaN() ) {
> lag += partitionLag;
> }
> }
> }
> {code}
> Here MONITOR_CONSUMER_LAG is {{{}records-lag-max{}}}.
> However these numbers dont match with the consumer lag we see in the kafka UI 
> . is records-lag-max the right metric to track for a kafka streams 
> application when the objective is to get consumer lag?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13939) Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer

2022-05-31 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17544645#comment-17544645
 ] 

Matthias J. Sax commented on KAFKA-13939:
-

Thanks for reporting this issue – sound rather severs – I bumped the priority 
to blocker.

As you already have a fix, would you like to open a PR on GitHub?

> Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer
> -
>
> Key: KAFKA-13939
> URL: https://issues.apache.org/jira/browse/KAFKA-13939
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jackson Newhouse
>Priority: Blocker
>
> If `loggingEnabled` is false, the `dirtyKeys` Set is not cleared within 
> `flush()`, see 
> [https://github.com/apache/kafka/blob/3.2/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java#L262.]
>  However, dirtyKeys is still written to in the loop within `evictWhile`. This 
> causes dirtyKeys to continuously grow for the life of the buffer. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13939) Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer

2022-05-31 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13939:

Component/s: streams

> Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer
> -
>
> Key: KAFKA-13939
> URL: https://issues.apache.org/jira/browse/KAFKA-13939
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jackson Newhouse
>Priority: Major
>
> If `loggingEnabled` is false, the `dirtyKeys` Set is not cleared within 
> `flush()`, see 
> [https://github.com/apache/kafka/blob/3.2/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java#L262.]
>  However, dirtyKeys is still written to in the loop within `evictWhile`. This 
> causes dirtyKeys to continuously grow for the life of the buffer. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13939) Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer

2022-05-31 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13939:

Priority: Blocker  (was: Major)

> Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer
> -
>
> Key: KAFKA-13939
> URL: https://issues.apache.org/jira/browse/KAFKA-13939
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jackson Newhouse
>Priority: Blocker
>
> If `loggingEnabled` is false, the `dirtyKeys` Set is not cleared within 
> `flush()`, see 
> [https://github.com/apache/kafka/blob/3.2/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java#L262.]
>  However, dirtyKeys is still written to in the loop within `evictWhile`. This 
> causes dirtyKeys to continuously grow for the life of the buffer. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13936) Invalid consumer lag when monitoring from a kafka streams application

2022-05-25 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17542359#comment-17542359
 ] 

Matthias J. Sax commented on KAFKA-13936:
-

> we see in the kafka UI 

There is no "Kafka UI" – at least not as part of Apache Kafka. If you are using 
some other "external" UI, it's unclear how they compute/display the lag.

Two thories:
 * The don't sum the lag over all partitions but take the max over all 
partitions?
 * They compute the lag based on committed offsets to end offset difference – 
because offsets are committed only every 30 seconds by default, this 
computation does not match what the consumer reports, because the consumer uses 
"current offset" (not committed offset) to compute the lag

I think we need to close this ticket? I don't see any bug in Kafka (ie, 
consumer or Kafka Streams) that we would need to fix?

> Invalid consumer lag when monitoring from a kafka streams application
> -
>
> Key: KAFKA-13936
> URL: https://issues.apache.org/jira/browse/KAFKA-13936
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Prashanth Joseph Babu
>Priority: Major
>
> I have a kafka streams application and I'm trying to monitor the consumer lag 
> via stream metrics.
> Here's some code snippet
> {code:java}
> metrics = streams.metrics();
> lag = 0;
> for (Metric m : metrics.values()) {
> tags = m.metricName().tags();
> if ( m.metricName().name().equals(MONITOR_CONSUMER_LAG) && 
> tags.containsKey(MONTOR_TAG_TOPIC) && 
> tags.get(MONTOR_TAG_TOPIC).equals(inputTopic) ) {
> partitionLag = 
> Float.valueOf(m.metricValue().toString()).floatValue();
> if ( !partitionLag.isNaN() ) {
> lag += partitionLag;
> }
> }
> }
> {code}
> Here MONITOR_CONSUMER_LAG is {{{}records-lag-max{}}}.
> However these numbers dont match with the consumer lag we see in the kafka UI 
> . is records-lag-max the right metric to track for a kafka streams 
> application when the objective is to get consumer lag?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: kafka stream - sliding window - getting unexpected output

2022-05-20 Thread Matthias J. Sax

Not sure atm.

It seems you are printing the timestamp extracted from the payload:


out.setStrTime(TimestampLongToString.getTime(tr.getTimestamp()));


Does this timestamp really map to the window?

You remove the window information so maybe you are looking at the wrong 
data?



.map((Windowed key, OutputPojo out) -> {
   return new KeyValue<>(key.key(),out) ;
 })



For the input: Do you use a custom timestamp extractor and use the 
payload timestamp? If not, does the record timestamp and the payload 
timestamp match?



-Matthias


On 5/18/22 11:32 PM, Shankar Mane wrote:

@Matthias J. Sax / All

Have added below line :


.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))




Here is the output : (for uuid (*2cbef750-325b-4a2f-ac39-b2c23fa0313f)*,
expecting single output but that is not the case here. Which 1 is the final
output from those 2 rows for the same uuid ?

[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=1, sum=10.0,

strTime=2022-05-19 11:48:08.128, uuid=fb6bea5f-8fd0-4c03-8df3-aaf392f04a5a)


[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,

strTime=2022-05-19 11:48:10.328, uuid=b4ab837f-b10a-452d-a663-719215d2992f)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
strTime=2022-05-19 11:48:12.527, uuid=8fa1b621-c967-4770-9f85-9fd84999c97c)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
strTime=2022-05-19 11:48:14.726, uuid=1fc21253-7859-45ef-969e-82ed596c4fa0)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
strTime=2022-05-19 11:48:16.925, uuid=
*2cbef750-325b-4a2f-ac39-b2c23fa0313f)*
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
strTime=2022-05-19 11:48:16.925, uuid=
*2cbef750-325b-4a2f-ac39-b2c23fa0313f*)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
strTime=2022-05-19 11:48:19.124, uuid=8f72454c-ec02-42c1-84e1-bcf262db436f)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
strTime=2022-05-19 11:48:19.124, uuid=8f72454c-ec02-42c1-84e1-bcf262db436f)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
strTime=2022-05-19 11:48:21.322, uuid=e4e73232-dbd4-45b4-aae9-f3bd2c6e54b4)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
strTime=2022-05-19 11:48:21.322, uuid=e4e73232-dbd4-45b4-aae9-f3bd2c6e54b4)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
strTime=2022-05-19 11:48:23.522, uuid=fc49906e-54d0-4e80-b394-3ebaf6e74eaa)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
strTime=2022-05-19 11:48:23.522, uuid=fc49906e-54d0-4e80-b394-3ebaf6e74eaa)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
strTime=2022-05-19 11:48:25.721, uuid=fbe62fa4-e7c4-437f-b976-0bb7ae0c4390)



On Wed, May 18, 2022 at 10:21 PM Matthias J. Sax  wrote:


Emitting intermediate result is by-design.

If you don't want to get intermediate result, you can add `suppress()`
after the aggregation and configure it to only "emit on window close".

-Matthias

On 5/17/22 3:20 AM, Shankar Mane wrote:

Hi All,

Our use case is to use sliding window. (for e.g. at any point, whenever

user performs any actions at time [ t1 ], we would like to see his

activity

in [ t1 - last 24 hours]. Using this, to show the user some

recommendations.




-- I have code ready and it works without any errors.
-- aggregations happen as expected.
-- but the output generated is unexpected. As windows gets slides, i am
getting mixed output which includes intermediate aggregated records also
coming with final aggregated outputs.

Could someone please help me here ?  what can I do here to get ONLY final
aggregated output.


Code snippet :




builder.stream(tempReadingTopic, Consumed.with(stringSerde, inputSerde))
  .filter((k, v) -> v != null)
  .map((k,v) -> KeyValue.pair(v.getUserId(), v))
  //.through("slidingbykey",
Produced.with(Serdes.String(), inputSerde))
  .groupByKey()



.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000),

windowDuration))
  .aggregate(OutputPojo::new, (k, tr, out) -> {
  out.setUserId(tr.getUserId());
  out.setCount(out.getCount() +1);
  out.setSum(out.getSum() + tr.getInt4());
  out.setUuid(tr.getUuid());

out.setStrTime(TimestampLongToString.getTime(tr.getTimestamp()));
  waitForMs(200); //added delay just for analysing

output

  return out;
  }, Materialized.with(stringSerde, outputSerde))
  .suppress(Suppressed.untilTimeLimit(windowDuration,
Suppressed.BufferConfig.unbounded()))
  .toStream(

Re: Are timestamps available for records stored in Kafka Streams state stores?

2022-05-20 Thread Matthias J. Sax
Added. Feel free to update wiki pages with smaller details like this 
directly.



-Matthias

On 5/19/22 2:27 PM, James Cheng wrote:

Thanks Guozhang! Based on your comment, I searched through the repo and found 
the associated pull requests and JIRAs.

It looks like most of the support was added in 
https://issues.apache.org/jira/browse/KAFKA-6521 


Can you add that to the KIP page for KIP-258? It would make it easier for other 
people to find when/where the timestamp support was added.

Thanks!
-James


On May 19, 2022, at 1:24 PM, Guozhang Wang  wrote:

Hi James,

For kv / time-window stores, they have been geared with timestamps and you
can access via the TimestampedKeyValueStore/TimstampedWindowStore.

What's not implemented yet are timestamped session stores.

Guozhang

On Thu, May 19, 2022 at 12:49 PM James Cheng  wrote:


Hi,

I'm trying to see if timestamps are available for records that are stored
in Kafka Streams state stores.

I saw "KIP-258: Allow to Store Record Timestamps in RocksDB"

https://cwiki.apache.org/confluence/display/KAFKA/KIP-258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-258:+Allow+to+Store+Record+Timestamps+in+RocksDB




But I am not sure if it has been fully implemented. The vote thread went
through, but it looks like the implementation is still in progress.
https://issues.apache.org/jira/browse/KAFKA-8382 <
https://issues.apache.org/jira/browse/KAFKA-8382>

The KIP page says "2.3.0 (partially implemented, inactive)"

Can someone share what the current thoughts are, around this KIP?

Thanks!

-James




--
-- Guozhang





[jira] [Commented] (KAFKA-13817) Schedule nextTimeToEmit to system time every time instead of just once

2022-05-20 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17540294#comment-17540294
 ] 

Matthias J. Sax commented on KAFKA-13817:
-

Thanks for the PR! I put it into my review backlog. Might take some time until 
I get to it.

> Schedule nextTimeToEmit to system time every time instead of just once
> --
>
> Key: KAFKA-13817
> URL: https://issues.apache.org/jira/browse/KAFKA-13817
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Assignee: Lim Qing Wei
>Priority: Minor
>  Labels: beginner, newbie
>
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java#L229-L231.]
>  
> If this is just scheduled once, this can trigger emit every time if system 
> time jumps a lot suddenly.
>  
> For example, 
>  # nextTimeToEmit set to 1 and step is 1
>  # If next system time jumps to 100, we will always emit for next 100 records



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13913) Provide builders for KafkaProducer/KafkaConsumer and KafkaStreams

2022-05-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539718#comment-17539718
 ] 

Matthias J. Sax commented on KAFKA-13913:
-

There was some discussion about this in the past (cf KAFKA-3943 and KAFKA-4436).

There was also some push back on this idea. Might be good to revisit those 
arguments. (Not sure if it's contained in the Jira, or PR, or maybe mailing 
list.)

> Provide builders for KafkaProducer/KafkaConsumer and KafkaStreams
> -
>
> Key: KAFKA-13913
> URL: https://issues.apache.org/jira/browse/KAFKA-13913
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Affects Versions: 3.2.0
>Reporter: François Rosière
>Assignee: François Rosière
>Priority: Major
>  Labels: kip
> Fix For: 3.3.0
>
>
> To have more flexibility, builders should be provided for the following 
> objects
>  * KafkaProducer
>  * KafkaConsumer
>  * KafkaStreams 
> These builders will give an easy way to construct these objects using 
> different arguments/combinations without having to add a new constructor 
> every time a new parameter is required.
> They will also allow using already configured dependencies coming from an 
> injection framework such as Spring (see 
> [https://github.com/spring-projects/spring-kafka/issues/2244]).
> From a user point of view, builders would be used as follow
> {noformat}
> KafkaProducer kafkaProducer = new KafkaProducerBuilder MyPojo()  
>   .withKeySerializer()  
>   .withValueSerializer()  
>   .withInterceptors()  
>   .withPartitioner() 
>   .withMetricsReporter()  
>   .build();  
> KafkaConsumer consumer = new KafkaConsumerBuilder MyPojo()  
>   .withKeyDeserializer()  
>   .withValueDeserializer()  
>   .withInterceptors()  
>   .withMetricsReporter()  
>   .build(); 
> KafkaStreams kafkaStreams = new KafkaStreamsBuilder(, 
> )  
>   .withProducerInterceptors()  
>   .withConsumerInterceptors()  
>   .withTime()  .withKafkaClientSupplier()  
>   .withMetricsReporter()  
>   .build();{noformat}
> This KIP can be seen as the continuity of the KIP-832.
> More details can be found in the related KIP 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211884640]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13913) Provide builders for KafkaProducer/KafkaConsumer and KafkaStreams

2022-05-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13913:

Labels: kip  (was: )

> Provide builders for KafkaProducer/KafkaConsumer and KafkaStreams
> -
>
> Key: KAFKA-13913
> URL: https://issues.apache.org/jira/browse/KAFKA-13913
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Affects Versions: 3.2.0
>Reporter: François Rosière
>Assignee: François Rosière
>Priority: Major
>  Labels: kip
> Fix For: 3.3.0
>
>
> To have more flexibility, builders should be provided for the following 
> objects
>  * KafkaProducer
>  * KafkaConsumer
>  * KafkaStreams 
> These builders will give an easy way to construct these objects using 
> different arguments/combinations without having to add a new constructor 
> every time a new parameter is required.
> They will also allow using already configured dependencies coming from an 
> injection framework such as Spring (see 
> [https://github.com/spring-projects/spring-kafka/issues/2244]).
> From a user point of view, builders would be used as follow
> {noformat}
> KafkaProducer kafkaProducer = new KafkaProducerBuilder MyPojo()  
>   .withKeySerializer()  
>   .withValueSerializer()  
>   .withInterceptors()  
>   .withPartitioner() 
>   .withMetricsReporter()  
>   .build();  
> KafkaConsumer consumer = new KafkaConsumerBuilder MyPojo()  
>   .withKeyDeserializer()  
>   .withValueDeserializer()  
>   .withInterceptors()  
>   .withMetricsReporter()  
>   .build(); 
> KafkaStreams kafkaStreams = new KafkaStreamsBuilder(, 
> )  
>   .withProducerInterceptors()  
>   .withConsumerInterceptors()  
>   .withTime()  .withKafkaClientSupplier()  
>   .withMetricsReporter()  
>   .build();{noformat}
> This KIP can be seen as the continuity of the KIP-832.
> More details can be found in the related KIP 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211884640]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13913) Provide builders for KafkaProducer/KafkaConsumer and KafkaStreams

2022-05-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13913:

Component/s: streams

> Provide builders for KafkaProducer/KafkaConsumer and KafkaStreams
> -
>
> Key: KAFKA-13913
> URL: https://issues.apache.org/jira/browse/KAFKA-13913
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Affects Versions: 3.2.0
>Reporter: François Rosière
>Assignee: François Rosière
>Priority: Major
> Fix For: 3.3.0
>
>
> To have more flexibility, builders should be provided for the following 
> objects
>  * KafkaProducer
>  * KafkaConsumer
>  * KafkaStreams 
> These builders will give an easy way to construct these objects using 
> different arguments/combinations without having to add a new constructor 
> every time a new parameter is required.
> They will also allow using already configured dependencies coming from an 
> injection framework such as Spring (see 
> [https://github.com/spring-projects/spring-kafka/issues/2244]).
> From a user point of view, builders would be used as follow
> {noformat}
> KafkaProducer kafkaProducer = new KafkaProducerBuilder MyPojo()  
>   .withKeySerializer()  
>   .withValueSerializer()  
>   .withInterceptors()  
>   .withPartitioner() 
>   .withMetricsReporter()  
>   .build();  
> KafkaConsumer consumer = new KafkaConsumerBuilder MyPojo()  
>   .withKeyDeserializer()  
>   .withValueDeserializer()  
>   .withInterceptors()  
>   .withMetricsReporter()  
>   .build(); 
> KafkaStreams kafkaStreams = new KafkaStreamsBuilder(, 
> )  
>   .withProducerInterceptors()  
>   .withConsumerInterceptors()  
>   .withTime()  .withKafkaClientSupplier()  
>   .withMetricsReporter()  
>   .build();{noformat}
> This KIP can be seen as the continuity of the KIP-832.
> More details can be found in the related KIP 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211884640]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: kafka stream - sliding window - getting unexpected output

2022-05-18 Thread Matthias J. Sax

Emitting intermediate result is by-design.

If you don't want to get intermediate result, you can add `suppress()` 
after the aggregation and configure it to only "emit on window close".


-Matthias

On 5/17/22 3:20 AM, Shankar Mane wrote:

Hi All,

Our use case is to use sliding window. (for e.g. at any point, whenever

user performs any actions at time [ t1 ], we would like to see his activity
in [ t1 - last 24 hours]. Using this, to show the user some recommendations.




-- I have code ready and it works without any errors.
-- aggregations happen as expected.
-- but the output generated is unexpected. As windows gets slides, i am
getting mixed output which includes intermediate aggregated records also
coming with final aggregated outputs.

Could someone please help me here ?  what can I do here to get ONLY final
aggregated output.


Code snippet :




builder.stream(tempReadingTopic, Consumed.with(stringSerde, inputSerde))
 .filter((k, v) -> v != null)
 .map((k,v) -> KeyValue.pair(v.getUserId(), v))
 //.through("slidingbykey",
Produced.with(Serdes.String(), inputSerde))
 .groupByKey()

.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000),
windowDuration))
 .aggregate(OutputPojo::new, (k, tr, out) -> {
 out.setUserId(tr.getUserId());
 out.setCount(out.getCount() +1);
 out.setSum(out.getSum() + tr.getInt4());
 out.setUuid(tr.getUuid());

out.setStrTime(TimestampLongToString.getTime(tr.getTimestamp()));
 waitForMs(200); //added delay just for analysing output
 return out;
 }, Materialized.with(stringSerde, outputSerde))
 .suppress(Suppressed.untilTimeLimit(windowDuration,
Suppressed.BufferConfig.unbounded()))
 .toStream()
 .map((Windowed key, OutputPojo out) -> {
 return new KeyValue<>(key.key(),out) ;
 })
 .print(Printed.toSysOut());
//.to(aveTempOutputTopic, Produced.with(stringSerde,
outputSerde))
 ;






Input data :

for i in {1..10}; do sleep 1s;python3 del.py 1001 10;sleep 1s; done

{'userId': '1001', 'timestamp': 1652781716234, 'int4': 10, 'uuid':
'64f019ee-9cf4-427d-b4c9-f2b5f88820e1'}
{'userId': '1001', 'timestamp': 1652781718436, 'int4': 10, 'uuid':
'cf173b3e-c34f-470a-ba15-ef648d0be8b9'}
{'userId': '1001', 'timestamp': 1652781720634, 'int4': 10, 'uuid':
'48d2b4ea-052d-42fa-a998-0216d928c034'}
{'userId': '1001', 'timestamp': 1652781722832, 'int4': 10, 'uuid':
'55a6c26c-3d2c-46f1-ab3c-04927f660cbe'}
{'userId': '1001', 'timestamp': 1652781725029, 'int4': 10, 'uuid':
'dbfd8cee-565d-496b-b5a8-773ae64bc518'}
{'userId': '1001', 'timestamp': 1652781727227, 'int4': 10, 'uuid':
'135dc5cd-50cb-467b-9e63-300fdeedaf75'}
{'userId': '1001', 'timestamp': 1652781729425, 'int4': 10, 'uuid':
'66d8e3c7-8f63-43ca-acf1-e39619bf33a0'}
{'userId': '1001', 'timestamp': 1652781731623, 'int4': 10, 'uuid':
'f037712b-42a5-4449-bcc2-cf6eafddf5ad'}
{'userId': '1001', 'timestamp': 1652781733820, 'int4': 10, 'uuid':
'7baa4254-b9da-43dc-bbb7-4caede578aeb'}
{'userId': '1001', 'timestamp': 1652781736018, 'int4': 10, 'uuid':
'16541989-f3ba-49f6-bd31-bf8a75ba8eac'}






Output (*Unexpected*) :  below output is captured at each sliding window of
1s duration   (but input data is published at 2s of interval) :

[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,

strTime=2022-05-17 15:31:28.263,
uuid=966277fd-3bdc-45c2-aa6a-c9c0dbe89c34)  > seems older UUID
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=1, sum=10.0,
strTime=2022-05-17 15:31:28.263, uuid=966277fd-3bdc-45c2-aa6a-c9c0dbe89c34)

[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=1, sum=10.0,
strTime=2022-05-17 15:31:56.234, uuid=64f019ee-9cf4-427d-b4c9-f2b5f88820e1)

[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
strTime=2022-05-17 15:31:58.436, uuid=cf173b3e-c34f-470a-ba15-ef648d0be8b9)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
strTime=2022-05-17 15:32:00.634, uuid=48d2b4ea-052d-42fa-a998-0216d928c034)

[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
strTime=2022-05-17 15:32:00.634, uuid=48d2b4ea-052d-42fa-a998-0216d928c034)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
strTime=2022-05-17 15:32:02.832, uuid=55a6c26c-3d2c-46f1-ab3c-04927f660cbe)

[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
strTime=2022-05-17 15:32:02.832, uuid=55a6c26c-3d2c-46f1-ab3c-04927f660cbe)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, 

[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2022-05-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17537860#comment-17537860
 ] 

Matthias J. Sax commented on KAFKA-6520:


It's still an open item – and it's complex (for details, see the KIP discussion 
thread).

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Assignee: Vince Mu
>Priority: Major
>  Labels: newbie, user-experience
>
> KIP WIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams]
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
>  See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  This is a link to a related 
> issue.
> -
> Update: there are some discussions on the PR itself which leads me to think 
> that a more general solution should be at the ClusterConnectionStates rather 
> than at the Streams or even Consumer level. One proposal would be:
>  * Add a new metric named `failedConnection` in SelectorMetrics which is 
> recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the 
> IOException / RuntimeException which indicates the connection disconnected.
>  * And then users of Consumer / Streams can monitor on this metric, which 
> normally will only have close to zero values as we have transient 
> disconnects, if it is spiking it means the brokers are consistently being 
> unavailable indicting the state.
> [~Yohan123] WDYT?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13817) Schedule nextTimeToEmit to system time every time instead of just once

2022-05-13 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17536926#comment-17536926
 ] 

Matthias J. Sax commented on KAFKA-13817:
-

> as it will eventually self-correct and continue to throttle, I believe how 
> fast it self-correct depends on the magnitude of clock-drift and the 
> {color:#871094}emitIntervalMs value.{color}

That's exactly the point. We want to avoid that we only throttle eventually for 
this case, but keep throttling right away. Thus, instead of just computing 
"next = next + X" we want to compute "next = now + X" to quickly fast forward 
in case we missed an interval.

We do similar thing in the windowed aggregation: 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java#L260-L269]

> Schedule nextTimeToEmit to system time every time instead of just once
> --
>
> Key: KAFKA-13817
> URL: https://issues.apache.org/jira/browse/KAFKA-13817
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Assignee: Lim Qing Wei
>Priority: Minor
>  Labels: beginner, newbie
>
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java#L229-L231.]
>  
> If this is just scheduled once, this can trigger emit every time if system 
> time jumps a lot suddenly.
>  
> For example, 
>  # nextTimeToEmit set to 1 and step is 1
>  # If next system time jumps to 100, we will always emit for next 100 records



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (SUREFIRE-2076) BufferOverflowException when encoding message with null runMode

2022-05-11 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/SUREFIRE-2076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17535072#comment-17535072
 ] 

Matthias J. Sax commented on SUREFIRE-2076:
---

[~tibordigana] [~zoltan.meze] – Did you see my last comment? Is there some 
workaround or do we need to stick with `-M5` until `-M7` is released?

> BufferOverflowException when encoding message with null runMode
> ---
>
> Key: SUREFIRE-2076
> URL: https://issues.apache.org/jira/browse/SUREFIRE-2076
> Project: Maven Surefire
>  Issue Type: Bug
>  Components: Maven Surefire Plugin
>Affects Versions: 3.0.0-M6
>Reporter: Zoltan Meze
>Assignee: Tibor Digana
>Priority: Major
> Fix For: 3.0.0-M7
>
>
> Per 
> [#issuecomment-1099231382|https://github.com/apache/maven-surefire/pull/518#issuecomment-1099231382],
>  
> [#issuecomment-1099706229|https://github.com/apache/maven-surefire/pull/518#issuecomment-1099706229],
>  
> [#pullrequestreview-951134938|https://github.com/apache/maven-surefire/pull/518#pullrequestreview-951134938]
>  and 
> [#issuecomment-1108371215|https://github.com/apache/maven-surefire/pull/518#issuecomment-1108371215]
> RunMode can be null causing BufferOverflowException when encoding message.
> Related to similar issue with null testIds: 
> [SUREFIRE-2056|https://issues.apache.org/jira/browse/SUREFIRE-2056]
> [*AbstractStreamEncoder#encodeHeader*|https://github.com/apache/maven-surefire/blob/959c1e9cabb8d06c72f5ebd7eb6e56e9987eccf8/surefire-api/src/main/java/org/apache/maven/surefire/api/stream/AbstractStreamEncoder.java#L86]
>  stores runMode in at least 3 bytes:
> * 1-byte length
> * 1-byte delimiter
> * length-bytes runMode
> * 1-byte delimiter 
> In case of null runMode the encoded part becomes *0::* (exactly 3 bytes 
> length)
> The issue is that 
> [*AbstractStreamEncoder#estimateBufferLength*|https://github.com/apache/maven-surefire/blob/959c1e9cabb8d06c72f5ebd7eb6e56e9987eccf8/surefire-api/src/main/java/org/apache/maven/surefire/api/stream/AbstractStreamEncoder.java#L184]
>  is not expecting/couting any bytes for runMode part in case of null runMode.
> This results in in BufferOverflowException becase the byte size of the 
> message is underestimated.
> Exception thrown:
> {code:java}
> java.nio.BufferOverflowException
>   at java.nio.Buffer.nextPutIndex(Buffer.java:547)
>   at java.nio.HeapByteBuffer.put(HeapByteBuffer.java:172)
>   at 
> org.apache.maven.surefire.api.stream.AbstractStreamEncoder.encodeString(AbstractStreamEncoder.java:127)
>   at 
> org.apache.maven.surefire.api.stream.AbstractStreamEncoder.encodeStringData(AbstractStreamEncoder.java:171)
>   at 
> org.apache.maven.surefire.api.stream.AbstractStreamEncoder.encode(AbstractStreamEncoder.java:157)
>   at 
> org.apache.maven.surefire.booter.spi.EventChannelEncoder.encodeMessage(EventChannelEncoder.java:398)
>   at 
> org.apache.maven.surefire.booter.spi.EventChannelEncoder.setOutErr(EventChannelEncoder.java:188)
>   at 
> org.apache.maven.surefire.booter.spi.EventChannelEncoder.testOutput(EventChannelEncoder.java:183)
>   at 
> org.apache.maven.surefire.api.booter.ForkingRunListener.writeTestOutput(ForkingRunListener.java:113)
>   at 
> org.apache.maven.surefire.api.booter.ForkingRunListener.writeTestOutput(ForkingRunListener.java:44)
>   at 
> org.apache.maven.surefire.common.junit4.JUnit4RunListener.writeTestOutput(JUnit4RunListener.java:235)
>   at 
> org.apache.maven.surefire.api.report.ConsoleOutputCapture$ForwardingPrintStream.println(ConsoleOutputCapture.java:144)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-7527) Enable Dependency Injection for Kafka Streams handlers (KIP-378)

2022-05-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7527.

Resolution: Fixed

> Enable Dependency Injection for Kafka Streams handlers (KIP-378)
> 
>
> Key: KAFKA-7527
> URL: https://issues.apache.org/jira/browse/KAFKA-7527
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Wladimir Schmidt
>Priority: Minor
>  Labels: kip, usability
>
> Implement solution proposed in the KIP-378 (Enable Dependency Injection for 
> Kafka Streams handlers).
> Link to 
> [KIP-378|https://cwiki.apache.org/confluence/display/KAFKA/KIP-378:+Enable+Dependency+Injection+for+Kafka+Streams+handlers]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-7527) Enable Dependency Injection for Kafka Streams handlers (KIP-378)

2022-05-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7527.

Resolution: Fixed

> Enable Dependency Injection for Kafka Streams handlers (KIP-378)
> 
>
> Key: KAFKA-7527
> URL: https://issues.apache.org/jira/browse/KAFKA-7527
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Wladimir Schmidt
>Priority: Minor
>  Labels: kip, usability
>
> Implement solution proposed in the KIP-378 (Enable Dependency Injection for 
> Kafka Streams handlers).
> Link to 
> [KIP-378|https://cwiki.apache.org/confluence/display/KAFKA/KIP-378:+Enable+Dependency+Injection+for+Kafka+Streams+handlers]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [VOTE] KIP-834: Pause / Resume KafkaStreams Topologies

2022-05-10 Thread Matthias J. Sax
I had one minor question on the discuss thread. It's mainly about 
clarifying and document the user contract. I am fine either way.


+1 (binding)


-Matthias

On 5/10/22 12:32 PM, Sophie Blee-Goldman wrote:

Thanks for the KIP! +1 (binding)

On Tue, May 10, 2022, 12:24 PM Bruno Cadonna  wrote:


Thanks Jim,

+1 (binding)

Best,
Bruno

On 10.05.22 21:19, John Roesler wrote:

Thanks Jim,

I’m +1 (binding)

-John

On Tue, May 10, 2022, at 14:05, Jim Hughes wrote:

Hi all,

I'm asking for a vote on KIP-834:


https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832


Thanks in advance!

Jim






Re: [DISCUSS] KIP-834: Pause / Resume KafkaStreams Topologies

2022-05-10 Thread Matthias J. Sax
ut that. I hope you don't find

my

long message offputting. I'm fundamentally in favor of your KIP,
and I think with a little more explanation in the KIP, and a few
small tweaks to the proposal, we'll be able to provide good
ergonomics to our users.



Thanks!

Jim



Thanks,
-John

On Sat, May 7, 2022, at 00:06, Guozhang Wang wrote:

I'm in favor of the "just pausing the instance itself“ option

as

well. As

for EOS, the point is that when the processing is paused, we

would

not

trigger any `producer.send` during the time, and the

transaction

timeout

is

sort of relying on that behavior, so my point was that it's

probably

better

to also commit the processing before we pause it.


Guozhang

On Fri, May 6, 2022 at 6:12 PM Jim Hughes



wrote:


Hi Matthias,

Since the only thing which will be paused is processing the

topology, I

think we can let commits happen naturally.

Good point about getting the paused state to new members; it

is

seeming

like the "building block" approach is a good one to keep

things

simple

at

first.

Cheers,

Jim

On Fri, May 6, 2022 at 8:31 PM Matthias J. Sax <

mj...@apache.org



wrote:



I think it's tricky to propagate a pauseAll() via the

rebalance

protocol. New members joining the group would need to get

paused,

too?

Could there be weird race conditions with overlapping

pauseAll()

and

resumeAll() calls on different instanced while there could

be a

errors /

network partitions or similar?

I would argue that similar to IQ, we provide the basic

building

blocks,

and leave it the user users to implement cross instance

management

for a

pauseAll() scenario. -- Also, if there is really demand, we

can

always

add pauseAll()/resumeAll() as follow up work.

About named typologies: I agree to Jim to not include them

in

this

KIP

as they are not a public feature yet. If we make named

typologies

public, the corresponding KIP should extend the pause/resume

feature

(ie, APIs) accordingly. Of course, the code can (and should)

already

be

setup to support it to be future proof.

Good call out about commit and EOS -- to simplify it, I

think

it

might

be good to commit also for the at-least-once case?


-Matthias


On 5/6/22 1:05 PM, Jim Hughes wrote:

Hi Bill,

Great questions; I'll do my best to reply inline:

On Fri, May 6, 2022 at 3:21 PM Bill Bejeck <

bbej...@gmail.com>

wrote:



Hi Jim,

Thanks for the KIP.  I have a couple of meta-questions as

well:


1) Regarding pausing only a subset of running instances,

I'm

thinking

there

may be a use case for pausing all of them.
  Would it make sense to also allow for pausing all

instances

by

adding a

method `pauseAll()` or something similar?



Honestly, I'm indifferent on this point.  Presently, I

think

what I

have

proposed is the minimal change to get the ability to pause

and

resume

processing.  If adding a 'pauseAll()' is required, I'd be

happy

to

do

that!


   From Guozhang's email, it sounds like this would require

using

the

rebalance protocol to trigger the coordination.  Would

there

be

enough

room

in that approach to indicate that a named topology is to

be

paused

across

all nodes?



2) Would pausing affect standby tasks?  For example,

imagine

there

are 3

instances A, B, and C.
  A user elects to pause instance C only but it hosts

the

standby

tasks

for A.
  Would the standby tasks on the paused application

continue

to

read

from

the changelog topic?



Yes, standby tasks would continue reading from the

changelog

topic.

All

consumers would continue reading to avoid getting dropped

from

their

consumer groups.

Cheers,

Jim





Thanks!
Bill


On Fri, May 6, 2022 at 2:44 PM Jim Hughes




wrote:


Hi Guozhang,

Thanks for the feedback; responses inline below:

On Fri, May 6, 2022 at 1:09 PM Guozhang Wang <

wangg...@gmail.com>

wrote:



Hello Jim,

Thanks for the proposed KIP. I have some meta questions

about

it:


1) Would an instance always pause/resume all of its

current

owned

topologies (i.e. the named topologies), or are there

any

scenarios

where

we

only want to pause/resume a subset of them?



An instance may wish to pause some of its named

topologies.

I

was

unsure

what to say about named topologies in the KIP since they

seem

to

be

an

internal detail at the moment.

I intend to add to KafkaStreamsNamedTopologyWrapper

methods

like:

   public void pauseNamedTopology(final String

topologyToPause)

   public boolean isNamedTopologyPaused(final String

topology)

   public void resumeNamedTopology(final String

topologyToResume)





2) From a user's perspective, do we want to always

issue a

`pause/resume`

to all the instances or not? For example, we can define

the

semantics

of

the function as "you only need to call this function on

any

of

the

application's instances, and all instances would then

pause

(via

the

rebalance error codes)", or as "

Re: [DISCUSS] KIP-834: Pause / Resume KafkaStreams Topologies

2022-05-06 Thread Matthias J. Sax
I think it's tricky to propagate a pauseAll() via the rebalance 
protocol. New members joining the group would need to get paused, too? 
Could there be weird race conditions with overlapping pauseAll() and 
resumeAll() calls on different instanced while there could be a errors / 
network partitions or similar?


I would argue that similar to IQ, we provide the basic building blocks, 
and leave it the user users to implement cross instance management for a 
pauseAll() scenario. -- Also, if there is really demand, we can always 
add pauseAll()/resumeAll() as follow up work.


About named typologies: I agree to Jim to not include them in this KIP 
as they are not a public feature yet. If we make named typologies 
public, the corresponding KIP should extend the pause/resume feature 
(ie, APIs) accordingly. Of course, the code can (and should) already be 
setup to support it to be future proof.


Good call out about commit and EOS -- to simplify it, I think it might 
be good to commit also for the at-least-once case?



-Matthias


On 5/6/22 1:05 PM, Jim Hughes wrote:

Hi Bill,

Great questions; I'll do my best to reply inline:

On Fri, May 6, 2022 at 3:21 PM Bill Bejeck  wrote:


Hi Jim,

Thanks for the KIP.  I have a couple of meta-questions as well:

1) Regarding pausing only a subset of running instances, I'm thinking there
may be a use case for pausing all of them.
Would it make sense to also allow for pausing all instances by adding a
method `pauseAll()` or something similar?



Honestly, I'm indifferent on this point.  Presently, I think what I have
proposed is the minimal change to get the ability to pause and resume
processing.  If adding a 'pauseAll()' is required, I'd be happy to do that!

 From Guozhang's email, it sounds like this would require using the
rebalance protocol to trigger the coordination.  Would there be enough room
in that approach to indicate that a named topology is to be paused across
all nodes?



2) Would pausing affect standby tasks?  For example, imagine there are 3
instances A, B, and C.
A user elects to pause instance C only but it hosts the standby tasks
for A.
Would the standby tasks on the paused application continue to read from
the changelog topic?



Yes, standby tasks would continue reading from the changelog topic.  All
consumers would continue reading to avoid getting dropped from their
consumer groups.

Cheers,

Jim





Thanks!
Bill


On Fri, May 6, 2022 at 2:44 PM Jim Hughes 
wrote:


Hi Guozhang,

Thanks for the feedback; responses inline below:

On Fri, May 6, 2022 at 1:09 PM Guozhang Wang  wrote:


Hello Jim,

Thanks for the proposed KIP. I have some meta questions about it:

1) Would an instance always pause/resume all of its current owned
topologies (i.e. the named topologies), or are there any scenarios

where

we

only want to pause/resume a subset of them?



An instance may wish to pause some of its named topologies.  I was unsure
what to say about named topologies in the KIP since they seem to be an
internal detail at the moment.

I intend to add to KafkaStreamsNamedTopologyWrapper methods like:
 public void pauseNamedTopology(final String topologyToPause)
 public boolean isNamedTopologyPaused(final String topology)
 public void resumeNamedTopology(final String topologyToResume)




2) From a user's perspective, do we want to always issue a

`pause/resume`

to all the instances or not? For example, we can define the semantics

of

the function as "you only need to call this function on any of the
application's instances, and all instances would then pause (via the
rebalance error codes)", or as "you would call this function for all

the

instances of an application". Which one are you referring to?



My initial intent is that one would call this function on any instances

of

the application that one wishes to pause.  This should allow more control
(in case one wanted to pause a portion of the instances).  On the other
hand, this approach would put more work on the implementer to coordinate
calling pause or resume across instances.

If the other option is more suitable, happy to do that instead.



3) With EOS, there's a transaction timeout which would determine how

long a

transaction can stay idle before it's force-aborted on the broker

side. I

think when a pause is issued, that means we'd need to immediately

commit

the current transaction for EOS since we do not know how long we could
pause for. Is that right? If yes could you please clarify that in the

doc

as well.



Good point.  My intent is for pause() to wait for the next iteration
through `runOnce()` and then only skip over the processing for paused

tasks

in `taskManager.process(numIterations, time)`.

Do commits live inside that call or do they live across/outside of it?

In

the former case, I think there shouldn't be any issues with EOS.
Otherwise, we may need to work through some details to get EOS right.

Once we figure that out, I can update the KIP.


[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key

2022-05-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17532427#comment-17532427
 ] 

Matthias J. Sax commented on KAFKA-8769:


Just a "random" comment, based on some other discussion I just had: if we do 
track stream time per key, it could be an issue if a key "goes away" – 
stream-time for this key would stop to advance, and thus we could not close the 
corresponding window (for windowed aggregations and left/outer stream-stream 
join).

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13873) Add ability to Pause / Resume KafkaStreams Topologies

2022-05-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13873:

Description: 
In order to reduce resources used or modify data pipelines, users may want to 
pause processing temporarily.  Presently, this would require stopping the 
entire KafkaStreams instance (or instances).  

This work would add the ability to pause and resume topologies.  When the need 
to pause processing has passed, then users should be able to resume processing.

KIP-834: 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832]

  was:
In order to reduce resources used or modify data pipelines, users may want to 
pause processing temporarily.  Presently, this would require stopping the 
entire KafkaStreams instance (or instances).  

This work would add the ability to pause and resume topologies.  When the need 
to pause processing has passed, then users should be able to resume processing.


> Add ability to Pause / Resume KafkaStreams Topologies
> -
>
> Key: KAFKA-13873
> URL: https://issues.apache.org/jira/browse/KAFKA-13873
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Major
>  Labels: kip
>
> In order to reduce resources used or modify data pipelines, users may want to 
> pause processing temporarily.  Presently, this would require stopping the 
> entire KafkaStreams instance (or instances).  
> This work would add the ability to pause and resume topologies.  When the 
> need to pause processing has passed, then users should be able to resume 
> processing.
> KIP-834: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13873) Add ability to Pause / Resume KafkaStreams Topologies

2022-05-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13873:

Component/s: streams

> Add ability to Pause / Resume KafkaStreams Topologies
> -
>
> Key: KAFKA-13873
> URL: https://issues.apache.org/jira/browse/KAFKA-13873
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Major
>
> In order to reduce resources used or modify data pipelines, users may want to 
> pause processing temporarily.  Presently, this would require stopping the 
> entire KafkaStreams instance (or instances).  
> This work would add the ability to pause and resume topologies.  When the 
> need to pause processing has passed, then users should be able to resume 
> processing.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13873) Add ability to Pause / Resume KafkaStreams Topologies

2022-05-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13873:

Labels: kip  (was: )

> Add ability to Pause / Resume KafkaStreams Topologies
> -
>
> Key: KAFKA-13873
> URL: https://issues.apache.org/jira/browse/KAFKA-13873
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Major
>  Labels: kip
>
> In order to reduce resources used or modify data pipelines, users may want to 
> pause processing temporarily.  Presently, this would require stopping the 
> entire KafkaStreams instance (or instances).  
> This work would add the ability to pause and resume topologies.  When the 
> need to pause processing has passed, then users should be able to resume 
> processing.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13870) support both Suppressed untilTimeLimit and maxBytes without using emitEarlyWhenFull()

2022-05-04 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13870:

Labels: needs-kip  (was: )

> support both Suppressed untilTimeLimit and maxBytes without using 
> emitEarlyWhenFull()
> -
>
> Key: KAFKA-13870
> URL: https://issues.apache.org/jira/browse/KAFKA-13870
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Anil
>Priority: Major
>  Labels: needs-kip
>
> My use case is to use  ** *untilTimeLimit* with *maxBytes,* but when the 
> buffer is full, the application is breaking, but with using 
> *{{emitEarlyWhenFull}}* {{{}application is not breaking but{}}}{*}{{}}{*} it 
> sends out the same key record multiple times in a particular window when the 
> buffer exceeds max bytes 
> for eg:-
> *Suppressed.untilTimeLimit(Duration.ofMinutes(15),Suppressed.BufferConfig.maxBytes(1).emitEarlyWhenFull())*
>  
> messages flow : (A,1) (A,2) (A,3) -> aggregation result : (A,6) . suppose 
> here, the buffer is full, (A,6) will be sent downstream. Let's suppose (A,4) 
> comes now in the same tumbling window.
>  
> current response:- the aggregation will continue and eventually *(A,10)* will 
> be emitted
>  
> but our application expected *(A,4),*  so the request for the feature is that 
> window should be happening with window time(untilTimeLimit) or 
> Buffer(maxByte) should full, in either of these two conditions met, a new 
> window should be created and data should be emitted 
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13870) support both Suppressed untilTimeLimit and maxBytes without using emitEarlyWhenFull()

2022-05-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17531836#comment-17531836
 ] 

Matthias J. Sax commented on KAFKA-13870:
-

I think you mix up two concepts: windowing is about "grouping" records 
according to their timestamp. Thus, the _window-size_ you define via 
`TimeWindows.withSizeAndGrace()` (or similar) defines into which window a 
record falls into.

On the other hand suppression has nothing to do with the definition of window 
bounds, but it's about when to emit (potentially partial) results for a window 
– however, if a partial result is emitted, the window is not closed.

It seems your request is more about a window definition rather than suppress, 
and you want some kind of "count based" window? It might be possible to add, 
but only if your requirement are clear. Note that the suppress buffer size has 
nothing to do with the window definition. The buffer size basically define how 
many windows the buffer can hold before it need to emit a partial result and 
drop a window from the buffer.

Thus, it's not clear to me what semantics you really need?

For now, you could still build it manually using the Processor API.

> support both Suppressed untilTimeLimit and maxBytes without using 
> emitEarlyWhenFull()
> -
>
> Key: KAFKA-13870
> URL: https://issues.apache.org/jira/browse/KAFKA-13870
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Anil
>Priority: Major
>
> My use case is to use  ** *untilTimeLimit* with *maxBytes,* but when the 
> buffer is full, the application is breaking, but with using 
> *{{emitEarlyWhenFull}}* {{{}application is not breaking but{}}}{*}{{}}{*} it 
> sends out the same key record multiple times in a particular window when the 
> buffer exceeds max bytes 
> for eg:-
> *Suppressed.untilTimeLimit(Duration.ofMinutes(15),Suppressed.BufferConfig.maxBytes(1).emitEarlyWhenFull())*
>  
> messages flow : (A,1) (A,2) (A,3) -> aggregation result : (A,6) . suppose 
> here, the buffer is full, (A,6) will be sent downstream. Let's suppose (A,4) 
> comes now in the same tumbling window.
>  
> current response:- the aggregation will continue and eventually *(A,10)* will 
> be emitted
>  
> but our application expected *(A,4),*  so the request for the feature is that 
> window should be happening with window time(untilTimeLimit) or 
> Buffer(maxByte) should full, in either of these two conditions met, a new 
> window should be created and data should be emitted 
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13813) left/outer joins can wait indefinitely for emitted record with spurious record fix

2022-05-03 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13813:

Summary: left/outer joins can wait indefinitely for emitted record with 
spurious record fix  (was: left/outer joins can wait indefinitely for emitted 
record with suprious record fix)

> left/outer joins can wait indefinitely for emitted record with spurious 
> record fix
> --
>
> Key: KAFKA-13813
> URL: https://issues.apache.org/jira/browse/KAFKA-13813
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Clive Cox
>Priority: Major
>
> With the fix in https://issues.apache.org/jira/browse/KAFKA-10847 records 
> will be emitted after the grace period but only when a new record is 
> processed. This means its possible to wait for arbitrary long time for a 
> record to be emitted.
> This also means one can not recreate the previous semantics of emitting 
> immediately records or even now guaranteed emitting after the grace period.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13866) Support more advanced time retention policies

2022-05-02 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-13866:
---

 Summary: Support more advanced time retention policies
 Key: KAFKA-13866
 URL: https://issues.apache.org/jira/browse/KAFKA-13866
 Project: Kafka
  Issue Type: Improvement
  Components: config, core, log cleaner
Reporter: Matthias J. Sax


Time-based retention policy compares the record timestamp to broker wall-clock 
time. Those semantics are questionable and also lead to issues for data 
reprocessing: If one want to re-process older data then retention time, it's 
not possible as broker expire those record aggressively and user need to 
increate the retention time accordingly.

Especially for Kafka Stream, we have seen many cases when users got bit by the 
current behavior.

It would be best, if Kafka would track _two_ timestamps per record: the record 
event-time (as the broker do currently), plus the log append-time (which is 
only tracked currently if the topic is configured with "append-time" tracking, 
but the issue is, that it overwrite the producer provided record event-time).

Tracking both timestamps would allow to set a pure wall-clock time retention 
time plus a pure event-time retention time policy:
 * Wall-clock time: keep (at least) the date X days after writing
 * Event-time: keep (at max) the X days worth of event-time data

Comparing wall-clock time to wall-clock time and event-time to event-time 
provides much cleaner semantics. The idea is to combine both policies and only 
expire data if both policies trigger.

For the event-time policy, the broker would need to track "stream time" as max 
event-timestamp it has see per partition (similar to how Kafka Streams is 
tracking "stream time" client side).

Note the difference between "at least" and "at max" above: for the 
data-reprocessing case, the max-based event-time policy avoids that the broker 
would keep a huge history for the reprocessing case.

It would be part of a KIP discussion on the details how wall-clock/event-time 
and mix/max policies could be combined. For example, it might also be useful to 
have the following policy: keep at least X days worth of event-time history no 
matter how long the data is already stored (ie, there would only be an 
event-time base expiration but not wall-clock time). It could also be combined 
with a wall-clock time expiration: delete data only after it's at least X days 
old and stored for at least Y days.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13866) Support more advanced time retention policies

2022-05-02 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-13866:
---

 Summary: Support more advanced time retention policies
 Key: KAFKA-13866
 URL: https://issues.apache.org/jira/browse/KAFKA-13866
 Project: Kafka
  Issue Type: Improvement
  Components: config, core, log cleaner
Reporter: Matthias J. Sax


Time-based retention policy compares the record timestamp to broker wall-clock 
time. Those semantics are questionable and also lead to issues for data 
reprocessing: If one want to re-process older data then retention time, it's 
not possible as broker expire those record aggressively and user need to 
increate the retention time accordingly.

Especially for Kafka Stream, we have seen many cases when users got bit by the 
current behavior.

It would be best, if Kafka would track _two_ timestamps per record: the record 
event-time (as the broker do currently), plus the log append-time (which is 
only tracked currently if the topic is configured with "append-time" tracking, 
but the issue is, that it overwrite the producer provided record event-time).

Tracking both timestamps would allow to set a pure wall-clock time retention 
time plus a pure event-time retention time policy:
 * Wall-clock time: keep (at least) the date X days after writing
 * Event-time: keep (at max) the X days worth of event-time data

Comparing wall-clock time to wall-clock time and event-time to event-time 
provides much cleaner semantics. The idea is to combine both policies and only 
expire data if both policies trigger.

For the event-time policy, the broker would need to track "stream time" as max 
event-timestamp it has see per partition (similar to how Kafka Streams is 
tracking "stream time" client side).

Note the difference between "at least" and "at max" above: for the 
data-reprocessing case, the max-based event-time policy avoids that the broker 
would keep a huge history for the reprocessing case.

It would be part of a KIP discussion on the details how wall-clock/event-time 
and mix/max policies could be combined. For example, it might also be useful to 
have the following policy: keep at least X days worth of event-time history no 
matter how long the data is already stored (ie, there would only be an 
event-time base expiration but not wall-clock time). It could also be combined 
with a wall-clock time expiration: delete data only after it's at least X days 
old and stored for at least Y days.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (SUREFIRE-2076) BufferOverflowException when encoding message with null runMode

2022-04-28 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/SUREFIRE-2076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17529670#comment-17529670
 ] 

Matthias J. Sax commented on SUREFIRE-2076:
---

Seems we are hitting this issue trying to upgrade from `-M5` to `-M6` (cf 
[https://github.com/confluentinc/ksql/issues/9060)]

Is there any workaround/config we can set to avoid this issue?

> BufferOverflowException when encoding message with null runMode
> ---
>
> Key: SUREFIRE-2076
> URL: https://issues.apache.org/jira/browse/SUREFIRE-2076
> Project: Maven Surefire
>  Issue Type: Bug
>  Components: Maven Surefire Plugin
>Affects Versions: 3.0.0-M6
>Reporter: Zoltan Meze
>Assignee: Tibor Digana
>Priority: Major
> Fix For: 3.0.0-M7
>
>
> Per 
> [#issuecomment-1099231382|https://github.com/apache/maven-surefire/pull/518#issuecomment-1099231382],
>  
> [#issuecomment-1099706229|https://github.com/apache/maven-surefire/pull/518#issuecomment-1099706229],
>  
> [#pullrequestreview-951134938|https://github.com/apache/maven-surefire/pull/518#pullrequestreview-951134938]
>  and 
> [#issuecomment-1108371215|https://github.com/apache/maven-surefire/pull/518#issuecomment-1108371215]
> RunMode can be null causing BufferOverflowException when encoding message.
> Related to similar issue with null testIds: 
> [SUREFIRE-2056|https://issues.apache.org/jira/browse/SUREFIRE-2056]
> [*AbstractStreamEncoder#encodeHeader*|https://github.com/apache/maven-surefire/blob/959c1e9cabb8d06c72f5ebd7eb6e56e9987eccf8/surefire-api/src/main/java/org/apache/maven/surefire/api/stream/AbstractStreamEncoder.java#L86]
>  stores runMode in at least 3 bytes:
> * 1-byte length
> * 1-byte delimiter
> * length-bytes runMode
> * 1-byte delimiter 
> In case of null runMode the encoded part becomes *0::* (exactly 3 bytes 
> length)
> The issue is that 
> [*AbstractStreamEncoder#estimateBufferLength*|https://github.com/apache/maven-surefire/blob/959c1e9cabb8d06c72f5ebd7eb6e56e9987eccf8/surefire-api/src/main/java/org/apache/maven/surefire/api/stream/AbstractStreamEncoder.java#L184]
>  is not expecting/couting any bytes for runMode part in case of null runMode.
> This results in in BufferOverflowException becase the byte size of the 
> message is underestimated.
> Exception thrown:
> {code:java}
> java.nio.BufferOverflowException
>   at java.nio.Buffer.nextPutIndex(Buffer.java:547)
>   at java.nio.HeapByteBuffer.put(HeapByteBuffer.java:172)
>   at 
> org.apache.maven.surefire.api.stream.AbstractStreamEncoder.encodeString(AbstractStreamEncoder.java:127)
>   at 
> org.apache.maven.surefire.api.stream.AbstractStreamEncoder.encodeStringData(AbstractStreamEncoder.java:171)
>   at 
> org.apache.maven.surefire.api.stream.AbstractStreamEncoder.encode(AbstractStreamEncoder.java:157)
>   at 
> org.apache.maven.surefire.booter.spi.EventChannelEncoder.encodeMessage(EventChannelEncoder.java:398)
>   at 
> org.apache.maven.surefire.booter.spi.EventChannelEncoder.setOutErr(EventChannelEncoder.java:188)
>   at 
> org.apache.maven.surefire.booter.spi.EventChannelEncoder.testOutput(EventChannelEncoder.java:183)
>   at 
> org.apache.maven.surefire.api.booter.ForkingRunListener.writeTestOutput(ForkingRunListener.java:113)
>   at 
> org.apache.maven.surefire.api.booter.ForkingRunListener.writeTestOutput(ForkingRunListener.java:44)
>   at 
> org.apache.maven.surefire.common.junit4.JUnit4RunListener.writeTestOutput(JUnit4RunListener.java:235)
>   at 
> org.apache.maven.surefire.api.report.ConsoleOutputCapture$ForwardingPrintStream.println(ConsoleOutputCapture.java:144)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2022-04-27 Thread Matthias J. Sax
Let's wait a couple of days to give Ivan a chance to reply. If he does 
not reply, feel free to pick it up.



-Matthias

On 4/26/22 3:58 AM, Levani Kokhreidze wrote:

Hi,

Sorry, maybe I am jumping the gun here, but if by any chance this KIP becomes 
dormant, I'd be interested in picking it up.

Levani


On 23. Apr 2022, at 02:43, Matthias J. Sax  wrote:

Ivan,

are you still interested in this KIP? I think it would be a good addition.


-Matthias

On 8/16/21 5:30 PM, Matthias J. Sax wrote:

Your point about the IQ problem is an interesting one. I missed the
point that the "new key" would be a "superkey", and thus, it should
always be possible to compute the original key from the superkey. (As a
matter of fact, for windowed-table the windowed-key is also a superkey...)
I am not sure if we need to follow the "use the head idea" or if we need
a "CompositeKey" interface? It seems we can just allow for any types and
we can be agnostic to it?
KStream stream = ...
KStream stream2 =
   stream.selectKey(/*set superkey*/)
 .markAsPartitioned()
We only need a `Function` without any restrictions on the type,
to map the "superkey" to the original "partition key"?
Do you propose to provide the "revers mapper" via the
`markAsPartitioned()` method (or config object), or via the IQ methods?
Not sure which one is better?
However, I am not sure if it would solve the join problem? At least not
easily: if one has two KStream and one is properly
partitioned by `Tuple` while the other one is "marked-as-partitoned",
the join would just fail. -- Similar for a stream-table join. -- The
only fix would be to do the re-partitioning anyway, effectively ignoring
the "user hint", but it seems to defeat the purpose? Again, I would
argue that it is ok to not handle this case, but leave it as the
responsibility for the user to not mess it up.
-Matthias
On 8/9/21 2:32 PM, Ivan Ponomarev wrote:

Hi Matthias and Sophie!

==1.  markAsPartitioned vs extending `selectKey(..)` etc. with a config.==

I don't have a strong opinion here, both Sophie's and Matthias' points
look convincing for me.

I think we should estimate the following: what is the probability that
we will ever need to extend `selectKey` etc. with a config for the
purposes other than `markAsPartitioned`?

If we find this probability high, then it's just a refactoring to
deprecate overloads with `Named` and introduce overloads with dedicated
configs, and we should do it this way.

If it's low or zero, maybe it's better not to mess with the existing
APIs and to introduce a single `markAsPartitioned()` method, which
itself can be easily deprecated if we find a better solution later!


==2. The IQ problem==


it then has to be the case that



Partitioner.partition(key) == Partitioner.partition(map(key))



Sophie, you got this wrong, and Matthias already explained why.

The actual required property for the mapping function is:

\forall k1, k2 (map(k1) = map(k2) => partition(k1) = partition(k2))

or, by contraposition law,

\forall k1, k2 (partition(k1) =/= partition(k2) => map(k1) =/= map(k2) )


(look at the whiteboard photo that I attached to the KIP).

There is a big class of such mappings: key -> Tuple(key, anyValue). This
is actually what we often do before aggregation, and this mapping does
not require repartition.

But of course we can extract the original key from Tuple(key, anyValue),
and this can save IQ and joins!

This is what I'm talking about when I talk about 'CompositeKey' idea.

We can do the following:

1. implement a 'partitioner wrapper' that recognizes tuples
(CompositeKeys) and uses only the 'head' to calculate the partition,

2. implement

selectCompositeKey(BiFunction tailSelector) {
   selectKey((k, v) -> new CompositeKey(k, tailSelector.apply(k, v));
   //MARK_AS_PARTITIONED call here,
   //but this call is an implementation detail and we do not expose
   //markAsPartitioned publicly!
}

WDYT? (it's just a brainstorming idea)

09.08.2021 2:38, Matthias J. Sax пишет:

Hi,

I originally had a similar thought about `markAsPartitioned()` vs
extending `selectKey()` et al. with a config. While I agree that it
might be conceptually cleaner to use a config object, I did not propose
it as the API impact (deprecating stuff and adding new stuff) is quite
big... If we think it's an acceptable price to pay, I am ok with it
though.

I also do think, that `markAsPartitioned()` could actually be
categorized as an operator... We don't expose it in the API as
first-class citizen atm, but in fact we have two types of `KStream` -- a
"PartitionedKStream" and a "NonPartitionedKStream". Thus,
`markAsPartitioned()` can be seen as a "cast operator" that converts the
one into the other.

I also think that the raised concern about "forgetting to remove
`markAsPartitioned()`" might not be very strong though. If you have
different plac

[jira] [Commented] (KAFKA-13647) RocksDb metrics 'number-open-files' is not correct

2022-04-26 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17528288#comment-17528288
 ] 

Matthias J. Sax commented on KAFKA-13647:
-

Should we close this ticket? \cc [~guozhang] [~cadonna] 

> RocksDb metrics 'number-open-files' is not correct
> --
>
> Key: KAFKA-13647
> URL: https://issues.apache.org/jira/browse/KAFKA-13647
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: Sylvain Le Gouellec
>Priority: Major
> Attachments: image-2022-02-07-16-06-25-304.png, 
> image-2022-02-07-16-06-39-821.png, image-2022-02-07-16-06-53-164.png
>
>
> We were looking at RocksDB metrics and noticed that the {{number-open-files}} 
> metric behaves like a counter, rather than a gauge. 
> Looking at the code, we think there is a small error in the type of metric 
> for that specific mbean (should be a value metric rather than a sum metric).
> See [ 
> https://github.com/apache/kafka/blob/ca5d6f9229c170beb23809159113037f05a1120f/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java#L482|https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java#L482]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2022-04-22 Thread Matthias J. Sax

Ivan,

are you still interested in this KIP? I think it would be a good addition.


-Matthias

On 8/16/21 5:30 PM, Matthias J. Sax wrote:

Your point about the IQ problem is an interesting one. I missed the
point that the "new key" would be a "superkey", and thus, it should
always be possible to compute the original key from the superkey. (As a
matter of fact, for windowed-table the windowed-key is also a superkey...)

I am not sure if we need to follow the "use the head idea" or if we need
a "CompositeKey" interface? It seems we can just allow for any types and
we can be agnostic to it?

KStream stream = ...
KStream stream2 =
   stream.selectKey(/*set superkey*/)
 .markAsPartitioned()

We only need a `Function` without any restrictions on the type,
to map the "superkey" to the original "partition key"?


Do you propose to provide the "revers mapper" via the
`markAsPartitioned()` method (or config object), or via the IQ methods?
Not sure which one is better?


However, I am not sure if it would solve the join problem? At least not
easily: if one has two KStream and one is properly
partitioned by `Tuple` while the other one is "marked-as-partitoned",
the join would just fail. -- Similar for a stream-table join. -- The
only fix would be to do the re-partitioning anyway, effectively ignoring
the "user hint", but it seems to defeat the purpose? Again, I would
argue that it is ok to not handle this case, but leave it as the
responsibility for the user to not mess it up.


-Matthias

On 8/9/21 2:32 PM, Ivan Ponomarev wrote:

Hi Matthias and Sophie!

==1.  markAsPartitioned vs extending `selectKey(..)` etc. with a config.==

I don't have a strong opinion here, both Sophie's and Matthias' points
look convincing for me.

I think we should estimate the following: what is the probability that
we will ever need to extend `selectKey` etc. with a config for the
purposes other than `markAsPartitioned`?

If we find this probability high, then it's just a refactoring to
deprecate overloads with `Named` and introduce overloads with dedicated
configs, and we should do it this way.

If it's low or zero, maybe it's better not to mess with the existing
APIs and to introduce a single `markAsPartitioned()` method, which
itself can be easily deprecated if we find a better solution later!


==2. The IQ problem==


it then has to be the case that



Partitioner.partition(key) == Partitioner.partition(map(key))



Sophie, you got this wrong, and Matthias already explained why.

The actual required property for the mapping function is:

\forall k1, k2 (map(k1) = map(k2) => partition(k1) = partition(k2))

or, by contraposition law,

\forall k1, k2 (partition(k1) =/= partition(k2) => map(k1) =/= map(k2) )


(look at the whiteboard photo that I attached to the KIP).

There is a big class of such mappings: key -> Tuple(key, anyValue). This
is actually what we often do before aggregation, and this mapping does
not require repartition.

But of course we can extract the original key from Tuple(key, anyValue),
and this can save IQ and joins!

This is what I'm talking about when I talk about 'CompositeKey' idea.

We can do the following:

1. implement a 'partitioner wrapper' that recognizes tuples
(CompositeKeys) and uses only the 'head' to calculate the partition,

2. implement

selectCompositeKey(BiFunction tailSelector) {
   selectKey((k, v) -> new CompositeKey(k, tailSelector.apply(k, v));
   //MARK_AS_PARTITIONED call here,
   //but this call is an implementation detail and we do not expose
   //markAsPartitioned publicly!
}

WDYT? (it's just a brainstorming idea)

09.08.2021 2:38, Matthias J. Sax пишет:

Hi,

I originally had a similar thought about `markAsPartitioned()` vs
extending `selectKey()` et al. with a config. While I agree that it
might be conceptually cleaner to use a config object, I did not propose
it as the API impact (deprecating stuff and adding new stuff) is quite
big... If we think it's an acceptable price to pay, I am ok with it
though.

I also do think, that `markAsPartitioned()` could actually be
categorized as an operator... We don't expose it in the API as
first-class citizen atm, but in fact we have two types of `KStream` -- a
"PartitionedKStream" and a "NonPartitionedKStream". Thus,
`markAsPartitioned()` can be seen as a "cast operator" that converts the
one into the other.

I also think that the raised concern about "forgetting to remove
`markAsPartitioned()`" might not be very strong though. If you have
different places in the code that link stuff together, a call to eg.
`selectKey().markAsPartitioned()` must always to together. If you have
some other place in the code that get a `KStream` passed an input, it
would be "invalid" to blindly call `markAsPartitioned()` as you don't
know anything about the upstream code. Of course, it requires some

Re: [DISCUSS] KIP-819: Merge multiple KStreams in one operation

2022-04-22 Thread Matthias J. Sax

Nick, how should we proceed?

On 3/29/22 9:21 PM, Chris Egerton wrote:

Hi all,

The Named-first variant seems pretty appealing:
 merge(Named named, KStream... streams)
It doesn't align with the existing merge methods, but it does at least
follow precedent set by the (now-deprecated) branch method [1].

A Collection-based alternative seems slightly less appealing, but only
because I'm guessing it'll be more common for the set of to-be-merged
streams to be known at compile time. In that case, the syntactic sugar
provided by a variadic method is preferable to having to wrap your set of
streams in a call to, e.g., Arrays::asList [2].

An issue I have with the split variant:
 merge(KStream first, Named named, KStream... rest)
is that it doesn't seem very intuitive to users who aren't familiar with
Streams and don't have the context of how/when the overloaded variants to
the merge method were introduced.

If we really want things to be consistent, one possibility is to add a
Named-first variant:
 merge(Named named, KStream... streams)
Deprecate the existing Named variant:
 merge(KStream stream, Named named)
And change the existing single-arg merge method:
 merge(KStream stream)
To become variadic (like proposed by Matthias earlier in the thread):
 merge(KStream... streams)

In the end, we'd have three methods, possibly reduced to two by the next
major release:
 merge(Named named, KStream... streams)
 merge(KStream... streams)
 merge(KStream stream, Named named) (deprecated)

RE supporting both a Collection-based and a variadic method: it doesn't
look like this is too common in the Streams API right now and considering
how trivial it is to convert from one style to another in most cases
(either with the aforementioned Arrays::asList to turn a static
compile-time set of streams into a Collection, or with Collection::toArray
[3] to turn a runtime Collection of streams into an array which can be
passed to a variadic method), it doesn't seem worth it to pollute the API
space with multiple methods that provide the exact same behavior.

[1] -
https://kafka.apache.org/31/javadoc/org/apache/kafka/streams/kstream/KStream.html#branch(org.apache.kafka.streams.kstream.Named,org.apache.kafka.streams.kstream.Predicate..
.)
[2] -
https://docs.oracle.com/javase/8/docs/api/java/util/Arrays.html#asList-T...-
[3] -
https://docs.oracle.com/javase/8/docs/api/java/util/Collection.html#toArray-T:A-

Cheers,

Chris

On Tue, Mar 29, 2022 at 11:14 PM Matthias J. Sax  wrote:


My understand was, that the original proposal was to have:

merge(KStream stream);
merge(KStream... streams);

Maybe I misunderstood.


I am not really a fan of

merge(KStream stream, KStream... streams);

because it seem to break the `Collection` pattern. If I have a
collection of KStreams, I need to artificially extract one and pass as
first argument and pass all others using the second argument.

On the other hand, _if_ I have a collection, going back to the original
proposal of

merge(Collection streams);

would work, too. Maybe bottom line is, that we might want to have both
(`Collection` and vararg) to optimize for both cases? On the other hand
it feels rather redundant? Also not sure if both are compatible?



The issue with Named is interesting. Wondering if we should just flip
the argument order:

merge(Named name, KStream... streams);

Falling back to `Collection` would also avoid this issue.



-Matthias


On 3/29/22 1:43 AM, Nick Telford wrote:

Yeah, the Named parameter makes it a little trickier. My suggestion would
be to add an additional overload that looks like:

KStream merged(KStream first, Named named, KStream

rest);


It's not ideal having the Named parameter split the other parameters; we
could alternatively move the Named parameter to be first, but then that
wouldn't align with the rest of the API.

Nick

On Tue, 29 Mar 2022 at 05:20, Chris Egerton 

wrote:



Hi all,

Java permits the overload. Simple test class to demonstrate:

```
public class Test {
  private final String field;

  public Test(String field) {
  this.field = field;
  }

  public Test merge(Test that) {
  return new Test("Single-arg merge: " + this.field + ", " +
that.field);
  }

  public Test merge(Test that, Test... those) {
  String newField = "Varargs merge: " + this.field + ", " +
that.field;
  for (Test test : those) newField += ", " + test.field;
  return new Test(newField);
  }

  public static void main(String[] args) {
  Test t1 = new Test("t1"), t2 = new Test("t2"), t3 = new

Test("t3");

  Test merge1 = t1.merge(t2), merge2 = t1.merge(t2, t3);
  System.out.println(merge1.field); // Single-arg merge: t1, t2
  System.out.println(merge2.field); // Varargs merge: t1, t2, t3
  }
}
```

There's a great StackOverflow wr

[jira] [Comment Edited] (KAFKA-12909) Allow users to opt-into spurious left/outer stream-stream join improvement

2022-04-21 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17522502#comment-17522502
 ] 

Matthias J. Sax edited comment on KAFKA-12909 at 4/21/22 9:13 PM:
--

{quote}It makes sense but it is still kind of hard to wrap my head around it.
{quote}
Assume you have the following input (and a join window of 10):

left: 

right:  

result: ,100> , 105>

The second result record is no a replacement of the first result record. The 
result is really two records.

Doing the same with left-join (and eager emit) you get: result: 
,95>, ,100> , 105> – for this case, the first 
"left-join" is clearly incorrect, right? But how can you know that the second 
record is an update to the first one, while the third record is _no_ update to 
the second one?

Maybe also check out: 
[https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/]


was (Author: mjsax):
{quote}It makes sense but it is still kind of hard to wrap my head around it.
{quote}
Assume you have the following input (and a join window of 10):

left: 

right:  

result: ,100> , 105>

The second result record is no a replacement of the first result record. The 
result is really two records.

Doing the same with left-join (and eager emit) you get: result: 
,95>, ,100> , 105> – for this case, the first 
"left-join" is clearly incorrect, right? But how can you know that the second 
record is an update to the first one, while the third record is _no_ update to 
the second one?

Maybe also check out: 
https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/

> Allow users to opt-into spurious left/outer stream-stream join improvement
> --
>
> Key: KAFKA-12909
> URL: https://issues.apache.org/jira/browse/KAFKA-12909
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Blocker
> Fix For: 3.1.0
>
>
> https://issues.apache.org/jira/browse/KAFKA-10847 improves left/outer 
> stream-stream join, by not emitting left/outer results eagerly, but only 
> after the grace period passed.
> While this change is desired, there is an issue with regard to upgrades: if 
> users don't specify a grace period, we fall back to a 24h default. Thus, 
> left/outer join results would only be emitted 24h after the join window end. 
> This change in behavior could break existing applications when upgrading to 
> 3.0.0 release. – And even if users do set a grace period explicitly, it's 
> still unclear if the new delayed output behavior would work for them.
> Thus, we propose to disable the fix of KAFAK-10847 by default, and let user 
> opt-into the fix explicitly instead.
> To allow users to enable the fix, we want to piggy-back on KIP-633 
> (https://issues.apache.org/jira/browse/KAFKA-8613) that deprecated the 
> existing `JoinWindows.of()` and `JoinWindows#grace()` methods in favor of 
> `JoinWindows.ofSizeAndGrace()` – if users don't update their code, we would 
> keep the fix disabled, and thus, if users upgrade their app nothing changes. 
> Only if users switch to the new `ofSizeAndGrace()` API, we enable the fix and 
> thus give users the opportunity to opt-in expliclity and pick an appropriate 
> grace period for their application.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-12909) Allow users to opt-into spurious left/outer stream-stream join improvement

2022-04-21 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17526086#comment-17526086
 ] 

Matthias J. Sax commented on KAFKA-12909:
-

Ah sorry. That was my bad... Fixed.

> Allow users to opt-into spurious left/outer stream-stream join improvement
> --
>
> Key: KAFKA-12909
> URL: https://issues.apache.org/jira/browse/KAFKA-12909
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>            Reporter: Matthias J. Sax
>        Assignee: Matthias J. Sax
>Priority: Blocker
> Fix For: 3.1.0
>
>
> https://issues.apache.org/jira/browse/KAFKA-10847 improves left/outer 
> stream-stream join, by not emitting left/outer results eagerly, but only 
> after the grace period passed.
> While this change is desired, there is an issue with regard to upgrades: if 
> users don't specify a grace period, we fall back to a 24h default. Thus, 
> left/outer join results would only be emitted 24h after the join window end. 
> This change in behavior could break existing applications when upgrading to 
> 3.0.0 release. – And even if users do set a grace period explicitly, it's 
> still unclear if the new delayed output behavior would work for them.
> Thus, we propose to disable the fix of KAFAK-10847 by default, and let user 
> opt-into the fix explicitly instead.
> To allow users to enable the fix, we want to piggy-back on KIP-633 
> (https://issues.apache.org/jira/browse/KAFKA-8613) that deprecated the 
> existing `JoinWindows.of()` and `JoinWindows#grace()` methods in favor of 
> `JoinWindows.ofSizeAndGrace()` – if users don't update their code, we would 
> keep the fix disabled, and thus, if users upgrade their app nothing changes. 
> Only if users switch to the new `ofSizeAndGrace()` API, we enable the fix and 
> thus give users the opportunity to opt-in expliclity and pick an appropriate 
> grace period for their application.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13842) Add per-aggregation step before repartitioning

2022-04-20 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13842:

Description: 
Kafka Streams follows a continuous refinement model for aggregation. For this 
reason, we never implement a pre-aggregation step before data repartitioning, 
because it won't help much to reduce repartition cost (there is no natural 
boundary when a pre-aggregation is finished and when to emit it downstream for 
the actual aggregation roll-up).

With https://issues.apache.org/jira/browse/KAFKA-13785 we introduce a 
per-aggregation "emit final" feature (different to suppress()) that changes the 
continuous refinement model and thus it seems to be a good optimization to add 
a pre-aggregation step if this new feature is used.

We might want to give user control over inserting the pre-aggregation step 
because there is no free lunch... If we have X distinct keys, pre-aggregation 
implies that the upstream RocksDB store will need to store up to X rows to hold 
the pre-aggregate. Thus, given N input partitions, we need to hold N*X rows 
(upstream) plus X rows (in the final donwstream aggregation). – In contrast, a 
direct repartition step will only require to hold X rows downstream. It's a 
tradeoff between (much) higher disk usage vs network/Kafka traffic.

  was:
Kafka Streams follows a continuous refinement model for aggregation. For this 
reason, we never implement a pre-aggregation step before data repartitioning, 
because it won't help much to reduce repartition cost (there is no natural 
boundary when a pre-aggregation is finished and when to emit it downstream for 
the actual aggregation roll-up).

With https://issues.apache.org/jira/browse/KAFKA-13785 we introduce a 
per-aggregation "emit final" feature (different to suppress()) that changes the 
continuous refinement model and thus it seems to be a good optimization to add 
a pre-aggregation step if this new feature is used.


> Add per-aggregation step before repartitioning
> --
>
> Key: KAFKA-13842
> URL: https://issues.apache.org/jira/browse/KAFKA-13842
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> Kafka Streams follows a continuous refinement model for aggregation. For this 
> reason, we never implement a pre-aggregation step before data repartitioning, 
> because it won't help much to reduce repartition cost (there is no natural 
> boundary when a pre-aggregation is finished and when to emit it downstream 
> for the actual aggregation roll-up).
> With https://issues.apache.org/jira/browse/KAFKA-13785 we introduce a 
> per-aggregation "emit final" feature (different to suppress()) that changes 
> the continuous refinement model and thus it seems to be a good optimization 
> to add a pre-aggregation step if this new feature is used.
> We might want to give user control over inserting the pre-aggregation step 
> because there is no free lunch... If we have X distinct keys, pre-aggregation 
> implies that the upstream RocksDB store will need to store up to X rows to 
> hold the pre-aggregate. Thus, given N input partitions, we need to hold N*X 
> rows (upstream) plus X rows (in the final donwstream aggregation). – In 
> contrast, a direct repartition step will only require to hold X rows 
> downstream. It's a tradeoff between (much) higher disk usage vs network/Kafka 
> traffic.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13842) Add per-aggregation step before repartitioning

2022-04-20 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-13842:
---

 Summary: Add per-aggregation step before repartitioning
 Key: KAFKA-13842
 URL: https://issues.apache.org/jira/browse/KAFKA-13842
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams follows a continuous refinement model for aggregation. For this 
reason, we never implement a pre-aggregation step before data repartitioning, 
because it won't help much to reduce repartition cost (there is no natural 
boundary when a pre-aggregation is finished and when to emit it downstream for 
the actual aggregation roll-up).

With https://issues.apache.org/jira/browse/KAFKA-13785 we introduce a 
per-aggregation "emit final" feature (different to suppress()) that changes the 
continuous refinement model and thus it seems to be a good optimization to add 
a pre-aggregation step if this new feature is used.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13842) Add per-aggregation step before repartitioning

2022-04-20 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-13842:
---

 Summary: Add per-aggregation step before repartitioning
 Key: KAFKA-13842
 URL: https://issues.apache.org/jira/browse/KAFKA-13842
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams follows a continuous refinement model for aggregation. For this 
reason, we never implement a pre-aggregation step before data repartitioning, 
because it won't help much to reduce repartition cost (there is no natural 
boundary when a pre-aggregation is finished and when to emit it downstream for 
the actual aggregation roll-up).

With https://issues.apache.org/jira/browse/KAFKA-13785 we introduce a 
per-aggregation "emit final" feature (different to suppress()) that changes the 
continuous refinement model and thus it seems to be a good optimization to add 
a pre-aggregation step if this new feature is used.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (KAFKA-13787) Failed to delete state store directory for it is not empty

2022-04-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17516068#comment-17516068
 ] 

Matthias J. Sax edited comment on KAFKA-13787 at 4/15/22 8:56 PM:
--

`StateDirectory` should have all relevant code. Not sure why it does not delete 
the `kafka-streams-process-metadata` file – for the `.lock` file it's ok, as we 
hold the lock all the time and it will be deleted when the directory is deleted.

[~vvcephei] might know more about the file in question?


was (Author: mjsax):
`StateDirectory` should have all relevant code. Not sure why it does not delete 
the `kafka-streams-process-metadata` file – for the `.lock` file it's ok, as we 
hold the lock all the time and it will be deleted when the directory is deleted.

[~vvcephei] might now more about the file in question?

> Failed to delete state store directory for it is not empty
> --
>
> Key: KAFKA-13787
> URL: https://issues.apache.org/jira/browse/KAFKA-13787
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: Nico Pommerening
>Priority: Major
> Attachments: bug-log.txt
>
>
> On Kafka Streams shutdown the Cleanup of state directories seems not to work, 
> since the lock and metadata file seem not to be deleted.
> Relevant WARN logging:
> 2022-03-31 10:34:41,689 WARN  [SpringApplicationShutdownHook] 
> org.apache.kafka.streams.processor.internals.StateDirectory: stream-thread 
> [SpringApplicationShutdownHook] Failed to delete state store directory of 
> /kafka-streams-statestore/555b9965-95e3-4c92-b467-1d283428da5d/test-test-run-kpi
>  for it is not empty
>  
> Left over files in directory:
>  * .lock
>  * kafka-streams-process-metadata
>  
> I'm not sure what the consequences of a unclean state cleanup are, but I 
> would like to get rid of the Warning.
> I attached a bigger log extract and I've already patched the StateDirectory 
> implementation which I'll try to contribute.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-12909) Allow users to opt-into spurious left/outer stream-stream join improvement

2022-04-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17522502#comment-17522502
 ] 

Matthias J. Sax edited comment on KAFKA-12909 at 4/14/22 7:12 PM:
--

{quote}It makes sense but it is still kind of hard to wrap my head around it.
{quote}
Assume you have the following input (and a join window of 10):

left: 

right:  

result: ,100> , 105>

The second result record is no a replacement of the first result record. The 
result is really two records.

Doing the same with left-join (and eager emit) you get: result: 
,95>, ,100> , 105> – for this case, the first 
"left-join" is clearly incorrect, right? But how can you know that the second 
record is an update to the first one, while the third record is _no_ update to 
the second one?

Maybe also check out: 
https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/


was (Author: mjsax):
{quote}It makes sense but it is still kind of hard to wrap my head around it.
{quote}
Assume you have the following input (and a join window of 10):

left: 

right:  

result: ,100> , 105>

The second result record is no a replacement of the first result record. The 
result is really two records.

Doing the same with left-join (and eager emit) you get: result: 
,95>, ,100> , 105> – for this case, the first 
"left-join" is clearly incorrect, right? But how can you know that the second 
record is an update to the first one, while the third record is _no_ update to 
the second one?

> Allow users to opt-into spurious left/outer stream-stream join improvement
> --
>
> Key: KAFKA-12909
> URL: https://issues.apache.org/jira/browse/KAFKA-12909
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Blocker
> Fix For: 3.1.0
>
>
> https://issues.apache.org/jira/browse/KAFKA-10847 improves left/outer 
> stream-stream join, by not emitting left/outer results eagerly, but only 
> after the grace period passed.
> While this change is desired, there is an issue with regard to upgrades: if 
> users don't specify a grace period, we fall back to a 24h default. Thus, 
> left/outer join results would only be emitted 24h after the join window end. 
> This change in behavior could break existing applications when upgrading to 
> 3.0.0 release. – And even if users do set a grace period explicitly, it's 
> still unclear if the new delayed output behavior would work for them.
> Thus, we propose to disable the fix of KAFAK-10847 by default, and let user 
> opt-into the fix explicitly instead.
> To allow users to enable the fix, we want to piggy-back on KIP-633 
> (https://issues.apache.org/jira/browse/KAFKA-8613) that deprecated the 
> existing `JoinWindows.of()` and `JoinWindows#grace()` methods in favor of 
> `JoinWindows.ofSizeAndGrace()` – if users don't update their code, we would 
> keep the fix disabled, and thus, if users upgrade their app nothing changes. 
> Only if users switch to the new `ofSizeAndGrace()` API, we enable the fix and 
> thus give users the opportunity to opt-in expliclity and pick an appropriate 
> grace period for their application.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-12909) Allow users to opt-into spurious left/outer stream-stream join improvement

2022-04-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17522502#comment-17522502
 ] 

Matthias J. Sax commented on KAFKA-12909:
-

{quote}It makes sense but it is still kind of hard to wrap my head around it.
{quote}
Assume you have the following input (and a join window of 10):

left: 

right:  

result: ,100> , 105>

The second result record is no a replacement of the first result record. The 
result is really two records.

Doing the same with left-join (and eager emit) you get: result: 
,95>, ,100> , 105> – for this case, the first 
"left-join" is clearly incorrect, right? But how can you know that the second 
record is an update to the first one, while the third record is _no_ update to 
the second one?

> Allow users to opt-into spurious left/outer stream-stream join improvement
> --
>
> Key: KAFKA-12909
> URL: https://issues.apache.org/jira/browse/KAFKA-12909
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>    Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Blocker
> Fix For: 3.1.0
>
>
> https://issues.apache.org/jira/browse/KAFKA-10847 improves left/outer 
> stream-stream join, by not emitting left/outer results eagerly, but only 
> after the grace period passed.
> While this change is desired, there is an issue with regard to upgrades: if 
> users don't specify a grace period, we fall back to a 24h default. Thus, 
> left/outer join results would only be emitted 24h after the join window end. 
> This change in behavior could break existing applications when upgrading to 
> 3.0.0 release. – And even if users do set a grace period explicitly, it's 
> still unclear if the new delayed output behavior would work for them.
> Thus, we propose to disable the fix of KAFAK-10847 by default, and let user 
> opt-into the fix explicitly instead.
> To allow users to enable the fix, we want to piggy-back on KIP-633 
> (https://issues.apache.org/jira/browse/KAFKA-8613) that deprecated the 
> existing `JoinWindows.of()` and `JoinWindows#grace()` methods in favor of 
> `JoinWindows.ofSizeAndGrace()` – if users don't update their code, we would 
> keep the fix disabled, and thus, if users upgrade their app nothing changes. 
> Only if users switch to the new `ofSizeAndGrace()` API, we enable the fix and 
> thus give users the opportunity to opt-in expliclity and pick an appropriate 
> grace period for their application.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-12909) Allow users to opt-into spurious left/outer stream-stream join improvement

2022-04-13 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17521985#comment-17521985
 ] 

Matthias J. Sax commented on KAFKA-12909:
-

This ticket is about left/outer join in particular and the "emit at window 
close" strategy is only applied to none-matching records. Ie, even if you have 
a left/outer join, all _inner_ join result of the operation are emitted right 
away. However, it's not "safe" to emit a left/right join result eagerly, as 
this record might actually find a join partner later – thus, we need to delay 
emitting "un-joined" record until the grace period passed to ensure the compute 
the right result.

In the old implementation, we basically did not compute the correct left/outer 
join result, but a super-set of it. – Your DB argument does not really apply, 
because the result is a KStream and thus we should only emit _final_ result. If 
we emit an > eagerly and a second > later, the second one 
is _not_ and update to the first one (a KStream has no update semantics) – 
otherwise we would need to treat all results with the same key as _updates_ but 
if a record joins twice, the second join result is also not an update to the 
first one.

Does this make sense?

> Allow users to opt-into spurious left/outer stream-stream join improvement
> --
>
> Key: KAFKA-12909
> URL: https://issues.apache.org/jira/browse/KAFKA-12909
> Project: Kafka
>  Issue Type: Improvement
>          Components: streams
>        Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Blocker
> Fix For: 3.1.0
>
>
> https://issues.apache.org/jira/browse/KAFKA-10847 improves left/outer 
> stream-stream join, by not emitting left/outer results eagerly, but only 
> after the grace period passed.
> While this change is desired, there is an issue with regard to upgrades: if 
> users don't specify a grace period, we fall back to a 24h default. Thus, 
> left/outer join results would only be emitted 24h after the join window end. 
> This change in behavior could break existing applications when upgrading to 
> 3.0.0 release. – And even if users do set a grace period explicitly, it's 
> still unclear if the new delayed output behavior would work for them.
> Thus, we propose to disable the fix of KAFAK-10847 by default, and let user 
> opt-into the fix explicitly instead.
> To allow users to enable the fix, we want to piggy-back on KIP-633 
> (https://issues.apache.org/jira/browse/KAFKA-8613) that deprecated the 
> existing `JoinWindows.of()` and `JoinWindows#grace()` methods in favor of 
> `JoinWindows.ofSizeAndGrace()` – if users don't update their code, we would 
> keep the fix disabled, and thus, if users upgrade their app nothing changes. 
> Only if users switch to the new `ofSizeAndGrace()` API, we enable the fix and 
> thus give users the opportunity to opt-in expliclity and pick an appropriate 
> grace period for their application.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-8318) Session Window Aggregations generate an extra tombstone

2022-04-13 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17521866#comment-17521866
 ] 

Matthias J. Sax commented on KAFKA-8318:


It seem [~ipasynkov] does no work on this any longer. Feel free to pickup this 
ticket if you are interested to provide a fix.

> Session Window Aggregations generate an extra tombstone
> ---
>
> Key: KAFKA-8318
> URL: https://issues.apache.org/jira/browse/KAFKA-8318
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Ilia Pasynkov
>Priority: Minor
>  Labels: newbie++
>
> See the discussion 
> https://github.com/apache/kafka/pull/6654#discussion_r280231439
> The session merging logic generates a tombstone in addition to an update when 
> the session window already exists. It's not a correctness issue, just a small 
> performance hit, because that tombstone is immediately invalidated by the 
> update.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13817) Schedule nextTimeToEmit to system time every time instead of just once

2022-04-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13817:

Labels: beginner newbie  (was: )

> Schedule nextTimeToEmit to system time every time instead of just once
> --
>
> Key: KAFKA-13817
> URL: https://issues.apache.org/jira/browse/KAFKA-13817
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Priority: Minor
>  Labels: beginner, newbie
>
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java#L229-L231.]
>  
> If this is just scheduled once, this can trigger emit every time if system 
> time jumps a lot suddenly.
>  
> For example, 
>  # nextTimeToEmit set to 1 and step is 1
>  # If next system time jumps to 100, we will always emit for next 100 records



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13817) Schedule nextTimeToEmit to system time every time instead of just once

2022-04-12 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13817:

Component/s: streams

> Schedule nextTimeToEmit to system time every time instead of just once
> --
>
> Key: KAFKA-13817
> URL: https://issues.apache.org/jira/browse/KAFKA-13817
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Priority: Minor
>
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java#L229-L231.]
>  
> If this is just scheduled once, this can trigger emit every time if system 
> time jumps a lot suddenly.
>  
> For example, 
>  # nextTimeToEmit set to 1 and step is 1
>  # If next system time jumps to 100, we will always emit for next 100 records



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13813) left/outer joins can wait indefinitely for emitted record with suprious record fix

2022-04-12 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17521243#comment-17521243
 ] 

Matthias J. Sax commented on KAFKA-13813:
-

The change is intentionally, as users wanted to get SQL join semantics, while 
the old behavior is different to SQL semantics.

Currently, you can still use the old API to get the old behavior. If we want to 
keep the old behavior, we need to find a way to give users control over it. – 
If we do this, we should do it before we remove the old and already deprecated 
API.

Seems, we might need a KIP for this. – While un-deprecating the old API might 
be the simplest way, it seems hard to reason for users (ie, understand the 
difference between both – would be very subtle), thus, designing something new 
and more explicit might be a better way forward?

> left/outer joins can wait indefinitely for emitted record with suprious 
> record fix
> --
>
> Key: KAFKA-13813
> URL: https://issues.apache.org/jira/browse/KAFKA-13813
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Clive Cox
>Priority: Major
>
> With the fix in https://issues.apache.org/jira/browse/KAFKA-10847 records 
> will be emitted after the grace period but only when a new record is 
> processed. This means its possible to wait for arbitrary long time for a 
> record to be emitted.
> This also means one can not recreate the previous semantics of emitting 
> immediately records or even now guaranteed emitting after the grace period.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13820) Add debug-level logs to explain why a store is filtered out during interactive query

2022-04-12 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17521234#comment-17521234
 ] 

Matthias J. Sax commented on KAFKA-13820:
-

Sounds related to 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors]
 that we never completed :( 

> Add debug-level logs to explain why a store is filtered out during 
> interactive query
> 
>
> Key: KAFKA-13820
> URL: https://issues.apache.org/jira/browse/KAFKA-13820
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Minor
>
> Currently Kafka Streams throws an InvalidStateStoreException when the desired 
> store is not present on the local instance. It also throws the same exception 
> with the same message when the store is present, but it not active (and stale 
> queries are disabled).
> This is an important distinction when debugging store unavailability, and a 
> debug-level log is an un-intrusive mechanism to expose the information.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13813) left/outer joins can wait indefinitely for emitted record with suprious record fix

2022-04-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13813:

Issue Type: Improvement  (was: Bug)

> left/outer joins can wait indefinitely for emitted record with suprious 
> record fix
> --
>
> Key: KAFKA-13813
> URL: https://issues.apache.org/jira/browse/KAFKA-13813
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Clive Cox
>Priority: Major
>
> With the fix in https://issues.apache.org/jira/browse/KAFKA-10847 records 
> will be emitted after the grace period but only when a new record is 
> processed. This means its possible to wait for arbitrary long time for a 
> record to be emitted.
> This also means one can not recreate the previous semantics of emitting 
> immediately records or even now guaranteed emitting after the grace period.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13813) left/outer joins can wait indefinitely for emitted record with suprious record fix

2022-04-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13813:

Component/s: streams

> left/outer joins can wait indefinitely for emitted record with suprious 
> record fix
> --
>
> Key: KAFKA-13813
> URL: https://issues.apache.org/jira/browse/KAFKA-13813
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Clive Cox
>Priority: Major
>
> With the fix in https://issues.apache.org/jira/browse/KAFKA-10847 records 
> will be emitted after the grace period but only when a new record is 
> processed. This means its possible to wait for arbitrary long time for a 
> record to be emitted.
> This also means one can not recreate the previous semantics of emitting 
> immediately records or even now guaranteed emitting after the grace period.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations

2022-04-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17519199#comment-17519199
 ] 

Matthias J. Sax commented on KAFKA-7509:


Thanks for staying on top of this. The filtering code is a partial solution to 
the problem. Also the prefixes like `producer.` et al: yes they help. Not sure 
if they solve all issues. KAFKA-6793 actually illustrates the general problem 
we tried to solve.

However, IIRC KAFKA-6793 in particular was triggered by `StreamsConfig#retries` 
that we needed to pass though the admin client into `StreamsPartitionAssignor`. 
`StreamConfig#retries` was deprecated (it's still in the code, but unused and 
not set on the `AdiminConfig` any longer) via KIP-572 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams|https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams(]
 -- thus, KAFKA-6793 might effectively resolved as a side effect, but only 
because we stopped passing in an unrecognized config – thus, the underlying 
principle issue is still there.

> Kafka Connect logs unnecessary warnings about unused configurations
> ---
>
> Key: KAFKA-7509
> URL: https://issues.apache.org/jira/browse/KAFKA-7509
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Randall Hauch
>Assignee: Chris Egerton
>Priority: Major
>
> When running Connect, the logs contain quite a few warnings about "The 
> configuration '{}' was supplied but isn't a known config." This occurs when 
> Connect creates producers, consumers, and admin clients, because the 
> AbstractConfig is logging unused configuration properties upon construction. 
> It's complicated by the fact that the Producer, Consumer, and AdminClient all 
> create their own AbstractConfig instances within the constructor, so we can't 
> even call its {{ignore(String key)}} method.
> See also KAFKA-6793 for a similar issue with Streams.
> There are no arguments in the Producer, Consumer, or AdminClient constructors 
> to control  whether the configs log these warnings, so a simpler workaround 
> is to only pass those configuration properties to the Producer, Consumer, and 
> AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig 
> configdefs know about.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-12909) Allow users to opt-into spurious left/outer stream-stream join improvement

2022-04-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17519175#comment-17519175
 ] 

Matthias J. Sax commented on KAFKA-12909:
-

Not that _inner_ join result are still instant, even if you use `leftJoin` or 
`outerJoin` – there are no plan to emit left/right join results eagerly again, 
but make the new methods mandatory.

Why would you want left/right join result to be emitted eagerly (and thus, 
potentially incorrectly?)

> Allow users to opt-into spurious left/outer stream-stream join improvement
> --
>
> Key: KAFKA-12909
> URL: https://issues.apache.org/jira/browse/KAFKA-12909
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>            Reporter: Matthias J. Sax
>        Assignee: Matthias J. Sax
>Priority: Blocker
> Fix For: 3.1.0
>
>
> https://issues.apache.org/jira/browse/KAFKA-10847 improves left/outer 
> stream-stream join, by not emitting left/outer results eagerly, but only 
> after the grace period passed.
> While this change is desired, there is an issue with regard to upgrades: if 
> users don't specify a grace period, we fall back to a 24h default. Thus, 
> left/outer join results would only be emitted 24h after the join window end. 
> This change in behavior could break existing applications when upgrading to 
> 3.0.0 release. – And even if users do set a grace period explicitly, it's 
> still unclear if the new delayed output behavior would work for them.
> Thus, we propose to disable the fix of KAFAK-10847 by default, and let user 
> opt-into the fix explicitly instead.
> To allow users to enable the fix, we want to piggy-back on KIP-633 
> (https://issues.apache.org/jira/browse/KAFKA-8613) that deprecated the 
> existing `JoinWindows.of()` and `JoinWindows#grace()` methods in favor of 
> `JoinWindows.ofSizeAndGrace()` – if users don't update their code, we would 
> keep the fix disabled, and thus, if users upgrade their app nothing changes. 
> Only if users switch to the new `ofSizeAndGrace()` API, we enable the fix and 
> thus give users the opportunity to opt-in expliclity and pick an appropriate 
> grace period for their application.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations

2022-04-04 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17516979#comment-17516979
 ] 

Matthias J. Sax commented on KAFKA-7509:


Thanks for following up. – I briefly took a look. My first impression is that 
it might work with KS.

Assing `connect.kafka.cluster.id` would still require a KIP – it seems that 
adding a config and avoiding the warnings is two different things though – 
might be good to extract to changes for the logging into an independent PR (and 
maybe add using is in KS right away to verify that it works)?

> Kafka Connect logs unnecessary warnings about unused configurations
> ---
>
> Key: KAFKA-7509
> URL: https://issues.apache.org/jira/browse/KAFKA-7509
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Randall Hauch
>Assignee: Chris Egerton
>Priority: Major
>
> When running Connect, the logs contain quite a few warnings about "The 
> configuration '{}' was supplied but isn't a known config." This occurs when 
> Connect creates producers, consumers, and admin clients, because the 
> AbstractConfig is logging unused configuration properties upon construction. 
> It's complicated by the fact that the Producer, Consumer, and AdminClient all 
> create their own AbstractConfig instances within the constructor, so we can't 
> even call its {{ignore(String key)}} method.
> See also KAFKA-6793 for a similar issue with Streams.
> There are no arguments in the Producer, Consumer, or AdminClient constructors 
> to control  whether the configs log these warnings, so a simpler workaround 
> is to only pass those configuration properties to the Producer, Consumer, and 
> AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig 
> configdefs know about.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13787) Failed to delete state store directory for it is not empty

2022-04-01 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17516068#comment-17516068
 ] 

Matthias J. Sax commented on KAFKA-13787:
-

`StateDirectory` should have all relevant code. Not sure why it does not delete 
the `kafka-streams-process-metadata` file – for the `.lock` file it's ok, as we 
hold the lock all the time and it will be deleted when the directory is deleted.

[~vvcephei] might now more about the file in question?

> Failed to delete state store directory for it is not empty
> --
>
> Key: KAFKA-13787
> URL: https://issues.apache.org/jira/browse/KAFKA-13787
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: Nico Pommerening
>Priority: Major
> Attachments: bug-log.txt
>
>
> On Kafka Streams shutdown the Cleanup of state directories seems not to work, 
> since the lock and metadata file seem not to be deleted.
> Relevant WARN logging:
> 2022-03-31 10:34:41,689 WARN  [SpringApplicationShutdownHook] 
> org.apache.kafka.streams.processor.internals.StateDirectory: stream-thread 
> [SpringApplicationShutdownHook] Failed to delete state store directory of 
> /kafka-streams-statestore/555b9965-95e3-4c92-b467-1d283428da5d/test-test-run-kpi
>  for it is not empty
>  
> Left over files in directory:
>  * .lock
>  * kafka-streams-process-metadata
>  
> I'm not sure what the consequences of a unclean state cleanup are, but I 
> would like to get rid of the Warning.
> I attached a bigger log extract and I've already patched the StateDirectory 
> implementation which I'll try to contribute.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [VOTE] KIP-813 Shared State Stores

2022-04-01 Thread Matthias J. Sax

+1 (binding)


On 4/1/22 6:47 AM, John Roesler wrote:

Thanks for the KIP, Daan!

I’m +1 (binding)

-John

On Tue, Mar 29, 2022, at 06:01, Daan Gertis wrote:

I would like to start a vote on this one:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-813%3A+Shareable+State+Stores

Cheers,
D.


[DISCUSS] KIP-0422: Add Record Footers

2022-04-01 Thread Matthias J. Sax

Hi,

we added record header support to Kafka via KIP-82 many years ago. I 
think it's time to complement this feature with record footers.


Looking forward to your feedback.

https://tinyurl.com/43jubbaj


-Matthias


[jira] [Commented] (KAFKA-13787) Failed to delete state store directory for it is not empty

2022-03-31 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17515648#comment-17515648
 ] 

Matthias J. Sax commented on KAFKA-13787:
-

What file system are you using?

> Failed to delete state store directory for it is not empty
> --
>
> Key: KAFKA-13787
> URL: https://issues.apache.org/jira/browse/KAFKA-13787
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: Nico Pommerening
>Priority: Major
> Attachments: bug-log.txt
>
>
> On Kafka Streams shutdown the Cleanup of state directories seems not to work, 
> since the lock and metadata file seem not to be deleted.
> Relevant WARN logging:
> 2022-03-31 10:34:41,689 WARN  [SpringApplicationShutdownHook] 
> org.apache.kafka.streams.processor.internals.StateDirectory: stream-thread 
> [SpringApplicationShutdownHook] Failed to delete state store directory of 
> /kafka-streams-statestore/555b9965-95e3-4c92-b467-1d283428da5d/test-test-run-kpi
>  for it is not empty
>  
> Left over files in directory:
>  * .lock
>  * kafka-streams-process-metadata
>  
> I'm not sure what the consequences of a unclean state cleanup are, but I 
> would like to get rid of the Warning.
> I attached a bigger log extract and I've already patched the StateDirectory 
> implementation which I'll try to contribute.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13785) Support emit final result for windowed aggregation

2022-03-31 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13785:

Labels: kip  (was: )

> Support emit final result for windowed aggregation
> --
>
> Key: KAFKA-13785
> URL: https://issues.apache.org/jira/browse/KAFKA-13785
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
>  Labels: kip
>
> For KIP-825: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13785) Support emit final result for windowed aggregation

2022-03-31 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13785:

Component/s: streams

> Support emit final result for windowed aggregation
> --
>
> Key: KAFKA-13785
> URL: https://issues.apache.org/jira/browse/KAFKA-13785
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hao Li
>Assignee: Hao Li
>Priority: Major
>
> For KIP-825: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] KIP-819: Merge multiple KStreams in one operation

2022-03-29 Thread Matthias J. Sax

My understand was, that the original proposal was to have:

  merge(KStream stream);
  merge(KStream... streams);

Maybe I misunderstood.


I am not really a fan of

  merge(KStream stream, KStream... streams);

because it seem to break the `Collection` pattern. If I have a 
collection of KStreams, I need to artificially extract one and pass as 
first argument and pass all others using the second argument.


On the other hand, _if_ I have a collection, going back to the original 
proposal of


  merge(Collection streams);

would work, too. Maybe bottom line is, that we might want to have both 
(`Collection` and vararg) to optimize for both cases? On the other hand 
it feels rather redundant? Also not sure if both are compatible?




The issue with Named is interesting. Wondering if we should just flip 
the argument order:


  merge(Named name, KStream... streams);

Falling back to `Collection` would also avoid this issue.



-Matthias


On 3/29/22 1:43 AM, Nick Telford wrote:

Yeah, the Named parameter makes it a little trickier. My suggestion would
be to add an additional overload that looks like:

KStream merged(KStream first, Named named, KStream rest);

It's not ideal having the Named parameter split the other parameters; we
could alternatively move the Named parameter to be first, but then that
wouldn't align with the rest of the API.

Nick

On Tue, 29 Mar 2022 at 05:20, Chris Egerton  wrote:


Hi all,

Java permits the overload. Simple test class to demonstrate:

```
public class Test {
 private final String field;

 public Test(String field) {
 this.field = field;
 }

 public Test merge(Test that) {
 return new Test("Single-arg merge: " + this.field + ", " +
that.field);
 }

 public Test merge(Test that, Test... those) {
 String newField = "Varargs merge: " + this.field + ", " +
that.field;
 for (Test test : those) newField += ", " + test.field;
 return new Test(newField);
 }

 public static void main(String[] args) {
 Test t1 = new Test("t1"), t2 = new Test("t2"), t3 = new Test("t3");
 Test merge1 = t1.merge(t2), merge2 = t1.merge(t2, t3);
 System.out.println(merge1.field); // Single-arg merge: t1, t2
 System.out.println(merge2.field); // Varargs merge: t1, t2, t3
 }
}
```

There's a great StackOverflow writeup on the subject [1], which explains
that during method resolution, priority is given to methods whose
signatures match the argument list without taking boxing/unboxing or
varargs into consideration:


The first phase performs overload resolution without permitting boxing or

unboxing conversion, or the use of variable arity method invocation. If no
applicable method is found during this phase then processing continues to
the second phase.

The second phase performs overload resolution while allowing boxing and

unboxing, but still precludes the use of variable arity method invocation.
If no applicable method is found during this phase then processing
continues to the third phase.

The third phase allows overloading to be combined with variable arity

methods, boxing, and unboxing.

I'm curious if it's worth keeping a variant that accepts a Named parameter?
Might be tricky to accommodate since variadic arguments have to be last.

[1] - https://stackoverflow.com/a/48850722

Cheers,

Chris

On Mon, Mar 28, 2022 at 11:46 PM Matthias J. Sax  wrote:


I think Java does not allow to have both overloads, because it would
result in ambiguity?

If you call `s1.merge(s2)` it's unclear which method you want to call.


-Matthias


On 3/28/22 7:20 AM, Nick Telford wrote:

Hi Matthias,

How about instead of changing the signature of the existing method to
variadic, we simply add a new overload which takes variadic args:

KStream merge(KStream first, KStream... rest);

That way, we maintain both source *and* binary compatibility for the
existing method, and we can enforce that there is always at least one
stream (argument) being merged.

I'm fine dropping the static methods. As you said, this is mostly all

just

syntax sugar anyway, but I do think allowing multiple streams to be

merged

together is a benefit. My motivation was that we generate diagrams for

our

Topologies, and having several binary merges becomes quite messy when a
single n-ary merge is what you're really modelling.

Regards,

Nick

On Thu, 24 Mar 2022 at 21:24, Matthias J. Sax 

wrote:



Thanks for proposing this KIP.

I feel a little bit torn by the idea. In general, we try to keep the
surface area small, and only add APIs that delivery (significant)

value.


It seems the current proposal is more or less about syntactic sugar,
what can still be valuable, but I am not really sure about it.

I am also wondering, if we could use a variadic argument instead of a
`Collection`:

   KStream merge(KStream... streams);

This way, we could just replac

Re: [DISCUSS] KIP-819: Merge multiple KStreams in one operation

2022-03-28 Thread Matthias J. Sax
I think Java does not allow to have both overloads, because it would 
result in ambiguity?


If you call `s1.merge(s2)` it's unclear which method you want to call.


-Matthias


On 3/28/22 7:20 AM, Nick Telford wrote:

Hi Matthias,

How about instead of changing the signature of the existing method to
variadic, we simply add a new overload which takes variadic args:

KStream merge(KStream first, KStream... rest);

That way, we maintain both source *and* binary compatibility for the
existing method, and we can enforce that there is always at least one
stream (argument) being merged.

I'm fine dropping the static methods. As you said, this is mostly all just
syntax sugar anyway, but I do think allowing multiple streams to be merged
together is a benefit. My motivation was that we generate diagrams for our
Topologies, and having several binary merges becomes quite messy when a
single n-ary merge is what you're really modelling.

Regards,

Nick

On Thu, 24 Mar 2022 at 21:24, Matthias J. Sax  wrote:


Thanks for proposing this KIP.

I feel a little bit torn by the idea. In general, we try to keep the
surface area small, and only add APIs that delivery (significant) value.

It seems the current proposal is more or less about syntactic sugar,
what can still be valuable, but I am not really sure about it.

I am also wondering, if we could use a variadic argument instead of a
`Collection`:

  KStream merge(KStream... streams);

This way, we could just replace the existing method in a backward
compatible way (well, source code compatible only) and thus not increase
the surface area of the API while still achieving your goal?

A `merge()` with zero argument would just be a no-op (same as for using
`Collection` I assume?).


For adding the static methods: It seems not to be a common pattern to
me? I might be better not to add them and leave it to users to write a
small helper method themselves if they have such a pattern?


-Matthias



On 1/31/22 7:35 AM, Nick Telford wrote:

Hi everyone,

I'd like to discuss KIP 819:


https://cwiki.apache.org/confluence/display/KAFKA/KIP-819%3A+Merge+multiple+KStreams+in+one+operation


This is a simple KIP that adds/modifies the KStream#merge API to enable
many streams to be merged in a single graph node.

Regards,

Nick Telford







[jira] [Comment Edited] (KAFKA-4609) KTable/KTable join followed by groupBy and aggregate/count can result in duplicated results

2022-03-25 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959455#comment-16959455
 ] 

Matthias J. Sax edited comment on KAFKA-4609 at 3/25/22, 5:31 PM:
--

As you can see, the ticket is open. It was never addressed and thus it's not a 
surprise that you may still hit it in newer versions.


was (Author: mjsax):
As you can see, the ticket is open. It was never addressed and thus it's not a 
surprise that you may still hit it in never versions.

> KTable/KTable join followed by groupBy and aggregate/count can result in 
> duplicated results
> ---
>
> Key: KAFKA-4609
> URL: https://issues.apache.org/jira/browse/KAFKA-4609
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1, 0.10.2.0
>Reporter: Damian Guy
>Priority: Major
>  Labels: architecture
>
> When caching is enabled, KTable/KTable joins can result in duplicate values 
> being emitted. This will occur if there were updates to the same key in both 
> tables. Each table is flushed independently, and each table will trigger the 
> join, so you get two results for the same key. 
> If we subsequently perform a groupBy and then aggregate operation we will now 
> process these duplicates resulting in incorrect aggregated values. For 
> example count will be double the value it should be.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] KIP-819: Merge multiple KStreams in one operation

2022-03-24 Thread Matthias J. Sax

Thanks for proposing this KIP.

I feel a little bit torn by the idea. In general, we try to keep the 
surface area small, and only add APIs that delivery (significant) value.


It seems the current proposal is more or less about syntactic sugar, 
what can still be valuable, but I am not really sure about it.


I am also wondering, if we could use a variadic argument instead of a 
`Collection`:


KStream merge(KStream... streams);

This way, we could just replace the existing method in a backward 
compatible way (well, source code compatible only) and thus not increase 
the surface area of the API while still achieving your goal?


A `merge()` with zero argument would just be a no-op (same as for using 
`Collection` I assume?).



For adding the static methods: It seems not to be a common pattern to 
me? I might be better not to add them and leave it to users to write a 
small helper method themselves if they have such a pattern?



-Matthias



On 1/31/22 7:35 AM, Nick Telford wrote:

Hi everyone,

I'd like to discuss KIP 819:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-819%3A+Merge+multiple+KStreams+in+one+operation

This is a simple KIP that adds/modifies the KStream#merge API to enable
many streams to be merged in a single graph node.

Regards,

Nick Telford



Re: [VOTE] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-24 Thread Matthias J. Sax

+1 (binding)


On 3/24/22 1:52 PM, Hao Li wrote:

I hit reply on my phone in the mail app and changed the title and text
hoping it will start a new thread. Apparently it doesn't work...

On Thu, Mar 24, 2022 at 12:36 PM Bruno Cadonna  wrote:


Hi Hao,

Actually, this is the VOTE thread. Do you use GMail? Sometimes it is a
bit weird how it shows e-mails in threads.

Anyways, thank you for the KIP and your patience!

+1 (binding)

Best,
Bruno

On 24.03.22 17:36, Hao Li wrote:

Thanks John and Guozhang. Didn't realize I used this discussion thread

for

voting. Let me start a new thread for voting. Will fix the KIP.

On Thu, Mar 24, 2022 at 9:28 AM Guozhang Wang 

wrote:



+1 (binding).

Thanks Hao!


Guozhang

On Thu, Mar 24, 2022 at 9:20 AM John Roesler 

wrote:



Thanks, Hao!

I'm +1 (binding)

-John

On Wed, 2022-03-23 at 22:25 -0700, Hao Li wrote:

Hi all,

I'd like to start a vote on Kafka Streams KIP-825:






https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced





--
-- Guozhang











[jira] [Commented] (KAFKA-13542) Utilize the new Consumer#enforceRebalance(reason) API in Streams

2022-03-24 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17512018#comment-17512018
 ] 

Matthias J. Sax commented on KAFKA-13542:
-

SGTM.

> Utilize the new Consumer#enforceRebalance(reason) API in Streams
> 
>
> Key: KAFKA-13542
> URL: https://issues.apache.org/jira/browse/KAFKA-13542
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Hao Li
>Priority: Blocker
> Fix For: 3.2.0
>
>
> KIP-800 is adding a new "reason" parameter to the Consumer#enforceRebalance 
> API, which will be passed in to a new field of the JoinGroup protocol. We 
> invoke this API throughout Streams for various reasons, which are very useful 
> for debugging the cause of rebalancing. Passing in the reason to this new API 
> would make it possible to figure out why a Streams client triggered a 
> rebalance from the broker logs, which are often the only logs available when 
> the client logs cannot be retrieved for whatever reason



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-23 Thread Matthias J. Sax
ugh

for

this KIP we could just add it in

`XXXWindowedKStream.aggregate()`,

we

may

want to extend to other non-windowed operators in the

future.

2. To be consistent with other control class names, I

feel

maybe

we

can

name it "Emitted", not "EmitConfig".
3. Following the first comment, I think we can have the

static

constructor

names as "onWindowClose" and "onEachUpdate".

The resulted code pattern would be like this:

stream
  .groupBy(..)
  .windowBy(TimeWindow..)
  .count(Emitted.onWindowClose)

WDYT?


On Wed, Mar 16, 2022 at 12:07 PM Matthias J. Sax <

mj...@apache.org



wrote:



`allowedLateness` may not be a good name. What I

have

in

mind

is

to

use

this to control how frequently we try to emit final

results.

Maybe

it's

more flexible to be used as config in properties as

we

don't

need to

recompile DSL to change it.


I see; making it a config seems better. Frankly, I am

not

even

sure

if

we need a config at all or if we can just hard code

it?

For

the

stream-stream join left/outer join fix, there is only

an

internal

config

but no public config either.

Option 1: Your proposal is?

stream
  .groupByKey()
  .windowBy(TimeWindow.ofSizeNoGrace(...))
  .configure(EmitConfig.emitFinal()
  .count()

Does not change my argument that it seems to be

misplace

from

an

API

flow POV.

Option 1 seems to be the least desirable to me.

For option 2 and 3, and not sure which one I like

better.

Might

be

good

if other could chime in, too. I think I slightly

prefer

option

2

over

option 3.


-Matthias

On 3/15/22 5:33 PM, Hao Li wrote:

Thanks for the feedback Matthias.

`allowedLateness` may not be a good name. What I have

in

mind

is

to

use

this to control how frequently we try to emit final

results.

Maybe

it's

more flexible to be used as config in properties as

we

don't

need

to

recompile DSL to change it.

For option 1, I intend to use `emitFinal` to

configure

how

`TimeWindowedKStream` should be outputted to `KTable`

after

aggregation.

But `emitFinal` is not an action to the

`TimeWindowedKStream`

interface.

Maybe adding `configure(EmitConfig config)` makes

more

sense?


For option 2, config can be created using

`WindowConfig.emitFinal()`

or

`EmitConfig.emitFinal`

For option 3, it will be something like

`TimeWindows(...,

EmitConfig

emitConfig)`.

For putting `EmitConfig` in aggregation operator, I

think

it

doesn't

control how we do aggregation but how we output to

`KTable`.

That's

why I

feel option 1 makes more sense as it applies to

`TimeWindowedKStream`.

But

I'm also OK with option 2.

Hao




On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax <

mj...@apache.org



wrote:



Thanks for the KIP.

A general comment: it seem that we won't need any

new

`allowedLateness`

parameter because the grace-period is defined on the

window

itself

already?


(On the other hand, if I think about it once more,

maybe

the

`grace-period` is actually not a property of the

window

but

a

property

of the aggregation operator? _thinking_)

   From an API flow point of view, option 1 might

not

be

desirable

IMHO:


 stream
   .groupByKey()
   .windowBy(TimeWindow.ofSizeNoGrace(...))
   .emitFinal()
   .count()

The call to `emitFinal(0` seems not to be on the

right

place

for

this

case?



Option 2 might work (I think we need to discuss a

few

details

of

the

API

though):

 stream
   .groupByKey()
   .windowBy(
 TimeWindow.ofSizeNoGrace(...),
 EmitConfig.emitFinal() -- just made this

up;

it's

not

in

the

KIP

   )
   .count()

I made up the `WindowConfig.emitFinal()` call --

from

the

KIP

it's

unclear what API you have in mind? `EmitFinalConfig`

has

not

public

constructor not any builder method.


For option 3, I am not sure what you really have in

mind.

Can

you

given

a concrete example (similar to above) how users

would

write

their

code?




Did you consider to actually pass in the

`EmitConfig`

into

the

aggregation operator? In the end, it seems not to be

property

of

the

window definition or windowing step, but a property

of

the

actual

operator:


 stream
   .groupByKey()
   .windowBy(
 TimeWindow.ofSizeNoGrace(...)
   )
   .count(EmitConfig.emitFinal())

The API surface area that need to be updated might

be

larger

for

this

case though...


-Matthias



On 3/14/22 9:21 PM, Hao Li wrote:

Thanks Guozhang!

1. I agree `EmitConfig` is better than

`WindowConfig`

and

option 2

modifies

less places. What do you think of option 1 which

doesn't

change

the

current

`windowedBy` api but configures `EmitConfig`

separately.

The

benefit

of

option 1 is if we need to configure something else

later,

w

[jira] [Commented] (KAFKA-13542) Utilize the new Consumer#enforceRebalance(reason) API in Streams

2022-03-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17511537#comment-17511537
 ] 

Matthias J. Sax commented on KAFKA-13542:
-

Curious: did we figure out why the patch resulted in the perf regression?

> Utilize the new Consumer#enforceRebalance(reason) API in Streams
> 
>
> Key: KAFKA-13542
> URL: https://issues.apache.org/jira/browse/KAFKA-13542
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Hao Li
>Priority: Blocker
> Fix For: 3.2.0
>
>
> KIP-800 is adding a new "reason" parameter to the Consumer#enforceRebalance 
> API, which will be passed in to a new field of the JoinGroup protocol. We 
> invoke this API throughout Streams for various reasons, which are very useful 
> for debugging the cause of rebalancing. Passing in the reason to this new API 
> would make it possible to figure out why a Streams client triggered a 
> rebalance from the broker logs, which are often the only logs available when 
> the client logs cannot be retrieved for whatever reason



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations

2022-03-22 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17510997#comment-17510997
 ] 

Matthias J. Sax commented on KAFKA-7509:


Thanks for the detailed explanation. I don't know anything about connect code 
base, and just realizing it's even more complicated than for the Streams case 
with regard to nesting... I guess I don't have a good proposal how to address 
it. – In the end, it also seems a bigger problem for Connect than for Streams 
(for Streams, it falls into "annoying but not harmful" category, but it seem 
for Connect the noise ration is much higher...).

> Kafka Connect logs unnecessary warnings about unused configurations
> ---
>
> Key: KAFKA-7509
> URL: https://issues.apache.org/jira/browse/KAFKA-7509
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Randall Hauch
>Priority: Major
>
> When running Connect, the logs contain quite a few warnings about "The 
> configuration '{}' was supplied but isn't a known config." This occurs when 
> Connect creates producers, consumers, and admin clients, because the 
> AbstractConfig is logging unused configuration properties upon construction. 
> It's complicated by the fact that the Producer, Consumer, and AdminClient all 
> create their own AbstractConfig instances within the constructor, so we can't 
> even call its {{ignore(String key)}} method.
> See also KAFKA-6793 for a similar issue with Streams.
> There are no arguments in the Producer, Consumer, or AdminClient constructors 
> to control  whether the configs log these warnings, so a simpler workaround 
> is to only pass those configuration properties to the Producer, Consumer, and 
> AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig 
> configdefs know about.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [VOTE} KIP-796: Interactive Query v2

2022-03-22 Thread Matthias J. Sax

The old API IQv1 is not deprecated yet, so I don't see a reason to revert.

We might not want to "advertise" the IQv2 in the release announcement 
though if it's not complete and unstable right now.


We might also not want to mention it in the docs? Not sure if there was 
already docs PR. If yes, reverting the docs or adding a BIG disclaimer 
"under development / experimental" might be good.


-Matthias


On 3/22/22 9:04 AM, Guozhang Wang wrote:

Hi John,

Originally I was leaning towards making IQv2 APIs more or less stable while
being released for the first time, but after some second thoughts I'm now
feeling it's okay to take it in a more evolving manner. So I'm preferring
to keep it and be open for breaking changes in the future.


Guozhang

On Tue, Mar 22, 2022 at 8:27 AM John Roesler  wrote:


Thanks Bruno,

Yes, although IQv2 is not fully implemented (it doesn't
query global stores, and it doesn't support every query we
ultimately want), the queries that are implemnted do work as
expected.

The main concern was that, right now, it's the job of the
Metered layer to translate queries and responses between
Java types and binary data. The suggestions that came in
after the KIP was approved were to consider moving to a
generic type-mapping API or to move to a lazy
de/serialization approach. I think either of those is a good
suggestion, but I haven't had time to really explore the
implementation to see how either would really pan out.

You are correct, though, all these APIs are marked
"Evolving", so we can always break compatibility later if we
feel like we have a much better approach.

Let's give the community a few days to chime in. If no one
votes to pull it out, I'll plan to just keep it in. If we do
remove it, then KIPs 805 and 806 would also be removed.

Thanks,
-John

On Tue, 2022-03-22 at 12:43 +0100, Bruno Cadonna wrote:

Hi John,

The already implemented query types work as expected, don't they?

I do not know the specific concerns, but it seems that this is the
situation that motivated you to mark the APIs as @Evolving. Keeping the
IQv2 API in 3.2 does not contradict the accepted KIP, right?

In case, we decided to remove the IQv2 API from 3.2, would that mean we
also need to remove the existing query types specified in KIP-805 and
KIP-806?

BTW, I now realize that I mistakenly removed KIP-796 from the release
plan for 3.2. Sorry for that! I will re-add it to the release plan.

Best,
Bruno

On 22.03.22 02:50, John Roesler wrote:

Hello, all,

During the PR reviews for this KIP, there were several late concerns

raised about the IQv2 APIs. I filed tickets under KAFKA-13479 and promised
to revisit them before the API was released.


Unfortunately, I have not had time to circle back on those concerns.

Now that the 3.2 branch cut has happened, I can either remove the IQv2 API
from 3.2 and plan to address those concerns before 3.3, or we can go ahead
and release IQv2 as proposed and implemented.


Note that the APIs are all marked @Evolving, so we can technically

break compatibility if we do find a better way to do something later.


What is your preference? Release it, or wait?

Thanks,
John

On Mon, Nov 22, 2021, at 21:18, John Roesler wrote:

Thanks for voting and for the discussion, all!

The vote on KIP-796 passes with:
3 binding +1 (Bruno, Bill, and myself)
2 non-binding +1 (Patrick and Vasiliki)
no vetoes

The vote is now closed. If anyone has objections later on,
please raise them, though!

We will proceed with a series of pull requests to implement
the framework, and we will also propose one or more small
KIPs to propose specific queries.

Thanks again,
-John

On Mon, 2021-11-22 at 12:11 -0500, Bill Bejeck wrote:

Thanks for the well-detailed KIP, John.

It's a +1 (binding) from me.

I want to point out one thing which I think is an oversight. The

"Example

Raw Query" scan includes a line using the

`kafkaStreams.serdesForStore`

method, but it's listed in the "Rejected Alternatives" section.

Thanks,
Bill

On Mon, Nov 22, 2021 at 9:22 AM Bruno Cadonna 

wrote:



Thanks for the KIP, John!

+1 (binding)

Best,
Bruno

On 19.11.21 18:04, Vasiliki Papavasileiou wrote:

I think this KIP will greatly improve how we handle IQ in

streams so +1

(non-binding) from me.

Thank you John!

On Thu, Nov 18, 2021 at 9:45 PM Patrick Stuedi



wrote:


+1 (non-binding), thanks John!
-Patrick

On Thu, Nov 18, 2021 at 12:27 AM John Roesler <

vvcep...@apache.org>

wrote:



Hello all,

I'd like to open the vote for KIP-796, which proposes
a revamp of the Interactive Query APIs in Kafka Streams.

The proposal is here:
https://cwiki.apache.org/confluence/x/34xnCw

Thanks to all who reviewed the proposal, and thanks in
advance for taking the time to vote!

Thank you,
-John














[jira] [Commented] (KAFKA-13739) Sliding window without grace not working

2022-03-21 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17510043#comment-17510043
 ] 

Matthias J. Sax commented on KAFKA-13739:
-

Thank you! – Let us know if you need any help.

> Sliding window without grace not working
> 
>
> Key: KAFKA-13739
> URL: https://issues.apache.org/jira/browse/KAFKA-13739
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: bounkong khamphousone
>Priority: Minor
>  Labels: beginner, newbie
>
> Hi everyone! I would like to understand why KafkaStreams DSL offer the 
> ability to express a SlidingWindow with no grace period but seems that it 
> doesn't work. [confluent's 
> site|https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#sliding-time-windows]
>  state that grace period is required and with the deprecated method, it's 
> default to 24 hours.
> Doing a basic sliding window with a count, if I set grace period to 1 ms, 
> expected output is done. Based on the sliding window documentation, lower and 
> upper bounds are inclusive.
> If I set grace period to 0 ms, I can see that record is not skipped at 
> KStreamSlidingWindowAggregate(l.126) but when we try to create the window and 
> push the event in KStreamSlidingWindowAggregate#createWindows we call the 
> method updateWindowAndForward(l.417). This method (l.468) check that 
> {{{}windowEnd > closeTime{}}}.
> closeTime is defined as {{observedStreamTime - window.gracePeriodMs}} 
> (Sliding window configuration)
> windowEnd is defined as {{{}inputRecordTimestamp{}}}.
>  
> For a first event with a record timestamp, we can assume that 
> observedStreamTime is equal to inputRecordTimestamp.
>  
> Therefore, closeTime is {{inputRecordTimestamp - 0}} (gracePeriodMS) which 
> results to {{{}inputRecordTimestamp{}}}.
> If we go back to the check done in {{updateWindowAndForward}} method, then we 
> have inputRecordTimestamp > inputRecordTimestamp which is always false. The 
> record is then skipped for record's own window.
> Stating that lower and upper bounds are inclusive, I would have expected the 
> event to be pushed in the store and forwarded. Hence, the check would be 
> {{{}windowEnd >= closeTime{}}}.
>  
> Is it a bug or is it intended ?
> Thanks in advance for your explanations!
> Best regards!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations

2022-03-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17509095#comment-17509095
 ] 

Matthias J. Sax commented on KAFKA-7509:


{quote} # Breaking backwards compatibility for a lot of Kafka components 
(producer/consumer interceptors, Connect REST extensions, etc.{quote}
Why? Could we not feature flag it (disable by default to preserve backward 
compatibility; maybe log a WARN message if not enabled; have a path forward to 
enable by default in some future major release)?
{quote} # Complicating the configuration behavior for applications with this 
type of "nested" configuration surface{quote}
Yes, it's a trade-off between flexibility and complexity – personally, I think 
that for most cases deep nesting won't be the case, and single-level nesting 
seems not to be too complicated and more flexible compare to a "config 
enable/disable warn-logging".
{quote} # Makes it impossible to have sub-configured components that are aware 
of their parent's configuration (for example, you could no longer have a 
Connect REST extension that's aware of the entire Connect worker's config 
without duplicating those same properties into the sub-config for that REST 
extension){quote}
This is a use-case I never encountered before. From an encapsulation point of 
view, I am wondering why an inner component would need the config of the outer 
one? Can you elaborate?
{quote}Ultimately, this still seems a little too aggressive of a change for the 
problem that it's trying to solve. If we were redesigning these APIs from the 
ground up, it would certainly be beneficial, but considering how much has been 
built on top of these APIs already and how much work it'd take for users to 
adjust to the proposed changes, it doesn't seem like a friendly tradeoff. Plus, 
for some situations (like the example with REST extensions), it's unclear how 
we'd want to proceed.
{quote}
Yeah, that always tricky. Personally, I tend to prefer "the right" solution 
even if it might be more complex to get there. But I don't feel too strong 
about it either. I agree it's "just" about log messages, so maybe not worth it. 
My personal concern with disabling (either completely via config or move to 
DEBUG) is that actual useful warning would be affected, too. It just seems to 
be too coarse grained – for this case, I was rather have spurious / annoying 
WARNs than allow to disable them.

But I guess details would need to get discussed on a KIP anyway... 

> Kafka Connect logs unnecessary warnings about unused configurations
> ---
>
> Key: KAFKA-7509
> URL: https://issues.apache.org/jira/browse/KAFKA-7509
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Randall Hauch
>Priority: Major
>
> When running Connect, the logs contain quite a few warnings about "The 
> configuration '{}' was supplied but isn't a known config." This occurs when 
> Connect creates producers, consumers, and admin clients, because the 
> AbstractConfig is logging unused configuration properties upon construction. 
> It's complicated by the fact that the Producer, Consumer, and AdminClient all 
> create their own AbstractConfig instances within the constructor, so we can't 
> even call its {{ignore(String key)}} method.
> See also KAFKA-6793 for a similar issue with Streams.
> There are no arguments in the Producer, Consumer, or AdminClient constructors 
> to control  whether the configs log these warnings, so a simpler workaround 
> is to only pass those configuration properties to the Producer, Consumer, and 
> AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig 
> configdefs know about.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations

2022-03-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17507833#comment-17507833
 ] 

Matthias J. Sax commented on KAFKA-7509:


Personally, I think the best way would be to tackle this with a KIP across the 
board: not just for connect. The idea would be to allow user to add a prefix to 
the config they want to "forward" to "inner" classes. For example, if an 
interceptor needs a config, we could use `interceptor.foo.bar` instead of 
`foo.bar` – this way, we know that the consumer/producer can ignore this 
config, and when we forward the config into the interceptor, we actually remove 
`interceptor` prefix.

The `interceptor` prefix would of course be quite specific (and it only works 
because interceptors are a Kafka concept). We could also generalize it using a 
prefix `forward.`.

In Kafka Streams, we are already using `main.consumer.`, `restore.consumer.`, 
and `producer.` as prefix to allow users to configure individual client and to 
avoid that `StreamsConfig` logs warnings about "unknown" client config.

The advantage of the prefix approach is, that is it much more fine-grained 
compared to a config, and it can also be applies recursively. For example, in 
Kafka Streams we use an `AdminClient` inside the `ConsumerClient`. If we want 
to configure the admin client, we can provide 
`main.consumer.forward.actualAdminConfigName` and avoid that the consumer logs 
a warning about an unknown client.

> Kafka Connect logs unnecessary warnings about unused configurations
> ---
>
> Key: KAFKA-7509
> URL: https://issues.apache.org/jira/browse/KAFKA-7509
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Randall Hauch
>Priority: Major
>
> When running Connect, the logs contain quite a few warnings about "The 
> configuration '{}' was supplied but isn't a known config." This occurs when 
> Connect creates producers, consumers, and admin clients, because the 
> AbstractConfig is logging unused configuration properties upon construction. 
> It's complicated by the fact that the Producer, Consumer, and AdminClient all 
> create their own AbstractConfig instances within the constructor, so we can't 
> even call its {{ignore(String key)}} method.
> See also KAFKA-6793 for a similar issue with Streams.
> There are no arguments in the Producer, Consumer, or AdminClient constructors 
> to control  whether the configs log these warnings, so a simpler workaround 
> is to only pass those configuration properties to the Producer, Consumer, and 
> AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig 
> configdefs know about.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13739) Sliding window without grace not working

2022-03-16 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13739:

Labels: beginner newbie  (was: )

> Sliding window without grace not working
> 
>
> Key: KAFKA-13739
> URL: https://issues.apache.org/jira/browse/KAFKA-13739
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: bounkong khamphousone
>Priority: Minor
>  Labels: beginner, newbie
>
> Hi everyone! I would like to understand why KafkaStreams DSL offer the 
> ability to express a SlidingWindow with no grace period but seems that it 
> doesn't work. [confluent's 
> site|https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#sliding-time-windows]
>  state that grace period is required and with the deprecated method, it's 
> default to 24 hours.
> Doing a basic sliding window with a count, if I set grace period to 1 ms, 
> expected output is done. Based on the sliding window documentation, lower and 
> upper bounds are inclusive.
> If I set grace period to 0 ms, I can see that record is not skipped at 
> KStreamSlidingWindowAggregate(l.126) but when we try to create the window and 
> push the event in KStreamSlidingWindowAggregate#createWindows we call the 
> method updateWindowAndForward(l.417). This method (l.468) check that 
> {{{}windowEnd > closeTime{}}}.
> closeTime is defined as {{observedStreamTime - window.gracePeriodMs}} 
> (Sliding window configuration)
> windowEnd is defined as {{{}inputRecordTimestamp{}}}.
>  
> For a first event with a record timestamp, we can assume that 
> observedStreamTime is equal to inputRecordTimestamp.
>  
> Therefore, closeTime is {{inputRecordTimestamp - 0}} (gracePeriodMS) which 
> results to {{{}inputRecordTimestamp{}}}.
> If we go back to the check done in {{updateWindowAndForward}} method, then we 
> have inputRecordTimestamp > inputRecordTimestamp which is always false. The 
> record is then skipped for record's own window.
> Stating that lower and upper bounds are inclusive, I would have expected the 
> event to be pushed in the store and forwarded. Hence, the check would be 
> {{{}windowEnd >= closeTime{}}}.
>  
> Is it a bug or is it intended ?
> Thanks in advance for your explanations!
> Best regards!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13739) Sliding window without grace not working

2022-03-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17507825#comment-17507825
 ] 

Matthias J. Sax commented on KAFKA-13739:
-

Thanks for chiming in [~lct45] \
{quote}_if_ we want to support a grace period of 
{quote}
Given that we support `SlidingWindow.ofTimeDifferenceNoGrace()` we should 
support 0ms grace period :) 

> Sliding window without grace not working
> 
>
> Key: KAFKA-13739
> URL: https://issues.apache.org/jira/browse/KAFKA-13739
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: bounkong khamphousone
>Priority: Minor
>
> Hi everyone! I would like to understand why KafkaStreams DSL offer the 
> ability to express a SlidingWindow with no grace period but seems that it 
> doesn't work. [confluent's 
> site|https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#sliding-time-windows]
>  state that grace period is required and with the deprecated method, it's 
> default to 24 hours.
> Doing a basic sliding window with a count, if I set grace period to 1 ms, 
> expected output is done. Based on the sliding window documentation, lower and 
> upper bounds are inclusive.
> If I set grace period to 0 ms, I can see that record is not skipped at 
> KStreamSlidingWindowAggregate(l.126) but when we try to create the window and 
> push the event in KStreamSlidingWindowAggregate#createWindows we call the 
> method updateWindowAndForward(l.417). This method (l.468) check that 
> {{{}windowEnd > closeTime{}}}.
> closeTime is defined as {{observedStreamTime - window.gracePeriodMs}} 
> (Sliding window configuration)
> windowEnd is defined as {{{}inputRecordTimestamp{}}}.
>  
> For a first event with a record timestamp, we can assume that 
> observedStreamTime is equal to inputRecordTimestamp.
>  
> Therefore, closeTime is {{inputRecordTimestamp - 0}} (gracePeriodMS) which 
> results to {{{}inputRecordTimestamp{}}}.
> If we go back to the check done in {{updateWindowAndForward}} method, then we 
> have inputRecordTimestamp > inputRecordTimestamp which is always false. The 
> record is then skipped for record's own window.
> Stating that lower and upper bounds are inclusive, I would have expected the 
> event to be pushed in the store and forwarded. Hence, the check would be 
> {{{}windowEnd >= closeTime{}}}.
>  
> Is it a bug or is it intended ?
> Thanks in advance for your explanations!
> Best regards!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-16 Thread Matthias J. Sax

`allowedLateness` may not be a good name. What I have in mind is to use
this to control how frequently we try to emit final results. Maybe it's
more flexible to be used as config in properties as we don't need to
recompile DSL to change it.


I see; making it a config seems better. Frankly, I am not even sure if 
we need a config at all or if we can just hard code it? For the 
stream-stream join left/outer join fix, there is only an internal config 
but no public config either.


Option 1: Your proposal is?

  stream
.groupByKey()
.windowBy(TimeWindow.ofSizeNoGrace(...))
.configure(EmitConfig.emitFinal()
.count()

Does not change my argument that it seems to be misplace from an API 
flow POV.


Option 1 seems to be the least desirable to me.

For option 2 and 3, and not sure which one I like better. Might be good 
if other could chime in, too. I think I slightly prefer option 2 over 
option 3.



-Matthias

On 3/15/22 5:33 PM, Hao Li wrote:

Thanks for the feedback Matthias.

`allowedLateness` may not be a good name. What I have in mind is to use
this to control how frequently we try to emit final results. Maybe it's
more flexible to be used as config in properties as we don't need to
recompile DSL to change it.

For option 1, I intend to use `emitFinal` to configure how
`TimeWindowedKStream` should be outputted to `KTable` after aggregation.
But `emitFinal` is not an action to the `TimeWindowedKStream` interface.
Maybe adding `configure(EmitConfig config)` makes more sense?

For option 2, config can be created using `WindowConfig.emitFinal()` or
`EmitConfig.emitFinal`

For option 3, it will be something like `TimeWindows(..., EmitConfig
emitConfig)`.

For putting `EmitConfig` in aggregation operator, I think it doesn't
control how we do aggregation but how we output to `KTable`. That's why I
feel option 1 makes more sense as it applies to `TimeWindowedKStream`. But
I'm also OK with option 2.

Hao




On Tue, Mar 15, 2022 at 4:48 PM Matthias J. Sax  wrote:


Thanks for the KIP.

A general comment: it seem that we won't need any new `allowedLateness`
parameter because the grace-period is defined on the window itself already?

(On the other hand, if I think about it once more, maybe the
`grace-period` is actually not a property of the window but a property
of the aggregation operator? _thinking_)

  From an API flow point of view, option 1 might not be desirable IMHO:

stream
  .groupByKey()
  .windowBy(TimeWindow.ofSizeNoGrace(...))
  .emitFinal()
  .count()

The call to `emitFinal(0` seems not to be on the right place for this case?


Option 2 might work (I think we need to discuss a few details of the API
though):

stream
  .groupByKey()
  .windowBy(
TimeWindow.ofSizeNoGrace(...),
EmitConfig.emitFinal() -- just made this up; it's not in the KIP
  )
  .count()

I made up the `WindowConfig.emitFinal()` call -- from the KIP it's
unclear what API you have in mind? `EmitFinalConfig` has not public
constructor not any builder method.


For option 3, I am not sure what you really have in mind. Can you given
a concrete example (similar to above) how users would write their code?



Did you consider to actually pass in the `EmitConfig` into the
aggregation operator? In the end, it seems not to be property of the
window definition or windowing step, but a property of the actual operator:

stream
  .groupByKey()
  .windowBy(
TimeWindow.ofSizeNoGrace(...)
  )
  .count(EmitConfig.emitFinal())

The API surface area that need to be updated might be larger for this
case though...


-Matthias



On 3/14/22 9:21 PM, Hao Li wrote:

Thanks Guozhang!

1. I agree `EmitConfig` is better than `WindowConfig` and option 2

modifies

less places. What do you think of option 1 which doesn't change the

current

`windowedBy` api but configures `EmitConfig` separately. The benefit of
option 1 is if we need to configure something else later, we don't need

to

pile them on `windowedBy` but can add separate APIs.
2. I added it to `Stores` mainly to conform to


https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231
.

But We can also create an internal API to do that without modifying
`Stores`.

Hao

On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang 

wrote:



Hello Hao,

Thanks for the proposal, I have some preference among the options here

so I

will copy them here:

I'm now thinking if it's better to not add this new config on each of

the

Window interfaces, but instead add that at the KGroupedStream#windowedBy
function. Also instead of adding just a boolean flag, maybe we can add a
Configured class like Grouped, Suppressed, etc, e.g. let's call it a
Emitted which for now would just have a single construct as
Emitted.atWindowClose whose semantics is the same as emitFinal == true.

I

think the benefits are:

1) you do not need to modify

Re: [DISCUSS] KIP-825: introduce a new API to control when aggregated results are produced

2022-03-15 Thread Matthias J. Sax

Thanks for the KIP.

A general comment: it seem that we won't need any new `allowedLateness` 
parameter because the grace-period is defined on the window itself already?


(On the other hand, if I think about it once more, maybe the 
`grace-period` is actually not a property of the window but a property 
of the aggregation operator? _thinking_)


From an API flow point of view, option 1 might not be desirable IMHO:

  stream
.groupByKey()
.windowBy(TimeWindow.ofSizeNoGrace(...))
.emitFinal()
.count()

The call to `emitFinal(0` seems not to be on the right place for this case?


Option 2 might work (I think we need to discuss a few details of the API 
though):


  stream
.groupByKey()
.windowBy(
  TimeWindow.ofSizeNoGrace(...),
  EmitConfig.emitFinal() -- just made this up; it's not in the KIP
)
.count()

I made up the `WindowConfig.emitFinal()` call -- from the KIP it's 
unclear what API you have in mind? `EmitFinalConfig` has not public 
constructor not any builder method.



For option 3, I am not sure what you really have in mind. Can you given 
a concrete example (similar to above) how users would write their code?




Did you consider to actually pass in the `EmitConfig` into the 
aggregation operator? In the end, it seems not to be property of the 
window definition or windowing step, but a property of the actual operator:


  stream
.groupByKey()
.windowBy(
  TimeWindow.ofSizeNoGrace(...)
)
.count(EmitConfig.emitFinal())

The API surface area that need to be updated might be larger for this 
case though...



-Matthias



On 3/14/22 9:21 PM, Hao Li wrote:

Thanks Guozhang!

1. I agree `EmitConfig` is better than `WindowConfig` and option 2 modifies
less places. What do you think of option 1 which doesn't change the current
`windowedBy` api but configures `EmitConfig` separately. The benefit of
option 1 is if we need to configure something else later, we don't need to
pile them on `windowedBy` but can add separate APIs.
2. I added it to `Stores` mainly to conform to
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java#L227-L231.
But We can also create an internal API to do that without modifying
`Stores`.

Hao

On Mon, Mar 14, 2022 at 7:52 PM Guozhang Wang  wrote:


Hello Hao,

Thanks for the proposal, I have some preference among the options here so I
will copy them here:

I'm now thinking if it's better to not add this new config on each of the
Window interfaces, but instead add that at the KGroupedStream#windowedBy
function. Also instead of adding just a boolean flag, maybe we can add a
Configured class like Grouped, Suppressed, etc, e.g. let's call it a
Emitted which for now would just have a single construct as
Emitted.atWindowClose whose semantics is the same as emitFinal == true. I
think the benefits are:

1) you do not need to modify multiple Window classes, but just overload one
windowedBy function with a second param. This is less of a scope for now,
and also more extensible for any future changes.

2) With a config interface, we maintain its extensibility as well as being
able to reuse this Emitted interface for other operators if we wanted to
expand to.



So in general I'm leaning towards option 2). For that, some more detailed
comments:

1) If we want to reuse that config object for other non-window stateful
operations, I think naming it as `EmitConfig` is probably better than
`WindowConfig`.
2) I saw your PR (https://github.com/apache/kafka/pull/11892) that you are
also proposing to add new stores into the public factory Stores, but it's
not included in the KIP. Is that intentional? Personally I think that
although we may eventually want to add a new store type to the public APIs,
for this KIP maybe we do not have to add them but can delay for later after
we've learned the best way to layout. LMK what do you think?



Guozhang



On Fri, Mar 11, 2022 at 2:13 PM Hao Li  wrote:


Hi Dev team,

I'd like to start a discussion thread on Kafka Streams KIP-825:



https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced


This KIP is aimed to add new APIs to support outputting final aggregated
results for windowed aggregations. I listed several options there and I'm
looking forward to your feedback.

Thanks,
Hao




--
-- Guozhang






Re: [VOTE] KIP-820: Extend KStream process with new Processor API

2022-03-15 Thread Matthias J. Sax

+1 (binding)

Thanks for pushing this through. Was a difficult discussion!


-Matthias

On 3/15/22 10:01 AM, John Roesler wrote:

Thanks for the update, Jorge!

I’m still +1 (binding)

Thanks,
John

On Thu, Feb 17, 2022, at 12:57, Guozhang Wang wrote:

Thanks Jorge, overall looks good to me.

Maybe we can clarify a bit in the wiki that the reason we have to not
include the additional `final String... stateStoreNames` params in the new
`process` API is that we need to have overloaded functions which takes
`ProcessorSupplier<...> ` where the output types are not `Void`, but due to
type eraser we cannot distinguish the new overloaded function signatures
with the old ones if they also include `final String... stateStoreNames`.
And in javadocs explains that if users want to connect state stores to this
processor, they could use the `connectState` API instead.

Otherwise, I'm +1.

Guozhang

On Tue, Feb 15, 2022 at 11:54 AM John Roesler  wrote:


Thanks, Jorge!

I'm +1 (binding)

-John

On Tue, 2022-02-15 at 19:16 +, Jorge Esteban Quilcate
Otoya wrote:

Hi all,

I'd like to start a vote on KIP-820 which proposes extending KStream to

use

the new Processor API


https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API


Thanks,
Jorge





--
-- Guozhang


Re: [DISCUSS] KIP-820: Extend KStream process with new Processor API

2022-03-15 Thread Matthias J. Sax

Thanks.

It would be good to add the concrete interfaces of the new classed to 
the KIP, ie,


 - FixedKeyProcessorSupplier
 - FixedKeyProcessor
 - FixedKeyProcessorContext
 - FixedKeyRecord


-Matthias


On 3/10/22 3:15 PM, Jorge Esteban Quilcate Otoya wrote:

Thanks all!

I agree with Matthias and Jon on going forward with the new
`FixedKeyRecord` approach.
The KIP has been updated accordingly.

Feel free to add your vote or amend on the vote thread if needed.

Cheers,

On Mon, 7 Mar 2022 at 21:57, Matthias J. Sax  wrote:


I think it's ok that we cannot prevent users from mutating a given
read-only object. We have similar issues "all over the place" in the
API, because it's just how Java works unfortunately (eg,
`ValueMapperWithKey` and similar interfaces).

The point being is, that the API clearly expresses that the key should
not be changes, as `FixedKeyRecord` as not `withKey()` method, what is
much better then having `Record.withKey()` and thus incorrectly
indicating to user that it would be ok to set a new key.

I think it's worth to add the new interfaces.


-Matthias


On 3/7/22 11:46 AM, Guozhang Wang wrote:

Thanks John! I feel a bit ashamed of just thinking loud here without

trying

out prototypes myself :P

I think the FixedKeyProcessor/Record looks very good -- like you said,
since we are making a new set of APIs then why don't we reconsider more
bolderly -- but at the same time I'd also like to make sure we agree on

how

much "safety" we can achieve in runtime: even with the proposed APIs, we
cannot prevent users doing something like:

---
process(FixedKeyRecord inputRecord) {
  inputRecord.key().modifyField(...); // this is not preventable with
runtime key validations either since we just check the key object itself

is

not replaced
  context.forward(inputRecord);
}

---

I.e. in either type-safety or runtime validation, we cannot be 100% safe
that users would not do anything wrong. This drives me to think, how much
we'd like to pay to "remind" (instead of say "enforce", since we cannot
really do it) users the semantics of "processValue". Personally I felt

that

adding the new set of APIs for that purpose only is a bit overkill, and
hence was leaning towards just the runtime validation. But I admit this

is

purely subjective so I'm willing to yield to the group if others feel

it's

worthy to do so.


Guozhang



On Mon, Mar 7, 2022 at 10:32 AM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:


Thanks, John!
This looks very promising.

I will familiarize this approach and update the KIP accordingly. From

what

I can see so far, this should cover most of the open issues in this
proposal.

PS.


Just as a reminder, the current approach with transformers
is NOT enforced at compile time. Transformers have access to
a "forwarding disabled" processor context, which still has
the forward methods that throw a runtime exception when
invoked.


Agree. I was referring to the value transformers where `readOnlyKey` is
passed but not forwarded internally. Though about the "forwarding

disabled"

approach, you're totally right that is a runtime validation.
Regardless, the approach proposed here will be a much better one.


On Sun, 6 Mar 2022 at 18:59, John Roesler  wrote:


Hello all,

It seems like we're making good progress on this discussion.
If I'm keeping track correctly, if we can resolve this
question about how to handle processValues(), then we should
be able to finalize the vote, right?

I share Matthias's preference for having a type-safe API.

Just as a reminder, the current approach with transformers
is NOT enforced at compile time. Transformers have access to
a "forwarding disabled" processor context, which still has
the forward methods that throw a runtime exception when
invoked.

However, the spirit of the "new processor api" line of work
is to clean up a lot of the cruft around the original
processor API, so this is a good opportunity to introduce a
type-safe version if we can.

Based on my experience adding the new processor API, I felt
like it should be possible to do what he suggests, but it
would be more involved than what he said. The biggest thing
I learned from that effort, though, is that you really have
to just try it to see what all the complications are.

With that in mind, I went ahead and implemented the
suggestion: https://github.com/apache/kafka/pull/11854

This is a functional prototype. It only adds processValues,
which takes a supplier of a new type, FixedKeyProcessor.
That processor only processes FixedKeyRecords, which have a
key that cannot be changed. FixedKeyProcessors have a
special context, a FixedKeyProcessorContext, which can only
forward FixedKeyRecords.

FixedKeyRecords have "fixed keys" because its key can only
be set in the constructor, and its constructor is package-
private.

As you can see

[jira] [Updated] (KAFKA-13739) Sliding window without grace not working

2022-03-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13739:

Priority: Minor  (was: Major)

> Sliding window without grace not working
> 
>
> Key: KAFKA-13739
> URL: https://issues.apache.org/jira/browse/KAFKA-13739
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: bounkong khamphousone
>Priority: Minor
>
> Hi everyone! I would like to understand why KafkaStreams DSL offer the 
> ability to express a SlidingWindow with no grace period but seems that it 
> doesn't work. [confluent's 
> site|https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#sliding-time-windows]
>  state that grace period is required and with the deprecated method, it's 
> default to 24 hours.
> Doing a basic sliding window with a count, if I set grace period to 1 ms, 
> expected output is done. Based on the sliding window documentation, lower and 
> upper bounds are inclusive.
> If I set grace period to 0 ms, I can see that record is not skipped at 
> KStreamSlidingWindowAggregate(l.126) but when we try to create the window and 
> push the event in KStreamSlidingWindowAggregate#createWindows we call the 
> method updateWindowAndForward(l.417). This method (l.468) check that 
> {{{}windowEnd > closeTime{}}}.
> closeTime is defined as {{observedStreamTime - window.gracePeriodMs}} 
> (Sliding window configuration)
> windowEnd is defined as {{{}inputRecordTimestamp{}}}.
>  
> For a first event with a record timestamp, we can assume that 
> observedStreamTime is equal to inputRecordTimestamp.
>  
> Therefore, closeTime is {{inputRecordTimestamp - 0}} (gracePeriodMS) which 
> results to {{{}inputRecordTimestamp{}}}.
> If we go back to the check done in {{updateWindowAndForward}} method, then we 
> have inputRecordTimestamp > inputRecordTimestamp which is always false. The 
> record is then skipped for record's own window.
> Stating that lower and upper bounds are inclusive, I would have expected the 
> event to be pushed in the store and forwarded. Hence, the check would be 
> {{{}windowEnd >= closeTime{}}}.
>  
> Is it a bug or is it intended ?
> Thanks in advance for your explanations!
> Best regards!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13739) Sliding window without grace not working

2022-03-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17507261#comment-17507261
 ] 

Matthias J. Sax commented on KAFKA-13739:
-

Thanks for reporting this. Good catch. I think you analysis makes sense. Do you 
want to do a PR to fix it? Seems we have some testing gap for "no grace 
period"... :( 

> Sliding window without grace not working
> 
>
> Key: KAFKA-13739
> URL: https://issues.apache.org/jira/browse/KAFKA-13739
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: bounkong khamphousone
>Priority: Major
>
> Hi everyone! I would like to understand why KafkaStreams DSL offer the 
> ability to express a SlidingWindow with no grace period but seems that it 
> doesn't work. [confluent's 
> site|https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#sliding-time-windows]
>  state that grace period is required and with the deprecated method, it's 
> default to 24 hours.
> Doing a basic sliding window with a count, if I set grace period to 1 ms, 
> expected output is done. Based on the sliding window documentation, lower and 
> upper bounds are inclusive.
> If I set grace period to 0 ms, I can see that record is not skipped at 
> KStreamSlidingWindowAggregate(l.126) but when we try to create the window and 
> push the event in KStreamSlidingWindowAggregate#createWindows we call the 
> method updateWindowAndForward(l.417). This method (l.468) check that 
> {{{}windowEnd > closeTime{}}}.
> closeTime is defined as {{observedStreamTime - window.gracePeriodMs}} 
> (Sliding window configuration)
> windowEnd is defined as {{{}inputRecordTimestamp{}}}.
>  
> For a first event with a record timestamp, we can assume that 
> observedStreamTime is equal to inputRecordTimestamp.
>  
> Therefore, closeTime is {{inputRecordTimestamp - 0}} (gracePeriodMS) which 
> results to {{{}inputRecordTimestamp{}}}.
> If we go back to the check done in {{updateWindowAndForward}} method, then we 
> have inputRecordTimestamp > inputRecordTimestamp which is always false. The 
> record is then skipped for record's own window.
> Stating that lower and upper bounds are inclusive, I would have expected the 
> event to be pushed in the store and forwarded. Hence, the check would be 
> {{{}windowEnd >= closeTime{}}}.
>  
> Is it a bug or is it intended ?
> Thanks in advance for your explanations!
> Best regards!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [VOTE] KIP-622 Add currentSystemTimeMs and currentStreamTimeMs to ProcessorContext

2022-03-15 Thread Matthias J. Sax

Just a quick FYI.

KIP-622 overlapped with KIP-478.

We added the new method to the new `api.ProcessorContext` via 
https://issues.apache.org/jira/browse/KAFKA-13699 for 3.2.0 release.


Please let us know if there are any concerns.

I updated the KIP accordingly.

-Matthias

On 3/5/21 8:42 PM, Rohit Deshpande wrote:

Hello all,
Based on the feedback of the pr <https://github.com/apache/kafka/pull/9744>
https://github.com/apache/kafka/pull/9744, there are following changes done
to the kip
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext>
.

*ProcessorContext#currentSystemTimeMs()*

It is expected that this method will return the internally cached system
timestamp from the Kafka Stream runtime. Thus, it may return a different
value compared to System.currentTimeMillis(). The cached system time
represents the time when we start processing / punctuating, and it would
not change throughout the process / punctuate. So this method will return
current system time (also called wall-clock time) known from kafka streams
runtime.

New methods to MockProcessorContext for testing purposes:

*MockProcessorContext#setRecordTimestamp*: set record timestamp

*MockProcessorContext#setCurrentSystemTimeMs:* set system timestamp

*MockProcessorContext#setCurrentStreamTimeMs*: set stream time

Deprecate method: MockProcessorContext#setTimestamp as it's name is
misleading and we are adding a new method
  MockProcessorContext#setRecordTimestamp which does the same work.

Please let me know if you have any thoughts or concerns with this change.

Thanks,
Roohit

On Fri, Dec 4, 2020 at 7:31 PM Rohit Deshpande 
wrote:


Hello all,
I am closing the vote for this KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext

Summary of the KIP:
Planning to add two new methods to ProcessorContext:
1. long currentSystemTimeMs() to fetch wall-clock time
2. long currentStreamTimeMs() to fetch maximum timestamp of any record yet
processed by the task

Thanks,
Rohit


On 2020/12/01 16:09:54, Bill Bejeck  wrote:

Sorry for jumping into this so late,

Thanks for the KIP, I'm a +1 (binding)

-Bill

On Sun, Jul 26, 2020 at 11:06 AM John Roesler  wrote:


Thanks William,

I’m +1 (binding)

Thanks,
John

On Fri, Jul 24, 2020, at 20:22, Sophie Blee-Goldman wrote:

Thanks all, +1 (non-binding)

Cheers,
Sophie

On Wed, Jul 8, 2020 at 4:02 AM Bruno Cadonna 

wrote:



Thanks Will and Piotr,

+1 (non-binding)

Best,
Bruno

On Wed, Jul 8, 2020 at 8:12 AM Matthias J. Sax 

wrote:


Thanks for the KIP.

+1 (binding)


-Matthias

On 7/7/20 11:48 AM, William Bottrell wrote:

Hi everyone,

I'd like to start a vote for adding two new time API's to

ProcessorContext.


Add currentSystemTimeMs and currentStreamTimeMs to

ProcessorContext

<





https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext




  Thanks everyone for the initial feedback and thanks for your

time.


















[jira] [Updated] (KAFKA-13699) ProcessorContext does not expose Stream Time

2022-03-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13699:

Affects Version/s: 3.0.0
   (was: 2.7.0)

> ProcessorContext does not expose Stream Time
> 
>
> Key: KAFKA-13699
> URL: https://issues.apache.org/jira/browse/KAFKA-13699
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: Shay Lin
>    Assignee: Matthias J. Sax
>Priority: Major
>  Labels: newbie
> Fix For: 3.2.0
>
>
> As a KS developer, I would like to leverage 
> [KIP-622|https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext]
>  and access stream time in Processor Context.
> _(Updated)_
> However, the methods currentStreamTimeMs or currentSystemTimeMs is missing 
> from for KStreams 3.0+.
> Checked with [~mjsax] , the methods are absent from the Processor API , i.e.
>  * org.apache.kafka.streams.processor.api.ProcessorContext



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-13699) ProcessorContext does not expose Stream Time

2022-03-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-13699:
---

Assignee: Matthias J. Sax

> ProcessorContext does not expose Stream Time
> 
>
> Key: KAFKA-13699
> URL: https://issues.apache.org/jira/browse/KAFKA-13699
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Shay Lin
>    Assignee: Matthias J. Sax
>Priority: Major
>  Labels: newbie
>
> As a KS developer, I would like to leverage 
> [KIP-622|https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext]
>  and access stream time in Processor Context.
> _(Updated)_
> However, the methods currentStreamTimeMs or currentSystemTimeMs is missing 
> from for KStreams 3.0+.
> Checked with [~mjsax] , the methods are absent from the Processor API , i.e.
>  * org.apache.kafka.streams.processor.api.ProcessorContext



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-13721) Left-join still emit spurious results in stream-stream joins in some cases

2022-03-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-13721:
---

Assignee: Matthias J. Sax

> Left-join still emit spurious results in stream-stream joins in some cases
> --
>
> Key: KAFKA-13721
> URL: https://issues.apache.org/jira/browse/KAFKA-13721
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: Nollet
>    Assignee: Matthias J. Sax
>Priority: Major
>
> Stream-stream joins seems to still emit spurious results for some window 
> configurations.
> From my tests, it happened when setting before to 0 and having a grace period 
> smaller than the window duration. More precisely it seems to happen when 
> setting before and 
> window duration > grace period + before
> h2. how to reproduce
> {code:java}
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.TestInputTopic;
> import org.apache.kafka.streams.TestOutputTopic;
> import org.apache.kafka.streams.Topology;
> import org.apache.kafka.streams.TopologyTestDriver;
> import org.apache.kafka.streams.kstream.JoinWindows;
> import org.apache.kafka.streams.kstream.KStream;
> import org.junit.After;
> import org.junit.Before;
> import org.junit.Test;
> import java.time.Duration;
> import java.time.Instant;
> import java.util.Properties;
> public class SpuriousLeftJoinTest {
> static final Duration WINDOW_DURATION = Duration.ofMinutes(10);
> static final Duration GRACE = Duration.ofMinutes(6);
> static final Duration BEFORE = Duration.ZERO;
> static final String LEFT_TOPIC_NAME = "LEFT_TOPIC";
> static final String RIGHT_TOPIC_NAME = "RIGHT_TOPIC";
> static final String OUTPUT_TOPIC_NAME = "OUTPUT_TOPIC";
> private static TopologyTestDriver testDriver;
> private static TestInputTopic inputTopicLeft;
> private static TestInputTopic inputTopicRight;
> private static TestOutputTopic outputTopic;
> public static Topology createTopology() {
> StreamsBuilder builder = new StreamsBuilder();
> KStream leftStream = builder.stream(LEFT_TOPIC_NAME);
> KStream rightStream = 
> builder.stream(RIGHT_TOPIC_NAME);
> // return 1 if left join matched, otherwise 0
> KStream joined = leftStream.leftJoin(
> rightStream,
> (value1, value2) -> {
> if(value2 == null){
> return 0;
> }
> return 1;
> },
> JoinWindows.ofTimeDifferenceAndGrace(WINDOW_DURATION, GRACE)
> .before(BEFORE)
> );
> joined.to(OUTPUT_TOPIC_NAME);
> return builder.build();
> }
> @Before
> public void setup() {
> Topology topology = createTopology();
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092");
> props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.StringSerde.class);
> props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
> Serdes.IntegerSerde.class);
> testDriver = new TopologyTestDriver(topology, props);
> inputTopicLeft = testDriver.createInputTopic(LEFT_TOPIC_NAME, 
> Serdes.String().serializer(), Serdes.Integer().serializer());
> inputTopicRight = testDriver.createInputTopic(RIGHT_TOPIC_NAME, 
> Serdes.String().serializer(), Serdes.Integer().serializer());
> outputTopic = testDriver.createOutputTopic(OUTPUT_TOPIC_NAME, 
> Serdes.String().deserializer(), Serdes.Integer().deserializer());
> }
> @After
> public void tearDown() {
> testDriver.close();
> }
> @Test
> public void shouldEmitOnlyOneMessageForKey1(){
> Instant now = Instant.now();
> inputTopicLeft.pipeInput("key1", 12, now);
> inputTopicRight.pipeInput("key1", 13, now.plus(WINDOW_DURATION));
> // send later record to increase stream time & close the window
> inputTopicLeft.pipeInput("other_key", 1212122, 
> now.plus(WINDOW_DURATION).plus(GRACE).plusSeconds(10));
> while (! outputTopic.isEmpty()){
> System.out.println(outputTopic.readKeyValue());
> }
> }
> }
> {code}
> Stdout of previous code is
> {noformat}
> KeyValue(key1, 0)
> KeyValue(key1, 1)
> {noformat}
> However it should be
> {noformat}
> KeyValue(key1, 1)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13722) Update internal interfaces that use ProcessorContext to use StateStoreContext instead

2022-03-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13722:

Component/s: streams

> Update internal interfaces that use ProcessorContext to use StateStoreContext 
> instead
> -
>
> Key: KAFKA-13722
> URL: https://issues.apache.org/jira/browse/KAFKA-13722
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> This is a remainder that when we remove the deprecated public APIs that uses 
> the ProcessorContext, like `StateStore.init`, we should also consider 
> updating the internal interfaces with the ProcessorContext as well. That 
> includes:
> 1. Segments and related util classes which use ProcessorContext.
> 2. For state stores that leverage on ProcessorContext.getXXXTime, their logic 
> should be moved out of the state store impl but to the processor node level 
> that calls on these state stores.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-8065) Forwarding modified timestamps does not reset timestamp correctly

2022-03-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-8065:
--

Assignee: Matthias J. Sax

> Forwarding modified timestamps does not reset timestamp correctly
> -
>
> Key: KAFKA-8065
> URL: https://issues.apache.org/jira/browse/KAFKA-8065
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1, 2.2.0, 2.1.1
>        Reporter: Matthias J. Sax
>    Assignee: Matthias J. Sax
>Priority: Major
> Fix For: 2.2.0, 2.0.2, 2.1.2
>
>
> Using Processor API, users can set a new output record timestamp via 
> `context.forward(..., To.all().withTimestamp(...))`. However, after the 
> forward()-call returns, the timestamp is not reset to the original input 
> record timestamp and thus a consecutive call to `context.forward(...)` 
> without `To` will use the newly set output record timestamp from before, too.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13683) Transactional Producer - Transaction with key xyz went wrong with exception: Timeout expired after 60000milliseconds while awaiting InitProducerId

2022-03-08 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503218#comment-17503218
 ] 

Matthias J. Sax commented on KAFKA-13683:
-

{quote}We are using kakfa streams 3.0.
{quote}
Should the ticket component field should be set to `kafkaStreams` instead of 
`clients?


For Kafka Streams this should actually be fixed in 2.8.0 via 
https://issues.apache.org/jira/browse/KAFKA-8803 / 
https://issues.apache.org/jira/browse/KAFKA-9274

Also, how do set `bootstrap.server` config? Is it hard codes IPs, or serverl 
url? For server urls, what is you DNS caching setting inside the JVM? Cf 
[https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/java-dg-jvm-ttl.html]
 

> Transactional Producer - Transaction with key xyz went wrong with exception: 
> Timeout expired after 6milliseconds while awaiting InitProducerId
> --
>
> Key: KAFKA-13683
> URL: https://issues.apache.org/jira/browse/KAFKA-13683
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.6.0, 2.7.0, 3.0.0
>Reporter: Michael Hornung
>Priority: Critical
>  Labels: new-txn-protocol-should-fix
> Attachments: AkkaHttpRestServer.scala, 
> image-2022-02-24-09-12-04-804.png, image-2022-02-24-09-13-01-383.png, 
> timeoutException.png
>
>
> We have an urgent issue with our customer using kafka transactional producer 
> with kafka cluster with 3 or more nodes. Our customer is using confluent 
> cloud on azure.
> We this exception regularly: "Transaction with key XYZ went wrong with 
> exception: Timeout expired after 6milliseconds while awaiting 
> InitProducerId" (see attachment)
> We assume that the cause is a node which is down and the producer still sends 
> messages to the “down node”. 
> We are using kafa streams 3.0.
> *We expect that if a node is down kafka producer is intelligent enough to not 
> send messages to this node any more.*
> *What’s the solution of this issue? Is there any config we have to set?*
> *This request is urgent because our costumer will soon have production 
> issues.*
> *Additional information*
>  * send record --> see attachment “AkkaHttpRestServer.scala” – line 100
>  * producer config --> see attachment “AkkaHttpRestServer.scala” – line 126



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13699) ProcessorContext does not expose Stream Time

2022-03-08 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503214#comment-17503214
 ] 

Matthias J. Sax commented on KAFKA-13699:
-

Seems the only question is, if we need to update the KIP? And if we can only 
fix forward in 3.2.0 (or if we can back-port to `3.0.1` and `3.1.1` ?

\cc [~mimaison] [~guozhang] WDYT? `3.0.1` is already on it's way...

> ProcessorContext does not expose Stream Time
> 
>
> Key: KAFKA-13699
> URL: https://issues.apache.org/jira/browse/KAFKA-13699
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Shay Lin
>Priority: Major
>  Labels: newbie
>
> As a KS developer, I would like to leverage 
> [KIP-622|https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext]
>  and access stream time in Processor Context.
> _(Updated)_
> However, the methods currentStreamTimeMs or currentSystemTimeMs is missing 
> from for KStreams 3.0+.
> Checked with [~mjsax] , the methods are absent from the Processor API , i.e.
>  * org.apache.kafka.streams.processor.api.ProcessorContext



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13699) ProcessorContext does not expose Stream Time

2022-03-08 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13699:

Labels: newbie  (was: )

> ProcessorContext does not expose Stream Time
> 
>
> Key: KAFKA-13699
> URL: https://issues.apache.org/jira/browse/KAFKA-13699
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Shay Lin
>Priority: Major
>  Labels: newbie
>
> As a KS developer, I would like to leverage 
> [KIP-622|https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext]
>  and access stream time in Processor Context.
> _(Updated)_
> However, the methods currentStreamTimeMs or currentSystemTimeMs is missing 
> from for KStreams 3.0+.
> Checked with [~mjsax] , the methods are absent from the Processor API , i.e.
>  * org.apache.kafka.streams.processor.api.ProcessorContext



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-6035) Avoid creating changelog topics for state stores that are directly piped to a sink topic

2022-03-08 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503213#comment-17503213
 ] 

Matthias J. Sax commented on KAFKA-6035:


Even if this ticket is finished, if you call `withLoggingDisabled` you tell 
Kafka Streams that the store is ephemeral and does not need to be recovered 
after a failure.

This feature will only avoid creating a separate changelog topic and reuse the 
output topic, if logging is enabled. We might do the same thing as for input 
table and only do the merge if topology optimization is enabled, but I guess 
it's TDB.

> Avoid creating changelog topics for state stores that are directly piped to a 
> sink topic
> 
>
> Key: KAFKA-6035
> URL: https://issues.apache.org/jira/browse/KAFKA-6035
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Jeyhun Karimov
>Priority: Major
>
> Today Streams make all state stores to be backed by a changelog topic by 
> default unless users overrides it by {{disableLogging}} when creating the 
> state store / materializing the KTable. However there are a few cases where a 
> separate changelog topic would not be required as we can re-use an existing 
> topic for that. This ticket summarize a specific issue that can be optimized:
> Consider the case when a KTable is materialized and then sent directly into a 
> sink topic with the same key, e.g.
> {code}
> table1 = stream.groupBy(...).aggregate("state1").to("topic2");
> {code}
> Then we do not need to create a {{state1-changelog}} but can just use 
> {{topic2}} as its changelog.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] KIP-820: Extend KStream process with new Processor API

2022-03-07 Thread Matthias J. Sax
tion, but I'm also pretty sure
this complexity is necessary.

Taking a step back, I do think this approach results in a
better API, even though the change is a little complicated.

Thanks,
-John

On Sun, 2022-03-06 at 10:51 +, Jorge Esteban Quilcate
Otoya wrote:

Matthias, thanks for your feedback.

I can see the following alternatives to deal with `processValues()`:

1. Runtime key validation (current proposal)
2. Using Void type. Guozhang already points out some important
considerations about allocating `Record` twice.
3. Adding a new ValueRecord, proposed by Matthias. This one would carry
some of the problems of the second alternative as ValueRecord will have

to

be created from a Record. Also, either by having a public constructor

or

creation from a Record, the key _can_ be changed without being captured

by

the Topology.
4. Reducing the KIP scope to `process` only, and removing/postponing
`processValues` for a later DSL redesign.

A couple of additional comments:

About the Record API:

IIUC, the issue with allocating new objects is coming from the current
design of the Record API.
If a user does record.withKey(...).withValue(...) is already leading

to a

couple of instatiations.
My impression is that if the cost/value of immutability has been

weighed

already, then maybe the considerations for alternative 2 can be

disregarded?

Either way, if the cost of recreation of objects is something we want

to

minimize, then maybe adding a Builder to the record should help to

reduce

the allocations.

About the key validation:

So far, the only way I can see to _really_ validate a key doesn't

change

at

compile-time is by not exposing it at all — as we are doing it today

with

Transform.
Otherwise, deal with it at runtime — as we have been dealing with

Transform

without the ability to forward.
Processor API already —by definition— means lower-level abstraction,
therefore users should be aware of the potential runtime exceptions if

the

key changes.
This is why I'm leaning towards alternative 1.

Looking forward to your feedback.
As a reminder, the vote thread is still open. Feel free to add your

vote

or

amend if needed.

Cheers,


On Fri, 4 Mar 2022 at 06:51, Matthias J. Sax  wrote:


John, thanks for verifying source compatibility. My impression was

that

it should be source compatible, I was just not 100% sure.

The question about `processValues()` is really a hard one. Guozhang's
point is very good one. Maybe we need to be pragmatic and accept the
runtime check (even if I deeply hate this solution compare to a

compile

time check).

Other possibilities to address this issue might just become too ugly?

It

seems it would require to add a new `ValueProcessorContext` that

offers

a `#forward(ValueRecord)` method (with `ValueRecord` being a `Record`
with immutable key? Not sure if we would be willing to go down this
route? Personally, I would be ok with it, as a strongly prefer

compile

time checks and I am happy to extend the API surface area to achieve

it

-- however, I won't be surprised if others don't like this idea...



-Matthias

On 2/27/22 6:20 AM, Jorge Esteban Quilcate Otoya wrote:

Thanks, Guozhang.


Compared with reference checks and runtime exceptions for those

who

mistakenly change the key, I think that enforcing everyone to

`setValue`

may incur more costs..


This is a fair point. I agree that this may incur in more costs

than

key

checking.

Will hold for more feedback, but if we agree I will update the KIP

during

the week.

Cheers,
Jorge.


On Sun, 27 Feb 2022 at 00:50, Guozhang Wang 

wrote:



Hello folks,

Regarding the outstanding question, I'm actually a bit leaning

towards

the

second option since that `withKey()` itself always creates a new

Record

object. This has a few implications:

* That we would have to discard the previous Record object to be

GC'ed

with

the new object --- note in practice, processing value does not

mean

you'd

have to replace the whole value with `withValue`, but maybe you

just

need

to manipulate some fields of the value object if it is a JSon /

etc.

* It may become an obstacle for further runtime optimizations

e.g.

skip

serdes and interpret processing as direct byte manipulations.

Compared with reference checks and runtime exceptions for those

who

mistakenly change the key, I think that enforcing everyone to

`setValue`

may incur more costs..

Guozhang

On Fri, Feb 25, 2022 at 12:54 PM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:


Hi all,

Appreciate very much all the great feedback received so far.


After applying that interface change, I don't see any syntax

errors in our tests (which use those methods), and the
StreamBuilderTest still passes for me.

This is awesome John, thank you for your efforts here.


Jorge, do you mind clarifying these points in the

Compatibility

section

of your KIP?

+1. I have clarified the impact of changing the return type in

the KIP.



I think t

Re: [DISCUSS] KIP-820: Extend KStream process with new Processor API

2022-03-07 Thread Matthias J. Sax
ng with

Transform

without the ability to forward.
Processor API already —by definition— means lower-level abstraction,
therefore users should be aware of the potential runtime exceptions if

the

key changes.
This is why I'm leaning towards alternative 1.

Looking forward to your feedback.
As a reminder, the vote thread is still open. Feel free to add your vote

or

amend if needed.

Cheers,


On Fri, 4 Mar 2022 at 06:51, Matthias J. Sax  wrote:


John, thanks for verifying source compatibility. My impression was that
it should be source compatible, I was just not 100% sure.

The question about `processValues()` is really a hard one. Guozhang's
point is very good one. Maybe we need to be pragmatic and accept the
runtime check (even if I deeply hate this solution compare to a compile
time check).

Other possibilities to address this issue might just become too ugly?

It

seems it would require to add a new `ValueProcessorContext` that offers
a `#forward(ValueRecord)` method (with `ValueRecord` being a `Record`
with immutable key? Not sure if we would be willing to go down this
route? Personally, I would be ok with it, as a strongly prefer compile
time checks and I am happy to extend the API surface area to achieve it
-- however, I won't be surprised if others don't like this idea...



-Matthias

On 2/27/22 6:20 AM, Jorge Esteban Quilcate Otoya wrote:

Thanks, Guozhang.


Compared with reference checks and runtime exceptions for those who
mistakenly change the key, I think that enforcing everyone to

`setValue`

may incur more costs..


This is a fair point. I agree that this may incur in more costs than

key

checking.

Will hold for more feedback, but if we agree I will update the KIP

during

the week.

Cheers,
Jorge.


On Sun, 27 Feb 2022 at 00:50, Guozhang Wang 

wrote:



Hello folks,

Regarding the outstanding question, I'm actually a bit leaning

towards

the

second option since that `withKey()` itself always creates a new

Record

object. This has a few implications:

* That we would have to discard the previous Record object to be

GC'ed

with

the new object --- note in practice, processing value does not mean

you'd

have to replace the whole value with `withValue`, but maybe you

just

need

to manipulate some fields of the value object if it is a JSon /

etc.

* It may become an obstacle for further runtime optimizations e.g.

skip

serdes and interpret processing as direct byte manipulations.

Compared with reference checks and runtime exceptions for those who
mistakenly change the key, I think that enforcing everyone to

`setValue`

may incur more costs..

Guozhang

On Fri, Feb 25, 2022 at 12:54 PM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:


Hi all,

Appreciate very much all the great feedback received so far.


After applying that interface change, I don't see any syntax

errors in our tests (which use those methods), and the
StreamBuilderTest still passes for me.

This is awesome John, thank you for your efforts here.


Jorge, do you mind clarifying these points in the Compatibility

section

of your KIP?

+1. I have clarified the impact of changing the return type in

the KIP.



I think the other outstanding question for you is whether
the output key type for processValues should be K or Void.

One thing I realized belatedly was that if we do set it to
Void, then users will actually have to override the key when
forwarding, like `record.withKey(null)`, whereas if we keep
it is K, all users have to do is not touch the key at all.


This is a tricky one.
On one hand, with Void type for key output, we force the users

to cast

to

Void and change the key to null,
though this can be documented on the API, so the users are aware

of the

peculiarity of forwarding within `processValues`.
On the other hand, keeping the key type as output doesn't

_require_ to

do

any change of keys,
but this could lead to key-checking runtime exceptions.

I slightly inclined myself for the first option and change the

type to

`Void`.
This will impose a bit of pain on the users to gain some

type-safety

and

avoid runtime exceptions.
We can justify this requirement as a way to prove that the key

hasn't

changed.

Btw, thanks for this idea Matthias!


On Fri, 25 Feb 2022 at 17:10, John Roesler 

wrote:



Oh, one more thing Jorge,

I think the other outstanding question for you is whether
the output key type for processValues should be K or Void. I
get the impression that all of us don't feel too strongly
about it, so I think the ball is in your court to consider
everyone's points and make a call (with justification).

One thing I realized belatedly was that if we do set it to
Void, then users will actually have to override the key when
forwarding, like `record.withKey(null)`, whereas if we keep
it as K, all users have to do is not touch the key at all.

Thanks,
-John

On Fri, 2022-02-25 at 11:07 -0600, John Roesler wrote:

Hello all,

I'll chime in again in the interest of 

Re: [DISCUSS] KIP-813 Shared State Stores

2022-03-07 Thread Matthias J. Sax

Thanks for updating the KIP. LGTM.

I think we can start a vote.



 I think this might provide issues if your processor is doing a projection of 
the data.


This is correct. It's a know issue: 
https://issues.apache.org/jira/browse/KAFKA-7663


Global-stores/KTables are designed to put the data into the store 
_unmodified_.



-Matthias

On 2/28/22 5:05 AM, Daan Gertis wrote:

Updated the KIP to be more aligned with global state store function names.

If I remember correctly during restore the processor will not be used right? I 
think this might provide issues if your processor is doing a projection of the 
data. Either way, I would not add that into this KIP since it is a specific 
use-case pattern.

Unless there is anything more to add or change, I would propose moving to a 
vote?

Cheers!
D.

From: Matthias J. Sax 
Date: Friday, 18 February 2022 at 03:29
To: dev@kafka.apache.org 
Subject: Re: [DISCUSS] KIP-813 Shared State Stores
Thanks for updating the KIP!

I am wondering if we would need two overloads of `addReadOnlyStateStore`
one w/ and one w/o `TimestampExtractor` argument to effectively make it
an "optional" parameter?

Also wondering if we need to pass in a `String sourceName` and `String
processorName` parameters (similar to `addGlobalStore()`?) instead if
re-using the store name as currently proposed? -- In general I don't
have a strong opinion either way, but it seems to introduce some API
inconsistency if we don't follow the `addGlobalStore()` pattern?



Another thing we were confronted with was the restoring of state when the 
actual local storage is gone. For example, we host on K8s with ephemeral pods, 
so there is no persisted storage between pod restarts. However, the consumer 
group will be already been at the latest offset, preventing from previous data 
to be restored within the new pod’s statestore.


We have already code in-place in the runtime to do the right thing for
this case (ie, via DSL source-table changelog optimization). We can
re-use this part. It's nothing we need to discuss on the KIP, but we can
discuss on the PR later.


-Matthias


On 2/17/22 10:09 AM, Guozhang Wang wrote:

Hi Daan,

I think for the read-only state stores you'd need ot slightly augment the
checkpointing logic so that it would still write the checkpointed offsets
while restoring from the changelogs.


Guozhang

On Thu, Feb 17, 2022 at 7:02 AM Daan Gertis 
wrote:


Could you add more details about the signature of
`addReadOnlyStateStore()` -- What parameters does it take? Are there any
overloads taking different parameters? The KIP only contains some verbal
description on the "Implementation Plan" section, that is hard to find
and hard to read.

The KIP mentions a `ProcessorProvider` -- do you mean

`ProcessorSupplier`?


About timestamp synchronization: why do you propose to disable timestamp
synchronization (similar to global state stores)? It seems to be an
unnecessary limitation? -- Given that we could re-use the new method for
source `KTables` (ie, `StreamsBuilder#table()` implemenation), having
timestamp synchronization enabled seems to be important?


Yup, will do these updates. I’ll overload the addReadOnlyStateStore to
have allow for timestamp synchronization.

Another thing we were confronted with was the restoring of state when the
actual local storage is gone. For example, we host on K8s with ephemeral
pods, so there is no persisted storage between pod restarts. However, the
consumer group will be already been at the latest offset, preventing from
previous data to be restored within the new pod’s statestore.

If I remember correctly, there was some checkpoint logic available when
restoring, but we are bypassing that since logging is disabled on the
statestore, no?

As always, thanks for your insights.

Cheers,
D.


From: Matthias J. Sax 
Date: Wednesday, 16 February 2022 at 02:09
To: dev@kafka.apache.org 
Subject: Re: [DISCUSS] KIP-813 Shared State Stores
Thanks for updating the KIP.

Could you add more details about the signature of
`addReadOnlyStateStore()` -- What parameters does it take? Are there any
overloads taking different parameters? The KIP only contains some verbal
description on the "Implementation Plan" section, that is hard to find
and hard to read.

The KIP mentions a `ProcessorProvider` -- do you mean `ProcessorSupplier`?

About timestamp synchronization: why do you propose to disable timestamp
synchronization (similar to global state stores)? It seems to be an
unnecessary limitation? -- Given that we could re-use the new method for
source `KTables` (ie, `StreamsBuilder#table()` implemenation), having
timestamp synchronization enabled seems to be important?


-Matthias


On 2/8/22 11:01 PM, Guozhang Wang wrote:

Daan,

Thanks for the replies, those make sense to me.

On Tue, Feb 8, 2022 at 7:24 AM Daan Gertis 

wrote:



I just updated the KIP to reflect the things discussed in this thread.

As for your questions Guozhang:


1) How do w

<    9   10   11   12   13   14   15   16   17   18   >