Re: [VOTE] KIP-834: Pause / Resume KafkaStreams Topologies

2022-05-11 Thread Leah Thomas
Thanks Jim, great discussion. +1 from me (non-binding)

Cheers,
Leah

On Wed, May 11, 2022 at 10:14 AM Bill Bejeck  wrote:

> Thanks for the KIP!
>
> +1 (binding)
>
> -Bill
>
> On Wed, May 11, 2022 at 9:36 AM Luke Chen  wrote:
>
> > Hi Jim,
> >
> > I'm +1. (please add some note in KIP about the stream resetting tool
> can't
> > be used in paused state)
> > Thanks for the KIP!
> >
> > Luke
> >
> > On Wed, May 11, 2022 at 9:09 AM Guozhang Wang 
> wrote:
> >
> > > Thanks Jim. +1 from me.
> > >
> > > On Tue, May 10, 2022 at 4:51 PM Matthias J. Sax 
> > wrote:
> > >
> > > > I had one minor question on the discuss thread. It's mainly about
> > > > clarifying and document the user contract. I am fine either way.
> > > >
> > > > +1 (binding)
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 5/10/22 12:32 PM, Sophie Blee-Goldman wrote:
> > > > > Thanks for the KIP! +1 (binding)
> > > > >
> > > > > On Tue, May 10, 2022, 12:24 PM Bruno Cadonna 
> > > wrote:
> > > > >
> > > > >> Thanks Jim,
> > > > >>
> > > > >> +1 (binding)
> > > > >>
> > > > >> Best,
> > > > >> Bruno
> > > > >>
> > > > >> On 10.05.22 21:19, John Roesler wrote:
> > > > >>> Thanks Jim,
> > > > >>>
> > > > >>> I’m +1 (binding)
> > > > >>>
> > > > >>> -John
> > > > >>>
> > > > >>> On Tue, May 10, 2022, at 14:05, Jim Hughes wrote:
> > > >  Hi all,
> > > > 
> > > >  I'm asking for a vote on KIP-834:
> > > > 
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832
> > > > 
> > > >  Thanks in advance!
> > > > 
> > > >  Jim
> > > > >>
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>


[jira] [Created] (KAFKA-13811) Investigate sliding windows performance

2022-04-08 Thread Leah Thomas (Jira)
Leah Thomas created KAFKA-13811:
---

 Summary: Investigate sliding windows performance
 Key: KAFKA-13811
 URL: https://issues.apache.org/jira/browse/KAFKA-13811
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: Leah Thomas


We recently fixed a bug in sliding windows so that a grace period of 0ms is 
properly calculated, see https://issues.apache.org/jira/browse/KAFKA-13739. 
Before this patch, sliding windows with a grace period of 0ms would just skip 
all records so nothing would get put into the store.

When we ran benchmarks for the 3.2 release we saw a significant drop in 
performance for sliding windows on both the 3.2 and trunk branches, see the 
`sliding windows` results 
[here|[http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/summaries/process-cumulative-rate/graph.html].]
 These benchmarks use a sliding window with a 0ms grace period, which means 
until now we weren't sending any values to the state store when running these 
benchmarks.

I ran benchmarks on the 
[commit|https://github.com/apache/kafka/commit/430f9c99012d1585aa544d4dadf449963296c1fd]
 before KAFKA-13739 and changed the grace period to 2 seconds to see if the bug 
fix changed anything. The performance was still low for [this 
run|[http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/sliding1min-3-5-3-3-0-430f9c9901-leah-20220408084241-streamsbench/].]

Given this, it seems like the performance for sliding windows has always been 
low but we didn't realize it because the bug fixed in KAFKA-13739 was present 
in the benchmarks we were running. We should investigate why the current 
algorithm is slow and see if improvements can be made



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [VOTE] KIP-761: Add Total Blocked Time Metric to Streams

2021-07-26 Thread Leah Thomas
Hey Rohan,

Thanks for pushing this KIP through. I'm +1, non-binding.

Leah

On Wed, Jul 21, 2021 at 7:09 PM Rohan Desai  wrote:

> Now that the discussion thread's been open for a few days, I'm calling for
> a vote on
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams
>


[jira] [Created] (KAFKA-13120) Flesh out `streams_static_membership_test` to be more robust

2021-07-21 Thread Leah Thomas (Jira)
Leah Thomas created KAFKA-13120:
---

 Summary: Flesh out `streams_static_membership_test` to be more 
robust
 Key: KAFKA-13120
 URL: https://issues.apache.org/jira/browse/KAFKA-13120
 Project: Kafka
  Issue Type: Task
  Components: streams, system tests
Reporter: Leah Thomas


When fixing the `streams_static_membership_test.py` we noticed that the test is 
pretty bare bones, it creates a streams application but doesn't ever send data 
through it or do much with the streams application. We should flesh this out a 
bit to be more realistic. The full java test is in `StaticMembershipTestClient`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-761: Add total blocked time metric to streams

2021-07-20 Thread Leah Thomas
Thanks for the KIP Rohan.

I had a question - it seems like from the descriptions
of `txn-commit-time-total` and `offset-commit-time-total` that they measure
similar processes for ALOS and EOS, but only `txn-commit-time-total` is
included in `blocked-time-total`. Why isn't `offset-commit-time-total` also
included?

Aside from `flush-time-total`, `txn-commit-time-total` and
`offset-commit-time-total`, which will be producer/consumer client metrics,
the rest of the metrics will be streams metrics that will be thread level,
is that right?

Cheers,
Leah

On Tue, Jul 20, 2021 at 2:00 AM Rohan Desai  wrote:

> Thanks for the review Guozhang! responding to your feedback inline:
>
> > 1) I agree that the current ratio metrics is just "snapshot in point",
> and
> more flexible metrics that would allow reporters to calculate based on
> window intervals are better. However, the current mechanism of the proposed
> metrics assumes the thread->clients mapping as of today, where each thread
> would own exclusively one main consumer, restore consumer, producer and an
> admin client. But this mapping may be subject to change in the future. Have
> you thought about how this metric can be extended when, e.g. the embedded
> clients and stream threads are de-coupled?
>
> Of course this depends on how exactly we refactor the runtime - assuming
> that we plan to factor out consumers into an "I/O" layer that is
> responsible for receiving records and enqueuing them to be processed by
> processing threads, then I think it should be reasonable to count the time
> we spend blocked on this internal queue(s) as blocked. The main concern
> there to me is that the I/O layer would be doing something expensive like
> decompression that shouldn't be counted as "blocked". But if that really is
> so expensive that it starts to throw off our ratios then it's probably
> indicative of a larger problem that the "i/o layer" is a bottleneck and it
> would be worth refactoring so that decompression (or insert other expensive
> thing here) can also be done on the processing threads.
>
> > 2) [This and all below are minor comments] The "flush-time-total" may
> better be a producer client metric, as "flush-wait-time-total", than a
> streams metric, though the streams-level "total-blocked" can still leverage
> it. Similarly, I think "txn-commit-time-total" and
> "offset-commit-time-total" may better be inside producer and consumer
> clients respectively.
>
> Good call - I'll update the KIP
>
> > 3) The doc was not very clear on how "thread-start-time" would be needed
> when calculating streams utilization along with total-blocked time, could
> you elaborate a bit more in the KIP?
>
> Yes, will do.
>
> > For "txn-commit-time-total" specifically, besides producer.commitTxn.
> other txn-related calls may also be blocking, including
> producer.beginTxn/abortTxn, I saw you mentioned "txn-begin-time-total"
> later in the doc, but did not include it as a separate metric, and
> similarly, should we have a `txn-abort-time-total` as well? If yes, could
> you update the KIP page accordingly.
>
> Ack.
>
> On Mon, Jul 12, 2021 at 11:29 PM Rohan Desai 
> wrote:
>
> > Hello All,
> >
> > I'd like to start a discussion on the KIP linked above which proposes
> some
> > metrics that we would find useful to help measure whether a Kafka Streams
> > application is saturated. The motivation section in the KIP goes into
> some
> > more detail on why we think this is a useful addition to the metrics
> > already implemented. Thanks in advance for your feedback!
> >
> > Best Regards,
> >
> > Rohan
> >
> > On Mon, Jul 12, 2021 at 12:00 PM Rohan Desai 
> > wrote:
> >
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams
> >>
> >
>


[jira] [Created] (KAFKA-13052) Replace uses of SerDes in the docs with Serdes

2021-07-08 Thread Leah Thomas (Jira)
Leah Thomas created KAFKA-13052:
---

 Summary: Replace uses of SerDes in the docs with Serdes
 Key: KAFKA-13052
 URL: https://issues.apache.org/jira/browse/KAFKA-13052
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: Leah Thomas


Right now, we have scattered uses of `SerDes` throughout the docs. These should 
be updated to be `Serdes`, as that's what we commonly use now.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-741: Change default serde to be null

2021-05-27 Thread Leah Thomas
Thanks all, I think we can go ahead and close this now.

KIP-741 passes with 4 binding votes (Guozhang, Sophie, John, Bruno) and 3
non-binding (Walker, Lee, and myself).

On Tue, May 25, 2021 at 8:32 AM Dongjin Lee  wrote:

> Hi Bruno,
>
> Oh, thanks for pointing out my mistake. Yes, the KIP is not passed yet and
> here is the updated status:
>
> - Binding: Guozhang Wang, Sophie Blee-Goldman, John Roesler, Bruno Cadonna
> (+4)
> - Non-binding: Walker Carlson, Lee Dongjin (+2)
>
> Thanks,
> Dongjin
>
> On Tue, May 25, 2021 at 9:12 PM Bruno Cadonna  wrote:
>
> > Hi Leah,
> >
> > Thanks for the KIP!
> >
> > +1 (binding)
> >
> > Best,
> > Bruno
> >
> > On 25.05.21 14:02, Bruno Cadonna wrote:
> > > Hi Dongjin,
> > >
> > > voting for a KIP needs to remain open for at least 72 hours [1].
> > > According to the date and time of the first message to this thread 72
> > > hours havn't passed, yet. Theoretically, there could still be -1 votes
> > > coming in.
> > >
> > > Best,
> > > Bruno
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals#KafkaImprovementProposals-Process
> > >
> > >
> > >
> > > On 25.05.21 13:13, Dongjin Lee wrote:
> > >> +1 (non-binding).
> > >>
> > >> As of present:
> > >>
> > >> - Binding: Guozhang Wang, Sophie Blee-Goldman, John Roesler (+3)
> > >> - Non-binding: Walker Carlson, Lee Dongjin (+2)
> > >>
> > >> This KIP is now passed.
> > >>
> > >> Thanks,
> > >> Dongjin
> > >>
> > >> On Tue, May 25, 2021 at 10:12 AM John Roesler 
> > >> wrote:
> > >>
> > >>> +1 (binding) from me. Thanks for the KIP!
> > >>> -John
> > >>>
> > >>> On Mon, May 24, 2021, at 18:10, Sophie Blee-Goldman wrote:
> > >>>> +1 binding
> > >>>>
> > >>>> thanks for the KIP
> > >>>> -Sophie
> > >>>>
> > >>>> On Mon, May 24, 2021 at 2:02 PM Walker Carlson
> > >>>>  wrote:
> > >>>>
> > >>>>> +1 (non-binding) from me, Leah
> > >>>>>
> > >>>>> Walker
> > >>>>>
> > >>>>> On Mon, May 24, 2021 at 1:51 PM Leah Thomas
> > >>> 
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Hi,
> > >>>>>>
> > >>>>>> I'd like to kick-off voting for KIP-741: Change default serde to
> be
> > >>> null.
> > >>>>>> <
> > >>>>>>
> > >>>>>
> > >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-741%3A+Change+default+serde+to+be+null
> > >>>
> > >>>>>>>
> > >>>>>> The
> > >>>>>> discussion is linked on the KIP for context.
> > >>>>>>
> > >>>>>> Cheers,
> > >>>>>> Leah
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >>
> >
> > --
> > *Dongjin Lee*
> >
> > *A hitchhiker in the mathematical world.*
> >
> >
> >
> > *github:  <http://goog_969573159/>github.com/dongjinleekr
> > <https://github.com/dongjinleekr>keybase:
> https://keybase.io/dongjinleekr
> > <https://keybase.io/dongjinleekr>linkedin:
> kr.linkedin.com/in/dongjinleekr
> > <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
> speakerdeck.com/dongjin
> > <https://speakerdeck.com/dongjin>*
> >
>


[VOTE] KIP-741: Change default serde to be null

2021-05-24 Thread Leah Thomas
Hi,

I'd like to kick-off voting for KIP-741: Change default serde to be null.

The
discussion is linked on the KIP for context.

Cheers,
Leah


Re: [DISCUSS] KIP-741: Change default serde to be null

2021-05-24 Thread Leah Thomas
If there are no further notes, I'll go ahead and start the voting thread
today.

Thanks,
Leah

On Thu, May 20, 2021 at 2:58 PM Leah Thomas  wrote:

> Thanks for finding that, Guozhang. Consistency seems like the best option
> to me as well, for the time being.  I updated the KIP with that detail
>
> On Thu, May 20, 2021 at 11:53 AM Sophie Blee-Goldman
>  wrote:
>
>> Thanks Guozhang, I forgot about ConfigException. I actually just reviewed
>> two KIPs related to config-based Serdes,
>> both of which use the ConfigException in this way: the ListSerde KIP, and
>> the KIP to clean up the windowed inner class
>> serde configuration.
>>
>> For the sake of simplicity and keeping this KIP well-scoped, I would
>> prefer
>> to stick with the ConfigException for now,
>> since this is consistent with how these very similar cases are handled at
>> the moment. I would still stand by the idea
>> of introducing a dedicated SerdeConfigurationException (or similar) but I
>> think we can treat that as orthogonal, and
>> maybe do a followup KIP at some point to convert all of these relevant
>> cases over to a new Serde-specific exception
>>
>> On Thu, May 20, 2021 at 3:33 AM Bruno Cadonna  wrote:
>>
>> > Hi,
>> >
>> > I think using ConfigException makes sense. But I am also fine with
>> > SerdeConfigurationException. I think both are meaningful in this
>> > situation where the former has the advantage that we do not need to
>> > introduce a new exception.
>> >
>> > Best,
>> > Bruno
>> >
>> >
>> >
>> >
>> > On 20.05.21 07:54, Guozhang Wang wrote:
>> > > Thanks Sophie. I think not piggy-backing on TopologyException makes
>> > sense.
>> > >
>> > > It just occurs to me that today we already have similar situations
>> even
>> > > with this config default to Bytes, that is the other
>> > > `DEFAULT_WINDOWED_KEY/VALUE_SERDE_INNER_CLASS` config, whose default
>> is
>> > > actually null. Quickly checking the code here, I think we throw
>> > > StreamsException when they are found not defined during runtime, we
>> > > actually throw the `*ConfigException*`. So for consistency we could
>> just
>> > > use that exception as well.
>> > >
>> > > Guozhang
>> > >
>> > >
>> > > On Wed, May 19, 2021 at 3:24 PM Sophie Blee-Goldman
>> > >  wrote:
>> > >
>> > >> To be honest I'm not really a fan of reusing the TopologyException
>> > since it
>> > >> feels like
>> > >> a bit of a stretch from a user point of view to classify Serde
>> > >> misconfiguration as a
>> > >> topology issue.
>> > >>
>> > >> I personally think a StreamsException would be acceptable, but I
>> would
>> > also
>> > >> propose
>> > >> to introduce a new type of exception, something like
>> > >> SerdeConfigurationException or
>> > >> so. We certainly don't want to end up like the Producer API with its
>> 500
>> > >> different
>> > >> exceptions. Luckily Streams is nowhere near that yet, in my opinion,
>> and
>> > >> problems
>> > >> with Serde configuration are so common and well-defined that a
>> dedicated
>> > >> exception
>> > >> feels very appropriate.
>> > >>
>> > >> If there are any other instances in the codebase where we throw a
>> > >> StreamsException
>> > >> for a Serde-related issue, this could also be migrated to the new
>> > exception
>> > >> type (not
>> > >> necessarily all at once, but gradually after this KIP)
>> > >>
>> > >> Thoughts?
>> > >>
>> > >> On Wed, May 19, 2021 at 10:31 AM Guozhang Wang 
>> > wrote:
>> > >>
>> > >>> Leah, thanks for the KIP.
>> > >>>
>> > >>> It looks good to me overall, just following up on @
>> br...@confluent.io
>> > >>>  's question about exception: what about using
>> the
>> > >>> `TopologyException` class? I know that currently it is only thrown
>> > during
>> > >>> the topology parsing phase, not at the streams construction, but I
>> feel
>> > >> we
>> > >>> can extend its scope to cover both top

Re: [DISCUSS] KIP-741: Change default serde to be null

2021-05-20 Thread Leah Thomas
Thanks for finding that, Guozhang. Consistency seems like the best option
to me as well, for the time being.  I updated the KIP with that detail

On Thu, May 20, 2021 at 11:53 AM Sophie Blee-Goldman
 wrote:

> Thanks Guozhang, I forgot about ConfigException. I actually just reviewed
> two KIPs related to config-based Serdes,
> both of which use the ConfigException in this way: the ListSerde KIP, and
> the KIP to clean up the windowed inner class
> serde configuration.
>
> For the sake of simplicity and keeping this KIP well-scoped, I would prefer
> to stick with the ConfigException for now,
> since this is consistent with how these very similar cases are handled at
> the moment. I would still stand by the idea
> of introducing a dedicated SerdeConfigurationException (or similar) but I
> think we can treat that as orthogonal, and
> maybe do a followup KIP at some point to convert all of these relevant
> cases over to a new Serde-specific exception
>
> On Thu, May 20, 2021 at 3:33 AM Bruno Cadonna  wrote:
>
> > Hi,
> >
> > I think using ConfigException makes sense. But I am also fine with
> > SerdeConfigurationException. I think both are meaningful in this
> > situation where the former has the advantage that we do not need to
> > introduce a new exception.
> >
> > Best,
> > Bruno
> >
> >
> >
> >
> > On 20.05.21 07:54, Guozhang Wang wrote:
> > > Thanks Sophie. I think not piggy-backing on TopologyException makes
> > sense.
> > >
> > > It just occurs to me that today we already have similar situations even
> > > with this config default to Bytes, that is the other
> > > `DEFAULT_WINDOWED_KEY/VALUE_SERDE_INNER_CLASS` config, whose default is
> > > actually null. Quickly checking the code here, I think we throw
> > > StreamsException when they are found not defined during runtime, we
> > > actually throw the `*ConfigException*`. So for consistency we could
> just
> > > use that exception as well.
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, May 19, 2021 at 3:24 PM Sophie Blee-Goldman
> > >  wrote:
> > >
> > >> To be honest I'm not really a fan of reusing the TopologyException
> > since it
> > >> feels like
> > >> a bit of a stretch from a user point of view to classify Serde
> > >> misconfiguration as a
> > >> topology issue.
> > >>
> > >> I personally think a StreamsException would be acceptable, but I would
> > also
> > >> propose
> > >> to introduce a new type of exception, something like
> > >> SerdeConfigurationException or
> > >> so. We certainly don't want to end up like the Producer API with its
> 500
> > >> different
> > >> exceptions. Luckily Streams is nowhere near that yet, in my opinion,
> and
> > >> problems
> > >> with Serde configuration are so common and well-defined that a
> dedicated
> > >> exception
> > >> feels very appropriate.
> > >>
> > >> If there are any other instances in the codebase where we throw a
> > >> StreamsException
> > >> for a Serde-related issue, this could also be migrated to the new
> > exception
> > >> type (not
> > >> necessarily all at once, but gradually after this KIP)
> > >>
> > >> Thoughts?
> > >>
> > >> On Wed, May 19, 2021 at 10:31 AM Guozhang Wang 
> > wrote:
> > >>
> > >>> Leah, thanks for the KIP.
> > >>>
> > >>> It looks good to me overall, just following up on @
> br...@confluent.io
> > >>>  's question about exception: what about using
> the
> > >>> `TopologyException` class? I know that currently it is only thrown
> > during
> > >>> the topology parsing phase, not at the streams construction, but I
> feel
> > >> we
> > >>> can extend its scope to cover both topology building and streams
> object
> > >>> (i.e. taking the topology and the config) construction time as well
> > since
> > >>> part of the construction is again to re-write / augment the topology.
> > >>>
> > >>> Guozhang
> > >>>
> > >>>
> > >>> On Wed, May 19, 2021 at 8:44 AM Leah Thomas
> >  > >>>
> > >>> wrote:
> > >>>
> > >>>> Hi Sophie,
> > >>>>
> > >>>> Thanks for catching that. These are existing 

