Re: KIP-675: Convert KTable to a KStream using the previous value

2020-10-09 Thread Javier Freire Riobo
I see. It's great hidden functionality and all you have to do is define a
good API and expose it. I think the easiest thing would be to expose a
first operator, and then add more operators if they are claimed. I think it
is hidden for not complicating the API.

Thank you

El vie., 9 oct. 2020 a las 7:26, Matthias J. Sax ()
escribió:

> I agree that there are cases when it is useful to get the old and new
> value. In fact, the DSL internally often tracks old and new value via a
> `Change` value type.
>
> We did have some discussion that it might be useful to expose this
> currently internal `Change` type in the public API. But if we do this,
> it would not be limited to a single operator.
>
>
> -Matthias
>
> On 10/8/20 1:41 PM, Javier Freire Riobo wrote:
> > You're right. The behavior is correct with the cache disabled.
> >
> > Anyway I think the operator I propose can be useful. The need to
> generate a
> > value from the previous and current value of a record can be quite
> common.
> > I think the only way to implement it is through an aggregate using a
> helper
> > class. It is simpler and more natural to be able to receive the previous
> > and current values in a function.
> >
> > Anyway thank you very much. I have been working with Kafka for a short
> > time, but I find it an amazing tool. Congratulations.
> >
> > El jue., 8 oct. 2020 a las 21:10, Matthias J. Sax ()
> > escribió:
> >
> >> I guess I understand now.
> >>
> >> However, it seems to be an "issue" with record caching. Setting the
> >> commit interval to zero would flush the cache each time, but it is not
> >> the "right" config change. You should just disable the `KTable` cache
> >> instead.
> >>
> >> You can disable caching globally by setting `cache.max.bytes.buffering`
> >> configuration parameter to zero.
> >>
> >> Or you can disable caching for an individual KTable via
> >> `Materialized#withCachingDisabled()` that you can pass into your
> >> `aggregation()` operator.
> >>
> >> Thus, overall, I don't see the need for a new operator.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 10/7/20 1:51 PM, Javier Freire Riobo wrote:
> >>> I have done a small demo example. I hope it serves as a clarification.
> >>>
> >>> https://github.com/javierfreire/KTableToKStreamTest
> >>>
> >>> Thank you very much
> >>>
> >>> El mié., 7 oct. 2020 a las 3:01, Matthias J. Sax ()
> >>> escribió:
> >>>
> >>>> Thanks for the KIP.
> >>>>
> >>>> I am not sure if I understand the motivation. In particular the KIP
> >> says:
> >>>>
> >>>>> The main problem, apart from needing more code, is that if the same
> >>>> event is received twice at the same time and the commit time is not 0,
> >> the
> >>>> difference is deleted and nothing is emitted.
> >>>>
> >>>> Can you elaborate? Maybe you can provide a concrete example? I don't
> >>>> understand the relationship between "the same event is received twice"
> >>>> and a "non-zero commit time".
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 10/6/20 6:25 AM, Javier Freire Riobo wrote:
> >>>>> Hi all,
> >>>>>
> >>>>> I'd like to propose these changes to the Kafka Streams API.
> >>>>>
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-675%3A+Convert+KTable+to+a+KStream+using+the+previous+value
> >>>>>
> >>>>> This is a proposal to convert a KTable to a KStream knowing the
> >> previous
> >>>>> value of the registry.
> >>>>>
> >>>>> I also opened a proof-of-concept PR:
> >>>>>
> >>>>> PR#9321:  https://github.com/apache/kafka/pull/9381
> >>>>>
> >>>>> What do you think?
> >>>>>
> >>>>> Cheers,
> >>>>> Javier Freire
> >>>>>
> >>>>
> >>>
> >>
> >
>


Re: KIP-675: Convert KTable to a KStream using the previous value

2020-10-08 Thread Javier Freire Riobo
You're right. The behavior is correct with the cache disabled.

Anyway I think the operator I propose can be useful. The need to generate a
value from the previous and current value of a record can be quite common.
I think the only way to implement it is through an aggregate using a helper
class. It is simpler and more natural to be able to receive the previous
and current values in a function.

Anyway thank you very much. I have been working with Kafka for a short
time, but I find it an amazing tool. Congratulations.

El jue., 8 oct. 2020 a las 21:10, Matthias J. Sax ()
escribió:

