Re: KIP-675: Convert KTable to a KStream using the previous value
I see. It's great hidden functionality and all you have to do is define a good API and expose it. I think the easiest thing would be to expose a first operator, and then add more operators if they are claimed. I think it is hidden for not complicating the API. Thank you El vie., 9 oct. 2020 a las 7:26, Matthias J. Sax () escribió: > I agree that there are cases when it is useful to get the old and new > value. In fact, the DSL internally often tracks old and new value via a > `Change` value type. > > We did have some discussion that it might be useful to expose this > currently internal `Change` type in the public API. But if we do this, > it would not be limited to a single operator. > > > -Matthias > > On 10/8/20 1:41 PM, Javier Freire Riobo wrote: > > You're right. The behavior is correct with the cache disabled. > > > > Anyway I think the operator I propose can be useful. The need to > generate a > > value from the previous and current value of a record can be quite > common. > > I think the only way to implement it is through an aggregate using a > helper > > class. It is simpler and more natural to be able to receive the previous > > and current values in a function. > > > > Anyway thank you very much. I have been working with Kafka for a short > > time, but I find it an amazing tool. Congratulations. > > > > El jue., 8 oct. 2020 a las 21:10, Matthias J. Sax () > > escribió: > > > >> I guess I understand now. > >> > >> However, it seems to be an "issue" with record caching. Setting the > >> commit interval to zero would flush the cache each time, but it is not > >> the "right" config change. You should just disable the `KTable` cache > >> instead. > >> > >> You can disable caching globally by setting `cache.max.bytes.buffering` > >> configuration parameter to zero. > >> > >> Or you can disable caching for an individual KTable via > >> `Materialized#withCachingDisabled()` that you can pass into your > >> `aggregation()` operator. > >> > >> Thus, overall, I don't see the need for a new operator. > >> > >> > >> -Matthias > >> > >> > >> On 10/7/20 1:51 PM, Javier Freire Riobo wrote: > >>> I have done a small demo example. I hope it serves as a clarification. > >>> > >>> https://github.com/javierfreire/KTableToKStreamTest > >>> > >>> Thank you very much > >>> > >>> El mié., 7 oct. 2020 a las 3:01, Matthias J. Sax () > >>> escribió: > >>> > >>>> Thanks for the KIP. > >>>> > >>>> I am not sure if I understand the motivation. In particular the KIP > >> says: > >>>> > >>>>> The main problem, apart from needing more code, is that if the same > >>>> event is received twice at the same time and the commit time is not 0, > >> the > >>>> difference is deleted and nothing is emitted. > >>>> > >>>> Can you elaborate? Maybe you can provide a concrete example? I don't > >>>> understand the relationship between "the same event is received twice" > >>>> and a "non-zero commit time". > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> On 10/6/20 6:25 AM, Javier Freire Riobo wrote: > >>>>> Hi all, > >>>>> > >>>>> I'd like to propose these changes to the Kafka Streams API. > >>>>> > >>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-675%3A+Convert+KTable+to+a+KStream+using+the+previous+value > >>>>> > >>>>> This is a proposal to convert a KTable to a KStream knowing the > >> previous > >>>>> value of the registry. > >>>>> > >>>>> I also opened a proof-of-concept PR: > >>>>> > >>>>> PR#9321: https://github.com/apache/kafka/pull/9381 > >>>>> > >>>>> What do you think? > >>>>> > >>>>> Cheers, > >>>>> Javier Freire > >>>>> > >>>> > >>> > >> > > >
Re: KIP-675: Convert KTable to a KStream using the previous value
You're right. The behavior is correct with the cache disabled. Anyway I think the operator I propose can be useful. The need to generate a value from the previous and current value of a record can be quite common. I think the only way to implement it is through an aggregate using a helper class. It is simpler and more natural to be able to receive the previous and current values in a function. Anyway thank you very much. I have been working with Kafka for a short time, but I find it an amazing tool. Congratulations. El jue., 8 oct. 2020 a las 21:10, Matthias J. Sax () escribió: > I guess I understand now. > > However, it seems to be an "issue" with record caching. Setting the > commit interval to zero would flush the cache each time, but it is not > the "right" config change. You should just disable the `KTable` cache > instead. > > You can disable caching globally by setting `cache.max.bytes.buffering` > configuration parameter to zero. > > Or you can disable caching for an individual KTable via > `Materialized#withCachingDisabled()` that you can pass into your > `aggregation()` operator. > > Thus, overall, I don't see the need for a new operator. > > > -Matthias > > > On 10/7/20 1:51 PM, Javier Freire Riobo wrote: > > I have done a small demo example. I hope it serves as a clarification. > > > > https://github.com/javierfreire/KTableToKStreamTest > > > > Thank you very much > > > > El mié., 7 oct. 2020 a las 3:01, Matthias J. Sax () > > escribió: > > > >> Thanks for the KIP. > >> > >> I am not sure if I understand the motivation. In particular the KIP > says: > >> > >>> The main problem, apart from needing more code, is that if the same > >> event is received twice at the same time and the commit time is not 0, > the > >> difference is deleted and nothing is emitted. > >> > >> Can you elaborate? Maybe you can provide a concrete example? I don't > >> understand the relationship between "the same event is received twice" > >> and a "non-zero commit time". > >> > >> > >> -Matthias > >> > >> On 10/6/20 6:25 AM, Javier Freire Riobo wrote: > >>> Hi all, > >>> > >>> I'd like to propose these changes to the Kafka Streams API. > >>> > >>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-675%3A+Convert+KTable+to+a+KStream+using+the+previous+value > >>> > >>> This is a proposal to convert a KTable to a KStream knowing the > previous > >>> value of the registry. > >>> > >>> I also opened a proof-of-concept PR: > >>> > >>> PR#9321: https://github.com/apache/kafka/pull/9381 > >>> > >>> What do you think? > >>> > >>> Cheers, > >>> Javier Freire > >>> > >> > > >
Re: KIP-675: Convert KTable to a KStream using the previous value
I have done a small demo example. I hope it serves as a clarification. https://github.com/javierfreire/KTableToKStreamTest Thank you very much El mié., 7 oct. 2020 a las 3:01, Matthias J. Sax () escribió: > Thanks for the KIP. > > I am not sure if I understand the motivation. In particular the KIP says: > > > The main problem, apart from needing more code, is that if the same > event is received twice at the same time and the commit time is not 0, the > difference is deleted and nothing is emitted. > > Can you elaborate? Maybe you can provide a concrete example? I don't > understand the relationship between "the same event is received twice" > and a "non-zero commit time". > > > -Matthias > > On 10/6/20 6:25 AM, Javier Freire Riobo wrote: > > Hi all, > > > > I'd like to propose these changes to the Kafka Streams API. > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-675%3A+Convert+KTable+to+a+KStream+using+the+previous+value > > > > This is a proposal to convert a KTable to a KStream knowing the previous > > value of the registry. > > > > I also opened a proof-of-concept PR: > > > > PR#9321: https://github.com/apache/kafka/pull/9381 > > > > What do you think? > > > > Cheers, > > Javier Freire > > >
KIP-675: Convert KTable to a KStream using the previous value
Hi all, I'd like to propose these changes to the Kafka Streams API. https://cwiki.apache.org/confluence/display/KAFKA/KIP-675%3A+Convert+KTable+to+a+KStream+using+the+previous+value This is a proposal to convert a KTable to a KStream knowing the previous value of the registry. I also opened a proof-of-concept PR: PR#9321: https://github.com/apache/kafka/pull/9381 What do you think? Cheers, Javier Freire
Create KIP permission
Hi, My UID is javier.freire. I wanted to create a KIP to add to Kafka Streams the ability to convert a changelog stream to a stream by computing the value by comparing the old and new value of the record. These are the changes: https://github.com/javierfreire/kafka/commit/d32169f06452388800ceb2a9e1ef86d1921d1345 Thank you