Re: [DISCUSS] KIP-741: Change default serde to be null

2021-05-19 Thread Leah Thomas
Hi Sophie,

Thanks for catching that. These are existing methods inside of
`StreamsConfig` that will return null (the new default) instead of byte
array serde (the old default). Both `StreamsConfig` and
`defaultKeySerde`/`defaultValueSerde` are public, so I assume these still
count as part of the public API. I updated the KIP to include this
information.

Bruno - I was planning on including a specific message with the streams
exception to indicate that either a serde needs to be passed in or a
default needs to be set. I'm open to doing something more specific,
perhaps something like a serde exception? WDYT? I was hoping that with the
message the streams exception would still give enough information for users
to debug the problem.

Still hoping for a short discussion (; but thanks for the input so far!

Leah

On Wed, May 19, 2021 at 3:00 AM Bruno Cadonna  wrote:

> Hey Leah,
>
>  > what I think should be a small discussion
>
> Dangerous words, indeed! It seems like they trigger something in people ;-)
>
> Jokes apart!
>
> Did you consider throwing a more specific exception instead of a
> StreamsException? Something that describes better the issue at hand.
>
> Best,
> Bruno
>
>
> On 19.05.21 01:19, Sophie Blee-Goldman wrote:
> >>
> >> what I think should be a small discussion
> >
> >
> > Dangerous words :P
> >
> > I'm all for the proposal but I do have one question about something in
> the
> > KIP. You list two methods called
> > defaultKeySerde() and defaultValueSerde() but it's not clear to me where
> > these are coming from. Are they
> > new APIs you propose to add in this KIP? Are they existing methods in the
> > public API which will now return
> > null, whereas they used to return the ByteArraySerde? If they're not a
> > public API then you can remove them
> > from the KIP, otherwise can you just update this section to clarify what
> > class/file these belong to, etc?
> >
> > -Sophie
> >
> > On Tue, May 18, 2021 at 5:34 AM Leah Thomas  >
> > wrote:
> >
> >> Hi all,
> >>
> >> I'd like to kick-off what I think should be a small discussion for
> KIP-741:
> >> Change default serde to be null.
> >>
> >> The wiki is here:
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-741%3A+Change+default+serde+to+be+null
> >>
> >>
> >> Thanks,
> >> Leah
> >>
> >
>


[DISCUSS] KIP-741: Change default serde to be null

2021-05-18 Thread Leah Thomas
Hi all,

I'd like to kick-off what I think should be a small discussion for KIP-741:
Change default serde to be null.

The wiki is here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-741%3A+Change+default+serde+to+be+null


Thanks,
Leah


Re: [VOTE] KIP-633: Drop 24 hour default of grace period in Streams

2021-04-06 Thread Leah Thomas
Thanks for picking this up, Sophie. +1 from me, non-binding.

Leah

On Mon, Apr 5, 2021 at 9:42 PM John Roesler  wrote:

> Thanks, Sophie,
>
> I’m +1 (binding)
>
> -John
>
> On Mon, Apr 5, 2021, at 21:34, Sophie Blee-Goldman wrote:
> > Hey all,
> >
> > I'd like to start the voting on KIP-633, to drop the awkward 24 hour
> grace
> > period and improve the API to raise visibility on an important concept in
> > Kafka Streams: grace period nad out-of-order data handling.
> >
> > Here's the KIP:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Drop+24+hour+default+of+grace+period+in+Streams
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Drop+24hr+default+grace+period
> >
> >
> > Cheers,
> > Sophie
> >
>


Re: [VOTE] KIP-725: Streamlining configurations for TimeWindowedDeserializer.

2021-04-06 Thread Leah Thomas
Hi Sagar, +1 non-binding. Thanks again for doing this.

Leah

On Mon, Apr 5, 2021 at 9:40 PM John Roesler  wrote:

> Thanks, Sagar!
>
> I’m +1 (binding)
>
> -John
>
> On Mon, Apr 5, 2021, at 21:35, Sophie Blee-Goldman wrote:
> > Thanks for the KIP! +1 (binding) from me
> >
> > Cheers,
> > Sophie
> >
> > On Mon, Apr 5, 2021 at 7:13 PM Sagar  wrote:
> >
> > > Hi All,
> > >
> > > I would like to start voting on the following KIP:
> > >
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177047930
> > >
> > > Thanks!
> > > Sagar.
> > >
> >
>


Re: [DISCUSS] KIP-725: Streamlining configurations for TimeWindowedDeserializer.

2021-04-02 Thread Leah Thomas
Hey Sagar,

Thanks for picking this up! The proposal looks good to me after Sophie and
John's changes.

Cheers,
Leah

On Fri, Apr 2, 2021 at 6:21 AM Sagar  wrote:

> Thanks John!
>
> Yeah I think window.inner.class.deserializer sounds good. Your thoughts
> @Sophie?
>
> Thanks!
> Sagar.
>
>
> On Fri, Apr 2, 2021 at 5:23 AM John Roesler  wrote:
>
> > Hi Sagar,
> >
> > I think this is a good proposal.
> >
> > The only change I might recommend is to change the config to more closely
> > align with the other one, for example: “window.inner.class.deserializer”
> >
> > But it’s very minor; I leave it to your judgement.
> >
> > Thanks,
> > John
> >
> > On Fri, Mar 26, 2021, at 03:36, Sagar wrote:
> > > Hi Sophie,
> > >
> > > Thanks for the feedback! I have updated the KIP inline with whatever
> you
> > > suggested.
> > >
> > > Regarding point 5, I have added the note as it makes sense to not set
> the
> > > config via the KafkaStreams app.
> > >
> > > Thanks!
> > > Sagar.
> > >
> > >
> > > On Wed, Mar 24, 2021 at 7:52 AM Sophie Blee-Goldman
> > >  wrote:
> > >
> > > > Hey Sagar,
> > > >
> > > > Thanks for the KIP! The overall proposal looks good to me, but I had
> > some
> > > > miscellaneous notes:
> > > >
> > > > 1) Some general KIP-writing advice:
> > > > - You don't need to list any implementation details, only public
> > > > interfaces that are being added, modified, or
> > > >   deprecated. It's better to describe your changes in words, or
> > > > occasionally psuedo-code for more complicated
> > > >   changes or algorithms. The KIP is a public contract, so you
> > generally
> > > > want to agree upon the high-level
> > > >   proposal and avoid getting locked in to specific code which you
> > may
> > > > end up wanting to change once you
> > > >   start on the PR.
> > > >   In this KIP, you can remove the code block under
> > > > *TimeWindowedDeserializer
> > > > *and just describe the desired
> > > >   semantics: eg that you should only use the constructor within
> > > > Streams, the configs within the console consumer,
> > > >   or either for a plain consumer client provided they match.
> > > >   The code block under *StreamsConfig *however is a good example
> of
> > > > what should be in a KIP. Only one nit:
> > > >   in the doc string, avoid saying "windowed key" and just say
> > "windowed
> > > > record" or something like that.
> > > > - The *Motivation* section should be focused on any background or
> > > > additional context that's necessary to
> > > >   understand the KIP, as well as the actual motivation for the
> > change
> > > > being proposed. So here, everything under
> > > >   the second bullet which begins with "The KIP also introduces
> > > > changes..." should be cut from that section and
> > > >   discussed elsewhere.
> > > > - The *Proposed Changes* and *Public Interfaces* sections have a
> > lot of
> > > > overlap and repeated content. To be
> > > >   honest I personally have struggled with deciding which section
> to
> > > > include something in, but a good rule of thumb
> > > >   is to describe the actual changes in the *Proposed Changes*
> > section,
> > > > and then use the *Public Interfaces* section
> > > >   to list any new or modified public APIs. In this case, I would
> > move
> > > > everything to the *Proposed Changes* section
> > > >   except for the code block with the deprecated config(s) and the
> > new
> > > > config + doc string.
> > > >
> > > > 2) Can you make it clear that both *default.windowed.key.serde.inner*
> > and
> > > > *default.windowed.key.serde.inner *
> > > > are being deprecated, and we're adding a new config called
> > > > *windowed.deserializer.inner.class*, not literally
> > > > renaming the existing *default.windowed.key.serde.inner* config? I
> > think
> > > > you're hinting at this in the
> > > > *Compatibility, Deprecation, and Migration* section, but elsewhere in
> > the
> > > > KIP it's implied that we'll be replacing
> > > > existing config which would break any applications that currently
> rely
> > on
> > > > it. Please update the *Public Interfaces*
> > > > and *Proposed Changes* sections to clarify that both of these configs
> > will
> > > > be deprecated.
> > > >
> > > > 3) At the end of this section you raise a question that I don't
> think I
> > > > quite understand, can you elaborate on this:
> > > >
> > > > We can maybe enforce the removal of the deprecated configs and then
> > enforce
> > > > > users?
> > > > >
> > > > Note: you only need to worry about deprecating these configs in the
> > current
> > > > KIP. Once deprecated we just need to
> > > > give users enough time to migrate over to the new API, and then we
> can
> > > > remove them in the next major version.
> > > > The important thing for the KIP itself is just to make sure the
> change
> > is
> > > > visible to users and provides a clear
> > > > migration path.
> > > >
> > > > 4) For the Console 

Re: [VOTE] KIP-715: Expose Committed offset in streams

2021-03-01 Thread Leah Thomas
Hey Walker,

Thanks for leading this discussion. +1 from me, non-binding

Leah

On Mon, Mar 1, 2021 at 12:37 AM Boyang Chen 
wrote:

> Thanks Walker for the proposal, +1 (binding) from me.
>
> On Fri, Feb 26, 2021 at 12:42 PM Walker Carlson 
> wrote:
>
> > Hello all,
> >
> > I would like to bring KIP-715 to a vote. Here is the KIP:
> > https://cwiki.apache.org/confluence/x/aRRRCg.
> >
> > Walker
> >
>


[jira] [Created] (KAFKA-12344) Support SlidingWindows in the Scala API

2021-02-18 Thread Leah Thomas (Jira)
Leah Thomas created KAFKA-12344:
---

 Summary: Support SlidingWindows in the Scala API
 Key: KAFKA-12344
 URL: https://issues.apache.org/jira/browse/KAFKA-12344
 Project: Kafka
  Issue Type: Improvement
Reporter: Leah Thomas
Assignee: Leah Thomas


in KIP-450 we implemented sliding windows for the Java API but left out a few 
crucial methods to allow sliding windows to work through the Scala API. We need 
to add those methods to make the Scala API fully leverage sliding windows



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9649) Remove/Warn on use of TimeWindowedSerde with no specified window size

2021-02-02 Thread Leah Thomas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leah Thomas resolved KAFKA-9649.

Fix Version/s: 2.8.0
   Resolution: Fixed

> Remove/Warn on use of TimeWindowedSerde with no specified window size
> -
>
> Key: KAFKA-9649
> URL: https://issues.apache.org/jira/browse/KAFKA-9649
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sören Henning
>    Assignee: Leah Thomas
>Priority: Major
>  Labels: kip
> Fix For: 2.8.0
>
>
> The API of the 
> [{{TimeWindowedSerde}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java]
>  promotes its construction without specifying a window size:
> {noformat}
> public TimeWindowedSerde(final Serde inner)
> {noformat}
> While code using this constructor looks absolutely clean, it leads to fatal 
> errors at runtime, which turned out to be very hard to discover.
> The reason for these error can be found in the construction of the 
> [{{TimeWindowedDeserializer}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java],
>  which is created via:
> {noformat}
> // TODO: fix this part as last bits of KAFKA-4468
> public TimeWindowedDeserializer(final Deserializer inner) {
>   this(inner, Long.MAX_VALUE);
> }
> {noformat}
> The TODO comment suggests that this issue is (or at least was) already known.
> We suggest to either remove the {{TimeWindowedSerde(final Serde inner)}} 
> constructor or at least to warn when using it (if required for backwards 
> compatiblity). The ideal solution of course would be to get the window size 
> from some externally provided context. However, I expect this to be difficult 
> to realize. Same applies also the {{TimeWindowedDeserializer(final 
> Deserializer inner)}} constructor.
> A further minor suggestion in this context: As now most Kafka Streams time 
> declarations use {{Duration}} s instead of long-encoded milliseconds, I 
> suggest to allow specifying window sizes with a {{Duration}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10366) TimeWindowedDeserializer doesn't allow users to set a custom window size

2021-02-02 Thread Leah Thomas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leah Thomas resolved KAFKA-10366.
-
Fix Version/s: 2.8.0
   Resolution: Fixed

> TimeWindowedDeserializer doesn't allow users to set a custom window size
> 
>
> Key: KAFKA-10366
> URL: https://issues.apache.org/jira/browse/KAFKA-10366
> Project: Kafka
>  Issue Type: Bug
>        Reporter: Leah Thomas
>    Assignee: Leah Thomas
>Priority: Major
>  Labels: streams
> Fix For: 2.8.0
>
>
> Related to [KAFKA-4468|https://issues.apache.org/jira/browse/KAFKA-4468], in 
> timeWindowedDeserializer Long.MAX_VALUE is used as _windowSize_ for any 
> deserializer that uses the default constructor. While streams apps can pass 
> in a window size in serdes or while creating a timeWindowedDeserializer, the 
> deserializer that is actually used in processing the messages is created by 
> the Kafka consumer, without passing in the set windowSize. The deserializer 
> the consumer creates uses the configs, but as there is no config for 
> windowSize, the window size is always default.
> See _KStreamAggregationIntegrationTest #ShouldReduceWindowed()_ as an example 
> of this issue. Despite passing in the windowSize to both the serdes and the 
> timeWindowedDeserializer, the window size is set to Long.MAX_VALUE. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-698: Add Explicit User Initialization of Broker-side State to Kafka Streams

2021-01-27 Thread Leah Thomas
Hi Bruno,
I'm still +1, non-binding. Thanks for the updates!

Leah

On Wed, Jan 27, 2021 at 1:56 PM Matthias J. Sax  wrote:

> Thanks for updating the KIP.
>
> +1 (binding)
>
>
> -Matthias
>
> On 1/27/21 10:19 AM, Bruno Cadonna wrote:
> > Hi all,
> >
> > Thanks for voting!
> >
> > I updated the KIP with some additional feedback I got.
> >
> > If I do not hear anything from folks that have already voted in the next
> > couple of days, I will assume their vote is still valid. You can also
> > confirm your vote if you want.
> >
> > KIP: https://cwiki.apache.org/confluence/x/7CnZCQ
> >
> > Best,
> > Bruno
> >
> > On 26.01.21 02:19, Sophie Blee-Goldman wrote:
> >> Thanks for the KIP Bruno, +1 (binding)
> >>
> >> Sophie
> >>
> >> On Mon, Jan 25, 2021 at 11:23 AM Guozhang Wang 
> >> wrote:
> >>
> >>> Hey Bruno,
> >>>
> >>> Thanks for your response!
> >>>
> >>> 1) Yup I'm good with option a) as well.
> >>> 2) Thanks!
> >>> 3) Sounds good to me. I think it would not change any StreamThread
> >>> implementation regarding capturing exceptions from consumer.poll()
> >>> since it
> >>> captures StreamsException as fatal.
> >>>
> >>>
> >>> Guozhang
> >>>
> >>> On Wed, Dec 16, 2020 at 4:43 AM Bruno Cadonna 
> >>> wrote:
> >>>
> >>>> Hi Guozhang,
> >>>>
> >>>> Thank for the feedback!
> >>>>
> >>>> Please find my answers inline.
> >>>>
> >>>> Best,
> >>>> Bruno
> >>>>
> >>>>
> >>>> On 14.12.20 23:33, Guozhang Wang wrote:
> >>>>> Hello Bruno,
> >>>>>
> >>>>> Just a few more questions about the KIP:
> >>>>>
> >>>>> 1) If the internal topics exist but the calculated num.partitions do
> >>> not
> >>>>> match the existing topics, what would Streams do;
> >>>>
> >>>> Good point! I missed to explicitly consider misconfigurations in the
> >>>> KIP.
> >>>>
> >>>> I propose to throw a fatal error in this case during manual and
> >>>> automatic initialization. For the fatal error, we have two options:
> >>>> a) introduce a second exception besides MissingInternalTopicException,
> >>>> e.g. MisconfiguredInternalTopicException
> >>>> b) rename MissingInternalTopicException to
> >>>> MissingOrMisconfiguredInternalTopicException and throw that in both
> >>> cases.
> >>>>
> >>>> Since the process to react on such an exception user-side should be
> >>>> similar, I am fine with option b). However, IMO option a) is a bit
> >>>> cleaner. WDYT?
> >>>>
> >>>>> 2) Since `init()` is a blocking call (we only return after all topics
> >>> are
> >>>>> confirmed to be created), should we have a timeout for this call as
> >>> well
> >>>> or
> >>>>> not;
> >>>>
> >>>> I will add an overload with a timeout to the KIP.
> >>>>
> >>>>> 3) If the configure is set to `MANUAL_SETUP`, then during rebalance
> do
> >>> we
> >>>>> still check if number of partitions of the existing topic match or
> >>>>> not;
> >>>> if
> >>>>> not, do we throw the newly added exception or throw a fatal
> >>>>> StreamsException? Today we would throw the StreamsException from
> >>> assign()
> >>>>> which would be then thrown from consumer.poll() as a fatal error.
> >>>>>
> >>>>
> >>>> Yes, I think we should check if the number of partitions match. I
> >>>> propose to throw the newly added exception in the same way as we throw
> >>>> now the MissingSourceTopicException, i.e., throw it from
> >>>> consumer.poll(). WDYT?
> >>>>
> >>>>> Guozhang
> >>>>>
> >>>>>
> >>>>> On Mon, Dec 14, 2020 at 12:47 PM John Roesler 
> >>>> wrote:
> >>>>>
> >>>>>> Thanks, Bruno!
> >>>>>>
> >>>>>> I'm +1 (binding)
> >>>>>>
> >>>>>> -John
> >>>>>>
> >>>>>> On Mon, 2020-12-14 at 09:57 -0600, Leah Thomas wrote:
> >>>>>>> Thanks for the KIP Bruno, LGTM. +1 (non-binding)
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Leah
> >>>>>>>
> >>>>>>> On Mon, Dec 14, 2020 at 4:29 AM Bruno Cadonna 
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> I'd like to start the voting on KIP-698 that proposes an explicit
> >>> user
> >>>>>>>> initialization of broker-side state for Kafka Streams instead of
> >>>>>> letting
> >>>>>>>> Kafka Streams setting up the broker-side state automatically
> during
> >>>>>>>> rebalance. Such an explicit initialization avoids possible data
> >>>>>>>> loss
> >>>>>>>> issues due to automatic initialization.
> >>>>>>>>
> >>>>>>>> https://cwiki.apache.org/confluence/x/7CnZCQ
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Bruno
> >>>>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
>