> I guess I understand now.
>
> However, it seems to be an "issue" with record caching. Setting the
> commit interval to zero would flush the cache each time, but it is not
> the "right" config change. You should just disable the `KTable` cache
> instead.
>
> You can disable caching globally by setting `cache.max.bytes.buffering`
> configuration parameter to zero.
>
> Or you can disable caching for an individual KTable via
> `Materialized#withCachingDisabled()` that you can pass into your
> `aggregation()` operator.
>
> Thus, overall, I don't see the need for a new operator.
>
>
> -Matthias
>
>
> On 10/7/20 1:51 PM, Javier Freire Riobo wrote:
> > I have done a small demo example. I hope it serves as a clarification.
> >
> > https://github.com/javierfreire/KTableToKStreamTest
> >
> > Thank you very much
> >
> > El mié., 7 oct. 2020 a las 3:01, Matthias J. Sax ()
> > escribió:
> >
> >> Thanks for the KIP.
> >>
> >> I am not sure if I understand the motivation. In particular the KIP
> says:
> >>
> >>> The main problem, apart from needing more code, is that if the same
> >> event is received twice at the same time and the commit time is not 0,
> the
> >> difference is deleted and nothing is emitted.
> >>
> >> Can you elaborate? Maybe you can provide a concrete example? I don't
> >> understand the relationship between "the same event is received twice"
> >> and a "non-zero commit time".
> >>
> >>
> >> -Matthias
> >>
> >> On 10/6/20 6:25 AM, Javier Freire Riobo wrote:
> >>> Hi all,
> >>>
> >>> I'd like to propose these changes to the Kafka Streams API.
> >>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-675%3A+Convert+KTable+to+a+KStream+using+the+previous+value
> >>>
> >>> This is a proposal to convert a KTable to a KStream knowing the
> previous
> >>> value of the registry.
> >>>
> >>> I also opened a proof-of-concept PR:
> >>>
> >>> PR#9321:  https://github.com/apache/kafka/pull/9381
> >>>
> >>> What do you think?
> >>>
> >>> Cheers,
> >>> Javier Freire
> >>>
> >>
> >
>


Re: KIP-675: Convert KTable to a KStream using the previous value

2020-10-07 Thread Javier Freire Riobo
I have done a small demo example. I hope it serves as a clarification.

https://github.com/javierfreire/KTableToKStreamTest

Thank you very much

El mié., 7 oct. 2020 a las 3:01, Matthias J. Sax ()
escribió:

> Thanks for the KIP.
>
> I am not sure if I understand the motivation. In particular the KIP says:
>
> > The main problem, apart from needing more code, is that if the same
> event is received twice at the same time and the commit time is not 0, the
> difference is deleted and nothing is emitted.
>
> Can you elaborate? Maybe you can provide a concrete example? I don't
> understand the relationship between "the same event is received twice"
> and a "non-zero commit time".
>
>
> -Matthias
>
> On 10/6/20 6:25 AM, Javier Freire Riobo wrote:
> > Hi all,
> >
> > I'd like to propose these changes to the Kafka Streams API.
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-675%3A+Convert+KTable+to+a+KStream+using+the+previous+value
> >
> > This is a proposal to convert a KTable to a KStream knowing the previous
> > value of the registry.
> >
> > I also opened a proof-of-concept PR:
> >
> > PR#9321:  https://github.com/apache/kafka/pull/9381
> >
> > What do you think?
> >
> > Cheers,
> > Javier Freire
> >
>


KIP-675: Convert KTable to a KStream using the previous value

2020-10-06 Thread Javier Freire Riobo
Hi all,

I'd like to propose these changes to the Kafka Streams API.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-675%3A+Convert+KTable+to+a+KStream+using+the+previous+value

This is a proposal to convert a KTable to a KStream knowing the previous
value of the registry.

I also opened a proof-of-concept PR:

PR#9321:  https://github.com/apache/kafka/pull/9381

What do you think?

Cheers,
Javier Freire


Create KIP permission

2020-10-03 Thread Javier Freire Riobo
Hi,

My UID is javier.freire. I wanted to create a KIP to add to Kafka Streams
the ability to convert a changelog stream to a stream by computing the
value by comparing the old and new value of the record.

These are the changes:

https://github.com/javierfreire/kafka/commit/d32169f06452388800ceb2a9e1ef86d1921d1345

Thank you