Re: [DISCUSS] Apache Kafka 2.8.0 release

2021-01-21 Thread Leah Thomas
Hi John,

KIP-659 was just accepted as well, can it be added to the release plan?
https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size

Thanks,
Leah

On Thu, Jan 14, 2021 at 9:36 AM John Roesler  wrote:

> Hi David,
>
> Thanks for the heads-up; it's added.
>
> -John
>
> On Thu, 2021-01-14 at 08:43 +0100, David Jacot wrote:
> > Hi John,
> >
> > KIP-700 just got accepted. Can we add it to the release plan?
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-700%3A+Add+Describe+Cluster+API
> >
> > Thanks,
> > David
> >
> > On Wed, Jan 13, 2021 at 11:22 PM John Roesler 
> wrote:
> >
> > > Thanks, Gary! Sorry for the oversight.
> > > -John
> > >
> > > On Wed, 2021-01-13 at 21:25 +, Gary Russell wrote:
> > > > Can you add a link to the summary page [1]?
> > > >
> > > > I always start there.
> > > >
> > > > Thanks
> > > >
> > > > [1]:
> > > https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan
> > > > Future release plan - Apache Kafka - Apache Software Foundation<
> > > https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan>
> > > > Release Plan 0.10.0; Release Plan 0.10.1; Release Plan 0.10.2.0;
> Release
> > > Plan 0.10.2.2; Release Plan 0.11.0.0; Release Plan 0.11.0.3; Release
> Plan
> > > 1.0.0 (2017 Oct.)
> > > > cwiki.apache.org
> > > >
> > > > 
> > > > From: John Roesler 
> > > > Sent: Wednesday, January 13, 2021 4:11 PM
> > > > To: dev@kafka.apache.org 
> > > > Subject: Re: [DISCUSS] Apache Kafka 2.8.0 release
> > > >
> > > > Hello again, all,
> > > >
> > > > I have published a release plan at
> > > >
> > >
> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fpages%2Fviewpage.action%3FpageId%3D173081737data=04%7C01%7Cgrussell%40vmware.com%7C6bb299de16bf4730c73608d8b8079404%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637461689989420036%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000sdata=8RfiDJZRr%2BZ1S46I5ZrHRyNOEKiBzjHYlxD4AnAb8p8%3Dreserved=0
> > > >
> > > > I have included all the KIPs that are currently approved,
> > > > but I am happy to make adjustments as necessary.
> > > >
> > > > The KIP freeze is Jan 27th.
> > > >
> > > > Please let me know if you have any objections.
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > > On Wed, 2021-01-06 at 23:30 -0600, John Roesler wrote:
> > > > > Hello All,
> > > > >
> > > > > I'd like to volunteer to be the release manager for our next
> > > > > feature release, 2.8.0. If there are no objections, I'll
> > > > > send out the release plan soon.
> > > > >
> > > > > Thanks,
> > > > > John Roesler
> > > > >
> > > >
> > > >
> > >
> > >
> > >
>
>
>


Re: [VOTE] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2021-01-21 Thread Leah Thomas
Thanks everyone, with that I'll go ahead and close this KIP with 4 binding
votes (Sophie, Guozhang, John, Matthias). Thanks for the lively discussion!

Leah

On Wed, Jan 20, 2021 at 6:00 PM Matthias J. Sax  wrote:

> Thanks for reviving this KIP, Leah.
>
> I agree that we should not extend the scope of this KIP to potentially
> deprecate/rename the `default.windowed.[key|value].serde.inner` configs.
>
> @Sophie: if you feel strong about it, let's do a separate KIP.
>
>
> +1 (binding)
>
>
> -Matthias
>
> On 1/19/21 3:00 PM, John Roesler wrote:
> > Hi all,
> >
> > I've just caught up on the thread, and FWIW, I'm still +1.
> >
> > Thanks,
> > -John
> >
> > On Mon, 2021-01-18 at 21:53 -0800, Guozhang Wang wrote:
> >> Read the above updates and the KIP's scope. Makes sense to me. +1 still
> >> counts :)
> >>
> >> On Wed, Jan 13, 2021 at 2:04 PM Sophie Blee-Goldman <
> sop...@confluent.io>
> >> wrote:
> >>
> >>> That sounds good to me. Thanks for reviving this
> >>>
> >>> Sophie
> >>>
> >>> On Wed, Jan 13, 2021 at 7:47 AM Leah Thomas 
> wrote:
> >>>
> >>>> Hey all,
> >>>>
> >>>> Bringing this back up for discussion.
> >>>>
> >>>> It seems like the next steps are to:
> >>>> 1. rename the config "window.size.ms"
> >>>> 2. ensure that users set window size EITHER through the config OR
> through
> >>>> the constructor. On this note, it may make sense to remove the default
> >>> for
> >>>> the `window.size.ms` config, so that there won't be a fall back if
> the
> >>>> window size isn't set in either spot. WDYT? This could also address
> the
> >>>> issue of multiple window sizes within a streams app.
> >>>>
> >>>> I see what Sophie is saying about the
> `default.windowed.key.serde.inner`
> >>>> config, but I do think deprecating and moving those configs would
> >>> require a
> >>>> larger discussion. I'm open to looping them into this KIP if we feel
> like
> >>>> it's vital (or incredibly convenient with low cost to users), but my
> >>>> initial reaction is to leave that out for now and work within the
> current
> >>>> set-up for window size.
> >>>>
> >>>> Thanks for all the comments so far,
> >>>> Leah
> >>>>
> >>>> On Tue, Sep 29, 2020 at 10:44 PM Sophie Blee-Goldman <
> >>> sop...@confluent.io>
> >>>> wrote:
> >>>>
> >>>>> There are two cases where you need to specify the window size --
> >>> directly
> >>>>> using a
> >>>>> Consumer (eg the console consumer) or reading as an input topic
> within
> >>>>> Streams.
> >>>>> We need a config for the first case, since you can't pass a
> >>> Deserializer
> >>>>> object to the
> >>>>> console consumer. In the Streams case, the reverse is true, and you
> >>> have
> >>>> to
> >>>>> pass in
> >>>>> an actual Serde object.
> >>>>>
> >>>>> Imo we should keep these two cases separate and not use the config
> for
> >>>> the
> >>>>> Streams
> >>>>> case at all. But that's hard to enforce (we'd have to strip the
> config
> >>>> out
> >>>>> of the user's
> >>>>> StreamsConfig if they tried to use it within Streams, for example)
> and
> >>> it
> >>>>> also puts us
> >>>>> in an awkward position due to the  default.windowed.inner.serde.class
> >>>>> configs. If
> >>>>> they can specify the inner serde class through their Streams app
> >>> config,
> >>>>> they
> >>>>> should be able to specify the window size through config as well.
> >>>> Otherwise
> >>>>> we
> >>>>> either force a mix-and-match as Matthias described, or you just
> always
> >>>> have
> >>>>> to
> >>>>> specify both the inner class and the window size in the constructor,
> at
> >>>>> which
> >>>>> point, why even have the default.windowed.inner.serde.class config at
> >>>> all?
> >>>>>
&

Re: [VOTE] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2021-01-13 Thread Leah Thomas
gt; > built-in operators only need the serializer and users do not need to
> > > override it, plus standby / restore active tasks would just copy the
> > bytes
> > > directly. So this KIP's motivation is not for Stream's own code
> anyways.
> > >
> > > 2) It is only when user specified serde is missing the window size,
> which
> > > is either when a) one is trying to read a source topic as windowed
> > records
> > > in Streams, this is a big blocker for KIP-300, and when b) one is
> trying
> > to
> > > read a topic as windowed records in Consumer, we would have issues if
> > users
> > > fail to use the appropriate serde constructs.
> > >
> > > I thought the main motivation of this KIP is for 2.a), in which we
> would
> > > just encourage the users to use the right constructor with the window
> > size
> > > by deprecating the other constructs. But I'm not sure how this would
> help
> > > with 2.b) since the proposal is on adding to StreamsConfig. If it is
> the
> > > case, then I agree that probably we can just not add an extra config
> but
> > > just deprecating the constructs.
> > >
> > >
> > > Guozhang
> > >
> > >
> > >
> > >
> > >
> > > On Tue, Sep 8, 2020 at 10:50 AM Sophie Blee-Goldman <
> sop...@confluent.io
> > >
> > > wrote:
> > >
> > >> Hey Guozhang & Leah,
> > >>
> > >> I want to push back a bit on the assumption that we would fall back on
> > this
> > >> config
> > >> in the case of an unspecified window size in a Streams serde. I don't
> > think
> > >> it should
> > >> be a default at all, either in name or in effect. To borrow the
> > rhetorical
> > >> question that
> > >> John raised earlier: what is the default window size of an
> application?
> > >>
> > >> Personally, I agree that that doesn't make much sense. Sure, if you
> only
> > >> have a single
> > >> windowed operation in your app then you could just specify the window
> > size
> > >> by config,
> > >> but how is that any more ergonomic than specifying the window size in
> > the
> > >> Serde's
> > >> constructor? If anything, it seems worse to put physical and mental
> > >> distance between
> > >> the specification and the actual usage of such parameters. What if you
> > add
> > >> another
> > >> windowed operation later, with a different size, and forget to specify
> > the
> > >> new size in
> > >> the new Serde? Or what if you never specify a default window size
> > config at
> > >> all and
> > >> accidentally end up using the default config value of Long.MAX_VALUE?
> > >> Avoiding this
> > >> possibility was/is one of the main motivations of this KIP, and the
> > whole
> > >> point of
> > >> deprecating the TimeWindowedSerde(innerClass) constructor.
> > >>
> > >> I actually would have advocated to remove this config entirely, but as
> > John
> > >> pointed
> > >> out, we still need it to configure things like the console consumer
> > >>
> > >> On Tue, Sep 8, 2020 at 10:40 AM Leah Thomas 
> > wrote:
> > >>
> > >>> Hi Guozhang,
> > >>>
> > >>> Yes, the config would read them as a single window size. I think this
> > >>> relates to John's comments about having variably sized windows, which
> > >> this
> > >>> config doesn't handle. I like the name change and updated the wiki to
> > >>> reflect that, and to clarify that the default value will still be
> > >>> Long.MAX_VALUE.
> > >>>
> > >>> Thanks for your feedback!
> > >>> Leah
> > >>>
> > >>> On Tue, Sep 8, 2020 at 11:54 AM Guozhang Wang 
> > >> wrote:
> > >>>
> > >>>> Hello Leah,
> > >>>>
> > >>>> Thanks for initiating this. I just have one minor clarification
> > >> question
> > >>>> here: the config "window.size.ms" seems to be used as the default
> > >> window
> > >>>> size when reading from a topic that represents windowed records
> right?
> > >>> I.e.
> > >>>> if there are multiple topics that represent windowed r

[jira] [Resolved] (KAFKA-9126) Extend `StreamJoined` to allow more store configs

2020-12-17 Thread Leah Thomas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leah Thomas resolved KAFKA-9126.

Fix Version/s: 2.8.0
   Resolution: Fixed

> Extend `StreamJoined` to allow more store configs
> -
>
> Key: KAFKA-9126
> URL: https://issues.apache.org/jira/browse/KAFKA-9126
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Matthias J. Sax
>Assignee: Leah Thomas
>Priority: Minor
>  Labels: needs-kip, newbie, newbie++
> Fix For: 2.8.0
>
>
> In 2.4.0 release, we introduced `StreamJoined` configuration object via 
> KIP-479 (KAFKA-8558). The idea of `StreamJoined` is to be an equivalent to 
> `Materialized` but customized for stream-stream joines, that have two stores 
> (in contrast to the usage of `Materialized` that is used for single-store 
> operators).
> During the KIP discussion, the idea to allow setting the store retention time 
> and enable/disable changelogging for the store was discussed. However, at 
> some point this idea was dropped for unknown reasons (seems it slipped).
> We should consider to extend `StreamJoined` with `withRetentionPeriod()` and 
> `loggingEnabled()`/`loggingDisabled()` methods to get feature parity to 
> `Materialized`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10417) suppress() with cogroup() throws ClassCastException

2020-12-16 Thread Leah Thomas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leah Thomas resolved KAFKA-10417.
-
Resolution: Fixed

> suppress() with cogroup() throws ClassCastException
> ---
>
> Key: KAFKA-10417
> URL: https://issues.apache.org/jira/browse/KAFKA-10417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Wardha Perinkada Kattu
>Assignee: Leah Thomas
>Priority: Critical
>  Labels: kafka-streams
> Fix For: 2.8.0, 2.7.1
>
>
> Streams operation - `cogroup()`, `aggregate()` followed by `suppress()` 
> throws `ClassCastException`
> Works fine without the `suppress()`
> Code block tested -
> {code:java}
> val stream1 = requestStreams.merge(successStreams).merge(errorStreams)
> .groupByKey(Grouped.with(Serdes.String(), 
> serdesConfig.notificationSerde()))
> val streams2 = confirmationStreams
> .groupByKey(Grouped.with(Serdes.String(), 
> serdesConfig.confirmationsSerde()))
> val cogrouped = 
> stream1.cogroup(notificationAggregator).cogroup(streams2, 
> confirmationsAggregator)
> 
> .windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong(
> .aggregate({ null }, Materialized.`as` NotificationMetric, WindowStore ByteArray>>("time-windowed-aggregated-stream-store")
> 
> .withValueSerde(serdesConfig.notificationMetricSerde()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()))
> .toStream()
> {code}
> Exception thrown is:
> {code:java}
> Caused by: java.lang.ClassCastException: class 
> org.apache.kafka.streams.kstream.internals.PassThrough cannot be cast to 
> class org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier 
> (org.apache.kafka.streams.kstream.internals.PassThrough and 
> org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier are in 
> unnamed module of loader 'app')
> {code}
> [https://stackoverflow.com/questions/63459685/kgroupedstream-with-cogroup-aggregate-suppress]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-698: Add Explicit User Initialization of Broker-side State to Kafka Streams

2020-12-14 Thread Leah Thomas
Thanks for the KIP Bruno, LGTM. +1 (non-binding)

Cheers,
Leah

On Mon, Dec 14, 2020 at 4:29 AM Bruno Cadonna  wrote:

> Hi,
>
> I'd like to start the voting on KIP-698 that proposes an explicit user
> initialization of broker-side state for Kafka Streams instead of letting
> Kafka Streams setting up the broker-side state automatically during
> rebalance. Such an explicit initialization avoids possible data loss
> issues due to automatic initialization.
>
> https://cwiki.apache.org/confluence/x/7CnZCQ
>
> Best,
> Bruno
>


Re: [VOTE] KIP-696: Update Streams FSM to clarify ERROR state meaning

2020-12-09 Thread Leah Thomas
Looks good, thanks Walker! +1 (non-binding)

Leah

On Wed, Dec 9, 2020 at 1:04 PM John Roesler  wrote:

> Thanks, Walker!
>
> I'm also +1 (binding)
>
> -John
>
> On Wed, 2020-12-09 at 11:03 -0800, Guozhang Wang wrote:
> > +1. Thanks Walker.
> >
> > On Wed, Dec 9, 2020 at 10:58 AM Walker Carlson 
> > wrote:
> >
> > > Sorry I forgot to change the subject line to vote.
> > >
> > > Thanks for the comments. If there are no further concerns I would like
> to
> > > call for a vote on KIP-696 to clarify and clean up the Streams State
> > > Machine.
> > >
> > > On Wed, Dec 9, 2020 at 10:04 AM Walker Carlson 
> > > wrote:
> > >
> > > > Thanks for the comments. If there are no further concerns I would
> like to
> > > > call for a vote on KIP-696 to clarify and clean up the Streams State
> > > > Machine.
> > > >
> > > > walker
> > > >
> > > > On Wed, Dec 9, 2020 at 8:50 AM John Roesler 
> wrote:
> > > >
> > > > > Thanks, Walker!
> > > > >
> > > > > Your proposal looks good to me.
> > > > >
> > > > > -John
> > > > >
> > > > > On Tue, 2020-12-08 at 18:29 -0800, Walker Carlson wrote:
> > > > > > Thanks for the feedback Guozhang!
> > > > > >
> > > > > > I clarified some of the points in the Proposed Changes section so
> > > > > hopefully
> > > > > > it will be more clear what is going on now. I also agree with
> your
> > > > > > suggestion about the possible call to close() on ERROR so I
> added this
> > > > > > line.
> > > > > > "Close() called on ERROR will be idempotent and not throw an
> > > exception,
> > > > > but
> > > > > > we will log a warning."
> > > > > >
> > > > > > I have linked those tickets and I will leave a comment trying to
> > > explain
> > > > > > how these changes will affect their issue.
> > > > > >
> > > > > > walker
> > > > > >
> > > > > > On Tue, Dec 8, 2020 at 4:57 PM Guozhang Wang  >
> > > > > wrote:
> > > > > >
> > > > > > > Hello Walker,
> > > > > > >
> > > > > > > Thanks for the KIP! Overall it looks reasonable to me. Just a
> few
> > > > > minor
> > > > > > > comments for the wiki page itself:
> > > > > > >
> > > > > > > 1) Could you clarify the conditions when RUNNING / REBALANCING
> ->
> > > > > > > PENDING_ERROR will happen; and when PENDING_ERROR -> ERROR will
> > > > > happen.
> > > > > > > E.g. when I read "Streams will only reach ERROR state in the
> event
> > > of
> > > > > an
> > > > > > > exceptional failure in which the
> `StreamsUncaughtExceptionHandler`
> > > > > chose to
> > > > > > > either shutdown the application or the client." I thought the
> first
> > > > > > > transition would happen before the handler, and the second
> > > transition
> > > > > would
> > > > > > > happen immediately after the handler returns "shutdown client"
> or
> > > > > "shutdown
> > > > > > > application", until I read the last statement regarding
> > > > > "SHUTDOWN_CLIENT".
> > > > > > >
> > > > > > > 2) A compatibility issue: today it is possible that users
> would call
> > > > > > > Streams APIs like shutdown in the global state transition
> listener.
> > > > > And
> > > > > > > it's common to try shutting down the application automatically
> when
> > > > > > > transiting to ERROR (assuming it was not a terminating state).
> I
> > > > > think we
> > > > > > > could consider making this call a no-op and log a warning.
> > > > > > >
> > > > > > > 3) Could you link the following JIRAs in the "JIRA" field?
> > > > > > >
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-10555
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-9638
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-6520
> > > > > > >
> > > > > > > And maybe we can also left a comment on those tickets
> explaining
> > > what
> > > > > would
> > > > > > > happen to tackle the issues after this KIP.
> > > > > > >
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Dec 8, 2020 at 12:16 PM Walker Carlson <
> > > wcarl...@confluent.io
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hello all,
> > > > > > > >
> > > > > > > > I'd like to propose KIP-696 to clarify the meaning of ERROR
> state
> > > > > in the
> > > > > > > > KafkaStreams Client State Machine. This will update the
> States to
> > > be
> > > > > > > > consistent with changes in KIP-671 and KIP-663.
> > > > > > > >
> > > > > > > > Here are the details:
> > > https://cwiki.apache.org/confluence/x/lCvZCQ
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Walker
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > >
> > > > >
> > > > >
> > >
> >
> >
>
>
>


Re: [VOTE] KIP-689: Extend `StreamJoined` to allow more store configs

2020-12-07 Thread Leah Thomas
Thanks all! KIP-689 is accepted with 4 binding votes (Sophie, Guozhang,
Matthias, John) and 2 non-binding votes (Bruno, Walker).

Cheers,
Leah

On Fri, Dec 4, 2020 at 10:03 PM John Roesler  wrote:

> Sorry I missed the discussion, but I just read the KIP and
> the discussion. It all looks good to me.
>
> +1 (binding)
>
> -John
>
> On Thu, 2020-12-03 at 14:00 -0700, Matthias J. Sax wrote:
> > +1 (binding)
> >
> > On 12/3/20 11:21 AM, Guozhang Wang wrote:
> > > +1 (binding)
> > >
> > > Thanks Leah!
> > >
> > > On Wed, Dec 2, 2020 at 11:27 AM Sophie Blee-Goldman <
> sop...@confluent.io>
> > > wrote:
> > >
> > > > Thanks for the KIP! +1 (binding)
> > > >
> > > > Sophie
> > > >
> > > > On Wed, Dec 2, 2020 at 8:42 AM Walker Carlson  >
> > > > wrote:
> > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > Thank you,
> > > > > walker
> > > > >
> > > > > On Wed, Dec 2, 2020 at 8:15 AM Bruno Cadonna 
> wrote:
> > > > >
> > > > > > +1 (non-binding)
> > > > > >
> > > > > > Thanks Leah!
> > > > > >
> > > > > > Best,
> > > > > > Bruno
> > > > > >
> > > > > > On 02.12.20 16:55, Leah Thomas wrote:
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I'd like to start the vote for KIP-689 for enabling/disabling
> logging
> > > > > for
> > > > > > > `StreamJoined`.
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-689%3A+Extend+%60StreamJoined%60+to+allow+more+store+configs
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Leah
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
>
>
>


Re: [DISCUSS] KIP-689: Extend `StreamJoined` to allow more store configs

2020-12-03 Thread Leah Thomas
Ahh, that makes sense. Thanks for clarifying!

On Thu, Dec 3, 2020 at 3:01 PM Matthias J. Sax  wrote:

> Thanks for looking into it. I also needed to refresh my memory.
>
> In Scala, it's sufficient to expose the `static` methods for a native
> Scala integration. If you look into the code, those static methods
> return the actually Java objects, and thus all non-static Java methods
> are available automatically.
>
> As we only add new non-static methods, we don't need to do anything for
> the Scala API, and the new methods will be available for Scala users
> automatically.
>
>
> -Matthias
>
> On 12/3/20 7:16 AM, Leah Thomas wrote:
> > Thanks for thinking of that Matthias. After looking at what we currently
> > have for `StreamJoined` in scala, we just have bare bones functionality,
> > allowing users to use `StreamJoined` through the two `with()` functions
> and
> > one `as()` function. This is the same for `Materialized` in scala, which
> > does not include implementation for `withLoggingEnabled()` etc. Neither
> of
> > these scala versions include `withKeySerde()` or `withValueSerde()`.
> >
> > I'm not sure if we consciously made the decision to not implement
> > additional configs for `Materialized` and `StreamJoined` in Scala, but I
> > don't think it makes a lot of sense to add functionality for
> > `withLoggingEnabled()` and `withLoggingDisabled()` if the same cannot be
> > said for Materialized or the other configs we allow in the Java API. My
> > thought is to leave it out of this KIP and, if we so choose, create a
> > follow-up ticket to implement all the additional functionality at once
> for
> > the Scala API. WDYT?
> >
> > On Wed, Dec 2, 2020 at 5:40 PM Matthias J. Sax  wrote:
> >
> >> One more follow up: do we need to update the Scala API, too?
> >>
> >>
> >> -Matthias
> >>
> >> On 12/2/20 7:51 AM, Leah Thomas wrote:
> >>> Thanks for the feedback! I'll go ahead and move it to vote now.
> >>>
> >>> Best,
> >>> Leah
> >>>
> >>> On Wed, Dec 2, 2020 at 6:58 AM Bruno Cadonna 
> wrote:
> >>>
> >>>> Thanks for the KIP!
> >>>>
> >>>> LGTM, as well!
> >>>>
> >>>> Best,
> >>>> Bruno
> >>>>
> >>>> On 02.12.20 00:41, Walker Carlson wrote:
> >>>>> Thanks for making these changes. It makes more sense now to me.
> Overall
> >>>> LGTM
> >>>>>
> >>>>> walker
> >>>>>
> >>>>> On Tue, Dec 1, 2020 at 3:39 PM Sophie Blee-Goldman <
> >> sop...@confluent.io>
> >>>>> wrote:
> >>>>>
> >>>>>> Thanks for the KIP! I'm happy with the state of things after your
> >> latest
> >>>>>> update,
> >>>>>> LGTM
> >>>>>>
> >>>>>> Sophie
> >>>>>>
> >>>>>> On Tue, Dec 1, 2020 at 2:26 PM Leah Thomas 
> >>>> wrote:
> >>>>>>
> >>>>>>> Hi Matthias,
> >>>>>>>
> >>>>>>> Yeah I think it should, good catch. That should also answer
> Walker's
> >>>>>>> question about why we have an option for `withLoggingEnabled()`
> even
> >>>>>> though
> >>>>>>> that's the default. Passing in a new map of configs could allow the
> >>>> user
> >>>>>> to
> >>>>>>> configure the log differently than the default. I've updated the
> KIP
> >> to
> >>>>>>> reflected the added parameter and an added variable, `topicConfig`
> to
> >>>>>> store
> >>>>>>> the map of configs.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Leah
> >>>>>>>
> >>>>>>> On Mon, Nov 30, 2020 at 5:35 PM Matthias J. Sax 
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Thanks for the KIP Leah.
> >>>>>>>>
> >>>>>>>> Should `withLoggingEnabled()` take a `Map config`
> >>>>>>>> similar to the one from `Materialized`?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>

Re: [DISCUSS] KIP-689: Extend `StreamJoined` to allow more store configs

2020-12-03 Thread Leah Thomas
Thanks for thinking of that Matthias. After looking at what we currently
have for `StreamJoined` in scala, we just have bare bones functionality,
allowing users to use `StreamJoined` through the two `with()` functions and
one `as()` function. This is the same for `Materialized` in scala, which
does not include implementation for `withLoggingEnabled()` etc. Neither of
these scala versions include `withKeySerde()` or `withValueSerde()`.

I'm not sure if we consciously made the decision to not implement
additional configs for `Materialized` and `StreamJoined` in Scala, but I
don't think it makes a lot of sense to add functionality for
`withLoggingEnabled()` and `withLoggingDisabled()` if the same cannot be
said for Materialized or the other configs we allow in the Java API. My
thought is to leave it out of this KIP and, if we so choose, create a
follow-up ticket to implement all the additional functionality at once for
the Scala API. WDYT?

On Wed, Dec 2, 2020 at 5:40 PM Matthias J. Sax  wrote:

> One more follow up: do we need to update the Scala API, too?
>
>
> -Matthias
>
> On 12/2/20 7:51 AM, Leah Thomas wrote:
> > Thanks for the feedback! I'll go ahead and move it to vote now.
> >
> > Best,
> > Leah
> >
> > On Wed, Dec 2, 2020 at 6:58 AM Bruno Cadonna  wrote:
> >
> >> Thanks for the KIP!
> >>
> >> LGTM, as well!
> >>
> >> Best,
> >> Bruno
> >>
> >> On 02.12.20 00:41, Walker Carlson wrote:
> >>> Thanks for making these changes. It makes more sense now to me. Overall
> >> LGTM
> >>>
> >>> walker
> >>>
> >>> On Tue, Dec 1, 2020 at 3:39 PM Sophie Blee-Goldman <
> sop...@confluent.io>
> >>> wrote:
> >>>
> >>>> Thanks for the KIP! I'm happy with the state of things after your
> latest
> >>>> update,
> >>>> LGTM
> >>>>
> >>>> Sophie
> >>>>
> >>>> On Tue, Dec 1, 2020 at 2:26 PM Leah Thomas 
> >> wrote:
> >>>>
> >>>>> Hi Matthias,
> >>>>>
> >>>>> Yeah I think it should, good catch. That should also answer Walker's
> >>>>> question about why we have an option for `withLoggingEnabled()` even
> >>>> though
> >>>>> that's the default. Passing in a new map of configs could allow the
> >> user
> >>>> to
> >>>>> configure the log differently than the default. I've updated the KIP
> to
> >>>>> reflected the added parameter and an added variable, `topicConfig` to
> >>>> store
> >>>>> the map of configs.
> >>>>>
> >>>>> Best,
> >>>>> Leah
> >>>>>
> >>>>> On Mon, Nov 30, 2020 at 5:35 PM Matthias J. Sax 
> >>>> wrote:
> >>>>>
> >>>>>> Thanks for the KIP Leah.
> >>>>>>
> >>>>>> Should `withLoggingEnabled()` take a `Map config`
> >>>>>> similar to the one from `Materialized`?
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 11/30/20 12:22 PM, Walker Carlson wrote:
> >>>>>>> Ah. That makes sense. Thank you for fixing that.
> >>>>>>>
> >>>>>>> One minor question. If the default is enabled is there any case
> >>>> where a
> >>>>>>> user would turn logging off then back on again? I can see having
> the
> >>>>>>> enableLoging for completeness so it's not that important probably.
> >>>>>>>
> >>>>>>> Anyways other than that it looks good!
> >>>>>>>
> >>>>>>> Walker
> >>>>>>>
> >>>>>>> On Mon, Nov 30, 2020 at 12:06 PM Leah Thomas  >
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hey Walker,
> >>>>>>>>
> >>>>>>>> Thanks for your response.
> >>>>>>>>
> >>>>>>>> 1. Ah yeah thanks for the catch, that was held over from
> copy/paste.
> >>>>>> Both
> >>>>>>>> functions should take no parameters, as they just `loggingEnabled`
> >>>> to
> >>>>>> true
> >>>>>>>> or false. I've removed the `WindowBytesStoreSuppli

[VOTE] KIP-689: Extend `StreamJoined` to allow more store configs

2020-12-02 Thread Leah Thomas
Hi all,

I'd like to start the vote for KIP-689 for enabling/disabling logging for
`StreamJoined`.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-689%3A+Extend+%60StreamJoined%60+to+allow+more+store+configs

Thanks,
Leah


Re: [DISCUSS] KIP-689: Extend `StreamJoined` to allow more store configs

2020-12-02 Thread Leah Thomas
Thanks for the feedback! I'll go ahead and move it to vote now.

Best,
Leah

On Wed, Dec 2, 2020 at 6:58 AM Bruno Cadonna  wrote:

> Thanks for the KIP!
>
> LGTM, as well!
>
> Best,
> Bruno
>
> On 02.12.20 00:41, Walker Carlson wrote:
> > Thanks for making these changes. It makes more sense now to me. Overall
> LGTM
> >
> > walker
> >
> > On Tue, Dec 1, 2020 at 3:39 PM Sophie Blee-Goldman 
> > wrote:
> >
> >> Thanks for the KIP! I'm happy with the state of things after your latest
> >> update,
> >> LGTM
> >>
> >> Sophie
> >>
> >> On Tue, Dec 1, 2020 at 2:26 PM Leah Thomas 
> wrote:
> >>
> >>> Hi Matthias,
> >>>
> >>> Yeah I think it should, good catch. That should also answer Walker's
> >>> question about why we have an option for `withLoggingEnabled()` even
> >> though
> >>> that's the default. Passing in a new map of configs could allow the
> user
> >> to
> >>> configure the log differently than the default. I've updated the KIP to
> >>> reflected the added parameter and an added variable, `topicConfig` to
> >> store
> >>> the map of configs.
> >>>
> >>> Best,
> >>> Leah
> >>>
> >>> On Mon, Nov 30, 2020 at 5:35 PM Matthias J. Sax 
> >> wrote:
> >>>
> >>>> Thanks for the KIP Leah.
> >>>>
> >>>> Should `withLoggingEnabled()` take a `Map config`
> >>>> similar to the one from `Materialized`?
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 11/30/20 12:22 PM, Walker Carlson wrote:
> >>>>> Ah. That makes sense. Thank you for fixing that.
> >>>>>
> >>>>> One minor question. If the default is enabled is there any case
> >> where a
> >>>>> user would turn logging off then back on again? I can see having the
> >>>>> enableLoging for completeness so it's not that important probably.
> >>>>>
> >>>>> Anyways other than that it looks good!
> >>>>>
> >>>>> Walker
> >>>>>
> >>>>> On Mon, Nov 30, 2020 at 12:06 PM Leah Thomas 
> >>>> wrote:
> >>>>>
> >>>>>> Hey Walker,
> >>>>>>
> >>>>>> Thanks for your response.
> >>>>>>
> >>>>>> 1. Ah yeah thanks for the catch, that was held over from copy/paste.
> >>>> Both
> >>>>>> functions should take no parameters, as they just `loggingEnabled`
> >> to
> >>>> true
> >>>>>> or false. I've removed the `WindowBytesStoreSupplier
> >>> otherStoreSupplier`
> >>>>>> from the methods in the KIP
> >>>>>> 2. I think the fix to 1 answers this question, otherwise, I'm not
> >>> quite
> >>>>>> sure what you're asking. With the updated method calls, there
> >>> shouldn't
> >>>> be
> >>>>>> any duplication.
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Leah
> >>>>>>
> >>>>>> On Mon, Nov 30, 2020 at 12:14 PM Walker Carlson <
> >>> wcarl...@confluent.io>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hello Leah,
> >>>>>>>
> >>>>>>> Thank you for the KIP.
> >>>>>>>
> >>>>>>> I had a couple questions that maybe you can expand on from what is
> >> on
> >>>> the
> >>>>>>> KIP.
> >>>>>>>
> >>>>>>> 1) Why are we enabling/disabling the logging by passing in a
> >>>>>>> `WindowBytesStoreSupplier`?
> >>>>>>> It seems to me that these two things should be separate.
> >>>>>>>
> >>>>>>> 2) There is already `withThisStoreSupplier(final
> >>>> WindowBytesStoreSupplier
> >>>>>>> otherStoreSupplier)` and `withOtherStoreSupplier(final
> >>>>>>> WindowBytesStoreSupplier otherStoreSupplier)`. Why do we need to
> >>>>>> duplicate
> >>>>>>> them when the `retentionPeriod` can be set through them?
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Walker
> >>>>>>>
> >>>>>>> On Mon, Nov 30, 2020 at 8:53 AM Leah Thomas 
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> After reading through
> >>>> https://issues.apache.org/jira/browse/KAFKA-9921
> >>>>>> I
> >>>>>>>> removed the option to enable/disable caching for `StreamJoined`,
> >> as
> >>>>>>> caching
> >>>>>>>> will always be disabled because we retain duplicates.
> >>>>>>>>
> >>>>>>>> I updated the KIP accordingly, it now adds only `enableLogging`
> >> as a
> >>>>>>>> config.
> >>>>>>>>
> >>>>>>>> On Mon, Nov 30, 2020 at 9:54 AM Leah Thomas  >>>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi all,
> >>>>>>>>>
> >>>>>>>>> I'd like to kick-off the discussion for KIP-689: Extend
> >>>>>> `StreamJoined`
> >>>>>>> to
> >>>>>>>>> allow more store configs. This builds off the work of KIP-479
> >>>>>>>>> <
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join
> >>>>>>>>
> >>>>>>>> to
> >>>>>>>>> add options to enable/disable both logging and caching for stream
> >>>>>> join
> >>>>>>>>> stores.
> >>>>>>>>>
> >>>>>>>>> KIP is here:
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-689%3A+Extend+%60StreamJoined%60+to+allow+more+store+configs
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Looking forward to hearing your thoughts,
> >>>>>>>>> Leah
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>


Re: [DISCUSS] KIP-689: Extend `StreamJoined` to allow more store configs

2020-12-01 Thread Leah Thomas
Hi Matthias,

Yeah I think it should, good catch. That should also answer Walker's
question about why we have an option for `withLoggingEnabled()` even though
that's the default. Passing in a new map of configs could allow the user to
configure the log differently than the default. I've updated the KIP to
reflected the added parameter and an added variable, `topicConfig` to store
the map of configs.

Best,
Leah

On Mon, Nov 30, 2020 at 5:35 PM Matthias J. Sax  wrote:

> Thanks for the KIP Leah.
>
> Should `withLoggingEnabled()` take a `Map config`
> similar to the one from `Materialized`?
>
>
> -Matthias
>
> On 11/30/20 12:22 PM, Walker Carlson wrote:
> > Ah. That makes sense. Thank you for fixing that.
> >
> > One minor question. If the default is enabled is there any case where a
> > user would turn logging off then back on again? I can see having the
> > enableLoging for completeness so it's not that important probably.
> >
> > Anyways other than that it looks good!
> >
> > Walker
> >
> > On Mon, Nov 30, 2020 at 12:06 PM Leah Thomas 
> wrote:
> >
> >> Hey Walker,
> >>
> >> Thanks for your response.
> >>
> >> 1. Ah yeah thanks for the catch, that was held over from copy/paste.
> Both
> >> functions should take no parameters, as they just `loggingEnabled` to
> true
> >> or false. I've removed the `WindowBytesStoreSupplier otherStoreSupplier`
> >> from the methods in the KIP
> >> 2. I think the fix to 1 answers this question, otherwise, I'm not quite
> >> sure what you're asking. With the updated method calls, there shouldn't
> be
> >> any duplication.
> >>
> >> Cheers,
> >> Leah
> >>
> >> On Mon, Nov 30, 2020 at 12:14 PM Walker Carlson 
> >> wrote:
> >>
> >>> Hello Leah,
> >>>
> >>> Thank you for the KIP.
> >>>
> >>> I had a couple questions that maybe you can expand on from what is on
> the
> >>> KIP.
> >>>
> >>> 1) Why are we enabling/disabling the logging by passing in a
> >>> `WindowBytesStoreSupplier`?
> >>> It seems to me that these two things should be separate.
> >>>
> >>> 2) There is already `withThisStoreSupplier(final
> WindowBytesStoreSupplier
> >>> otherStoreSupplier)` and `withOtherStoreSupplier(final
> >>> WindowBytesStoreSupplier otherStoreSupplier)`. Why do we need to
> >> duplicate
> >>> them when the `retentionPeriod` can be set through them?
> >>>
> >>> Thanks,
> >>> Walker
> >>>
> >>> On Mon, Nov 30, 2020 at 8:53 AM Leah Thomas 
> >> wrote:
> >>>
> >>>> After reading through
> https://issues.apache.org/jira/browse/KAFKA-9921
> >> I
> >>>> removed the option to enable/disable caching for `StreamJoined`, as
> >>> caching
> >>>> will always be disabled because we retain duplicates.
> >>>>
> >>>> I updated the KIP accordingly, it now adds only `enableLogging` as a
> >>>> config.
> >>>>
> >>>> On Mon, Nov 30, 2020 at 9:54 AM Leah Thomas 
> >>> wrote:
> >>>>
> >>>>> Hi all,
> >>>>>
> >>>>> I'd like to kick-off the discussion for KIP-689: Extend
> >> `StreamJoined`
> >>> to
> >>>>> allow more store configs. This builds off the work of KIP-479
> >>>>> <
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join
> >>>>
> >>>> to
> >>>>> add options to enable/disable both logging and caching for stream
> >> join
> >>>>> stores.
> >>>>>
> >>>>> KIP is here:
> >>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-689%3A+Extend+%60StreamJoined%60+to+allow+more+store+configs
> >>>>>
> >>>>>
> >>>>> Looking forward to hearing your thoughts,
> >>>>> Leah
> >>>>>
> >>>>
> >>>
> >>
> >
>


Re: [DISCUSS] KIP-689: Extend `StreamJoined` to allow more store configs

2020-11-30 Thread Leah Thomas
Hey Walker,

Thanks for your response.

1. Ah yeah thanks for the catch, that was held over from copy/paste. Both
functions should take no parameters, as they just `loggingEnabled` to true
or false. I've removed the `WindowBytesStoreSupplier otherStoreSupplier`
from the methods in the KIP
2. I think the fix to 1 answers this question, otherwise, I'm not quite
sure what you're asking. With the updated method calls, there shouldn't be
any duplication.

Cheers,
Leah

On Mon, Nov 30, 2020 at 12:14 PM Walker Carlson 
wrote:

> Hello Leah,
>
> Thank you for the KIP.
>
> I had a couple questions that maybe you can expand on from what is on the
> KIP.
>
> 1) Why are we enabling/disabling the logging by passing in a
> `WindowBytesStoreSupplier`?
> It seems to me that these two things should be separate.
>
> 2) There is already `withThisStoreSupplier(final WindowBytesStoreSupplier
> otherStoreSupplier)` and `withOtherStoreSupplier(final
> WindowBytesStoreSupplier otherStoreSupplier)`. Why do we need to duplicate
> them when the `retentionPeriod` can be set through them?
>
> Thanks,
> Walker
>
> On Mon, Nov 30, 2020 at 8:53 AM Leah Thomas  wrote:
>
> > After reading through https://issues.apache.org/jira/browse/KAFKA-9921 I
> > removed the option to enable/disable caching for `StreamJoined`, as
> caching
> > will always be disabled because we retain duplicates.
> >
> > I updated the KIP accordingly, it now adds only `enableLogging` as a
> > config.
> >
> > On Mon, Nov 30, 2020 at 9:54 AM Leah Thomas 
> wrote:
> >
> > > Hi all,
> > >
> > > I'd like to kick-off the discussion for KIP-689: Extend `StreamJoined`
> to
> > > allow more store configs. This builds off the work of KIP-479
> > > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join
> >
> > to
> > > add options to enable/disable both logging and caching for stream join
> > > stores.
> > >
> > > KIP is here:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-689%3A+Extend+%60StreamJoined%60+to+allow+more+store+configs
> > >
> > >
> > > Looking forward to hearing your thoughts,
> > > Leah
> > >
> >
>


Re: [DISCUSS] KIP-689: Extend `StreamJoined` to allow more store configs

2020-11-30 Thread Leah Thomas
After reading through https://issues.apache.org/jira/browse/KAFKA-9921 I
removed the option to enable/disable caching for `StreamJoined`, as caching
will always be disabled because we retain duplicates.

I updated the KIP accordingly, it now adds only `enableLogging` as a config.

On Mon, Nov 30, 2020 at 9:54 AM Leah Thomas  wrote:

> Hi all,
>
> I'd like to kick-off the discussion for KIP-689: Extend `StreamJoined` to
> allow more store configs. This builds off the work of KIP-479
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join>
>  to
> add options to enable/disable both logging and caching for stream join
> stores.
>
> KIP is here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-689%3A+Extend+%60StreamJoined%60+to+allow+more+store+configs
>
>
> Looking forward to hearing your thoughts,
> Leah
>


[DISCUSS] KIP-689: Extend `StreamJoined` to allow more store configs

2020-11-30 Thread Leah Thomas
Hi all,

I'd like to kick-off the discussion for KIP-689: Extend `StreamJoined` to
allow more store configs. This builds off the work of KIP-479

to
add options to enable/disable both logging and caching for stream join
stores.

KIP is here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-689%3A+Extend+%60StreamJoined%60+to+allow+more+store+configs


Looking forward to hearing your thoughts,
Leah


Re: [ANNOUNCE] New committer: A. Sophie Blee-Goldman

2020-10-19 Thread Leah Thomas
Congrats Sophie!

On Mon, Oct 19, 2020 at 11:41 AM Matthias J. Sax  wrote:

> Hi all,
>
> I am excited to announce that A. Sophie Blee-Goldman has accepted her
> invitation to become an Apache Kafka committer.
>
> Sophie is actively contributing to Kafka since Feb 2019 and has
> accumulated 140 commits. She authored 4 KIPs in the lead
>
>  - KIP-453: Add close() method to RocksDBConfigSetter
>  - KIP-445: In-memory Session Store
>  - KIP-428: Add in-memory window store
>  - KIP-613: Add end-to-end latency metrics to Streams
>
> and helped to implement two critical KIPs, 429 (incremental rebalancing)
> and 441 (smooth auto-scaling; not just implementation but also design).
>
> In addition, she participates in basically every Kafka Streams related
> KIP discussion, reviewed 142 PRs, and is active on the user mailing list.
>
> Thanks for all the contributions, Sophie!
>
>
> Please join me to congratulate her!
>  -Matthias
>
>


Re: [VOTE] KIP-671: Add method to Shutdown entire Streams Application

2020-09-29 Thread Leah Thomas
Hey Walker,

Thanks for the KIP! I'm +1, non-binding.

Cheers,
Leah

On Tue, Sep 29, 2020 at 1:56 PM Walker Carlson 
wrote:

> Hello all,
>
> I made some changes to the KIP the descriptions are on the discussion
> thread. If you have already voted I would ask you to confirm your vote.
>
> Otherwise please vote so we can get this feature in.
>
> Thanks,
> Walker
>
> On Thu, Sep 24, 2020 at 4:36 PM John Roesler  wrote:
>
> > Thanks for the KIP, Walker!
> >
> > I’m +1 (binding)
> >
> > -John
> >
> > On Mon, Sep 21, 2020, at 17:04, Guozhang Wang wrote:
> > > Thanks for finalizing the KIP. +1 (binding)
> > >
> > >
> > > Guozhang
> > >
> > > On Mon, Sep 21, 2020 at 1:38 PM Walker Carlson 
> > > wrote:
> > >
> > > > Hello all,
> > > >
> > > > I would like to start a thread to vote for KIP-671 to add a method to
> > close
> > > > all clients in a kafka streams application.
> > > >
> > > > KIP:
> > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Shutdown+Streams+Application+when+appropriate+exception+is+thrown
> > > >
> > > > Discussion thread: *here
> > > > <
> > > >
> >
> https://mail-archives.apache.org/mod_mbox/kafka-dev/202009.mbox/%3CCAC55fuh3HAGCxz-PzxTJraczy6T-os2oiCV328PBeuJQSVYASg%40mail.gmail.com%3E
> > > > >*
> > > >
> > > > Thanks,
> > > > -Walker
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>


Re: [DISCUSS] Apache Kafka 2.7.0 release

2020-09-08 Thread Leah Thomas
Hi Bill,

Could you also add KIP-450 to the release plan? It's been merged.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL

Cheers,
Leah

On Tue, Sep 8, 2020 at 9:32 AM Bill Bejeck  wrote:

> Hi Bruno,
>
> Thanks for letting me know, I've added KIP-662 to the release plan.
>
> -Bill
>
> On Mon, Sep 7, 2020 at 11:33 AM Bruno Cadonna  wrote:
>
> > Hi Bill,
> >
> > Could you add KIP-662 [1] to the release plan. The KIP has been already
> > implemented.
> >
> > Best,
> > Bruno
> >
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-662%3A+Throw+Exception+when+Source+Topics+of+a+Streams+App+are+Deleted
> >
> > On 26.08.20 16:54, Bill Bejeck wrote:
> > > Greetings All!
> > >
> > > I've published a release plan at
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158872629
> > .
> > > I have included all of the KIPs that are currently approved, but I'm
> > happy
> > > to make any adjustments as necessary.
> > >
> > > The KIP freeze is on September 30 with a target release date of
> November
> > 6.
> > >
> > > Let me know if there are any objections.
> > >
> > > Thanks,
> > > Bill Bejeck
> > >
> > > On Fri, Aug 14, 2020 at 4:01 PM John Roesler 
> > wrote:
> > >
> > >> Thanks, Bill!
> > >> -John
> > >>
> > >> On Thu, 2020-08-13 at 15:19 -0700, Ismael Juma wrote:
> > >>> Thanks for volunteering Bill. :)
> > >>>
> > >>> Ismael
> > >>>
> > >>> On Thu, Aug 13, 2020 at 3:13 PM Bill Bejeck 
> > wrote:
> > >>>
> >  Hi All,
> > 
> >  I'd like to volunteer to be the release manager for our next feature
> >  release, 2.7. If there are no objections, I'll send out the release
> > >> plan
> >  soon.
> > 
> >  Thanks,
> >  Bill Bejeck
> > 
> > >>
> > >>
> > >
> >
>


[jira] [Resolved] (KAFKA-5636) Add Sliding-Window support for Aggregations

2020-09-08 Thread Leah Thomas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5636?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leah Thomas resolved KAFKA-5636.

Resolution: Fixed

> Add Sliding-Window support for Aggregations
> ---
>
> Key: KAFKA-5636
> URL: https://issues.apache.org/jira/browse/KAFKA-5636
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Michael G. Noll
>Assignee: Leah Thomas
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.7.0
>
>
> We support three windowing types for aggregations in the DSL right now:
>  * Tumbling windows
>  * Hopping windows (note: some stream processing tools call these "sliding 
> windows")
>  * Session windows
> Some users have expressed the need for sliding windows. While we do use 
> sliding windows for joins, we do not yet support sliding window aggregations 
> in the DSL



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2020-09-08 Thread Leah Thomas
Hey Sophie,

Are you advocating that we change the name back to *window.size.ms
<http://window.size.ms>*?

I wasn't thinking that by including the config we would steer people away
from using the constructors that pass in a window size, so in that sense I
suppose the config isn't "default." I think that by deprecating the
TimeWindowedSerde(innerClass) constructor, as you mentioned, we'll steer
clear from some of the issues you brought up. In my mind, the first tier of
usage is to set the window size directly, and then after that is to use the
config, and if neither of those happen, then we would fall back on
Long.MAX_VALUE. I suppose in this sense, "default" is an imprecise term,
but indicates that if the proper constructor/set up is not used, there will
be a "default" window size that the user still sets. It seems like the
console consumer has us somewhat stuck with a new config, but I agree that
the new constructors should be relied upon before the config is. On that
line, what about a name like *backup.window.size.ms
<http://backup.window.size.ms>* instead of default?

Cheers,
Leah

On Tue, Sep 8, 2020 at 12:50 PM Sophie Blee-Goldman 
wrote:

> Hey Guozhang & Leah,
>
> I want to push back a bit on the assumption that we would fall back on this
> config
> in the case of an unspecified window size in a Streams serde. I don't think
> it should
> be a default at all, either in name or in effect. To borrow the rhetorical
> question that
> John raised earlier: what is the default window size of an application?
>
> Personally, I agree that that doesn't make much sense. Sure, if you only
> have a single
> windowed operation in your app then you could just specify the window size
> by config,
> but how is that any more ergonomic than specifying the window size in the
> Serde's
> constructor? If anything, it seems worse to put physical and mental
> distance between
> the specification and the actual usage of such parameters. What if you add
> another
> windowed operation later, with a different size, and forget to specify the
> new size in
> the new Serde? Or what if you never specify a default window size config at
> all and
> accidentally end up using the default config value of Long.MAX_VALUE?
> Avoiding this
> possibility was/is one of the main motivations of this KIP, and the whole
> point of
> deprecating the TimeWindowedSerde(innerClass) constructor.
>
> I actually would have advocated to remove this config entirely, but as John
> pointed
> out, we still need it to configure things like the console consumer
>
> On Tue, Sep 8, 2020 at 10:40 AM Leah Thomas  wrote:
>
> > Hi Guozhang,
> >
> > Yes, the config would read them as a single window size. I think this
> > relates to John's comments about having variably sized windows, which
> this
> > config doesn't handle. I like the name change and updated the wiki to
> > reflect that, and to clarify that the default value will still be
> > Long.MAX_VALUE.
> >
> > Thanks for your feedback!
> > Leah
> >
> > On Tue, Sep 8, 2020 at 11:54 AM Guozhang Wang 
> wrote:
> >
> > > Hello Leah,
> > >
> > > Thanks for initiating this. I just have one minor clarification
> question
> > > here: the config "window.size.ms" seems to be used as the default
> window
> > > size when reading from a topic that represents windowed records right?
> > I.e.
> > > if there are multiple topics that represent windowed records but their
> > > window sizes are different, with this config we can only read them
> with a
> > > single window size? If yes, could we rename the config as "
> > > default.window.size.ms" and make that clear in the description as
> well?
> > > Also we'd better also include its default value which I think would
> still
> > > be MAX_VALUE for compatibility.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Sep 8, 2020 at 9:38 AM Leah Thomas 
> wrote:
> > >
> > > > Hey all,
> > > >
> > > > We should be good to wrap up voting now that the discussion has been
> > > > resolved.
> > > >
> > > > Cheers,
> > > > Leah
> > > >
> > > > On Wed, Sep 2, 2020 at 7:23 PM Matthias J. Sax 
> > wrote:
> > > >
> > > > > +1 (binding)
> > > > >
> > > > > On 8/26/20 8:02 AM, John Roesler wrote:
> > > > > > Hi all,
> > > > > >
> > > > > > I've just sent a new message to the DISCUSS thread. We
> > > > > > for

Re: [VOTE] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2020-09-08 Thread Leah Thomas
Hi Guozhang,

Yes, the config would read them as a single window size. I think this
relates to John's comments about having variably sized windows, which this
config doesn't handle. I like the name change and updated the wiki to
reflect that, and to clarify that the default value will still be
Long.MAX_VALUE.

Thanks for your feedback!
Leah

On Tue, Sep 8, 2020 at 11:54 AM Guozhang Wang  wrote:

> Hello Leah,
>
> Thanks for initiating this. I just have one minor clarification question
> here: the config "window.size.ms" seems to be used as the default window
> size when reading from a topic that represents windowed records right? I.e.
> if there are multiple topics that represent windowed records but their
> window sizes are different, with this config we can only read them with a
> single window size? If yes, could we rename the config as "
> default.window.size.ms" and make that clear in the description as well?
> Also we'd better also include its default value which I think would still
> be MAX_VALUE for compatibility.
>
>
> Guozhang
>
>
> On Tue, Sep 8, 2020 at 9:38 AM Leah Thomas  wrote:
>
> > Hey all,
> >
> > We should be good to wrap up voting now that the discussion has been
> > resolved.
> >
> > Cheers,
> > Leah
> >
> > On Wed, Sep 2, 2020 at 7:23 PM Matthias J. Sax  wrote:
> >
> > > +1 (binding)
> > >
> > > On 8/26/20 8:02 AM, John Roesler wrote:
> > > > Hi all,
> > > >
> > > > I've just sent a new message to the DISCUSS thread. We
> > > > forgot to include the Scala API in the proposal.
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > > On Mon, 2020-08-24 at 18:00 -0700, Sophie Blee-Goldman
> > > > wrote:
> > > >> Thanks for the KIP! +1 (non-binding)
> > > >>
> > > >> Sophie
> > > >>
> > > >> On Mon, Aug 24, 2020 at 5:06 PM John Roesler 
> > > wrote:
> > > >>
> > > >>> Thanks Leah,
> > > >>> I’m +1 (binding)
> > > >>>
> > > >>> -John
> > > >>>
> > > >>> On Mon, Aug 24, 2020, at 16:54, Leah Thomas wrote:
> > > >>>> Hi everyone,
> > > >>>>
> > > >>>> I'd like to kick-off the vote for KIP-659: Improve
> > > >>>> TimeWindowedDeserializer
> > > >>>> and TimeWindowedSerde to handle window size.
> > > >>>>
> > > >>>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size
> > > >>>> Thanks,
> > > >>>> Leah
> > > >>>>
> > > >
> > >
> > >
> >
>
>
> --
> -- Guozhang
>


Re: [VOTE] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2020-09-08 Thread Leah Thomas
Hey all,

We should be good to wrap up voting now that the discussion has been
resolved.

Cheers,
Leah

On Wed, Sep 2, 2020 at 7:23 PM Matthias J. Sax  wrote:

> +1 (binding)
>
> On 8/26/20 8:02 AM, John Roesler wrote:
> > Hi all,
> >
> > I've just sent a new message to the DISCUSS thread. We
> > forgot to include the Scala API in the proposal.
> >
> > Thanks,
> > -John
> >
> > On Mon, 2020-08-24 at 18:00 -0700, Sophie Blee-Goldman
> > wrote:
> >> Thanks for the KIP! +1 (non-binding)
> >>
> >> Sophie
> >>
> >> On Mon, Aug 24, 2020 at 5:06 PM John Roesler 
> wrote:
> >>
> >>> Thanks Leah,
> >>> I’m +1 (binding)
> >>>
> >>> -John
> >>>
> >>> On Mon, Aug 24, 2020, at 16:54, Leah Thomas wrote:
> >>>> Hi everyone,
> >>>>
> >>>> I'd like to kick-off the vote for KIP-659: Improve
> >>>> TimeWindowedDeserializer
> >>>> and TimeWindowedSerde to handle window size.
> >>>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size
> >>>> Thanks,
> >>>> Leah
> >>>>
> >
>
>


Re: [DISCUSS] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2020-09-02 Thread Leah Thomas
Hey John,

I see what you say about the console consumer in particular. I don't think
that adding the extra config would *hurt* at all, so I'm good with keeping
that in the KIP. I re-updated the KIP proposal to include the configs.

The serde resolution sounds good to me as well, I added a few lines in the
KIP about logging an error when the *timeWindowedSerde *implicit is called.

Let me know if there are any other concerns, else I'll resume voting.

Cheers,
Leah

On Tue, Sep 1, 2020 at 11:17 AM John Roesler  wrote:

> Hi Leah and Sophie,
>
> Sorry for the delayed response.
>
> You can pass in pre-instantiated (and therefore arbirarily
> constructed) deserializers to the KafkaConsumer. However,
> this doesn't mean we should drop the configs. The same
> argument for dropping the configs implies that the consumer
> shouldn't have configs for setting the deserializers at all.
> This doesn't sound right, and I'm asking myself why. The
> most likely answer seems to me to be that you sometimes
> create a Consumer without invoking the Java constructor at
> all. For example, when you use the console-consumer. In that
> case, it would be indispensible to be able to fully
> configure the deserializers via a properties file.
>
> Therefore, I think we should go ahead and propose the new
> config. (Sorry for the flip-flop, Leah)
>
> Regarding the implicits, Leah's conclusion sounds good to
> me. Yuriy is not adding any implicit for this serde to the
> new class, and we'll just add an ERROR log to the existing
> implicit. Once KIP-616 is merged, the existing implicit will
> be deprecated along with all the other implicits in that
> class, so there will be two "forces" pushing people to the
> new interface, where they will discover the lack of an
> implicit, which then forces them to call the non-deprecated
> constructors directly.
>
> To answer Sophie's question, "implicit" is a feature of
> Scala that allows the type system to automatically resolve
> method arguments when there is just one possible argument in
> scope. There's a bunch of docs for it, so I won't waste a
> ton of e-ink on the details; the docs will be crystal clear
> just assuming you know all about monads and monoids and
> type-level programming ;)
>
> The punch line for us is that we provide implicits for the
> basic serdes, and also for turning pairs of
> serializers/deserializers into serdes, so you can avoid
> explicitly passing any serdes into Streams DSL operations,
> but also not have to fall back on the default key/value
> serde configs. Instead, the type system will plug in the
> right serde for the K/V types at each operation.
>
> We would _not_ add an implicit for a serde that we can't
> construct in a context-free way using just type information,
> as in this case. That's why Yuriy dropped the new implicit
> and why we're going to add an error to the existing
> implicit. On the other hand, removing the existing implicit
> will cause compiler errors when the type system is no longer
> able to find a suitable argument for an implicit parameter,
> so we don't want to just remove the existing implicit.
>
> Thanks,
> -John
>
> On Mon, 2020-08-31 at 16:28 -0500, Leah Thomas wrote:
> > Hey Sophie,
> >
> > Thanks for the catch! It makes sense that the consumer would accept a
> > deserializer somewhere, so we can definitely skip the additional
> configs. I
> > updated the KIP to reflect that.
> >
> > John seems to know Scala better than I do as well, but I think we need to
> > keep the current implicit that allows users to just pass in a serde and
> no
> > window size for backwards compatibility. It seems to me that based on the
> > discussion around KIP-616 <https://github.com/apache/kafka/pull/8955>;,
> we
> > can pretty easily do John's third suggestion for handling this implicit:
> > logging an error message and passing to a non-deprecated constructor
> using
> > some default value. It seems from KIP-616 that most scala users will use
> > the new Serdes class anyways, and Yuriy is just removing these implicits
> so
> > it seems like whatever fix we decide for this class won't get used too
> > heavily.
> >
> > Cheers,
> > Leah
> >
> > On Thu, Aug 27, 2020 at 8:49 PM Sophie Blee-Goldman  >
> > wrote:
> >
> > > Ok I'm definitely feeling pretty dumb now, but I was just thinking how
> > > ridiculous
> > > it is that the Consumer forces you to configure your Deserializer
> through
> > > actual
> > > config maps instead of just taking the ones you pass in directly. So I
> > > thought
> > > "why not just fix the Cons

Re: [DISCUSS] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2020-08-31 Thread Leah Thomas
Hey Sophie,

Thanks for the catch! It makes sense that the consumer would accept a
deserializer somewhere, so we can definitely skip the additional configs. I
updated the KIP to reflect that.

John seems to know Scala better than I do as well, but I think we need to
keep the current implicit that allows users to just pass in a serde and no
window size for backwards compatibility. It seems to me that based on the
discussion around KIP-616 <https://github.com/apache/kafka/pull/8955>, we
can pretty easily do John's third suggestion for handling this implicit:
logging an error message and passing to a non-deprecated constructor using
some default value. It seems from KIP-616 that most scala users will use
the new Serdes class anyways, and Yuriy is just removing these implicits so
it seems like whatever fix we decide for this class won't get used too
heavily.

Cheers,
Leah

On Thu, Aug 27, 2020 at 8:49 PM Sophie Blee-Goldman 
wrote:

> Ok I'm definitely feeling pretty dumb now, but I was just thinking how
> ridiculous
> it is that the Consumer forces you to configure your Deserializer through
> actual
> config maps instead of just taking the ones you pass in directly. So I
> thought
> "why not just fix the Consumer to allow passing in an actual Deserializer
> object"
> and went to go through the code in case there's some legitimate reason why
> not,
> and what do you know. You actually can pass in an actual Deserializer
> object!
> There is a KafkaConsumer constructor that accepts a key and value
> Deserializer,
> and doesn't instantiate or configure a new one if provided in this way.
> Duh.
>
> Sorry for misleading everyone on that front. I'm just happy to find out
> that a
> reasonable way of configuring deserializer actually *is *possible after
> all. In that
> case, maybe we can remove the extra configs from this KIP and just proceed
> with the deprecation?
>
> Obviously that doesn't help anything with regards to the remaining question
> that
> John/Leah have posed. Now I probably don't have anything valuable to offer
> there
> since I know next to nothing about Scala, but I do want to
> better understand: why
> would we add an "implicit" (what exactly does this mean?) that relies on
> allowing
> users to not set the windowSize, if we are explicitly taking away that
> option from
> the Java users? Or if we have already added something, can't we just
> deprecate
> it like we are deprecating the Java constructor? I may need some remedial
> lessons
> in Scala just to understand the problem that we apparently have, because I
> don't
> get it.
>
> By the way, I'm a little tempted to say that we should go one step further
> and
> deprecate the DEFAULT_WINDOWED_INNER_CLASS configs, but maybe that's
> a bit too radical for the moment. It just seems like default serde configs
> have been
> a lot more trouble than they're worth overall. That said, these particular
> configs
> don't appear to have hurt anyone thus far, at least not that we know of
> (possibly
> because no one is using it anyway) so there's no strong motivation to do so
>
> On Wed, Aug 26, 2020 at 9:19 AM Leah Thomas  wrote:
>
> > Hey John,
> >
> > Thanks for pointing this out, I wasn't sure how to handle the Scala
> > changes.
> >
> > I'm not fully versed in the Scala version of Streams, so feel free to
> > correct me if any of my assumptions are wrong. I think logging an error
> > message and then calling the constructor that requires a windowSize seems
> > like the simplest fix from my point of view. So instead of
> > calling`TimeWindowedSerde(final Serde inner)`, we could
> > call `TimeWindowedSerde(final Serde inner, final long windowSize)`
> with
> > Long.MAX_VALUE as the window size.
> >
> > I do feel like we would want to add an implicit to `Serdes.scala` that
> > takes a serde and a window size so that users can access the constructor
> > that initializes with the correct window size. I agree with your comment
> on
> > the KIP-616 PR that the serde needs to be pre-configured when it's
> passed,
> > but I'm not sure we would need a windowSize config. I think if the
> > constructor is passed the serde and the window size, then window size
> > should be set within the deserializer. The only catch is if the Scala
> > version of the consumer creates a new deserializer, and at that point
> we'd
> > need a window size config, but I'm not sure if that's the case.
> >
> > WDYT - is it possible to alter the existing implicit and add a new one?
> >
> > On Wed, Aug 26, 2020 at 10:00 AM John Roesler 
> wrote:
> >
> > > Hi Leah,
> > >
> > > I was just reviewing th

Re: [DISCUSS] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2020-08-26 Thread Leah Thomas
Hey John,

Thanks for pointing this out, I wasn't sure how to handle the Scala changes.

I'm not fully versed in the Scala version of Streams, so feel free to
correct me if any of my assumptions are wrong. I think logging an error
message and then calling the constructor that requires a windowSize seems
like the simplest fix from my point of view. So instead of
calling`TimeWindowedSerde(final Serde inner)`, we could
call `TimeWindowedSerde(final Serde inner, final long windowSize)` with
Long.MAX_VALUE as the window size.

I do feel like we would want to add an implicit to `Serdes.scala` that
takes a serde and a window size so that users can access the constructor
that initializes with the correct window size. I agree with your comment on
the KIP-616 PR that the serde needs to be pre-configured when it's passed,
but I'm not sure we would need a windowSize config. I think if the
constructor is passed the serde and the window size, then window size
should be set within the deserializer. The only catch is if the Scala
version of the consumer creates a new deserializer, and at that point we'd
need a window size config, but I'm not sure if that's the case.

WDYT - is it possible to alter the existing implicit and add a new one?

On Wed, Aug 26, 2020 at 10:00 AM John Roesler  wrote:

> Hi Leah,
>
> I was just reviewing the PR for KIP-616 and realized that we
> forgot to mention the Scala API in your KIP. We should
> consider it because `scala.Serdes.timeWindowedSerde` is
> implicitly using the exact constructor you're deprecating.
>
> I had some ideas in the code review:
> https://github.com/apache/kafka/pull/8955#discussion_r477358755
>
> What do you think is the best approach?
>
> Concretely, I think Yuriy can make the call for KIP-616 (for
> the new implicit that he's adding). But I think your KIP-659
> should mention how we modify the existing implicit.
>
> Typically, we'd try to avoid throwing new exceptions or
> causing compile errors, so
> * dropping the implicit is probably off the table (compile
> error).
> * throwing an exception in the deserializer may not be ok,
> althought it might still actually be ok since it's adding a
> corruption check.
> * logging an ERROR message and then passing through to the
> underlying deserializer would be more conservative.
>
> What do you think we should do?
>
> Thanks,
> -John
>
> On Fri, 2020-08-21 at 16:05 -0500, Leah Thomas wrote:
> > Thanks for the typo catch, John.
> >
> > Let me know if anyone else has thoughts or ideas.
> >
> > Cheers,
> > Leah
> >
> > On Fri, Aug 21, 2020 at 2:50 PM John Roesler 
> wrote:
> >
> > > Thanks, all,
> > >
> > > Based on my reading of the conversation, it sounds like I
> > > have some legwork to do in KIP-645, but our collective
> > > instinct is that Leah's proposal doesn't need to change to
> > > account for whatever we might decide to do in KIP-645.
> > >
> > > I have no further concerns about KIP-645, and I think it's a
> > > good proposal.
> > >
> > > Thanks,
> > > -John
> > >
> > > P.s., there's still a typo on the wiki that says
> > > "ConsumerConfig" on the code block, even though the text now
> > > says "StreamsConfig".
> > >
> > >
> > > On Fri, 2020-08-21 at 10:56 -0700, Sophie Blee-Goldman
> > > wrote:
> > > > Just want to make a quick comment on the question that John raised
> about
> > > > whether we
> > > > should introduce a separate config for "key" and "value" window
> sizes:
> > > >
> > > > My short answer is No, I don't think that's necessary. First of all,
> as
> > > you
> > > > said, there is no
> > > > first-class concept of a "Windowed value" in the DSL. Second, to
> engage
> > > in
> > > > your rhetorical
> > > > question, if there's no default window size for a Streams program
> then
> > > how
> > > > can there be a
> > > > sensible default for the key AND a separate sensible default for a
> value?
> > > >
> > > > I don't think we need to follow the existing pattern if it doesn't
> make
> > > > sense, and to be honest
> > > > I'm a bit skeptical that anyone was even using these default windowed
> > > inner
> > > > classes since
> > > > the config wasn't even defined/documented until quite recently. I'd
> > > > actually be in favor
> > > > of deprecating StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
> > > > but I don't want t

[VOTE] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2020-08-24 Thread Leah Thomas
Hi everyone,

I'd like to kick-off the vote for KIP-659: Improve TimeWindowedDeserializer
and TimeWindowedSerde to handle window size.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size

Thanks,
Leah


Re: [DISCUSS] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2020-08-21 Thread Leah Thomas
Thanks for the typo catch, John.

Let me know if anyone else has thoughts or ideas.

Cheers,
Leah

On Fri, Aug 21, 2020 at 2:50 PM John Roesler  wrote:

> Thanks, all,
>
> Based on my reading of the conversation, it sounds like I
> have some legwork to do in KIP-645, but our collective
> instinct is that Leah's proposal doesn't need to change to
> account for whatever we might decide to do in KIP-645.
>
> I have no further concerns about KIP-645, and I think it's a
> good proposal.
>
> Thanks,
> -John
>
> P.s., there's still a typo on the wiki that says
> "ConsumerConfig" on the code block, even though the text now
> says "StreamsConfig".
>
>
> On Fri, 2020-08-21 at 10:56 -0700, Sophie Blee-Goldman
> wrote:
> > Just want to make a quick comment on the question that John raised about
> > whether we
> > should introduce a separate config for "key" and "value" window sizes:
> >
> > My short answer is No, I don't think that's necessary. First of all, as
> you
> > said, there is no
> > first-class concept of a "Windowed value" in the DSL. Second, to engage
> in
> > your rhetorical
> > question, if there's no default window size for a Streams program then
> how
> > can there be a
> > sensible default for the key AND a separate sensible default for a value?
> >
> > I don't think we need to follow the existing pattern if it doesn't make
> > sense, and to be honest
> > I'm a bit skeptical that anyone was even using these default windowed
> inner
> > classes since
> > the config wasn't even defined/documented until quite recently. I'd
> > actually be in favor
> > of deprecating StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
> > but I don't want to drag that into this discussion as well.
> >
> > My understanding is that these were meant to mirror the default key/value
> > serde configs, but
> > the real use of the DEFAULT_WINDOWED_SERDE_INNER_CLASS config is actually
> > that you
> > can at least use it to configure the inner class for a Consumer, thus
> > making the TimeWindowed
> > serdes functional at a basic level. With the window size configs, the
> point
> > is not really to set a
> > default but to make it actually work with a Consumer which instantiates
> the
> > deserializer by
> > reflection. So I don't think we should position this new config as a
> > "default" (although it may
> > technically behave as one) -- within Streams users can and should always
> > supply the window
> > size through the constructor. I don't think that's such an inconvenience,
> > vs the amount of
> > confusion that will (and has) been caused by default serde-related
> configs
> > in streams.
> >
> > Regarding the fixed vs variable sized config, one idea I had was to just
> > keep the fixed-size config
> > and constructor and let users of enumerable windows override the
> > TimeWindowedSerde class(es)
> > to do whatever it is they need. IIUC you already have to override some
> > other windows-related
> > classes to get variable-sized windows so doing the same for the serdes
> > sounds reasonable to me.
> > Just my take on the "simple things should be easy, difficult things
> should
> > be possible" mantra
> >
> > One last quick side note: the reason we don't really need to discuss
> > SessionWindows here
> > is that they already encode both the start and end time for the window.
> > This is probably the best
> > way to go for TimeWindows as well, but making this change in a backwards
> > compatible way is a
> > much larger scope of work. And even then, we might want to consider
> making
> > it possible to still
> > just encode the start time to save space, thus requiring this config
> either
> > way
> >
> > On Fri, Aug 21, 2020 at 9:26 AM Leah Thomas 
> wrote:
> >
> > > Thanks John and Walker for your thoughts.
> > >
> > > I agree with your two scenarios John, that you configure fully in the
> > > constructor, or you don't need to call `init()`. IIUC, if we pass the
> > > deserializer to the consumer, we want to make sure it has the window
> size
> > > is set using the newly required constructor. If we don't pass in the
> > > deserializer, the window size will be set through the configs. To
> answer
> > > Walker's question directly, because the configs aren't passed to the
> > > constructor, we can't set the window size unless we pass it to the
> > > constructor or configure the constructor 

Re: [DISCUSS] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2020-08-21 Thread Leah Thomas
one that takes a window function, mapping a window start
> time to a full Window(start, end).
>
> In that context, it seems incongruous to introduce a
> configuration that specifies a window size. Of course, my
> KIP is also under discussion, so my proposal may not
> eventually be accepted. But it is necessary to consider both
> of these concerns together.
>
> One option seems to be to accept both. Namely, we keep the
> "fixed size" constructor AND add my new constructor (for
> variably sized windows). Likewise, we accept your proposal,
> and KIP-659 would propose to add a new config specifying a
> windowing function, such as:
>
> > StreamsConfig.WINDOW_FUNCTION_CONFIG
>
> which would be an instance of:
>
> > public interface WindowFunction implements Function Window>;
>
> I'm not bringing these up for discussion in your KIP right
> now, just demonstrating the feasibility of merging both
> proposals.
>
> My question for you: do you think the general strategy of
> having two constructors and two configs, one for fixed and
> one for variable windows, makes sense? Is it too
> complicated? Do you have a better idea?
>
> Thanks!
> -John
>
> On Thu, 2020-08-20 at 14:49 -0700, Walker Carlson wrote:
> > Hi Leah,
> >
> > Could you explain a bit more why we do not wish to
> > let TimeWindowedDeserializer and WindowedSerdes be created without a
> > specified time as a parameter?
> >
> > I understand the long.MAX_VALUE could cause problems but would it not be
> a
> > good idea to have a usable default or fetch from the config if available?
> > After all you are proposing to add "window.size.ms"
> >
> > We definitely need a fix to this problem and adding "window.size.ms"
> makes
> > sense to me.
> >
> > Thanks for the KIP,
> > Walker
> >
> > On Thu, Aug 20, 2020 at 2:22 PM Leah Thomas 
> wrote:
> >
> > > Hi all,
> > >
> > > I'd like to start a discussion for KIP-659:
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size
> > >
> > >
> > > The goal of the KIP is to ensure that window size is passed to the
> consumer
> > > when needed, which will generally be for testing purposes, and to avoid
> > > runtime errors when the *TimeWindowedSerde* is created without a window
> > > size.
> > >
> > > Looking forward to hearing your feedback.
> > >
> > > Cheers,
> > > Leah
> > >
>
>


[DISCUSS] KIP-659: Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2020-08-20 Thread Leah Thomas
Hi all,

I'd like to start a discussion for KIP-659:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size


The goal of the KIP is to ensure that window size is passed to the consumer
when needed, which will generally be for testing purposes, and to avoid
runtime errors when the *TimeWindowedSerde* is created without a window
size.

Looking forward to hearing your feedback.

Cheers,
Leah


[jira] [Created] (KAFKA-10366) TimeWindowedDeserializer doesn't allow users to set a custom window size

2020-08-05 Thread Leah Thomas (Jira)
Leah Thomas created KAFKA-10366:
---

 Summary: TimeWindowedDeserializer doesn't allow users to set a 
custom window size
 Key: KAFKA-10366
 URL: https://issues.apache.org/jira/browse/KAFKA-10366
 Project: Kafka
  Issue Type: Bug
Reporter: Leah Thomas
Assignee: Leah Thomas


Related to [KAFKA-4468|https://issues.apache.org/jira/browse/KAFKA-4468], in 
timeWindowedDeserializer Long.MAX_VALUE is used as _windowSize_ for any 
deserializer that uses the default constructor. While streams apps can pass in 
a window size in serdes or while creating a timeWindowedDeserializer, the 
deserializer that is actually used in processing the messages is created by the 
Kafka consumer, without passing in the set windowSize. The deserializer the 
consumer creates uses the configs, but as there is no config for windowSize, 
the window size is always default.

See _KStreamAggregationIntegrationTest #ShouldReduceWindowed()_ as an example 
of this issue. Despite passing in the windowSize to both the serdes and the 
timeWindowedDeserializer, the window size is set to Long.MAX_VALUE. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-450: Sliding Window Aggregations in the DSL

2020-07-31 Thread Leah Thomas
Thanks for the votes and conversation! The KIP has been accepted with 3
binding (Matthias, Guozhang, John) and 2 non-binding votes (Sophie and
Jorge)

Cheers,
Leah

On Wed, Jul 29, 2020 at 8:57 AM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Thanks Leah!
> This will be a great addition.
>
> +1 (non-binding)
>
> Very happy that KIP-617 is being used already :D
>
> Cheers,
> Jorge.
>
> On Wed, Jul 29, 2020 at 2:28 PM John Roesler  wrote:
>
> > Thanks for the awesome KIP, Leah,
> >
> > I’m +1 (binding)
> >
> > Thanks,
> > John
> >
> > On Tue, Jul 28, 2020, at 19:10, Guozhang Wang wrote:
> > > +1 (binding)
> > >
> > > On Tue, Jul 28, 2020 at 4:44 PM Matthias J. Sax 
> > wrote:
> > >
> > > > +1 (binding)
> > > >
> > > > On 7/28/20 4:35 PM, Sophie Blee-Goldman wrote:
> > > > > Thanks for the KIP! It's been an enlightening discussion
> > > > >
> > > > > +1 (non-binding)
> > > > >
> > > > > Sophie
> > > > >
> > > > > On Tue, Jul 28, 2020 at 8:03 AM Leah Thomas 
> > > > wrote:
> > > > >
> > > > >> Hi all,
> > > > >>
> > > > >> I'd like to kick-off the vote for KIP-450
> > > > >> <
> > > > >>
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> > > > >>> ,
> > > > >> adding sliding window aggregations to the DSL. The discussion
> > thread is
> > > > >> here
> > > > >> <
> > > > >>
> > > >
> >
> http://mail-archives.apache.org/mod_mbox/kafka-dev/202007.mbox/%3ccabug4nfjkrroe_rf4ht2p1wu1pt7o-qd74h_0l7a4bnsmmg...@mail.gmail.com%3e
> > > > >>>
> > > > >> .
> > > > >>
> > > > >> Cheers,
> > > > >> Leah
> > > > >>
> > > > >
> > > >
> > > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>


Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-29 Thread Leah Thomas
Thanks for the nits Matthias, I've updated the examples and language
accordingly.

Leah

On Tue, Jul 28, 2020 at 6:43 PM Matthias J. Sax  wrote:

> Thanks Leah. Overall LGTM.
>
> A few nits:
>
> - the first figure shows window [9,19] but the window is not aligned
> properly (it should be 1ms to the right; right now, it's aligned to
> window [8,18])
>
> - in the second figure, a hopping window would create more windows, ie,
> the first window would be [-6,14) and the last window would be [19,29),
> thus it's not just 10 windows but 26 windows (if I did not miss count)
>
> - "Two new windows will be created by the late record"
>
> late -> out-of-order
>
>
> -Matthias
>
>
>
> On 7/28/20 4:34 PM, Sophie Blee-Goldman wrote:
> > Thanks for the update Leah -- I think that all makes sense.
> >
> > Cheers,
> > Sophie
> >
> > On Tue, Jul 28, 2020 at 3:55 PM Leah Thomas 
> wrote:
> >
> >> Another minor tweak, instead of defining the window by the *size*, it
> will
> >> be defined by *timeDifference*, which is the maximum time difference
> >> between two events. This is a more precise way to define a window due to
> >> its inclusive ends, while allowing the user to create the window they
> >> expect. This definition fits with the current examples, where a record
> at
> >> *10* would fall into a window of time difference *5* from *[5,10]*. This
> >> window contains any records at 5, 6, 7, 8, 9, and 10, which is 6
> instances
> >> instead of 5. This semantic difference is why I've shifted *size* to
> >> *timeDifference*.
> >>
> >> The new builder will be *withTimeDifferenceAndGrace*, keeping with other
> >> conventions.
> >>
> >> Let me know if there are any concerns! The vote thread is open as well
> >> here:
> >> http://mail-archives.apache.org/mod_mbox/kafka-dev/202007.mbox/browser
> >>
> >> Best,
> >> Leah
> >>
> >> On Mon, Jul 27, 2020 at 3:58 PM Leah Thomas 
> wrote:
> >>
> >>> A small tweak - to make it more clear to users that grace is required,
> as
> >>> well as cleaning up some of the confusing grammar semantics of windows,
> >> the
> >>> main builder method for *slidingWindows* will be *withSizeAndGrace*
> >> instead
> >>> of *of*.  This looks like it'll be the last change (for now) on the
> >>> public API. If anyone has any comments or thoughts let me know.
> >> Otherwise,
> >>> I'll take this to vote shortly.
> >>>
> >>> Best,
> >>> Leah
> >>>
> >>> On Fri, Jul 24, 2020 at 3:45 PM Leah Thomas 
> >> wrote:
> >>>
> >>>> To accommodate the change to a final class, I've added another
> >>>> *windowedBy()* function in *CogroupedKStream.java *to handle
> >>>> SlidingWindows.
> >>>>
> >>>> As far as the discussion goes, I think this is the last change we've
> >>>> talked about. If anyone has other comments or concerns, please let me
> >> know!
> >>>>
> >>>> Cheers,
> >>>> Leah
> >>>>
> >>>> On Thu, Jul 23, 2020 at 7:34 PM Leah Thomas 
> >> wrote:
> >>>>
> >>>>> Thanks for the discussion about extending TimeWindows. I agree that
> >>>>> making it future proof is important, and the implementation of
> >>>>> SlidingWindows is unique enough that it seems logical to make it its
> >> own
> >>>>> final class.
> >>>>>
> >>>>> On that note, I've updated the KIP to make SlidingWindows a stand
> alone
> >>>>> final class, and added the *windowedBy() *API in *KGroupedStream *to
> >>>>> handle SlidingWindows. It seems that SlidingWindows would still be
> >> able to
> >>>>> leverage *TimeWindowedKStream by* creating a SlidingWindows version
> of
> >>>>> *TimeWindowedKStreamImpl* that implements *TimeWindowedKStream. *If
> >>>>> anyone sees issues with this implementation, please let me know.
> >>>>>
> >>>>> Best,
> >>>>> Leah
> >>>>>
> >>>>> On Wed, Jul 22, 2020 at 10:47 PM John Roesler 
> >>>>> wrote:
> >>>>>
> >>>>>> Thanks for the reply, Sophie.
> >>>>>>
> >>>>>> That all sounds about right to me.
> >>>&g

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-28 Thread Leah Thomas
Another minor tweak, instead of defining the window by the *size*, it will
be defined by *timeDifference*, which is the maximum time difference
between two events. This is a more precise way to define a window due to
its inclusive ends, while allowing the user to create the window they
expect. This definition fits with the current examples, where a record at
*10* would fall into a window of time difference *5* from *[5,10]*. This
window contains any records at 5, 6, 7, 8, 9, and 10, which is 6 instances
instead of 5. This semantic difference is why I've shifted *size* to
*timeDifference*.

The new builder will be *withTimeDifferenceAndGrace*, keeping with other
conventions.

Let me know if there are any concerns! The vote thread is open as well
here: http://mail-archives.apache.org/mod_mbox/kafka-dev/202007.mbox/browser

Best,
Leah

On Mon, Jul 27, 2020 at 3:58 PM Leah Thomas  wrote:

> A small tweak - to make it more clear to users that grace is required, as
> well as cleaning up some of the confusing grammar semantics of windows, the
> main builder method for *slidingWindows* will be *withSizeAndGrace* instead
> of *of*.  This looks like it'll be the last change (for now) on the
> public API. If anyone has any comments or thoughts let me know. Otherwise,
> I'll take this to vote shortly.
>
> Best,
> Leah
>
> On Fri, Jul 24, 2020 at 3:45 PM Leah Thomas  wrote:
>
>> To accommodate the change to a final class, I've added another
>> *windowedBy()* function in *CogroupedKStream.java *to handle
>> SlidingWindows.
>>
>> As far as the discussion goes, I think this is the last change we've
>> talked about. If anyone has other comments or concerns, please let me know!
>>
>> Cheers,
>> Leah
>>
>> On Thu, Jul 23, 2020 at 7:34 PM Leah Thomas  wrote:
>>
>>> Thanks for the discussion about extending TimeWindows. I agree that
>>> making it future proof is important, and the implementation of
>>> SlidingWindows is unique enough that it seems logical to make it its own
>>> final class.
>>>
>>> On that note, I've updated the KIP to make SlidingWindows a stand alone
>>> final class, and added the *windowedBy() *API in *KGroupedStream *to
>>> handle SlidingWindows. It seems that SlidingWindows would still be able to
>>> leverage *TimeWindowedKStream by* creating a SlidingWindows version of
>>> *TimeWindowedKStreamImpl* that implements *TimeWindowedKStream. *If
>>> anyone sees issues with this implementation, please let me know.
>>>
>>> Best,
>>> Leah
>>>
>>> On Wed, Jul 22, 2020 at 10:47 PM John Roesler 
>>> wrote:
>>>
>>>> Thanks for the reply, Sophie.
>>>>
>>>> That all sounds about right to me.
>>>>
>>>> The Windows “interface”/algorithm is quite flexible, so it makes sense
>>>> for it to be extensible. Different implementations can (and do) enumerate
>>>> different windows to suit different use cases.
>>>>
>>>> On the other hand, I can’t think of any way to extend SessionWindows to
>>>> do something different using the same algorithm, so it makes sense for it
>>>> to stay final.
>>>>
>>>> If we think SlidingWindows is similarly not usefully extensible, then
>>>> we can make it final. It’s easy to remove final later, but adding it is not
>>>> possible. Or we could go the other route and just make it an interface, on
>>>> general principle. Both of these choices are safe API design.
>>>>
>>>> Thanks again,
>>>> John
>>>>
>>>> On Wed, Jul 22, 2020, at 21:54, Sophie Blee-Goldman wrote:
>>>> > >
>>>> > > Users could pass in a custom `SessionWindows` as
>>>> > > long as the session algorithm works correctly for it.
>>>> >
>>>> >
>>>> > Well not really, SessionWindows is a final class. TimeWindows is also
>>>> a
>>>> > final class, so neither of these can be extended/customized. For a
>>>> given
>>>> > Windows then there would only be three (non-overlapping)
>>>> possibilities:
>>>> > either it's TimeWindows, SlidingWindows, or a custom  Windows. I don't
>>>> > think there's any problem with determining what the user wants in
>>>> this case
>>>> > --
>>>> > we would just check if it's a SlidingWindows and connect the new
>>>> processor,
>>>> > or else connect the existing hopping/tumbling window processor.
>>>> >
>>>> > I'll a

[VOTE] KIP-450: Sliding Window Aggregations in the DSL

2020-07-28 Thread Leah Thomas
Hi all,

I'd like to kick-off the vote for KIP-450
,
adding sliding window aggregations to the DSL. The discussion thread is here

.

Cheers,
Leah


Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-27 Thread Leah Thomas
A small tweak - to make it more clear to users that grace is required, as
well as cleaning up some of the confusing grammar semantics of windows, the
main builder method for *slidingWindows* will be *withSizeAndGrace* instead
of *of*.  This looks like it'll be the last change (for now) on the public
API. If anyone has any comments or thoughts let me know. Otherwise, I'll
take this to vote shortly.

Best,
Leah

On Fri, Jul 24, 2020 at 3:45 PM Leah Thomas  wrote:

> To accommodate the change to a final class, I've added another
> *windowedBy()* function in *CogroupedKStream.java *to handle
> SlidingWindows.
>
> As far as the discussion goes, I think this is the last change we've
> talked about. If anyone has other comments or concerns, please let me know!
>
> Cheers,
> Leah
>
> On Thu, Jul 23, 2020 at 7:34 PM Leah Thomas  wrote:
>
>> Thanks for the discussion about extending TimeWindows. I agree that
>> making it future proof is important, and the implementation of
>> SlidingWindows is unique enough that it seems logical to make it its own
>> final class.
>>
>> On that note, I've updated the KIP to make SlidingWindows a stand alone
>> final class, and added the *windowedBy() *API in *KGroupedStream *to
>> handle SlidingWindows. It seems that SlidingWindows would still be able to
>> leverage *TimeWindowedKStream by* creating a SlidingWindows version of
>> *TimeWindowedKStreamImpl* that implements *TimeWindowedKStream. *If
>> anyone sees issues with this implementation, please let me know.
>>
>> Best,
>> Leah
>>
>> On Wed, Jul 22, 2020 at 10:47 PM John Roesler 
>> wrote:
>>
>>> Thanks for the reply, Sophie.
>>>
>>> That all sounds about right to me.
>>>
>>> The Windows “interface”/algorithm is quite flexible, so it makes sense
>>> for it to be extensible. Different implementations can (and do) enumerate
>>> different windows to suit different use cases.
>>>
>>> On the other hand, I can’t think of any way to extend SessionWindows to
>>> do something different using the same algorithm, so it makes sense for it
>>> to stay final.
>>>
>>> If we think SlidingWindows is similarly not usefully extensible, then we
>>> can make it final. It’s easy to remove final later, but adding it is not
>>> possible. Or we could go the other route and just make it an interface, on
>>> general principle. Both of these choices are safe API design.
>>>
>>> Thanks again,
>>> John
>>>
>>> On Wed, Jul 22, 2020, at 21:54, Sophie Blee-Goldman wrote:
>>> > >
>>> > > Users could pass in a custom `SessionWindows` as
>>> > > long as the session algorithm works correctly for it.
>>> >
>>> >
>>> > Well not really, SessionWindows is a final class. TimeWindows is also a
>>> > final class, so neither of these can be extended/customized. For a
>>> given
>>> > Windows then there would only be three (non-overlapping) possibilities:
>>> > either it's TimeWindows, SlidingWindows, or a custom  Windows. I don't
>>> > think there's any problem with determining what the user wants in this
>>> case
>>> > --
>>> > we would just check if it's a SlidingWindows and connect the new
>>> processor,
>>> > or else connect the existing hopping/tumbling window processor.
>>> >
>>> > I'll admit that last sentence does leave a bad taste in my mouth. Part
>>> of
>>> > that
>>> > is probably the "leaking" API Matthias pointed out; we just assume the
>>> > hopping/tumbling window implementation fits all custom windows. But I
>>> guess
>>> > if you really needed to customize the algorithm any further you may as
>>> well
>>> > stick in a transformer and do it all yourself.
>>> >
>>> > Anyways, given what we have, it does seem weird to apply one algorithm
>>> > for most Windows types and then swap in a different one for one
>>> specific
>>> > extension of Windows. So adding a new #windowedBy(SlidingWindows)
>>> > sounds reasonable to me.
>>> >
>>> > I'm still not convinced that we need a whole new TimeWindowedKStream
>>> > equivalent class for sliding windows though. It seems like the
>>> > hopping/tumbling
>>> > window implementation could benefit just as much from a subtractor as
>>> the
>>> > sliding windows so the only future-proofing we get is the ability to be
>>> >

Re: [VOTE] KIP-617: Allow Kafka Streams State Stores to be iterated backwards

2020-07-27 Thread Leah Thomas
Hi Jorge,

Looks great. +1 (non-binding)

Best,
Leah

On Thu, Jul 16, 2020 at 6:39 PM Sophie Blee-Goldman 
wrote:

> Hey Jorge,
>
> Thanks for the reminder -- +1 (non-binding)
>
> Cheers,
> Sophie
>
> On Thu, Jul 16, 2020 at 4:06 PM Jorge Esteban Quilcate Otoya <
> quilcate.jo...@gmail.com> wrote:
>
> > Bumping this vote thread to check if there's any feedback.
> >
> > Cheers,
> > Jorge.
> >
> > On Sat, Jul 4, 2020 at 6:20 PM John Roesler  wrote:
> >
> > > Thanks Jorge,
> > >
> > > I’m +1 (binding)
> > >
> > > -John
> > >
> > > On Fri, Jul 3, 2020, at 10:26, Jorge Esteban Quilcate Otoya wrote:
> > > > Hola everyone,
> > > >
> > > > I'd like to start a new thread to vote for KIP-617 as there have been
> > > > significant changes since the previous vote started.
> > > >
> > > > KIP wiki page:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-617%3A+Allow+Kafka+Streams+State+Stores+to+be+iterated+backwards
> > > >
> > > > Many thanks!
> > > >
> > > > Jorge.
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-24 Thread Leah Thomas
To accommodate the change to a final class, I've added another
*windowedBy()* function in *CogroupedKStream.java *to handle SlidingWindows.

As far as the discussion goes, I think this is the last change we've talked
about. If anyone has other comments or concerns, please let me know!

Cheers,
Leah

On Thu, Jul 23, 2020 at 7:34 PM Leah Thomas  wrote:

> Thanks for the discussion about extending TimeWindows. I agree that making
> it future proof is important, and the implementation of SlidingWindows is
> unique enough that it seems logical to make it its own final class.
>
> On that note, I've updated the KIP to make SlidingWindows a stand alone
> final class, and added the *windowedBy() *API in *KGroupedStream *to
> handle SlidingWindows. It seems that SlidingWindows would still be able to
> leverage *TimeWindowedKStream by* creating a SlidingWindows version of
> *TimeWindowedKStreamImpl* that implements *TimeWindowedKStream. *If
> anyone sees issues with this implementation, please let me know.
>
> Best,
> Leah
>
> On Wed, Jul 22, 2020 at 10:47 PM John Roesler  wrote:
>
>> Thanks for the reply, Sophie.
>>
>> That all sounds about right to me.
>>
>> The Windows “interface”/algorithm is quite flexible, so it makes sense
>> for it to be extensible. Different implementations can (and do) enumerate
>> different windows to suit different use cases.
>>
>> On the other hand, I can’t think of any way to extend SessionWindows to
>> do something different using the same algorithm, so it makes sense for it
>> to stay final.
>>
>> If we think SlidingWindows is similarly not usefully extensible, then we
>> can make it final. It’s easy to remove final later, but adding it is not
>> possible. Or we could go the other route and just make it an interface, on
>> general principle. Both of these choices are safe API design.
>>
>> Thanks again,
>> John
>>
>> On Wed, Jul 22, 2020, at 21:54, Sophie Blee-Goldman wrote:
>> > >
>> > > Users could pass in a custom `SessionWindows` as
>> > > long as the session algorithm works correctly for it.
>> >
>> >
>> > Well not really, SessionWindows is a final class. TimeWindows is also a
>> > final class, so neither of these can be extended/customized. For a given
>> > Windows then there would only be three (non-overlapping) possibilities:
>> > either it's TimeWindows, SlidingWindows, or a custom  Windows. I don't
>> > think there's any problem with determining what the user wants in this
>> case
>> > --
>> > we would just check if it's a SlidingWindows and connect the new
>> processor,
>> > or else connect the existing hopping/tumbling window processor.
>> >
>> > I'll admit that last sentence does leave a bad taste in my mouth. Part
>> of
>> > that
>> > is probably the "leaking" API Matthias pointed out; we just assume the
>> > hopping/tumbling window implementation fits all custom windows. But I
>> guess
>> > if you really needed to customize the algorithm any further you may as
>> well
>> > stick in a transformer and do it all yourself.
>> >
>> > Anyways, given what we have, it does seem weird to apply one algorithm
>> > for most Windows types and then swap in a different one for one specific
>> > extension of Windows. So adding a new #windowedBy(SlidingWindows)
>> > sounds reasonable to me.
>> >
>> > I'm still not convinced that we need a whole new TimeWindowedKStream
>> > equivalent class for sliding windows though. It seems like the
>> > hopping/tumbling
>> > window implementation could benefit just as much from a subtractor as
>> the
>> > sliding windows so the only future-proofing we get is the ability to be
>> > lazy and
>> > add the subtractor to one but not the other. Of course it would only be
>> an
>> > optimization so we could just not apply it to one type and nothing would
>> > break.
>> > It does make me nervous to go against the "future-proof" direction,
>> though.
>> > Are there any other examples of things we might want to add to one
>> window
>> > type but not to another?
>> >
>> > On another note, this actually brings up a new question: should
>> > SlidingWindows
>> > also be final? My take is "yes" since there's no reasonable
>> customization of
>> > sliding windows, at least not that I can think of. Thoughts?
>> >
>> >
>> > On Wed, Jul 22, 2020 at 7:15 PM John Roesler 
>> wrote:

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-23 Thread Leah Thomas
gt; > >> As a consequence, they can all rely on an aggregation maintenence
> > > algorithm
> > > > >> that involves enumerating each of the windows and updating it.
> That
> > > > >> also means that their DSL object (TimeWindowedKStream) doesn't
> need
> > > > >> "subtractors" or "mergers", but only "adders"; again, this is a
> > > consequence
> > > > >> of the fact that the windows are enumerable.
> > > > >
> > > > >
> > > > > Given that, I'm a bit confused why you conclude that sliding
> windows
> > > are
> > > > > fundamentally
> > > > > different from the "statically enumerable" windows -- sliding
> windows
> > > > > require only an
> > > > > adder too. I'm not sure it's a consequence of being enumerable, or
> that
> > > > > being enumerable
> > > > > is the fundamental property that unites all Windows (ignoring
> > > JoinWindows
> > > > > here). Yes,  it
> > > > > currently does apply to all Windows implementations, but we
> shouldn't
> > > > > assume that it
> > > > > *has *to be that way on the basis that it currently happens to be.
> > > > >
> > > > > Also, the fact that they can all rely on the same aggregation
> algorithm
> > > > > seems like an
> > > > > implementation detail and it would be weird to force a separate/new
> > > DSL API
> > > > > just because
> > > > > under the covers we swap in a different processor.
> > > > >
> > > > > To be fair, I don't think there's a strong reason *against* not
> > > extending
> > > > > Windows -- in the end
> > > > > it will just mean adding a new #windowedBy method and copy/pasting
> > > > > everything from
> > > > >  TimeWindowedKStream pretty much word for word. But anytime you
> find
> > > > > yourself
> > > > > copying over code almost exactly, there should probably be a good
> > > reason
> > > > > why :)
> > > > >
> > > > >
> > > > > On Wed, Jul 22, 2020 at 3:48 PM John Roesler 
> > > wrote:
> > > > >
> > > > >> Thanks Leah!
> > > > >>
> > > > >> 5) Regarding the empty windows, I'm wondering if we should simply
> > > propose
> > > > >> that the windows should not be emitted downstream of the operator
> or
> > > > >> visible in IQ. Then, it'll be up to the implementation to make it
> > > happen.
> > > > >> I'm
> > > > >> personally not concerned about it, since it seems like there are
> > > multiple
> > > > >> ways to accomplish this.
> > > > >>
> > > > >> Note, the discrepancy Matthias pointed out is actually a design
> bug.
> > > The
> > > > >> windowed aggregation (like every operation in Streams) produces a
> > > "view",
> > > > >> which then forms the basis of downstream operations. When we pass
> the
> > > > >> Materialized option to the operation, all we're doing is saying to
> > > > >> "materialize"
> > > > >> the view (aka, actually store the computed view) and also make it
> > > > >> queriable.
> > > > >> It would be illegal for the "queriable, materialized view" to
> differ
> > > in
> > > > >> any way
> > > > >> from the "view". So, it seems we must either propose to emit the
> empty
> > > > >> windows AND make them visible in IQ, or propose NOT to emit the
> empty
> > > > >> windows AND NOT make them visible in IQ.
> > > > >>
> > > > >> 7) Regarding whether we can extend TimeWindows (or Windows):
> > > > >> I've been mulling this over more. I think it's worth asking the
> > > question of
> > > > >> what these classes even mean. For example, why is SessionWindows a
> > > > >> different thing from TimeWindows and UniversalWindows (which are
> both
> > > > >> Windows)?
> > > > >>
> > > > >> This conversation is extra complicated because of the incomplete
> and
> > > > >> mis-matched class hierarchy, but 

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-22 Thread Leah Thomas
16,26] should be created but the figure shows the green window as [15,20].
>
> About the blue window: maybe add not that the blue window contains the
> aggregate we need for the green window, _before_ the new record `a` is
> added to the blue window.
>
>
>
> 7) I am not really happy to extend TimeWindows and I think the argument
> about JoinWindows is not the best (IMHO, JoinWindows do it already wrong
> and we just repeat the same mistake). However, it seems our window
> hierarchy is "broken" already and it might be out of scope for this KIP
> to fix it. Hence, I am ok that we bite the bullet for now and clean it
> up later.
>
>
>
> -Matthias
>
>
> On 7/20/20 5:18 PM, Guozhang Wang wrote:
> > Hi Leah,
> >
> > Thanks for the updated KIP. I agree that extending SlidingWindows from
> > Windows is fine for the sake of not introducing more public APIs (and
> their
> > internal xxxImpl classes), and its cons is small enough to tolerate to
> me.
> >
> >
> > Guozhang
> >
> >
> > On Mon, Jul 20, 2020 at 1:49 PM Leah Thomas 
> wrote:
> >
> >> Hi all,
> >>
> >> Thanks for the feedback on the KIP. I've updated the KIP page
> >> <
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> >>>
> >> to address these points and have created a child page
> >> <
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450
> >>>
> >> to go more in depth on certain implementation details.
> >>
> >> *Grace Period:*
> >> I think Sophie raises a good point that the default grace period of 24
> >> hours is often too long and was chosen when retention time and grace
> period
> >> were the same. For SlidingWindows, I propose we make the grace period
> >> mandatory. To keep formatting consistent with other types of windows,
> grace
> >> period won't be an additional parameter in the #of method, but will
> still
> >> look like it does in other use cases:
> >> .windowedBy(SlidingWindows.of(twentySeconds).grace(fiftySeconds). If
> grace
> >> period isn't properly initialized, an error will be thrown through the
> >> process method.
> >>
> >> *Storage Layer + Aggregation:*
> >> SlidingWindows will use a WindowStore because computation can be done
> with
> >> the information stored in a WindowStore (window timestamp and value).
> Using
> >> the WindowStore also simplifies computation as SlidingWindows can
> leverage
> >> existing processes. Because we are using a WindowStore, the aggregation
> >> process will be similar to that of a hopping window. As records come in
> >> their value is added to the aggregation that already exists, following
> the
> >> same procedure as hopping windows. The aggregation difference between
> >> SlidingWindows and HoppingWindows comes in creating new windows for a
> >> SlidingWindow, where you need to find the existing records that belong
> to
> >> the new window. This computation is similar to the aggregation in
> >> SessionWindows and requires a scan to the WindowStore to find the window
> >> with the aggregation needed, which will always be pre-computed. The scan
> >> requires creating an iterator, but should have minimal performance
> effects
> >> as this strategy is already implemented in SessionWindows. More details
> on
> >> finding the aggregation that needs to be put in a new window can be
> found
> >> on the implementation page
> >> <
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Aggregation+details+for+KIP-450
> >>>
> >> .
> >>
> >> *Extending Windows, Windows or nothing*
> >> Because SlidingWindows are still defined by a windowSize (whereas
> >> SessionWindows are defined purely by data), I think it makes sense to
> >> leverage the existing Window processes instead of creating a new store
> type
> >> that would be very similar to the WindowStore. While it's true that the
> >> #windowsFor method isn't necessary for SlidingWindows, JoinWindows also
> >> extends Windows and throws an UnsupportedOperationException in
> the
> >> #windowsFor method, which is what SlidingWindows can do. The difference
> >> between extending Windows or Windows is minimal, as
> >> both are ways to pass window parameters. Extending Windows
> will
> >> give us more leverage in utilizing existing processes.
> >&

Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-20 Thread Leah Thomas
gt; > > > Leah,
> > > >
> > > > thanks for your update. However, it does not completely answer my
> > > question.
> > > >
> > > > In our current window implementations, we emit a window result update
> > > > record (ie, early/partial result) for each input record. When an
> > > > out-of-order record arrives, we just update to corresponding old
> window
> > > > and emit another update.
> > > >
> > > > It's unclear from the KIP if you propose the same emit strategy? --
> For
> > > > sliding windows it might be worth to consider to use a different emit
> > > > strategy and only support emitting the final result only (ie, after
> the
> > > > grace period passed)?
> > > >
> > > >
> > > >
> > > > Boyang, also raises a good point that relates to my point from above
> > > > about pre-aggregations and storage layout. Our current time windows
> are
> > > > all pre-aggregated and stored in parallel. We can also lookup windows
> > > > efficiently, as we can compute the windowed-key given the input
> record
> > > > key and timestamp based on the window definition.
> > > >
> > > > However, for sliding windows, window boundaries are data dependent
> and
> > > > thus we cannot compute them upfront. Thus, how can we "find" existing
> > > > window efficiently? Furthermore, out-of-order data would create new
> > > > windows in the past and we need to be able to handle this case.
> > > >
> > > > Thus, to handle out-of-order data correctly, we need to store all raw
> > > > input events. Additionally, we could also store pre-aggregated
> results
> > > > if we thinks it's benfitial. -- If we apply "emit only final results"
> > > > strategy, storing pre-aggregated result would not be necessary
> though.
> > > >
> > > >
> > > > Btw: for sliding windows it might also be useful to consider allowing
> > > > users to supply a `Subtractor` -- this subtractor could be applied on
> > > > the current window result (in case we store it) if a record drops
> out of
> > > > the window. Of course, not all aggregation functions are subtractable
> > > > and we can consider this as a follow up task, too, and not include in
> > > > this KIP for now. Thoughts?
> > > >
> > > >
> > > >
> > > > I was also thinking about the type hierarchy. I am not sure if
> extending
> > > > TimeWindow is the best approach? For TimeWindows, we can pre-compute
> > > > window boundaries (cf `windowsFor()`) while for a sliding window the
> > > > boundaries are data dependent. Session windows are also data
> dependent
> > > > and thus they don't inherit from TimeWindow (Maybe check out the KIP
> > > > that added session windows? It could provides some good insights.)
> -- I
> > > > believe the same rational applies to sliding windows?
> > > >
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > >
> > > >
> > > > On 7/10/20 12:47 PM, Boyang Chen wrote:
> > > > > Thanks Leah and Sophie for the KIP.
> > > > >
> > > > > 1. I'm a bit surprised that we don't have an advance time. Could we
> > > > > elaborate how the storage layer is structured?
> > > > >
> > > > > 2. IIUC, there will be extra cost in terms of fetching aggregation
> > > results,
> > > > > since we couldn't pre-aggregate until the user asks for it. Would
> be
> > > good
> > > > > to also discuss it.
> > > > >
> > > > > 3. We haven't discussed the possibility of supporting sliding
> windows
> > > > > inherently. For a user who actually uses a hopping window, Streams
> > > could
> > > > > detect such an inefficiency doing a window_size/advance_time ratio
> to
> > > reach
> > > > > a conclusion on whether the write amplification is too high
> compared
> > > with
> > > > > some configured threshold. The benefit of doing so is that existing
> > > Streams
> > > > > users don't need to change their code, learn a new API, but only to
> > > upgrade
> > > > > Streams library to get benefits for their inefficient hopping
> window
> > > > > implementation. There might be some compatibility issues for sure,
> but
> > > > > worth listing them out for trade-off.
> > > > >
> > > > > Boyang
> > > > >
> > > > > On Fri, Jul 10, 2020 at 12:40 PM Leah Thomas  >
> > > wrote:
> > > > >
> > > > >> Hey Matthias,
> > > > >>
> > > > >> Thanks for pointing that out. I added the following to the Propose
> > > Changes
> > > > >> section of the KIP:
> > > > >>
> > > > >> "Records that come out of order will be processed the same way as
> > > in-order
> > > > >> records, as long as they fall within the grace period. Any new
> windows
> > > > >> created by the late record will still be created, and the existing
> > > windows
> > > > >> that are changed by the late record will be updated. Any record
> that
> > > falls
> > > > >> outside of the grace period (either user defined or default) will
> be
> > > > >> discarded. "
> > > > >>
> > > > >> All the best,
> > > > >> Leah
> > > > >>
> > > > >> On Thu, Jul 9, 2020 at 9:47 PM Matthias J. Sax 
> > > wrote:
> > > > >>
> > > > >>> Leah,
> > > > >>>
> > > > >>> thanks a lot for the KIP. Very well written.
> > > > >>>
> > > > >>> The KIP does not talk about the handling of out-of-order data
> though.
> > > > >>> How do you propose to address this?
> > > > >>>
> > > > >>>
> > > > >>> -Matthias
> > > > >>>
> > > > >>> On 7/8/20 5:33 PM, Leah Thomas wrote:
> > > > >>>> Hi all,
> > > > >>>> I'd like to kick-off the discussion for KIP-450, adding sliding
> > > window
> > > > >>>> aggregation support to Kafka Streams.
> > > > >>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> > > > >>>>
> > > > >>>> Let me know what you think,
> > > > >>>> Leah
> > > > >>>>
> > > > >>>
> > > > >>>
> > > > >>
> > > > >
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-10 Thread Leah Thomas
Hey Matthias,

Thanks for pointing that out. I added the following to the Propose Changes
section of the KIP:

"Records that come out of order will be processed the same way as in-order
records, as long as they fall within the grace period. Any new windows
created by the late record will still be created, and the existing windows
that are changed by the late record will be updated. Any record that falls
outside of the grace period (either user defined or default) will be
discarded. "

All the best,
Leah

On Thu, Jul 9, 2020 at 9:47 PM Matthias J. Sax  wrote:

> Leah,
>
> thanks a lot for the KIP. Very well written.
>
> The KIP does not talk about the handling of out-of-order data though.
> How do you propose to address this?
>
>
> -Matthias
>
> On 7/8/20 5:33 PM, Leah Thomas wrote:
> > Hi all,
> > I'd like to kick-off the discussion for KIP-450, adding sliding window
> > aggregation support to Kafka Streams.
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
> >
> > Let me know what you think,
> > Leah
> >
>
>


[DISCUSS] KIP-450: Sliding Windows

2020-07-08 Thread Leah Thomas
Hi all,
I'd like to kick-off the discussion for KIP-450, adding sliding window
aggregation support to Kafka Streams.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL

Let me know what you think,
Leah


[jira] [Resolved] (KAFKA-4996) Fix findbugs multithreaded correctness warnings for streams

2020-06-30 Thread Leah Thomas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leah Thomas resolved KAFKA-4996.

Resolution: Fixed

> Fix findbugs multithreaded correctness warnings for streams
> ---
>
> Key: KAFKA-4996
> URL: https://issues.apache.org/jira/browse/KAFKA-4996
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Colin McCabe
>    Assignee: Leah Thomas
>Priority: Major
>  Labels: newbie
>
> Fix findbugs multithreaded correctness warnings for streams
> {code}
> Multithreaded correctness Warnings
>   
>   
> 
>   
>   
>   
> 
>Code Warning   
>   
>   
> 
>AT   Sequence of calls to java.util.concurrent.ConcurrentHashMap may not 
> be atomic in 
> org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(long, 
> ProcessorContext) 
> 
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.KafkaStreams.stateListener; locked 66% of time   
>   
>   
> 
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.processor.internals.StreamThread.stateListener; 
> locked 66% of time
>   
>  
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.processor.TopologyBuilder.applicationId; locked 50% 
> of time   
>   
>  
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.context; locked 
> 66% of time   
>   
> 
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.state.internals.CachingWindowStore.cache; locked 60% 
> of time   
>   
> 
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.state.internals.CachingWindowStore.context; locked 
> 66% of time   
>   
>   
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.state.internals.CachingWindowStore.name; locked 60% 
> of time   
>   
>  
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.state.internals.CachingWindowStore.serdes; locked 
> 70% of time   
>   
>
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.state.internals.RocksDBStore.db; locked 63% of time  
>   
>   
> 
>IS   Inconsistent synchronization of 
> org.apache.kafka.streams.state.internals.RocksDBStore.serdes; locked 76% of 
> time  
>   
>   
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)