Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2018-10-04 Thread Jeyhun Karimov
Hi Lei,

Please feel free to take over the KIP.

Cheers,
Jeyhun

On Fri, Sep 21, 2018, 22:27 Lei Chen  wrote:

> Hi,
>
> Just want to know is anyone actively working on this and also KAFKA-4835
> <https://issues.apache.org/jira/browse/KAFKA-4835>? Seems like the JIRA
> has been inactive for couple months. We want this feature and would like to
> move it forward if no one else is working on it.
>
>
> Lei
>
> On Wed, Jun 20, 2018 at 7:27 PM Matthias J. Sax 
> wrote:
>
>> No worries. It's just good to know. It seems that some other people are
>> interested to drive this further. So we will just "reassign" it to them.
>>
>> Thanks for letting us know.
>>
>>
>> -Matthias
>>
>> On 6/20/18 2:51 PM, Jeyhun Karimov wrote:
>> > Hi Matthias, all,
>> >
>> > Currently, I am not able to complete this KIP. Please accept my
>> > apologies for that.
>> >
>> >
>> > Cheers,
>> > Jeyhun
>> >
>> > On Mon, Jun 11, 2018 at 2:25 AM Matthias J. Sax > > <mailto:matth...@confluent.io>> wrote:
>> >
>> > What is the status of this KIP?
>> >
>> > -Matthias
>> >
>> >
>> > On 2/13/18 1:43 PM, Matthias J. Sax wrote:
>> > > Is there any update for this KIP?
>> > >
>> > >
>> > > -Matthias
>> > >
>> > > On 12/4/17 2:08 PM, Matthias J. Sax wrote:
>> > >> Jeyhun,
>> > >>
>> > >> thanks for updating the KIP.
>> > >>
>> > >> I am wondering if you intend to add a new class `Produced`?
>> There is
>> > >> already `org.apache.kafka.streams.kstream.Produced`. So if we
>> want to
>> > >> add a new class, it must have a different name -- or we might be
>> > able to
>> > >> merge both into one?
>> > >>
>> > >> Also, for the KStream overlaods of `through()` and `to()`, can
>> > you add
>> > >> the different behavior using different overloads? It's not clear
>> from
>> > >> the KIP what the semantics are.
>> > >>
>> > >>
>> > >> -Matthias
>> > >>
>> > >> On 11/17/17 3:27 PM, Jeyhun Karimov wrote:
>> > >>> Hi,
>> > >>>
>> > >>> Thanks for your comments. I agree with Matthias partially.
>> > >>> I think we should relax some requirements related with to() and
>> > through()
>> > >>> methods.
>> > >>> IMHO, Produced class can cover (existing/to be created) topic
>> > information,
>> > >>> and which will ease our effort:
>> > >>>
>> > >>> KStream.to(Produced topicInfo)
>> > >>> KStream.through(Produced topicInfo)
>> > >>>
>> > >>> This will decrease the number of overloads but we will need to
>> > deprecate
>> > >>> the existing to() and through() methods, perhaps.
>> > >>> I updated the KIP accordingly.
>> > >>>
>> > >>>
>> > >>> Cheers,
>> > >>> Jeyhun
>> > >>>
>> > >>> On Thu, Nov 16, 2017 at 10:21 PM Matthias J. Sax
>> > mailto:matth...@confluent.io>>
>> > >>> wrote:
>> > >>>
>> > >>>> @Jan:
>> > >>>>
>> > >>>> The `Produced` class was introduced in 1.0 to specify key and
>> valud
>> > >>>> Serdes (and partitioner) if data is written into a topic.
>> > >>>>
>> > >>>> Old API:
>> > >>>>
>> > >>>> KStream#to("topic", keySerde, valueSerde);
>> > >>>>
>> > >>>> New API:
>> > >>>>
>> > >>>> KStream#to("topic", Produced.with(keySerde, valueSerde));
>> > >>>>
>> > >>>>
>> > >>>> This allows to reduce the number of overloads for `to()` (and
>> > >>>> `through()` that follows the same pattern) -- the second
>> > param

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2018-06-20 Thread Jeyhun Karimov
Hi Matthias, all,

Currently, I am not able to complete this KIP. Please accept my apologies
for that.


Cheers,
Jeyhun

On Mon, Jun 11, 2018 at 2:25 AM Matthias J. Sax 
wrote:

> What is the status of this KIP?
>
> -Matthias
>
>
> On 2/13/18 1:43 PM, Matthias J. Sax wrote:
> > Is there any update for this KIP?
> >
> >
> > -Matthias
> >
> > On 12/4/17 2:08 PM, Matthias J. Sax wrote:
> >> Jeyhun,
> >>
> >> thanks for updating the KIP.
> >>
> >> I am wondering if you intend to add a new class `Produced`? There is
> >> already `org.apache.kafka.streams.kstream.Produced`. So if we want to
> >> add a new class, it must have a different name -- or we might be able to
> >> merge both into one?
> >>
> >> Also, for the KStream overlaods of `through()` and `to()`, can you add
> >> the different behavior using different overloads? It's not clear from
> >> the KIP what the semantics are.
> >>
> >>
> >> -Matthias
> >>
> >> On 11/17/17 3:27 PM, Jeyhun Karimov wrote:
> >>> Hi,
> >>>
> >>> Thanks for your comments. I agree with Matthias partially.
> >>> I think we should relax some requirements related with to() and
> through()
> >>> methods.
> >>> IMHO, Produced class can cover (existing/to be created) topic
> information,
> >>> and which will ease our effort:
> >>>
> >>> KStream.to(Produced topicInfo)
> >>> KStream.through(Produced topicInfo)
> >>>
> >>> This will decrease the number of overloads but we will need to
> deprecate
> >>> the existing to() and through() methods, perhaps.
> >>> I updated the KIP accordingly.
> >>>
> >>>
> >>> Cheers,
> >>> Jeyhun
> >>>
> >>> On Thu, Nov 16, 2017 at 10:21 PM Matthias J. Sax <
> matth...@confluent.io>
> >>> wrote:
> >>>
> >>>> @Jan:
> >>>>
> >>>> The `Produced` class was introduced in 1.0 to specify key and valud
> >>>> Serdes (and partitioner) if data is written into a topic.
> >>>>
> >>>> Old API:
> >>>>
> >>>> KStream#to("topic", keySerde, valueSerde);
> >>>>
> >>>> New API:
> >>>>
> >>>> KStream#to("topic", Produced.with(keySerde, valueSerde));
> >>>>
> >>>>
> >>>> This allows to reduce the number of overloads for `to()` (and
> >>>> `through()` that follows the same pattern) -- the second parameter is
> >>>> used to cover all different variations of option parameters users can
> >>>> specify, while we only have 2 overload for `to()` itself.
> >>>>
> >>>> What is still unclear to me it, what you mean by this topic prefix
> >>>> thing? Either a user cares about the topic name and thus, must create
> >>>> and manage it manually. Or the user does not care, and Streams create
> >>>> it. How would this prefix idea fit in here?
> >>>>
> >>>>
> >>>>
> >>>> @Guozhang:
> >>>>
> >>>> My idea was to extend `Produced` with the hint we want to give for
> >>>> creating internal topic and pass a optional `Produced` parameter.
> There
> >>>> are multiple things we can do here:
> >>>>
> >>>> 1) stream.through(null, Produced...).groupBy().aggregate()
> >>>> -> just allow for `null` topic name indicating that Streams should
> >>>> create an internal topic
> >>>>
> >>>> 2) stream.through(Produced...).groupBy().aggregate()
> >>>> -> add one overload taking an mandatory `Produced`
> >>>>
> >>>> We use `Serialized` to picky back the information
> >>>>
> >>>> 3) stream.groupBy(Serialized...).aggregate()
> >>>> and stream.groupByKey(Serialized...).aggregate()
> >>>> -> we don't need new top level overloads
> >>>>
> >>>>
> >>>> There are different trade-offs for those alternatives and maybe there
> >>>> are other ways to change the API. It's just to push the discussion
> further.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 11/12/17 1:22 PM, Jan Filipiak wrote:
> >>>>> Hi Gou

Re: [VOTE] KIP-159: Introducing Rich functions to Streams

2017-11-18 Thread Jeyhun Karimov
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On 10.11.2017 01:06, Guozhang Wang wrote:
> >>>>>>> Hello Jan,
> >>>>>>>
> >>>>>>> Regarding your question about caching: today we keep the record
> >>> context
> >>>>>>> with the cached entry already so when we flush the cache which may
> >>>>>>> generate
> >>>>>>> new records forwarding we will set the record context
> appropriately;
> >>>>>>> and
> >>>>>>> then after the flush is completed we will reset the context to the
> >>>>>>> record
> >>>>>>> before the flush happens. But I think when Jeyhun did the PR it is
> a
> >>>>>>> good
> >>>>>>> time to double check on such stages to make sure we are not
> >>>>>>> introducing any
> >>>>>>> regressions.
> >>>>>>>
> >>>>>>>
> >>>>>>> Guozhang
> >>>>>>>
> >>>>>>>
> >>>>>>> On Mon, Nov 6, 2017 at 8:54 PM, Jan Filipiak <
> >>> jan.filip...@trivago.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> I Aggree completely.
> >>>>>>>>
> >>>>>>>> Exposing this information in a place where it has no _natural_
> >>>>>>>> belonging
> >>>>>>>> might really be a bad blocker in the long run.
> >>>>>>>>
> >>>>>>>> Concerning your first point. I would argue its not to hard to
> have a
> >>>>>>>> user
> >>>>>>>> keep track of these. If we still don't want the user
> >>>>>>>> to keep track of these I would argue that all > projection only <
> >>>>>>>> transformations on a Source-backed KTable/KStream
> >>>>>>>> could also return a Ktable/KStream instance of the type we return
> >>>>>>>> from the
> >>>>>>>> topology builder.
> >>>>>>>> Only after any operation that exceeds projection or filter one
> would
> >>>>>>>> return a KTable not granting access to this any longer.
> >>>>>>>>
> >>>>>>>> Even then its difficult already: I never ran a topology with
> caching
> >>>>>>>> but I
> >>>>>>>> am not even 100% sure what the record Context means behind
> >>>>>>>> a materialized KTable with Caching? Topic and Partition are
> probably
> >>>>>>>> with
> >>>>>>>> some reasoning but offset is probably only the offset causing the
> >>>>>>>> flush?
> >>>>>>>> So one might aswell think to drop offsets from this RecordContext.
> >>>>>>>>
> >>>>>>>> Best Jan
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 07.11.2017 03:18, Guozhang Wang wrote:
> >>>>>>>>
> >>>>>>>>> Regarding the API design (the proposed set of overloads v.s. one
> >>>>>>>>> overload
> >>>>>>>>> on #map to enrich the record), I think what we have represents a
> >>> good
> >>>>>>>>> trade-off between API succinctness and user convenience: on one
> >>>>>>>>> hand we
> >>>>>>>>> definitely want to keep as fewer overloaded functions as
> possible.
> >>>>>>>>> But on
> >>>>>>>>> the other hand if we only do that in, say, the #map() function
> then
> >>>>>>>>> this
> >>>>>>>>> enrichment could be an overkill: think of a topology that has 7
> >>>>>>>>> operators
> >>>>>>>>> in a chain, where users want to access the record context on
> >>>>>>>>> operator #2
> &

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2017-11-17 Thread Jeyhun Karimov
Hi,

Thanks for your comments. I agree with Matthias partially.
I think we should relax some requirements related with to() and through()
methods.
IMHO, Produced class can cover (existing/to be created) topic information,
and which will ease our effort:

KStream.to(Produced topicInfo)
KStream.through(Produced topicInfo)

This will decrease the number of overloads but we will need to deprecate
the existing to() and through() methods, perhaps.
I updated the KIP accordingly.


Cheers,
Jeyhun

On Thu, Nov 16, 2017 at 10:21 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> @Jan:
>
> The `Produced` class was introduced in 1.0 to specify key and valud
> Serdes (and partitioner) if data is written into a topic.
>
> Old API:
>
> KStream#to("topic", keySerde, valueSerde);
>
> New API:
>
> KStream#to("topic", Produced.with(keySerde, valueSerde));
>
>
> This allows to reduce the number of overloads for `to()` (and
> `through()` that follows the same pattern) -- the second parameter is
> used to cover all different variations of option parameters users can
> specify, while we only have 2 overload for `to()` itself.
>
> What is still unclear to me it, what you mean by this topic prefix
> thing? Either a user cares about the topic name and thus, must create
> and manage it manually. Or the user does not care, and Streams create
> it. How would this prefix idea fit in here?
>
>
>
> @Guozhang:
>
> My idea was to extend `Produced` with the hint we want to give for
> creating internal topic and pass a optional `Produced` parameter. There
> are multiple things we can do here:
>
> 1) stream.through(null, Produced...).groupBy().aggregate()
> -> just allow for `null` topic name indicating that Streams should
> create an internal topic
>
> 2) stream.through(Produced...).groupBy().aggregate()
> -> add one overload taking an mandatory `Produced`
>
> We use `Serialized` to picky back the information
>
> 3) stream.groupBy(Serialized...).aggregate()
> and stream.groupByKey(Serialized...).aggregate()
> -> we don't need new top level overloads
>
>
> There are different trade-offs for those alternatives and maybe there
> are other ways to change the API. It's just to push the discussion further.
>
>
> -Matthias
>
> On 11/12/17 1:22 PM, Jan Filipiak wrote:
> > Hi Gouzhang,
> >
> > this felt like these questions are supposed to be answered by me.
> > I do not understand the first one. I don't understand why the user
> > shouldn't be able to specify a suffix for the topic name.
> >
> >  For the third question I am not 100% familiar if the Produced class
> > came to existence
> > at all. I remember proposing it somewhere in our redo DSL discussion that
> > I dropped out of later. Finally any call that does:
> >
> > 1. create the internal topic
> > 2. register sink
> > 3. register source
> >
> > will always get the work done. If we have a Produced like class. putting
> > all the parameters
> > in there make sense. (Partitioner, serde, PartitionHint, internal, name
> > ... )
> >
> > Hope this helps?
> >
> >
> > On 10.11.2017 07:54, Guozhang Wang wrote:
> >> A few clarification questions on the proposal details.
> >>
> >> 1. API: although the repartition only happens at the final stateful
> >> operations like agg / join, the repartition flag info was actually
> passed
> >> from an earlier operator like map / groupBy. So what should be the new
> >> API
> >> look like? For example, if we do
> >>
> >> stream.groupBy().through("topic-name", Produced..).aggregate
> >>
> >> This would be add a bunch of APIs to GroupedKStream/KTable
> >>
> >> 2. Semantics: as Matthias mentioned, today any topics defined in
> >> "through()" call is considered a user topic, and hence users are
> >> responsible for managing them, including the topic name. For this KIP's
> >> purpose, though, users would not care about the topic name. I.e. as a
> >> user
> >> I still want to make it be an internal topic so that I do not need to
> >> worry
> >> about it at all, but only specify num.partitions.
> >>
> >> 3. Details: in Produced we do not have specs for specifying the
> >> num.partitions or should we repartition or not. So it is still not
> >> clear to
> >> me how we would make use of that to achieve what's in the old
> >> proposal's RepartitionHint class.
> >>
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Mon, Nov 6, 2017 a

Re: [VOTE] KIP-159: Introducing Rich functions to Streams

2017-11-07 Thread Jeyhun Karimov
nner they need to do the
> >>>> enrichment on
> >>>> operator #2 and keep it that way until #6. In addition, the
> >>>> RecordContext
> >>>> fields (topic, offset, etc) are really orthogonal to the key-value
> >>>> payloads
> >>>> themselves, so I think separating them into this object is a
> >>>> cleaner way.
> >>>>
> >>>> Regarding the RecordContext inheritance, this is actually a good point
> >>>> that
> >>>> have not been discussed thoroughly before. Here are my my two
> >>>> cents: one
> >>>> natural way would be to inherit the record context from the
> >>>> "triggering"
> >>>> record, for example in a join operator, if the record from stream A
> >>>> triggers the join then the record context is inherited from with that
> >>>> record. This is also aligned with the lower-level PAPI interface. A
> >>>> counter
> >>>> argument, though, would be that this is sort of leaking the internal
> >>>> implementations of the DSL, so that moving forward if we did some
> >>>> refactoring to our join implementations so that the triggering
> >>>> record can
> >>>> change, the RecordContext would also be different. I do not know
> >>>> how much
> >>>> it would really affect end users, but would like to hear your
> >>>> opinions.
> >>> Agreed to 100% exposing this information
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>> On Mon, Nov 6, 2017 at 1:00 PM, Jeyhun Karimov <je.kari...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Hi Jan,
> >>>>>
> >>>>> Sorry for late reply.
> >>>>>
> >>>>>
> >>>>> The API Design doesn't look appealing
> >>>>>
> >>>>>
> >>>>> In terms of API design we tried to preserve the java functional
> >>>>> interfaces.
> >>>>> We applied the same set of rich methods for KTable to make it
> >>>>> compatible
> >>>>> with the rest of overloaded APIs.
> >>>>>
> >>>>> It should be 100% sufficient to offer a KTable + KStream that is
> >>>>> directly
> >>>>>> feed from a topic with 1 additional overload for the #map()
> >>>>>> methods to
> >>>>>> cover every usecase while keeping the API in a way better state.
> >>>>> - IMO this seems a workaround, rather than a direct solution.
> >>>>>
> >>>>> Perhaps we should continue this discussion in DISCUSS thread.
> >>>>>
> >>>>>
> >>>>> Cheers,
> >>>>> Jeyhun
> >>>>>
> >>>>>
> >>>>> On Mon, Nov 6, 2017 at 9:14 PM Jan Filipiak
> >>>>> <jan.filip...@trivago.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi.
> >>>>>>
> >>>>>> I do understand that it might come in Handy.
> >>>>>>From my POV in any relational algebra this is only a projection.
> >>>>>> Currently we hide these "fields" that come with the input record.
> >>>>>> It should be 100% sufficient to offer a KTable + KStream that is
> >>>>>> directly
> >>>>>> feed from a topic with 1 additional overload for the #map()
> >>>>>> methods to
> >>>>>> cover every usecase while keeping the API in a way better state.
> >>>>>>
> >>>>>> best Jan
> >>>>>>
> >>>>>> On 06.11.2017 17:52, Matthias J. Sax wrote:
> >>>>>>> Jan,
> >>>>>>>
> >>>>>>> I understand what you are saying. However, having a
> >>>>>>> RecordContext is
> >>>>>>> super useful for operations that are applied to input topic. Many
> >>>>>>> users
> >>>>>>> requested this feature -- it's much more convenient that falling
> >>>>>>> back
> >>>>> to
> >>>>>>> transform() to implement a a filter() for example that want to
> >>>>>>> access
> >>>>>>> some meta data.
> >>>>>>>
> >>>>>>> Because we cannot distinguish different "origins" of a
> >>>>>>> KStream/KTable,
> >>>>> I
> >>>>>>> am not sure if there would be a better way to do this. The only
> >>>>>>> "workaround" I see, is to have two KStream/KTable interfaces
> >>>>>>> each and
> >>>>> we
> >>>>>>> would use the first one for KStream/KTable with "proper"
> >>>>>>> RecordContext.
> >>>>>>> But this does not seem to be a good solution either.
> >>>>>>>
> >>>>>>> Note, a KTable can also be read directly from a topic, I agree that
> >>>>>>> using RecordContext on a KTable that is the result of an
> >>>>>>> aggregation is
> >>>>>>> questionable. But I don't see a reason to down vote the KIP for
> >>>>>>> this
> >>>>>> reason.
> >>>>>>> WDYT about this?
> >>>>>>>
> >>>>>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>> On 11/1/17 10:19 PM, Jan Filipiak wrote:
> >>>>>>>> -1 non binding
> >>>>>>>>
> >>>>>>>> I don't get the motivation.
> >>>>>>>> In 80% of my DSL processors there is no such thing as a reasonable
> >>>>>>>> RecordContext.
> >>>>>>>> After a join  the record I am processing belongs to at least 2
> >>>>>>>> topics.
> >>>>>>>> After a Group by the record I am processing was created from
> >>>>>>>> multiple
> >>>>>>>> offsets.
> >>>>>>>>
> >>>>>>>> The API Design doesn't look appealing
> >>>>>>>>
> >>>>>>>> Best Jan
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 01.11.2017 22:02, Jeyhun Karimov wrote:
> >>>>>>>>> Dear community,
> >>>>>>>>>
> >>>>>>>>> It seems the discussion for KIP-159 [1] converged finally. I
> >>>>>>>>> would
> >>>>>>>>> like to
> >>>>>>>>> initiate voting for the particular KIP.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> [1]
> >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>> 159%3A+Introducing+Rich+functions+to+Streams
> >>>>>>>>> Cheers,
> >>>>>>>>> Jeyhun
> >>>>>>>>>
> >>>>
> >
>
>


Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-11-06 Thread Jeyhun Karimov
Hi Matthias,

Thanks a lot for correcting. It is a leftover from the past designs when
punctuate() was not deprecated.
I corrected.

Cheers,
Jeyhun

On Mon, Nov 6, 2017 at 5:30 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> I just re-read the KIP.
>
> One minor comment: we don't need to introduce any deprecated methods.
> Thus, RichValueTransformer#punctuate can be removed completely instead
> of introducing it as deprecated.
>
> Otherwise looks good to me.
>
> Thanks for being so patient!
>
>
> -Matthias
>
> On 11/1/17 9:16 PM, Guozhang Wang wrote:
> > Jeyhun,
> >
> > I think I'm convinced to not do KAFKA-3907 in this KIP. We should think
> > carefully if we should add this functionality to the DSL layer moving
> > forward since from what we discovered working on it the conclusion is
> that
> > it would require revamping the public APIs quite a lot, and it's not
> clear
> > if it is a good trade-off than asking users to call process() instead.
> >
> >
> > Guozhang
> >
> >
> > On Wed, Nov 1, 2017 at 4:50 AM, Damian Guy <damian@gmail.com> wrote:
> >
> >> Hi Jeyhun, thanks, looks good.
> >> Do we need to remove the line that says:
> >>
> >>- on-demand commit() feature
> >>
> >> Cheers,
> >> Damian
> >>
> >> On Tue, 31 Oct 2017 at 23:07 Jeyhun Karimov <je.kari...@gmail.com>
> wrote:
> >>
> >>> Hi,
> >>>
> >>> I removed the 'commit()' feature, as we discussed. It simplified  the
> >>> overall design of KIP a lot.
> >>> If it is ok, I would like to start a VOTE thread.
> >>>
> >>> Cheers,
> >>> Jeyhun
> >>>
> >>> On Fri, Oct 27, 2017 at 5:28 PM Matthias J. Sax <matth...@confluent.io
> >
> >>> wrote:
> >>>
> >>>> Thanks. I understand what you are saying, but I don't agree that
> >>>>
> >>>>> but also we need a commit() method
> >>>>
> >>>> I would just not provide `commit()` at DSL level and close the
> >>>> corresponding Jira as "not a problem" or similar.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 10/27/17 3:42 PM, Jeyhun Karimov wrote:
> >>>>> Hi Matthias,
> >>>>>
> >>>>> Thanks for your comments. I agree that this is not the best way to
> >> do.
> >>> A
> >>>>> bit of history behind this design.
> >>>>>
> >>>>> Prior doing this, I tried to provide ProcessorContext itself as an
> >>>> argument
> >>>>> in Rich interfaces. However, we dont want to give users that
> >>> flexibility
> >>>>> and “power”. Moreover, ProcessorContext contains processor level
> >>>>> information and not Record level info. The only thing we need ij
> >>>>> ProcessorContext is commit() method.
> >>>>>
> >>>>> So, as far as I understood, we need recor context (offset, timestamp
> >>> and
> >>>>> etc) but also we need a commit() method ( we dont want to provide
> >>>>> ProcessorContext as a parameter so users can use
> >>>> ProcessorContext.commit()
> >>>>> ).
> >>>>>
> >>>>> As a result, I thought to “propagate” commit() call from
> >> RecordContext
> >>> to
> >>>>> ProcessorContext() .
> >>>>>
> >>>>>
> >>>>> If there is a misunderstanding in motvation/discussion of
> >> KIP/included
> >>>>> jiras please let me know.
> >>>>>
> >>>>>
> >>>>> Cheers,
> >>>>> Jeyhun
> >>>>>
> >>>>>
> >>>>> On Fri 27. Oct 2017 at 12:39, Matthias J. Sax <matth...@confluent.io
> >>>
> >>>> wrote:
> >>>>>
> >>>>>> I am personally still not convinced, that we should add `commit()`
> >> at
> >>>> all.
> >>>>>>
> >>>>>> @Guozhang: you created the original Jira. Can you elaborate a little
> >>>>>> bit? Isn't requesting commits a low level API that should not be
> >>> exposed
> >>>>>> in the DSL? Just want to understand the motivation better. Why would
> >>>>>> 

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2017-11-06 Thread Jeyhun Karimov
Hi,

Sorry for the late reply. I am convinced that we should enlarge the score
of through() (add more overloads) instead of introducing a separate set of
overloads to other methods.
I will update the KIP soon based on the discussion and inform.


Cheers,
Jeyhun

On Mon, Nov 6, 2017 at 9:18 PM Jan Filipiak <jan.filip...@trivago.com>
wrote:

> Sorry for not beeing 100% up to date.
> Back then we had the discussion that when an operation puts a >Sink<
> into the topology, a >Produced<
> parameter is added. This produced parameter could have internal or
> external. If internal I think the name would still make
> a great suffix for the topic name
>
> Is this plan still around? Otherwise having the name as suffix is
> probably always good it can help the user quicker to identify hot topics
> that need more
> partitions if he has many of these internal repartitions
>
> Best Jan
>
>
> On 06.11.2017 20:13, Matthias J. Sax wrote:
> > I absolute agree with what you say. It's not a requirement to specify a
> > topic name -- and this was the idea -- if user does specify a name, we
> > treat as is -- if users does not specify a name, Streams create an
> > internal topic.
> >
> > The goal of the Jira is to allow a simplified way to control
> > repartitioning (atm, user needs to manually create a topic and use via
> > through()).
> >
> > Thus, the idea is to make the topic name parameter of through optional.
> >
> > It's of course just an idea. Happy do have a other API design. The goal
> > was, to avoid to many new overloads.
> >
> >>> Could you clarify exactly what you mean by keeping the current
> distinction?
> > Current distinction is: user topics are created manually and user
> > specifies the name -- internal topics are created by Kafka Streams and
> > an name is generated automatically.
> >
> > -> through("user-topic")
> > -> through(TopicConfig.withNumberOfPartitions(5)) // Streams creates an
> > internal topic
> >
> >
> > -Matthias
> >
> >
> > On 11/6/17 6:56 PM, Thomas Becker wrote:
> >> Could you clarify exactly what you mean by keeping the current
> distinction?
> >>
> >> Actually, re-reading the KIP and JIRA, it's not clear that being able
> to specify a custom name is actually a requirement. If the goal is to
> control repartitioning and tune parallelism, maybe we can just sidestep
> this issue altogether by removing the ability to set a different name.
> >>
> >> On Mon, 2017-11-06 at 16:51 +0100, Matthias J. Sax wrote:
> >>
> >> That's a good point. In current design, we strictly distinguish both.
> >> For example, the reset tools deletes internal topics (starting with
> >> prefix `-` and ending with either `-repartition` or
> >> `-changelog`.
> >>
> >> Thus, from my point of view, it would make sense to keep the current
> >> distinction.
> >>
> >> -Matthias
> >>
> >> On 11/6/17 4:45 PM, Thomas Becker wrote:
> >>
> >>
> >> I think this sounds good as well. It's worth clarifying whether topics
> that are named by the user but created by streams are considered "internal"
> topics also.
> >>
> >> On Sun, 2017-11-05 at 23:02 +0100, Matthias J. Sax wrote:
> >>
> >> My idea was, to relax the requirement for through() that a topic must be
> >> created manually before startup.
> >>
> >> Thus, if no through() call is made, a (internal) topic is created the
> >> same way we do it currently.
> >>
> >> If one uses `through(String topicName)` we keep the current behavior and
> >> require users to create the topic manually.
> >>
> >> The reasoning is as follows: if a user creates a topic manually, a user
> >> can just use it for repartitioning. As the topic is already there, there
> >> is no need to specify any topic configs.
> >>
> >> We add a new `through()` overload (details TBD) that allows to specify
> >> topic configs and Streams create the topic with those configs.
> >>
> >> Reasoning: user don't want to manage topic manually, thus, it's still an
> >> internal topic and Streams create the topic name automatically as for
> >> all other internal topics. However, users gets some more control about
> >> topic parameters like number of partitions (we should discuss what other
> >> configs would be useful).
> >>
> >>
> >> Does this make sense?
> >>
> >>
> >> -Matthias
> >>
> >>

Re: [VOTE] KIP-159: Introducing Rich functions to Streams

2017-11-06 Thread Jeyhun Karimov
Hi Jan,

Sorry for late reply.


The API Design doesn't look appealing


In terms of API design we tried to preserve the java functional interfaces.
We applied the same set of rich methods for KTable to make it compatible
with the rest of overloaded APIs.

It should be 100% sufficient to offer a KTable + KStream that is directly
> feed from a topic with 1 additional overload for the #map() methods to
> cover every usecase while keeping the API in a way better state.


- IMO this seems a workaround, rather than a direct solution.

Perhaps we should continue this discussion in DISCUSS thread.


Cheers,
Jeyhun


On Mon, Nov 6, 2017 at 9:14 PM Jan Filipiak <jan.filip...@trivago.com>
wrote:

> Hi.
>
> I do understand that it might come in Handy.
>  From my POV in any relational algebra this is only a projection.
> Currently we hide these "fields" that come with the input record.
> It should be 100% sufficient to offer a KTable + KStream that is directly
> feed from a topic with 1 additional overload for the #map() methods to
> cover every usecase while keeping the API in a way better state.
>
> best Jan
>
> On 06.11.2017 17:52, Matthias J. Sax wrote:
> > Jan,
> >
> > I understand what you are saying. However, having a RecordContext is
> > super useful for operations that are applied to input topic. Many users
> > requested this feature -- it's much more convenient that falling back to
> > transform() to implement a a filter() for example that want to access
> > some meta data.
> >
> > Because we cannot distinguish different "origins" of a KStream/KTable, I
> > am not sure if there would be a better way to do this. The only
> > "workaround" I see, is to have two KStream/KTable interfaces each and we
> > would use the first one for KStream/KTable with "proper" RecordContext.
> > But this does not seem to be a good solution either.
> >
> > Note, a KTable can also be read directly from a topic, I agree that
> > using RecordContext on a KTable that is the result of an aggregation is
> > questionable. But I don't see a reason to down vote the KIP for this
> reason.
> >
> > WDYT about this?
> >
> >
> > -Matthias
> >
> > On 11/1/17 10:19 PM, Jan Filipiak wrote:
> >> -1 non binding
> >>
> >> I don't get the motivation.
> >> In 80% of my DSL processors there is no such thing as a reasonable
> >> RecordContext.
> >> After a join  the record I am processing belongs to at least 2 topics.
> >> After a Group by the record I am processing was created from multiple
> >> offsets.
> >>
> >> The API Design doesn't look appealing
> >>
> >> Best Jan
> >>
> >>
> >>
> >> On 01.11.2017 22:02, Jeyhun Karimov wrote:
> >>> Dear community,
> >>>
> >>> It seems the discussion for KIP-159 [1] converged finally. I would
> >>> like to
> >>> initiate voting for the particular KIP.
> >>>
> >>>
> >>>
> >>> [1]
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 159%3A+Introducing+Rich+functions+to+Streams
> >>>
> >>>
> >>> Cheers,
> >>> Jeyhun
> >>>
>
>


Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2017-11-04 Thread Jeyhun Karimov
Hi,


Thanks for your comments.

@Ted

API is given without much javadoc on the role / meaning of method
> parameters.


- I thought they are self-explanatory but I will add some more comments in
the document.

@Matthias

 - how does this relate to `KStream#through()` ?


- The main difference between `KStream#through()` and this KIP is that
the repartition
topic should be created manually (when using `KStream#through()`). However,
it is better if we can handle some part of manual work transparent to the
user.
I thought to add this functionality to `Produced`. However, there was some
conceptual issues.

In all `KStream#through()` method overloads, we state that topics should be
created manually. If we overload another `KStream#through()` method (say,
with different parameters) saying the topic name/partitions is up to the
user, this might seem weird.
Either we can enlarge the scope of `KStream#through()`  or we create new
overloads and keep both (`KStream#through()` and RepartitionHint) of them.



Cheers,
Jeyhun

On Sat, Nov 4, 2017 at 7:22 PM Ted Yu <yuzhih...@gmail.com> wrote:

> w.r.t. KIP-182, KAFKA-5651 has been resolved.
> But the KIP is still labeled Under Discussion.
>
> Should KIP-182 be moved to Adopted state ?
>
> On Sat, Nov 4, 2017 at 10:06 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > Thanks for the KIP.
> >
> > Two comments/questions:
> >
> >  - how does this relate to `KStream#through()` ?
> >  - do we really need to add new overload or can we just extends exiting
> > options (cf KIP-182); it would be create if we could just extend
> > existing classes like `Produced` instead of adding new methods to "main"
> > classes
> >
> > Maybe, we could use `through()` to create a topics in the back ground
> > and picky back all to `Produced`.
> >
> >
> > -Matthias
> >
> > On 11/4/17 4:16 PM, Ted Yu wrote:
> > > API is given without much javadoc on the role / meaning of method
> > > parameters.
> > >
> > > Can you enrich with descriptive javadoc ?
> > >
> > > On Sat, Nov 4, 2017 at 8:01 AM, Jeyhun Karimov <je.kari...@gmail.com>
> > wrote:
> > >
> > >> Dear community,
> > >>
> > >> I would like to initiate discussion on KIP-221 [1] based on issue [2].
> > >> Please feel free to comment.
> > >>
> > >> [1]
> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >> 221%3A+Repartition+Topic+Hints+in+Streams
> > >> [2] https://issues.apache.org/jira/browse/KAFKA-6037
> > >>
> > >>
> > >>
> > >> Cheers,
> > >> Jeyhun
> > >>
> > >
> >
> >
>


[DISCUSS] KIP-221: Repartition Topic Hints in Streams

2017-11-04 Thread Jeyhun Karimov
Dear community,

I would like to initiate discussion on KIP-221 [1] based on issue [2].
Please feel free to comment.

[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Repartition+Topic+Hints+in+Streams
[2] https://issues.apache.org/jira/browse/KAFKA-6037



Cheers,
Jeyhun


[VOTE] KIP-159: Introducing Rich functions to Streams

2017-11-01 Thread Jeyhun Karimov
Dear community,

It seems the discussion for KIP-159 [1] converged finally. I would like to
initiate voting for the particular KIP.



[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams

Cheers,
Jeyhun


Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-10-31 Thread Jeyhun Karimov
Hi,

I removed the 'commit()' feature, as we discussed. It simplified  the
overall design of KIP a lot.
If it is ok, I would like to start a VOTE thread.

Cheers,
Jeyhun

On Fri, Oct 27, 2017 at 5:28 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> Thanks. I understand what you are saying, but I don't agree that
>
> > but also we need a commit() method
>
> I would just not provide `commit()` at DSL level and close the
> corresponding Jira as "not a problem" or similar.
>
>
> -Matthias
>
> On 10/27/17 3:42 PM, Jeyhun Karimov wrote:
> > Hi Matthias,
> >
> > Thanks for your comments. I agree that this is not the best way to do. A
> > bit of history behind this design.
> >
> > Prior doing this, I tried to provide ProcessorContext itself as an
> argument
> > in Rich interfaces. However, we dont want to give users that flexibility
> > and “power”. Moreover, ProcessorContext contains processor level
> > information and not Record level info. The only thing we need ij
> > ProcessorContext is commit() method.
> >
> > So, as far as I understood, we need recor context (offset, timestamp and
> > etc) but also we need a commit() method ( we dont want to provide
> > ProcessorContext as a parameter so users can use
> ProcessorContext.commit()
> > ).
> >
> > As a result, I thought to “propagate” commit() call from RecordContext to
> > ProcessorContext() .
> >
> >
> > If there is a misunderstanding in motvation/discussion of KIP/included
> > jiras please let me know.
> >
> >
> > Cheers,
> > Jeyhun
> >
> >
> > On Fri 27. Oct 2017 at 12:39, Matthias J. Sax <matth...@confluent.io>
> wrote:
> >
> >> I am personally still not convinced, that we should add `commit()` at
> all.
> >>
> >> @Guozhang: you created the original Jira. Can you elaborate a little
> >> bit? Isn't requesting commits a low level API that should not be exposed
> >> in the DSL? Just want to understand the motivation better. Why would
> >> anybody that uses the DSL ever want to request a commit? To me,
> >> requesting commits is useful if you manipulated state explicitly, ie,
> >> via Processor API.
> >>
> >> Also, for the solution: it seem rather unnatural to me, that we add
> >> `commit()` to `RecordContext` -- from my understanding, `RecordContext`
> >> is an helper object that provide access to record meta data. Requesting
> >> a commit is something quite different. Additionally, a commit does not
> >> commit a specific record but a `RecrodContext` is for a specific record.
> >>
> >> To me, this does not seem to be a sound API design if we follow this
> path.
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 10/26/17 10:41 PM, Jeyhun Karimov wrote:
> >>> Hi,
> >>>
> >>> Thanks for your suggestions.
> >>>
> >>> I have some comments, to make sure that there is no misunderstanding.
> >>>
> >>>
> >>> 1. Maybe we can deprecate the `commit()` in ProcessorContext, to
> enforce
> >>>> user to consolidate this call as
> >>>> "processorContext.recordContext().commit()". And internal
> implementation
> >>>> of
> >>>> `ProcessorContext.commit()` in `ProcessorContextImpl` is also changed
> to
> >>>> this call.
> >>>
> >>>
> >>> - I think we should not deprecate `ProcessorContext.commit()`. The main
> >>> intuition that we introduce `commit()` in `RecordContext` is that,
> >>> `RecordContext` is the one which is provided in Rich interfaces. So if
> >> user
> >>> wants to commit, then there should be some method inside
> `RecordContext`
> >> to
> >>> do so. Internally, `RecordContext.commit()` calls
> >>> `ProcessorContext.commit()`  (see the last code snippet in KIP-159):
> >>>
> >>> @Override
> >>> public void process(final K1 key, final V1 value) {
> >>>
> >>> recordContext = new RecordContext() {   //
> >>> recordContext initialization is added in this KIP
> >>> @Override
> >>> public void commit() {
> >>> context().commit();
> >>> }
> >>>
> >>> @Override
> >>> public long offset() {
> >>> return contex

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-10-27 Thread Jeyhun Karimov
Hi Matthias,

Thanks for your comments. I agree that this is not the best way to do. A
bit of history behind this design.

Prior doing this, I tried to provide ProcessorContext itself as an argument
in Rich interfaces. However, we dont want to give users that flexibility
and “power”. Moreover, ProcessorContext contains processor level
information and not Record level info. The only thing we need ij
ProcessorContext is commit() method.

So, as far as I understood, we need recor context (offset, timestamp and
etc) but also we need a commit() method ( we dont want to provide
ProcessorContext as a parameter so users can use ProcessorContext.commit()
).

As a result, I thought to “propagate” commit() call from RecordContext to
ProcessorContext() .


If there is a misunderstanding in motvation/discussion of KIP/included
jiras please let me know.


Cheers,
Jeyhun


On Fri 27. Oct 2017 at 12:39, Matthias J. Sax <matth...@confluent.io> wrote:

> I am personally still not convinced, that we should add `commit()` at all.
>
> @Guozhang: you created the original Jira. Can you elaborate a little
> bit? Isn't requesting commits a low level API that should not be exposed
> in the DSL? Just want to understand the motivation better. Why would
> anybody that uses the DSL ever want to request a commit? To me,
> requesting commits is useful if you manipulated state explicitly, ie,
> via Processor API.
>
> Also, for the solution: it seem rather unnatural to me, that we add
> `commit()` to `RecordContext` -- from my understanding, `RecordContext`
> is an helper object that provide access to record meta data. Requesting
> a commit is something quite different. Additionally, a commit does not
> commit a specific record but a `RecrodContext` is for a specific record.
>
> To me, this does not seem to be a sound API design if we follow this path.
>
>
> -Matthias
>
>
>
> On 10/26/17 10:41 PM, Jeyhun Karimov wrote:
> > Hi,
> >
> > Thanks for your suggestions.
> >
> > I have some comments, to make sure that there is no misunderstanding.
> >
> >
> > 1. Maybe we can deprecate the `commit()` in ProcessorContext, to enforce
> >> user to consolidate this call as
> >> "processorContext.recordContext().commit()". And internal implementation
> >> of
> >> `ProcessorContext.commit()` in `ProcessorContextImpl` is also changed to
> >> this call.
> >
> >
> > - I think we should not deprecate `ProcessorContext.commit()`. The main
> > intuition that we introduce `commit()` in `RecordContext` is that,
> > `RecordContext` is the one which is provided in Rich interfaces. So if
> user
> > wants to commit, then there should be some method inside `RecordContext`
> to
> > do so. Internally, `RecordContext.commit()` calls
> > `ProcessorContext.commit()`  (see the last code snippet in KIP-159):
> >
> > @Override
> > public void process(final K1 key, final V1 value) {
> >
> > recordContext = new RecordContext() {   //
> > recordContext initialization is added in this KIP
> > @Override
> > public void commit() {
> > context().commit();
> > }
> >
> > @Override
> > public long offset() {
> > return context().recordContext().offset();
> > }
> >
> > @Override
> > public long timestamp() {
> > return context().recordContext().timestamp();
> > }
> >
> > @Override
> > public String topic() {
> > return context().recordContext().topic();
> > }
> >
> > @Override
> > public int partition() {
> > return context().recordContext().partition();
> > }
> >   };
> >
> >
> > So, we cannot deprecate `ProcessorContext.commit()` in this case IMO.
> >
> >
> > 2. Add the `task` reference to the impl class, `ProcessorRecordContext`,
> so
> >> that it can implement the commit call itself.
> >
> >
> > - Actually, I don't think that we need `commit()` in
> > `ProcessorRecordContext`. The main intuition is to "transfer"
> > `ProcessorContext.commit()` call to Rich interfaces, to support
> > user-specific committing.
> >  To do so, we introduce `commit()` method in `RecordContext()` just only
> to
> > call ProcessorContext.commit() inside. (see the above code snippet)
> > So, in Rich interfaces, we are not dealing with  `ProcessorRecordContext`
> > at all, and we leave all its 

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-10-26 Thread Jeyhun Karimov
Hi,

Thanks for your suggestions.

I have some comments, to make sure that there is no misunderstanding.


1. Maybe we can deprecate the `commit()` in ProcessorContext, to enforce
> user to consolidate this call as
> "processorContext.recordContext().commit()". And internal implementation
> of
> `ProcessorContext.commit()` in `ProcessorContextImpl` is also changed to
> this call.


- I think we should not deprecate `ProcessorContext.commit()`. The main
intuition that we introduce `commit()` in `RecordContext` is that,
`RecordContext` is the one which is provided in Rich interfaces. So if user
wants to commit, then there should be some method inside `RecordContext` to
do so. Internally, `RecordContext.commit()` calls
`ProcessorContext.commit()`  (see the last code snippet in KIP-159):

@Override
public void process(final K1 key, final V1 value) {

recordContext = new RecordContext() {   //
recordContext initialization is added in this KIP
@Override
public void commit() {
context().commit();
}

@Override
public long offset() {
return context().recordContext().offset();
}

@Override
public long timestamp() {
return context().recordContext().timestamp();
}

@Override
public String topic() {
return context().recordContext().topic();
}

@Override
public int partition() {
return context().recordContext().partition();
}
  };


So, we cannot deprecate `ProcessorContext.commit()` in this case IMO.


2. Add the `task` reference to the impl class, `ProcessorRecordContext`, so
> that it can implement the commit call itself.


- Actually, I don't think that we need `commit()` in
`ProcessorRecordContext`. The main intuition is to "transfer"
`ProcessorContext.commit()` call to Rich interfaces, to support
user-specific committing.
 To do so, we introduce `commit()` method in `RecordContext()` just only to
call ProcessorContext.commit() inside. (see the above code snippet)
So, in Rich interfaces, we are not dealing with  `ProcessorRecordContext`
at all, and we leave all its methods as it is.
In this KIP, we made `RecordContext` to be the parent class of
`ProcessorRecordContext`, just because of they share quite amount of
methods and it is logical to enable inheritance between those two.

3. In the wiki page, the statement that "However, call to a commit() method,
> is valid only within RecordContext interface (at least for now), we throw
> an exception in ProcessorRecordContext.commit()." and the code snippet
> below would need to be updated as well.


- I think above explanation covers this as well.


I want to gain some speed to this KIP, as it has gone though many changes
based on user/developer needs, both in documentation-/implementation-wise.


Cheers,
Jeyhun



On Tue, Oct 24, 2017 at 1:41 AM Guozhang Wang <wangg...@gmail.com> wrote:

> Thanks for the information Jeyhun. I had also forgot about KAFKA-3907 with
> this KIP..
>
> Thinking a bit more, I'm now inclined to go with what we agreed before, to
> add the commit() call to `RecordContext`. A few minor tweaks on its
> implementation:
>
> 1. Maybe we can deprecate the `commit()` in ProcessorContext, to enforce
> user to consolidate this call as
> "processorContext.recordContext().commit()". And internal implementation
> of
> `ProcessorContext.commit()` in `ProcessorContextImpl` is also changed to
> this call.
>
> 2. Add the `task` reference to the impl class, `ProcessorRecordContext`, so
> that it can implement the commit call itself.
>
> 3. In the wiki page, the statement that "However, call to a commit()
> method,
> is valid only within RecordContext interface (at least for now), we throw
> an exception in ProcessorRecordContext.commit()." and the code snippet
> below would need to be updated as well.
>
>
> Guozhang
>
>
>
> On Mon, Oct 23, 2017 at 1:40 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > Fair point. This is a long discussion and I totally forgot that we
> > discussed this.
> >
> > Seems I changed my opinion about including KAFKA-3907...
> >
> > Happy to hear what others think.
> >
> >
> > -Matthias
> >
> > On 10/23/17 1:20 PM, Jeyhun Karimov wrote:
> > > Hi Matthias,
> > >
> > > It is probably my bad, the discussion was a bit long in this thread. I
> > > proposed the related issue in the related KIP discuss thread [1] and
> got
> > an
> > > approval [2,3].
> > > Maybe I misunderstood.
> > >
> > > [1]
> > > http://sear

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-10-23 Thread Jeyhun Karimov
Hi Matthias,

It is probably my bad, the discussion was a bit long in this thread. I
proposed the related issue in the related KIP discuss thread [1] and got an
approval [2,3].
Maybe I misunderstood.

[1]
http://search-hadoop.com/m/Kafka/uyzND19Asmg1GKKXT1?subj=Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+Streams
[2]
http://search-hadoop.com/m/Kafka/uyzND1kpct22GKKXT1?subj=Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+Streams
[3]
http://search-hadoop.com/m/Kafka/uyzND1G6TGIGKKXT1?subj=Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+Streams


On Mon, Oct 23, 2017 at 8:44 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> Interesting.
>
> I thought that https://issues.apache.org/jira/browse/KAFKA-4125 is the
> main motivation for this KIP :)
>
> I also think, that we should not expose the full ProcessorContext at DSL
> level.
>
> Thus, overall I am not even sure if we should fix KAFKA-3907 at all.
> Manual commits are something DSL users should not worry about -- and if
> one really needs this, an advanced user can still insert a dummy
> `transform` to request a commit from there.
>
> -Matthias
>
>
> On 10/18/17 5:39 AM, Jeyhun Karimov wrote:
> > Hi,
> >
> > The main intuition is to solve [1], which is part of this KIP.
> > I agree with you that this might not seem semantically correct as we are
> > not committing record state.
> > Alternatively, we can remove commit() from RecordContext and add
> > ProcessorContext (which has commit() method) as an extra argument to Rich
> > methods:
> >
> > instead of
> > public interface RichValueMapper<V, VR, K> {
> > VR apply(final V value,
> >  final K key,
> >  final RecordContext recordContext);
> > }
> >
> > we can adopt
> >
> > public interface RichValueMapper<V, VR, K> {
> > VR apply(final V value,
> >  final K key,
> >  final RecordContext recordContext,
> >  final ProcessorContext processorContext);
> > }
> >
> >
> > However, in this case, a user can get confused as ProcessorContext and
> > RecordContext share some methods with the same name.
> >
> >
> > Cheers,
> > Jeyhun
> >
> >
> > [1] https://issues.apache.org/jira/browse/KAFKA-3907
> >
> >
> > On Tue, Oct 17, 2017 at 3:19 AM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> >> Regarding #6 above, I'm still not clear why we would need `commit()` in
> >> both ProcessorContext and RecordContext, could you elaborate a bit more?
> >>
> >> To me `commit()` is really a processor context not a record context
> >> logically: when you call that function, it means we would commit the
> state
> >> of the whole task up to this processed record, not only that single
> record
> >> itself.
> >>
> >>
> >> Guozhang
> >>
> >> On Mon, Oct 16, 2017 at 9:19 AM, Jeyhun Karimov <je.kari...@gmail.com>
> >> wrote:
> >>
> >>> Hi,
> >>>
> >>> Thanks for the feedback.
> >>>
> >>>
> >>> 0. RichInitializer definition seems missing.
> >>>
> >>>
> >>>
> >>> - Fixed.
> >>>
> >>>
> >>>  I'd suggest moving the key parameter in the RichValueXX and
> RichReducer
> >>>> after the value parameters, as well as in the templates; e.g.
> >>>> public interface RichValueJoiner<V1, V2, VR, K> {
> >>>> VR apply(final V1 value1, final V2 value2, final K key, final
> >>>> RecordContext
> >>>> recordContext);
> >>>> }
> >>>
> >>>
> >>>
> >>> - Fixed.
> >>>
> >>>
> >>> 2. Some of the listed functions are not necessary since their pairing
> >> APIs
> >>>> are being deprecated in 1.0 already:
> >>>>  KGroupedStream<KR, V> groupBy(final RichKeyValueMapper >> ?
> >>>> super V, KR> selector,
> >>>>final Serde keySerde,
> >>>>final Serde valSerde);
> >>>> <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
> >>>>  final RichValueJoiner >> super
> >>>> V,
> >>>> ? super VT, ? extends VR> joiner,
> >>>>  final Serde keySerde,
> >>>&

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-10-18 Thread Jeyhun Karimov
Hi,

The main intuition is to solve [1], which is part of this KIP.
I agree with you that this might not seem semantically correct as we are
not committing record state.
Alternatively, we can remove commit() from RecordContext and add
ProcessorContext (which has commit() method) as an extra argument to Rich
methods:

instead of
public interface RichValueMapper<V, VR, K> {
VR apply(final V value,
 final K key,
 final RecordContext recordContext);
}

we can adopt

public interface RichValueMapper<V, VR, K> {
VR apply(final V value,
 final K key,
 final RecordContext recordContext,
 final ProcessorContext processorContext);
}


However, in this case, a user can get confused as ProcessorContext and
RecordContext share some methods with the same name.


Cheers,
Jeyhun


[1] https://issues.apache.org/jira/browse/KAFKA-3907


On Tue, Oct 17, 2017 at 3:19 AM Guozhang Wang <wangg...@gmail.com> wrote:

> Regarding #6 above, I'm still not clear why we would need `commit()` in
> both ProcessorContext and RecordContext, could you elaborate a bit more?
>
> To me `commit()` is really a processor context not a record context
> logically: when you call that function, it means we would commit the state
> of the whole task up to this processed record, not only that single record
> itself.
>
>
> Guozhang
>
> On Mon, Oct 16, 2017 at 9:19 AM, Jeyhun Karimov <je.kari...@gmail.com>
> wrote:
>
> > Hi,
> >
> > Thanks for the feedback.
> >
> >
> > 0. RichInitializer definition seems missing.
> >
> >
> >
> > - Fixed.
> >
> >
> >  I'd suggest moving the key parameter in the RichValueXX and RichReducer
> > > after the value parameters, as well as in the templates; e.g.
> > > public interface RichValueJoiner<V1, V2, VR, K> {
> > > VR apply(final V1 value1, final V2 value2, final K key, final
> > > RecordContext
> > > recordContext);
> > > }
> >
> >
> >
> > - Fixed.
> >
> >
> > 2. Some of the listed functions are not necessary since their pairing
> APIs
> > > are being deprecated in 1.0 already:
> > >  KGroupedStream<KR, V> groupBy(final RichKeyValueMapper ?
> > > super V, KR> selector,
> > >final Serde keySerde,
> > >final Serde valSerde);
> > > <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
> > >  final RichValueJoiner super
> > > V,
> > > ? super VT, ? extends VR> joiner,
> > >  final Serde keySerde,
> > >  final Serde valSerde);
> >
> >
> > -Fixed
> >
> > 3. For a few functions where we are adding three APIs for a combo of both
> > > mapper / joiner, or both initializer / aggregator, or adder /
> subtractor,
> > > I'm wondering if we can just keep one that use "rich" functions for
> both;
> > > so that we can have less overloads and let users who only want to
> access
> > > one of them to just use dummy parameter declarations. For example:
> > >
> > > <GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV>
> globalKTable,
> > >  final RichKeyValueMapper > > super
> > >  V, ? extends GK> keyValueMapper,
> > >  final RichValueJoiner super
> > > V,
> > > ? super GV, ? extends RV> joiner);
> >
> >
> >
> > -Agreed. Fixed.
> >
> >
> > 4. For TimeWindowedKStream, I'm wondering why we do not make its
> > > Initializer also "rich" functions? I.e.
> >
> >
> > - It was a typo. Fixed.
> >
> >
> > 5. We need to move "RecordContext" from o.a.k.processor.internals to
> > > o.a.k.processor.
> > >
> > > 6. I'm not clear why we want to move `commit()` from ProcessorContext
> to
> > > RecordContext?
> > >
> >
> > -
> > Because it makes sense logically and  to reduce code maintenance (both
> > interfaces have offset() timestamp() topic() partition() methods),  I
> > inherit ProcessorContext from RecordContext.
> > Since we need commit() method both in ProcessorContext and in
> RecordContext
> > I move commit() method to parent class (RecordContext).
> >
> >
> > Cheers,
> > Jeyhun
> >
> >
> >
> > On Wed, Oct 11, 2017 at 12:59 AM, Guozhang Wa

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-10-16 Thread Jeyhun Karimov
tor
> aggregator,
>  final Materialized<K, VR, KeyValueStore<Bytes,
> byte[]>> materialized);
>
> Similarly for KGroupedTable, a bunch of aggregate() are deprecated so we do
> not need to add its rich functions any more.
>
>
> 4. For TimeWindowedKStream, I'm wondering why we do not make its
> Initializer also "rich" functions? I.e.
>
>  KTable<Windowed, VR> aggregate(final RichInitializer<VR, K>
> initializer,
>final RichAggregator super V, VR> aggregator);
>  KTable<Windowed, VR> aggregate(final RichInitializer<VR, K>
> initializer,
>final RichAggregator super V, VR> aggregator,
>final Materialized<K, VR,
> WindowStore<Bytes, byte[]>> materialized);
>
>
> 5. We need to move "RecordContext" from o.a.k.processor.internals to
> o.a.k.processor.
>
> 6. I'm not clear why we want to move `commit()` from ProcessorContext to
> RecordContext? Conceptually I think it would better staying in the
> ProcessorContext. Do you find this not doable in the internal
> implementations?
>
>
> Guozhang
>
>
>
> On Fri, Sep 22, 2017 at 1:09 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
> >recordContext = new RecordContext() {   // recordContext
> > initialization is added in this KIP
> >
> > This code snippet seems to be standard - would it make sense to pull it
> > into a (sample) RecordContext implementation ?
> >
> > Cheers
> >
> > On Fri, Sep 22, 2017 at 12:14 PM, Jeyhun Karimov <je.kari...@gmail.com>
> > wrote:
> >
> > > Hi Ted,
> > >
> > > Thanks for your comments. I added a couple of comments in KIP to
> clarify
> > > some points.
> > >
> > >
> > > bq. provides a hybrd solution
> > > > Typo in hybrid.
> > >
> > >
> > > - My bad. Thanks for the correction.
> > >
> > > It would be nice if you can name some Value operator as examples.
> > >
> > >
> > > >
> > > - I added the corresponding interface names to KIP.
> > >
> > >
> > >  KTable<K, VR> aggregate(final Initializer initializer,
> > > >  final Aggregator VR>
> > > > adder,
> > > > The adder doesn't need to be RichAggregator ?
> > >
> > >
> > >
> > > - Exactly. However, there are 2 Aggregator-type arguments in the
> related
> > > method. So, I had to overload all possible their Rich counterparts:
> > >
> > > // adder with non-rich, subtrctor is rich
> > >  KTable<K, VR> aggregate(final Initializer initializer,
> > >  final Aggregator
> > > adder,
> > >  final RichAggregator > VR>
> > > subtractor,
> > >  final Materialized<K, VR,
> > KeyValueStore<Bytes,
> > > byte[]>> materialized);
> > >
> > > // adder withrich, subtrctor is non-rich
> > >  KTable<K, VR> aggregate(final Initializer initializer,
> > >  final RichAggregator > VR>
> > > adder,
> > >  final Aggregator
> > > subtractor,
> > >  final Materialized<K, VR,
> > KeyValueStore<Bytes,
> > > byte[]>> materialized);
> > >
> > > // both adder and subtractor are rich
> > >  KTable<K, VR> aggregate(final Initializer initializer,
> > >  final RichAggregator > VR>
> > > adder,
> > >  final RichAggregator > VR>
> > > subtractor,
> > >  final Materialized<K, VR,
> > KeyValueStore<Bytes,
> > > byte[]>> materialized);
> > >
> > >
> > > Can you explain a bit about the above implementation ?
> > > >void commit () {
> > > >  throw new UnsupportedOperationException("commit() is not
> > supported
> > > in
> > > > this context");
> > > > Is the exception going to be replaced with real code in the PR ?
> > >
> > >
> > >
> > > - I added some comments both inside and outside the code snippets in
> KIP.
> > > Specifically, for the code snippet above, we add *commit()* method to
>

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-09-22 Thread Jeyhun Karimov
Hi Ted,

Thanks for your comments. I added a couple of comments in KIP to clarify
some points.


bq. provides a hybrd solution
> Typo in hybrid.


- My bad. Thanks for the correction.

It would be nice if you can name some Value operator as examples.


>
- I added the corresponding interface names to KIP.


 KTable<K, VR> aggregate(final Initializer initializer,
>  final Aggregator
> adder,
> The adder doesn't need to be RichAggregator ?



- Exactly. However, there are 2 Aggregator-type arguments in the related
method. So, I had to overload all possible their Rich counterparts:

// adder with non-rich, subtrctor is rich
 KTable<K, VR> aggregate(final Initializer initializer,
 final Aggregator
adder,
 final RichAggregator
subtractor,
 final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized);

// adder withrich, subtrctor is non-rich
 KTable<K, VR> aggregate(final Initializer initializer,
 final RichAggregator
adder,
 final Aggregator
subtractor,
 final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized);

// both adder and subtractor are rich
 KTable<K, VR> aggregate(final Initializer initializer,
 final RichAggregator
adder,
 final RichAggregator
subtractor,
 final Materialized<K, VR, KeyValueStore<Bytes,
byte[]>> materialized);


Can you explain a bit about the above implementation ?
>void commit () {
>  throw new UnsupportedOperationException("commit() is not supported in
> this context");
> Is the exception going to be replaced with real code in the PR ?



- I added some comments both inside and outside the code snippets in KIP.
Specifically, for the code snippet above, we add *commit()* method to
*RecordContext* interface.
However, we want  *commit()* method to be used only for *RecordContext*
instances (at least for now), so we add UnsupportedOperationException in
all classes/interfaces that extend/implement *RecordContext.*
In general, 1) we make RecordContext publicly available within
ProcessorContext,  2) initialize its instance within all required
Processors and 3) pass it as an argument to the related Rich interfaces
inside Processors.




Cheers,
Jeyhun

On Fri, Sep 22, 2017 at 6:44 PM Ted Yu <yuzhih...@gmail.com> wrote:

> bq. provides a hybrd solution
>
> Typo in hybrid.
>
> bq. accessing read-only keys within XXXValues operators
>
> It would be nice if you can name some Value operator as examples.
>
>  KTable<K, VR> aggregate(final Initializer initializer,
>  final Aggregator
> adder,
>
> The adder doesn't need to be RichAggregator ?
>
>   public RecordContext recordContext() {
> return this.recordContext();
>
> Can you explain a bit about the above implementation ?
>
>void commit () {
>  throw new UnsupportedOperationException("commit() is not supported in
> this context");
>
> Is the exception going to be replaced with real code in the PR ?
>
> Cheers
>
>
> On Fri, Sep 22, 2017 at 9:28 AM, Jeyhun Karimov <je.kari...@gmail.com>
> wrote:
>
> > Dear community,
> >
> > I updated the related KIP [1]. Please feel free to comment.
> >
> > Cheers,
> > Jeyhun
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 159%3A+Introducing+Rich+functions+to+Streams
> >
> >
> >
> >
> > On Fri, Sep 22, 2017 at 12:20 AM Jeyhun Karimov <je.kari...@gmail.com>
> > wrote:
> >
> > > Hi Damian,
> > >
> > > Thanks for the update. I working on it and will provide an update soon.
> > >
> > > Cheers,
> > > Jeyhun
> > >
> > > On Thu, Sep 21, 2017 at 4:50 PM Damian Guy <damian@gmail.com>
> wrote:
> > >
> > >> Hi Jeyhun,
> > >>
> > >> All KIP-182 API PRs have now been merged. So you can consider it as
> > >> stable.
> > >> Thanks,
> > >> Damian
> > >>
> > >> On Thu, 21 Sep 2017 at 15:23 Jeyhun Karimov <je.kari...@gmail.com>
> > wrote:
> > >>
> > >> > Hi all,
> > >> >
> > >> > Thanks a lot for your comments. For the single interface (RichXXX
> and
> > >> > XXXWithKey) solution, I have already submitted a PR but probably it
> is
> > >> > outdated (when the KIP first proposed), I need to revisit that one.
> > >> >
> > >> 

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-09-22 Thread Jeyhun Karimov
Dear community,

I updated the related KIP [1]. Please feel free to comment.

Cheers,
Jeyhun

[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams




On Fri, Sep 22, 2017 at 12:20 AM Jeyhun Karimov <je.kari...@gmail.com>
wrote:

> Hi Damian,
>
> Thanks for the update. I working on it and will provide an update soon.
>
> Cheers,
> Jeyhun
>
> On Thu, Sep 21, 2017 at 4:50 PM Damian Guy <damian@gmail.com> wrote:
>
>> Hi Jeyhun,
>>
>> All KIP-182 API PRs have now been merged. So you can consider it as
>> stable.
>> Thanks,
>> Damian
>>
>> On Thu, 21 Sep 2017 at 15:23 Jeyhun Karimov <je.kari...@gmail.com> wrote:
>>
>> > Hi all,
>> >
>> > Thanks a lot for your comments. For the single interface (RichXXX and
>> > XXXWithKey) solution, I have already submitted a PR but probably it is
>> > outdated (when the KIP first proposed), I need to revisit that one.
>> >
>> > @Guozhang, from our (offline) discussion, I understood that we may not
>> make
>> > it merge this KIP into the upcoming release, as KIP-159 is not voted yet
>> > (because we want both KIP-149 and KIP-159 to be as an "atomic" merge).
>> So
>> > I decided to wait until KIP-182 gets stable (there are some minor
>> updates
>> > AFAIK) and update the KIP accordingly. Please correct me if I am wrong
>> or I
>> > misunderstood.
>> >
>> > Cheers,
>> > Jeyhun
>> >
>> >
>> > On Thu, Sep 21, 2017 at 4:11 PM Damian Guy <damian@gmail.com>
>> wrote:
>> >
>> > > +1
>> > >
>> > > On Thu, 21 Sep 2017 at 13:46 Guozhang Wang <wangg...@gmail.com>
>> wrote:
>> > >
>> > > > +1 for me as well for collapsing.
>> > > >
>> > > > Jeyhun, could you update the wiki accordingly to show what's the
>> final
>> > > > updates post KIP-182 that needs to be done in KIP-159 including
>> > KIP-149?
>> > > > The child page I made is just a suggestion, but you would still
>> need to
>> > > > update your proposal for people to comment and vote on.
>> > > >
>> > > >
>> > > > Guozhang
>> > > >
>> > > >
>> > > > On Thu, Sep 14, 2017 at 10:37 PM, Ted Yu <yuzhih...@gmail.com>
>> wrote:
>> > > >
>> > > > > +1
>> > > > >
>> > > > > One interface is cleaner.
>> > > > >
>> > > > > On Thu, Sep 14, 2017 at 7:26 AM, Bill Bejeck <bbej...@gmail.com>
>> > > wrote:
>> > > > >
>> > > > > > +1 for me on collapsing the Rich and ValueWithKey
>> > interfaces
>> > > > > into 1
>> > > > > > interface.
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Bill
>> > > > > >
>> > > > > > On Wed, Sep 13, 2017 at 11:31 AM, Jeyhun Karimov <
>> > > je.kari...@gmail.com
>> > > > >
>> > > > > > wrote:
>> > > > > >
>> > > > > > > Hi Damian,
>> > > > > > >
>> > > > > > > Thanks for your feedback. Actually, this (what you propose)
>> was
>> > the
>> > > > > first
>> > > > > > > idea of KIP-149. Then we decided to divide it into two KIPs. I
>> > also
>> > > > > > > expressed my opinion that keeping the two interfaces (Rich and
>> > > > withKey)
>> > > > > > > separate would add more overloads. So, email discussion
>> resulted
>> > > that
>> > > > > > this
>> > > > > > > would not be a problem.
>> > > > > > >
>> > > > > > > Our initial idea was similar to :
>> > > > > > >
>> > > > > > > public abstract class RichValueMapper<K, V, VR>  implements
>> > > > > > > ValueMapperWithKey<K, V, VR>, RichFunction {
>> > > > > > > ..
>> > > > > > > }
>> > > > > > >
>> > > > > > >
>> > > > > > > So, we check the type of object, whether it is RichXXX or
>> > > XXXWithKey
>> > > &

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-09-21 Thread Jeyhun Karimov
Hi Damian,

Thanks for the update. I working on it and will provide an update soon.

Cheers,
Jeyhun

On Thu, Sep 21, 2017 at 4:50 PM Damian Guy <damian@gmail.com> wrote:

> Hi Jeyhun,
>
> All KIP-182 API PRs have now been merged. So you can consider it as stable.
> Thanks,
> Damian
>
> On Thu, 21 Sep 2017 at 15:23 Jeyhun Karimov <je.kari...@gmail.com> wrote:
>
> > Hi all,
> >
> > Thanks a lot for your comments. For the single interface (RichXXX and
> > XXXWithKey) solution, I have already submitted a PR but probably it is
> > outdated (when the KIP first proposed), I need to revisit that one.
> >
> > @Guozhang, from our (offline) discussion, I understood that we may not
> make
> > it merge this KIP into the upcoming release, as KIP-159 is not voted yet
> > (because we want both KIP-149 and KIP-159 to be as an "atomic" merge).
> So
> > I decided to wait until KIP-182 gets stable (there are some minor updates
> > AFAIK) and update the KIP accordingly. Please correct me if I am wrong
> or I
> > misunderstood.
> >
> > Cheers,
> > Jeyhun
> >
> >
> > On Thu, Sep 21, 2017 at 4:11 PM Damian Guy <damian@gmail.com> wrote:
> >
> > > +1
> > >
> > > On Thu, 21 Sep 2017 at 13:46 Guozhang Wang <wangg...@gmail.com> wrote:
> > >
> > > > +1 for me as well for collapsing.
> > > >
> > > > Jeyhun, could you update the wiki accordingly to show what's the
> final
> > > > updates post KIP-182 that needs to be done in KIP-159 including
> > KIP-149?
> > > > The child page I made is just a suggestion, but you would still need
> to
> > > > update your proposal for people to comment and vote on.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Thu, Sep 14, 2017 at 10:37 PM, Ted Yu <yuzhih...@gmail.com>
> wrote:
> > > >
> > > > > +1
> > > > >
> > > > > One interface is cleaner.
> > > > >
> > > > > On Thu, Sep 14, 2017 at 7:26 AM, Bill Bejeck <bbej...@gmail.com>
> > > wrote:
> > > > >
> > > > > > +1 for me on collapsing the Rich and ValueWithKey
> > interfaces
> > > > > into 1
> > > > > > interface.
> > > > > >
> > > > > > Thanks,
> > > > > > Bill
> > > > > >
> > > > > > On Wed, Sep 13, 2017 at 11:31 AM, Jeyhun Karimov <
> > > je.kari...@gmail.com
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Damian,
> > > > > > >
> > > > > > > Thanks for your feedback. Actually, this (what you propose) was
> > the
> > > > > first
> > > > > > > idea of KIP-149. Then we decided to divide it into two KIPs. I
> > also
> > > > > > > expressed my opinion that keeping the two interfaces (Rich and
> > > > withKey)
> > > > > > > separate would add more overloads. So, email discussion
> resulted
> > > that
> > > > > > this
> > > > > > > would not be a problem.
> > > > > > >
> > > > > > > Our initial idea was similar to :
> > > > > > >
> > > > > > > public abstract class RichValueMapper<K, V, VR>  implements
> > > > > > > ValueMapperWithKey<K, V, VR>, RichFunction {
> > > > > > > ..
> > > > > > > }
> > > > > > >
> > > > > > >
> > > > > > > So, we check the type of object, whether it is RichXXX or
> > > XXXWithKey
> > > > > > inside
> > > > > > > the called method and continue accordingly.
> > > > > > >
> > > > > > > If this is ok with the community, I would like to revert the
> > > current
> > > > > > design
> > > > > > > to this again.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Jeyhun
> > > > > > >
> > > > > > > On Wed, Sep 13, 2017 at 3:02 PM Damian Guy <
> damian@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi Jeyhun,
> > > > >

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-09-21 Thread Jeyhun Karimov
Hi all,

Thanks a lot for your comments. For the single interface (RichXXX and
XXXWithKey) solution, I have already submitted a PR but probably it is
outdated (when the KIP first proposed), I need to revisit that one.

@Guozhang, from our (offline) discussion, I understood that we may not make
it merge this KIP into the upcoming release, as KIP-159 is not voted yet
(because we want both KIP-149 and KIP-159 to be as an "atomic" merge).  So
I decided to wait until KIP-182 gets stable (there are some minor updates
AFAIK) and update the KIP accordingly. Please correct me if I am wrong or I
misunderstood.

Cheers,
Jeyhun


On Thu, Sep 21, 2017 at 4:11 PM Damian Guy <damian@gmail.com> wrote:

> +1
>
> On Thu, 21 Sep 2017 at 13:46 Guozhang Wang <wangg...@gmail.com> wrote:
>
> > +1 for me as well for collapsing.
> >
> > Jeyhun, could you update the wiki accordingly to show what's the final
> > updates post KIP-182 that needs to be done in KIP-159 including KIP-149?
> > The child page I made is just a suggestion, but you would still need to
> > update your proposal for people to comment and vote on.
> >
> >
> > Guozhang
> >
> >
> > On Thu, Sep 14, 2017 at 10:37 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> >
> > > +1
> > >
> > > One interface is cleaner.
> > >
> > > On Thu, Sep 14, 2017 at 7:26 AM, Bill Bejeck <bbej...@gmail.com>
> wrote:
> > >
> > > > +1 for me on collapsing the Rich and ValueWithKey interfaces
> > > into 1
> > > > interface.
> > > >
> > > > Thanks,
> > > > Bill
> > > >
> > > > On Wed, Sep 13, 2017 at 11:31 AM, Jeyhun Karimov <
> je.kari...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Hi Damian,
> > > > >
> > > > > Thanks for your feedback. Actually, this (what you propose) was the
> > > first
> > > > > idea of KIP-149. Then we decided to divide it into two KIPs. I also
> > > > > expressed my opinion that keeping the two interfaces (Rich and
> > withKey)
> > > > > separate would add more overloads. So, email discussion resulted
> that
> > > > this
> > > > > would not be a problem.
> > > > >
> > > > > Our initial idea was similar to :
> > > > >
> > > > > public abstract class RichValueMapper<K, V, VR>  implements
> > > > > ValueMapperWithKey<K, V, VR>, RichFunction {
> > > > > ..
> > > > > }
> > > > >
> > > > >
> > > > > So, we check the type of object, whether it is RichXXX or
> XXXWithKey
> > > > inside
> > > > > the called method and continue accordingly.
> > > > >
> > > > > If this is ok with the community, I would like to revert the
> current
> > > > design
> > > > > to this again.
> > > > >
> > > > > Cheers,
> > > > > Jeyhun
> > > > >
> > > > > On Wed, Sep 13, 2017 at 3:02 PM Damian Guy <damian@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi Jeyhun,
> > > > > >
> > > > > > Thanks for sending out the update. I guess i was thinking more
> > along
> > > > the
> > > > > > lines of option 2 where we collapse the Rich and
> > ValueWithKey
> > > > etc
> > > > > > interfaces into 1 interface that has all of the arguments. I
> think
> > we
> > > > > then
> > > > > > only need to add one additional overload for each operator?
> > > > > >
> > > > > > Thanks,
> > > > > > Damian
> > > > > >
> > > > > > On Wed, 13 Sep 2017 at 10:59 Jeyhun Karimov <
> je.kari...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Dear all,
> > > > > > >
> > > > > > > I would like to resume the discussion on KIP-159. I (and
> > Guozhang)
> > > > > think
> > > > > > > that releasing KIP-149 and KIP-159 in the same release would
> make
> > > > sense
> > > > > > to
> > > > > > > avoid a release with "partial" public APIs. There is a KIP [1]
> > > > proposed
> > > > > > by
> > > > > > &

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-09-13 Thread Jeyhun Karimov
Hi Damian,

Thanks for your feedback. Actually, this (what you propose) was the first
idea of KIP-149. Then we decided to divide it into two KIPs. I also
expressed my opinion that keeping the two interfaces (Rich and withKey)
separate would add more overloads. So, email discussion resulted that this
would not be a problem.

Our initial idea was similar to :

public abstract class RichValueMapper<K, V, VR>  implements
ValueMapperWithKey<K, V, VR>, RichFunction {
..
}


So, we check the type of object, whether it is RichXXX or XXXWithKey inside
the called method and continue accordingly.

If this is ok with the community, I would like to revert the current design
to this again.

Cheers,
Jeyhun

On Wed, Sep 13, 2017 at 3:02 PM Damian Guy <damian@gmail.com> wrote:

> Hi Jeyhun,
>
> Thanks for sending out the update. I guess i was thinking more along the
> lines of option 2 where we collapse the Rich and ValueWithKey etc
> interfaces into 1 interface that has all of the arguments. I think we then
> only need to add one additional overload for each operator?
>
> Thanks,
> Damian
>
> On Wed, 13 Sep 2017 at 10:59 Jeyhun Karimov <je.kari...@gmail.com> wrote:
>
> > Dear all,
> >
> > I would like to resume the discussion on KIP-159. I (and Guozhang) think
> > that releasing KIP-149 and KIP-159 in the same release would make sense
> to
> > avoid a release with "partial" public APIs. There is a KIP [1] proposed
> by
> > Guozhang (and approved by me) to unify both KIPs.
> > Please feel free to comment on this.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=73637757
> >
> > Cheers,
> > Jeyhun
> >
> > On Fri, Jul 21, 2017 at 2:00 AM Jeyhun Karimov <je.kari...@gmail.com>
> > wrote:
> >
> > > Hi Matthias, Damian, all,
> > >
> > > Thanks for your comments and sorry for super-late update.
> > >
> > > Sure, the DSL refactoring is not blocking for this KIP.
> > > I made some changes to KIP document based on my prototype.
> > >
> > > Please feel free to comment.
> > >
> > > Cheers,
> > > Jeyhun
> > >
> > > On Fri, Jul 7, 2017 at 9:35 PM Matthias J. Sax <matth...@confluent.io>
> > > wrote:
> > >
> > >> I would not block this KIP with regard to DSL refactoring. IMHO, we
> can
> > >> just finish this one and the DSL refactoring will help later on to
> > >> reduce the number of overloads.
> > >>
> > >> -Matthias
> > >>
> > >> On 7/7/17 5:28 AM, Jeyhun Karimov wrote:
> > >> > I am following the related thread in the mailing list and looking
> > >> forward
> > >> > for one-shot solution for overloads issue.
> > >> >
> > >> > Cheers,
> > >> > Jeyhun
> > >> >
> > >> > On Fri, Jul 7, 2017 at 10:32 AM Damian Guy <damian@gmail.com>
> > >> wrote:
> > >> >
> > >> >> Hi Jeyhun,
> > >> >>
> > >> >> About overrides, what other alternatives do we have? For
> > >> >>> backwards-compatibility we have to add extra methods to the
> existing
> > >> >> ones.
> > >> >>>
> > >> >>>
> > >> >> It wasn't clear to me in the KIP if these are new methods or
> > replacing
> > >> >> existing ones.
> > >> >> Also, we are currently discussing options for replacing the
> > overrides.
> > >> >>
> > >> >> Thanks,
> > >> >> Damian
> > >> >>
> > >> >>
> > >> >>> About ProcessorContext vs RecordContext, you are right. I think I
> > >> need to
> > >> >>> implement a prototype to understand the full picture as some parts
> > of
> > >> the
> > >> >>> KIP might not be as straightforward as I thought.
> > >> >>>
> > >> >>>
> > >> >>> Cheers,
> > >> >>> Jeyhun
> > >> >>>
> > >> >>> On Wed, Jul 5, 2017 at 10:40 AM Damian Guy <damian@gmail.com>
> > >> wrote:
> > >> >>>
> > >> >>>> HI Jeyhun,
> > >> >>>>
> > >> >>>> Is the intention that these methods are new overloads on the
> > KStream,
> > >> >>>> KTable, etc?
> >

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-09-13 Thread Jeyhun Karimov
Dear all,

I would like to resume the discussion on KIP-159. I (and Guozhang) think
that releasing KIP-149 and KIP-159 in the same release would make sense to
avoid a release with "partial" public APIs. There is a KIP [1] proposed by
Guozhang (and approved by me) to unify both KIPs.
Please feel free to comment on this.

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=73637757

Cheers,
Jeyhun

On Fri, Jul 21, 2017 at 2:00 AM Jeyhun Karimov <je.kari...@gmail.com> wrote:

> Hi Matthias, Damian, all,
>
> Thanks for your comments and sorry for super-late update.
>
> Sure, the DSL refactoring is not blocking for this KIP.
> I made some changes to KIP document based on my prototype.
>
> Please feel free to comment.
>
> Cheers,
> Jeyhun
>
> On Fri, Jul 7, 2017 at 9:35 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
>
>> I would not block this KIP with regard to DSL refactoring. IMHO, we can
>> just finish this one and the DSL refactoring will help later on to
>> reduce the number of overloads.
>>
>> -Matthias
>>
>> On 7/7/17 5:28 AM, Jeyhun Karimov wrote:
>> > I am following the related thread in the mailing list and looking
>> forward
>> > for one-shot solution for overloads issue.
>> >
>> > Cheers,
>> > Jeyhun
>> >
>> > On Fri, Jul 7, 2017 at 10:32 AM Damian Guy <damian@gmail.com>
>> wrote:
>> >
>> >> Hi Jeyhun,
>> >>
>> >> About overrides, what other alternatives do we have? For
>> >>> backwards-compatibility we have to add extra methods to the existing
>> >> ones.
>> >>>
>> >>>
>> >> It wasn't clear to me in the KIP if these are new methods or replacing
>> >> existing ones.
>> >> Also, we are currently discussing options for replacing the overrides.
>> >>
>> >> Thanks,
>> >> Damian
>> >>
>> >>
>> >>> About ProcessorContext vs RecordContext, you are right. I think I
>> need to
>> >>> implement a prototype to understand the full picture as some parts of
>> the
>> >>> KIP might not be as straightforward as I thought.
>> >>>
>> >>>
>> >>> Cheers,
>> >>> Jeyhun
>> >>>
>> >>> On Wed, Jul 5, 2017 at 10:40 AM Damian Guy <damian@gmail.com>
>> wrote:
>> >>>
>> >>>> HI Jeyhun,
>> >>>>
>> >>>> Is the intention that these methods are new overloads on the KStream,
>> >>>> KTable, etc?
>> >>>>
>> >>>> It is worth noting that a ProcessorContext is not a RecordContext. A
>> >>>> RecordContext, as it stands, only exists during the processing of a
>> >>> single
>> >>>> record. Whereas the ProcessorContext exists for the lifetime of the
>> >>>> Processor. Sot it doesn't make sense to cast a ProcessorContext to a
>> >>>> RecordContext.
>> >>>> You mentioned above passing the InternalProcessorContext to the
>> init()
>> >>>> calls. It is internal for a reason and i think it should remain that
>> >> way.
>> >>>> It might be better to move the recordContext() method from
>> >>>> InternalProcessorContext to ProcessorContext.
>> >>>>
>> >>>> In the KIP you have an example showing:
>> >>>> richMapper.init((RecordContext) processorContext);
>> >>>> But the interface is:
>> >>>> public interface RichValueMapper<V, VR> {
>> >>>> VR apply(final V value, final RecordContext recordContext);
>> >>>> }
>> >>>> i.e., there is no init(...), besides as above this wouldn't make
>> sense.
>> >>>>
>> >>>> Thanks,
>> >>>> Damian
>> >>>>
>> >>>> On Tue, 4 Jul 2017 at 23:30 Jeyhun Karimov <je.kari...@gmail.com>
>> >> wrote:
>> >>>>
>> >>>>> Hi Matthias,
>> >>>>>
>> >>>>> Actually my intend was to provide to RichInitializer and later on we
>> >>>> could
>> >>>>> provide the context of the record as you also mentioned.
>> >>>>> I remove that not to confuse the users.
>> >>>>> Regarding the RecordContext and ProcessorContext interfaces, I just
>> >>>

Re: [VOTE]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-07-21 Thread Jeyhun Karimov
Thanks Damian.

This vote is now closed and KIP-149 is
accepted with +3 binding +2 non-binding votes.

Cheers,
Jeyhun

On Fri, Jul 21, 2017 at 3:52 PM Damian Guy <damian@gmail.com> wrote:

> Hi Jeyhun,
>
> Feel free to close the vote. It has been accepted.
>
> Thanks,
> Damian
>
> On Mon, 17 Jul 2017 at 06:18 Guozhang Wang <wangg...@gmail.com> wrote:
>
> > +1. Thanks!
> >
> > On Sat, Jul 8, 2017 at 1:35 AM, Damian Guy <damian@gmail.com> wrote:
> >
> > > +1
> > > On Fri, 7 Jul 2017 at 16:08, Eno Thereska <eno.there...@gmail.com>
> > wrote:
> > >
> > > > +1 (non-binding) Thanks.
> > > >
> > > > Eno
> > > > > On 6 Jul 2017, at 21:49, Gwen Shapira <g...@confluent.io> wrote:
> > > > >
> > > > > +1
> > > > >
> > > > > On Wed, Jul 5, 2017 at 9:25 AM Matthias J. Sax <
> > matth...@confluent.io>
> > > > > wrote:
> > > > >
> > > > >> +1
> > > > >>
> > > > >> On 6/27/17 1:41 PM, Jeyhun Karimov wrote:
> > > > >>> Dear all,
> > > > >>>
> > > > >>> I would like to start the vote on KIP-149 [1].
> > > > >>>
> > > > >>>
> > > > >>> Cheers,
> > > > >>> Jeyhun
> > > > >>>
> > > > >>>
> > > > >>> [1]
> > > > >>>
> > > > >>
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 149%3A+Enabling+key+access+in+ValueTransformer%2C+
> > > ValueMapper%2C+and+ValueJoiner
> > > > >>>
> > > > >>
> > > > >>
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
-- 
-Cheers

Jeyhun


Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-07-20 Thread Jeyhun Karimov
Hi Matthias, Damian, all,

Thanks for your comments and sorry for super-late update.

Sure, the DSL refactoring is not blocking for this KIP.
I made some changes to KIP document based on my prototype.

Please feel free to comment.

Cheers,
Jeyhun

On Fri, Jul 7, 2017 at 9:35 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> I would not block this KIP with regard to DSL refactoring. IMHO, we can
> just finish this one and the DSL refactoring will help later on to
> reduce the number of overloads.
>
> -Matthias
>
> On 7/7/17 5:28 AM, Jeyhun Karimov wrote:
> > I am following the related thread in the mailing list and looking forward
> > for one-shot solution for overloads issue.
> >
> > Cheers,
> > Jeyhun
> >
> > On Fri, Jul 7, 2017 at 10:32 AM Damian Guy <damian@gmail.com> wrote:
> >
> >> Hi Jeyhun,
> >>
> >> About overrides, what other alternatives do we have? For
> >>> backwards-compatibility we have to add extra methods to the existing
> >> ones.
> >>>
> >>>
> >> It wasn't clear to me in the KIP if these are new methods or replacing
> >> existing ones.
> >> Also, we are currently discussing options for replacing the overrides.
> >>
> >> Thanks,
> >> Damian
> >>
> >>
> >>> About ProcessorContext vs RecordContext, you are right. I think I need
> to
> >>> implement a prototype to understand the full picture as some parts of
> the
> >>> KIP might not be as straightforward as I thought.
> >>>
> >>>
> >>> Cheers,
> >>> Jeyhun
> >>>
> >>> On Wed, Jul 5, 2017 at 10:40 AM Damian Guy <damian@gmail.com>
> wrote:
> >>>
> >>>> HI Jeyhun,
> >>>>
> >>>> Is the intention that these methods are new overloads on the KStream,
> >>>> KTable, etc?
> >>>>
> >>>> It is worth noting that a ProcessorContext is not a RecordContext. A
> >>>> RecordContext, as it stands, only exists during the processing of a
> >>> single
> >>>> record. Whereas the ProcessorContext exists for the lifetime of the
> >>>> Processor. Sot it doesn't make sense to cast a ProcessorContext to a
> >>>> RecordContext.
> >>>> You mentioned above passing the InternalProcessorContext to the init()
> >>>> calls. It is internal for a reason and i think it should remain that
> >> way.
> >>>> It might be better to move the recordContext() method from
> >>>> InternalProcessorContext to ProcessorContext.
> >>>>
> >>>> In the KIP you have an example showing:
> >>>> richMapper.init((RecordContext) processorContext);
> >>>> But the interface is:
> >>>> public interface RichValueMapper<V, VR> {
> >>>> VR apply(final V value, final RecordContext recordContext);
> >>>> }
> >>>> i.e., there is no init(...), besides as above this wouldn't make
> sense.
> >>>>
> >>>> Thanks,
> >>>> Damian
> >>>>
> >>>> On Tue, 4 Jul 2017 at 23:30 Jeyhun Karimov <je.kari...@gmail.com>
> >> wrote:
> >>>>
> >>>>> Hi Matthias,
> >>>>>
> >>>>> Actually my intend was to provide to RichInitializer and later on we
> >>>> could
> >>>>> provide the context of the record as you also mentioned.
> >>>>> I remove that not to confuse the users.
> >>>>> Regarding the RecordContext and ProcessorContext interfaces, I just
> >>>>> realized the InternalProcessorContext class. Can't we pass this as a
> >>>>> parameter to init() method of processors? Then we would be able to
> >> get
> >>>>> RecordContext easily with just a method call.
> >>>>>
> >>>>>
> >>>>> Cheers,
> >>>>> Jeyhun
> >>>>>
> >>>>> On Thu, Jun 29, 2017 at 10:14 PM Matthias J. Sax <
> >>> matth...@confluent.io>
> >>>>> wrote:
> >>>>>
> >>>>>> One more thing:
> >>>>>>
> >>>>>> I don't think `RichInitializer` does make sense. As we don't have
> >> any
> >>>>>> input record, there is also no context. We could of course provide
> >>> the
> >>>>>> context of t

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-07-07 Thread Jeyhun Karimov
I am following the related thread in the mailing list and looking forward
for one-shot solution for overloads issue.

Cheers,
Jeyhun

On Fri, Jul 7, 2017 at 10:32 AM Damian Guy <damian@gmail.com> wrote:

> Hi Jeyhun,
>
> About overrides, what other alternatives do we have? For
> > backwards-compatibility we have to add extra methods to the existing
> ones.
> >
> >
> It wasn't clear to me in the KIP if these are new methods or replacing
> existing ones.
> Also, we are currently discussing options for replacing the overrides.
>
> Thanks,
> Damian
>
>
> > About ProcessorContext vs RecordContext, you are right. I think I need to
> > implement a prototype to understand the full picture as some parts of the
> > KIP might not be as straightforward as I thought.
> >
> >
> > Cheers,
> > Jeyhun
> >
> > On Wed, Jul 5, 2017 at 10:40 AM Damian Guy <damian@gmail.com> wrote:
> >
> > > HI Jeyhun,
> > >
> > > Is the intention that these methods are new overloads on the KStream,
> > > KTable, etc?
> > >
> > > It is worth noting that a ProcessorContext is not a RecordContext. A
> > > RecordContext, as it stands, only exists during the processing of a
> > single
> > > record. Whereas the ProcessorContext exists for the lifetime of the
> > > Processor. Sot it doesn't make sense to cast a ProcessorContext to a
> > > RecordContext.
> > > You mentioned above passing the InternalProcessorContext to the init()
> > > calls. It is internal for a reason and i think it should remain that
> way.
> > > It might be better to move the recordContext() method from
> > > InternalProcessorContext to ProcessorContext.
> > >
> > > In the KIP you have an example showing:
> > > richMapper.init((RecordContext) processorContext);
> > > But the interface is:
> > > public interface RichValueMapper<V, VR> {
> > > VR apply(final V value, final RecordContext recordContext);
> > > }
> > > i.e., there is no init(...), besides as above this wouldn't make sense.
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Tue, 4 Jul 2017 at 23:30 Jeyhun Karimov <je.kari...@gmail.com>
> wrote:
> > >
> > > > Hi Matthias,
> > > >
> > > > Actually my intend was to provide to RichInitializer and later on we
> > > could
> > > > provide the context of the record as you also mentioned.
> > > > I remove that not to confuse the users.
> > > > Regarding the RecordContext and ProcessorContext interfaces, I just
> > > > realized the InternalProcessorContext class. Can't we pass this as a
> > > > parameter to init() method of processors? Then we would be able to
> get
> > > > RecordContext easily with just a method call.
> > > >
> > > >
> > > > Cheers,
> > > > Jeyhun
> > > >
> > > > On Thu, Jun 29, 2017 at 10:14 PM Matthias J. Sax <
> > matth...@confluent.io>
> > > > wrote:
> > > >
> > > > > One more thing:
> > > > >
> > > > > I don't think `RichInitializer` does make sense. As we don't have
> any
> > > > > input record, there is also no context. We could of course provide
> > the
> > > > > context of the record that triggers the init call, but this seems
> to
> > be
> > > > > semantically questionable. Also, the context for this first record
> > will
> > > > > be provided by the consecutive call to aggregate anyways.
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 6/29/17 1:11 PM, Matthias J. Sax wrote:
> > > > > > Thanks for updating the KIP.
> > > > > >
> > > > > > I have one concern with regard to backward compatibility. You
> > suggest
> > > > to
> > > > > > use RecrodContext as base interface for ProcessorContext. This
> will
> > > > > > break compatibility.
> > > > > >
> > > > > > I think, we should just have two independent interfaces. Our own
> > > > > > ProcessorContextImpl class would implement both. This allows us
> to
> > > cast
> > > > > > it to `RecordContext` and thus limit the visible scope.
> > > > > >
> > > > > >
> > > > > > -Matthias
> > > > > >
&g

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-07-06 Thread Jeyhun Karimov
Hi Damian,

Thanks for comments.
About overrides, what other alternatives do we have? For
backwards-compatibility we have to add extra methods to the existing ones.

About ProcessorContext vs RecordContext, you are right. I think I need to
implement a prototype to understand the full picture as some parts of the
KIP might not be as straightforward as I thought.


Cheers,
Jeyhun

On Wed, Jul 5, 2017 at 10:40 AM Damian Guy <damian@gmail.com> wrote:

> HI Jeyhun,
>
> Is the intention that these methods are new overloads on the KStream,
> KTable, etc?
>
> It is worth noting that a ProcessorContext is not a RecordContext. A
> RecordContext, as it stands, only exists during the processing of a single
> record. Whereas the ProcessorContext exists for the lifetime of the
> Processor. Sot it doesn't make sense to cast a ProcessorContext to a
> RecordContext.
> You mentioned above passing the InternalProcessorContext to the init()
> calls. It is internal for a reason and i think it should remain that way.
> It might be better to move the recordContext() method from
> InternalProcessorContext to ProcessorContext.
>
> In the KIP you have an example showing:
> richMapper.init((RecordContext) processorContext);
> But the interface is:
> public interface RichValueMapper<V, VR> {
> VR apply(final V value, final RecordContext recordContext);
> }
> i.e., there is no init(...), besides as above this wouldn't make sense.
>
> Thanks,
> Damian
>
> On Tue, 4 Jul 2017 at 23:30 Jeyhun Karimov <je.kari...@gmail.com> wrote:
>
> > Hi Matthias,
> >
> > Actually my intend was to provide to RichInitializer and later on we
> could
> > provide the context of the record as you also mentioned.
> > I remove that not to confuse the users.
> > Regarding the RecordContext and ProcessorContext interfaces, I just
> > realized the InternalProcessorContext class. Can't we pass this as a
> > parameter to init() method of processors? Then we would be able to get
> > RecordContext easily with just a method call.
> >
> >
> > Cheers,
> > Jeyhun
> >
> > On Thu, Jun 29, 2017 at 10:14 PM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > One more thing:
> > >
> > > I don't think `RichInitializer` does make sense. As we don't have any
> > > input record, there is also no context. We could of course provide the
> > > context of the record that triggers the init call, but this seems to be
> > > semantically questionable. Also, the context for this first record will
> > > be provided by the consecutive call to aggregate anyways.
> > >
> > >
> > > -Matthias
> > >
> > > On 6/29/17 1:11 PM, Matthias J. Sax wrote:
> > > > Thanks for updating the KIP.
> > > >
> > > > I have one concern with regard to backward compatibility. You suggest
> > to
> > > > use RecrodContext as base interface for ProcessorContext. This will
> > > > break compatibility.
> > > >
> > > > I think, we should just have two independent interfaces. Our own
> > > > ProcessorContextImpl class would implement both. This allows us to
> cast
> > > > it to `RecordContext` and thus limit the visible scope.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > >
> > > > On 6/27/17 1:35 PM, Jeyhun Karimov wrote:
> > > >> Hi all,
> > > >>
> > > >> I updated the KIP w.r.t. discussion and comments.
> > > >> Basically I eliminated overloads for particular method if they are
> > more
> > > >> than 3.
> > > >> As we can see there are a lot of overloads (and more will come with
> > > KIP-149
> > > >> :) )
> > > >> So, is it wise to
> > > >> wait the result of constructive DSL thread or
> > > >> extend KIP to address this issue as well or
> > > >> continue as it is?
> > > >>
> > > >> Cheers,
> > > >> Jeyhun
> > > >>
> > > >> On Wed, Jun 14, 2017 at 11:29 PM Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >>
> > > >>> LGTM. Thanks!
> > > >>>
> > > >>>
> > > >>> Guozhang
> > > >>>
> > > >>> On Tue, Jun 13, 2017 at 2:20 PM, Jeyhun Karimov <
> > je.kari...@gmail.com>
> > > >>> wrote:
> > > >>>
> > > >>>> Thanks for the

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-07-04 Thread Jeyhun Karimov
Hi Matthias,

Actually my intend was to provide to RichInitializer and later on we could
provide the context of the record as you also mentioned.
I remove that not to confuse the users.
Regarding the RecordContext and ProcessorContext interfaces, I just
realized the InternalProcessorContext class. Can't we pass this as a
parameter to init() method of processors? Then we would be able to get
RecordContext easily with just a method call.


Cheers,
Jeyhun

On Thu, Jun 29, 2017 at 10:14 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> One more thing:
>
> I don't think `RichInitializer` does make sense. As we don't have any
> input record, there is also no context. We could of course provide the
> context of the record that triggers the init call, but this seems to be
> semantically questionable. Also, the context for this first record will
> be provided by the consecutive call to aggregate anyways.
>
>
> -Matthias
>
> On 6/29/17 1:11 PM, Matthias J. Sax wrote:
> > Thanks for updating the KIP.
> >
> > I have one concern with regard to backward compatibility. You suggest to
> > use RecrodContext as base interface for ProcessorContext. This will
> > break compatibility.
> >
> > I think, we should just have two independent interfaces. Our own
> > ProcessorContextImpl class would implement both. This allows us to cast
> > it to `RecordContext` and thus limit the visible scope.
> >
> >
> > -Matthias
> >
> >
> >
> > On 6/27/17 1:35 PM, Jeyhun Karimov wrote:
> >> Hi all,
> >>
> >> I updated the KIP w.r.t. discussion and comments.
> >> Basically I eliminated overloads for particular method if they are more
> >> than 3.
> >> As we can see there are a lot of overloads (and more will come with
> KIP-149
> >> :) )
> >> So, is it wise to
> >> wait the result of constructive DSL thread or
> >> extend KIP to address this issue as well or
> >> continue as it is?
> >>
> >> Cheers,
> >> Jeyhun
> >>
> >> On Wed, Jun 14, 2017 at 11:29 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>
> >>> LGTM. Thanks!
> >>>
> >>>
> >>> Guozhang
> >>>
> >>> On Tue, Jun 13, 2017 at 2:20 PM, Jeyhun Karimov <je.kari...@gmail.com>
> >>> wrote:
> >>>
> >>>> Thanks for the comment Matthias. After all the discussion (thanks to
> all
> >>>> participants), I think this (single method that passes in a
> RecordContext
> >>>> object) is the best alternative.
> >>>> Just a side note: I think KAFKA-3907 [1] can also be integrated into
> the
> >>>> KIP by adding related method inside RecordContext interface.
> >>>>
> >>>>
> >>>> [1] https://issues.apache.org/jira/browse/KAFKA-3907
> >>>>
> >>>>
> >>>> Cheers,
> >>>> Jeyhun
> >>>>
> >>>> On Tue, Jun 13, 2017 at 7:50 PM Matthias J. Sax <
> matth...@confluent.io>
> >>>> wrote:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>> I would like to push this discussion further. It seems we got nice
> >>>>> alternatives (thanks for the summary Jeyhun!).
> >>>>>
> >>>>> With respect to RichFunctions and allowing them to be stateful, I
> have
> >>>>> my doubt as expressed already. From my understanding, the idea was to
> >>>>> give access to record metadata information only. If you want to do a
> >>>>> stateful computation you should rather use #transform().
> >>>>>
> >>>>> Furthermore, as pointed out, we would need to switch to a
> >>>>> supplier-pattern introducing many more overloads.
> >>>>>
> >>>>> For those reason, I advocate for a simple interface with a single
> >>> method
> >>>>> that passes in a RecordContext object.
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>>
> >>>>> On 6/6/17 5:15 PM, Guozhang Wang wrote:
> >>>>>> Thanks for the comprehensive summary!
> >>>>>>
> >>>>>> Personally I'd prefer the option of passing RecordContext as an
> >>>>> additional
> >>>>>> parameter into he overloaded function. But I'm also open to other
> >>>>> arguments
> >&g

Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-07-04 Thread Jeyhun Karimov
Hi Matthias,

Sorry for long delay. Thanks for the comment. Good to know that the
specified issue is not worth to break the backwards-compatibility.
I fixed the KIP.

Cheers,
Jeyhun

On Fri, Jun 30, 2017 at 1:38 AM Matthias J. Sax <matth...@confluent.io>
wrote:

> Hi Jeyhun,
>
> thanks for starting the VOTE thread. I did make one more pass over the
> KIP before casting my vote and I saw that the KIP still contains
> backward incompatible change introducing `ValueTransformerCommon`.
>
> I think, that for this case, it is not worth breaking compatibility. We
> should have two independent interface and duplicate init() and close()
> (note, with KIP-138 that got merged already, we don't need `punctuate()`
> for ValueTransformerWithKey)
>
>
> -Matthias
>
> On 6/14/17 3:03 PM, Matthias J. Sax wrote:
> > I have no strong opinion, but it seems that at least InitializerWithKey
> > with be helpful if you want to have different start values for different
> > keys (even if I cannot come up with a use case example why one wanted to
> > do this...). Otherwise, there is just the "completeness" argument, that
> > is not too strong either.
> >
> >
> > -Matthias
> >
> > On 6/14/17 2:03 PM, Guozhang Wang wrote:
> >> I'm not particularly concerning that we should NEVER break
> compatibility;
> >> in fact if we think that is worthwhile (i.e. small impact with large
> >> benefit) I think we can break compatibility as long as we have not
> removed
> >> the compatibility annotations from Streams. All I was saying is that if
> we
> >> decided to go this way we need to make sure this is mentioned in the
> >> upgrade guidance.
> >>
> >> Regarding the scope I'm still trying to solicit opinions regarding
> >> ReducerWithKey and InitializerWithKey; to me they are not necessarily
> to be
> >> included.
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Wed, Jun 14, 2017 at 5:22 AM, Jeyhun Karimov <je.kari...@gmail.com>
> >> wrote:
> >>
> >>> Hi,
> >>>
> >>> I introduced ValueTransformerCommon just to combine common methods of
> >>> ValueTransformer and ValueTransformerWithKey and avoid copy-paste.
> >>> I am aware of this issue and I agree that this needs users to compile
> the
> >>> code and therefore is not backwards compatible. When I saw this issue,
> I
> >>> thought the degree of incompatibility is small (the public APIs are the
> >>> same, users just need to recompile their code), so we can trade more
> >>> maintainable code in this case. I have to comments/solutions:
> >>>
> >>> 1. Basically we can remove ValueTransformerCommon class and return
> >>> ValueTransformer to its original form, which means there will be no
> issues
> >>> with backwards-compatibility. We just copy and past the methods inside
> >>> ValueTransformerCommon to ValueTransformerWithKey and maybe in future
> >>> releases, we can introduce ValueTransformerCommon.
> >>>
> >>> 2.  I have some doubts about Matthias's proposal.
> >>> If we extent withKey interface from original one   as you mentioned in
> >>> previous email, then we have to deal with
> >>>  ValueTransformer.transform(V value) method. As a result, inside
> withKey
> >>> interface we will have two transforms. Even if we make it abstract
> class,
> >>> user still have an access to play with both transform() methods. I
> think
> >>> this should not be allowed and seems "hacky" to me. Actually this was
> one
> >>> reason why I created withKey classes independently from original
> >>> interfaces.
> >>>
> >>> Of course, you can correct me if I am wrong.
> >>>
> >>>
> >>> Cheers,
> >>> Jeyhun
> >>>
> >>>
> >>>
> >>> On Tue, Jun 13, 2017 at 7:42 PM Matthias J. Sax <matth...@confluent.io
> >
> >>> wrote:
> >>>
> >>>> I agree with Guozhang's second point. This change does not seem
> backward
> >>>> compatible.
> >>>>
> >>>> As we don't have to support lambdas, it might be the easiest thing to
> >>>> just extend the current interface:
> >>>>
> >>>>> public interface ValueTransformerWithKey<K, V, VR> extends
> >>>> ValueTransformer
> >>>>
> >>>> When plugging the topology together, we can 

Re: Re: [DISCUSS] KIP-165: Extend Interactive Queries for return latest update timestamp per key

2017-06-27 Thread Jeyhun Karimov
Hi Michal,


Thanks a lot for your comment. I fixed  the document.

Cheers,
Jeyhun

On Sat, Jun 24, 2017 at 6:49 PM Michal Borowiecki <
michal.borowie...@openbet.com> wrote:

> Hi Jeyhun,
>
> Could the proposed KeyContext.keyTs() be made more descriptive?
>
> e.g. lastUpdated() or similar? So that users don't have to read the docs
> to know it isn't the creation timestamp for instance.
> Cheers,
> Michał
>
>
> On 04/06/17 01:24, Jeyhun Karimov wrote:
>
> Hi Matthias,
>
> Thanks for comments.
>
>  - why do you only consider get() and not range() and all() ?
>
>
> The corresponding jira concentrates on single key lookups. Moreover, I
> could not find a use-case to include range queries to return records with
> timestamp. However, theoritically we can include range() and all() as well.
>
>  - we cannot have a second get() (this would be ambiguous) but need
>
> another name like getWithTs() (or something better)
>
>
>  - what use case do you have in mind for getKeyTs() ? Would a single new
>
> method returning KeyContext not be sufficient?
>
>
>
> Thanks for correction, this is my bad.
>
>  - for backward compatibility, we will also need a new interface and
>
> cannot just extend the existing one
>
>
>
>  I will correct the KIP accordingly.
>
> Thanks,
> Jeyhun
>
> On Fri, Jun 2, 2017 at 7:36 AM, Matthias J. Sax <matth...@confluent.io> 
> <matth...@confluent.io>
> wrote:
>
>
> Thanks for the KIP Jeyhun.
>
> Some comments:
>  - why do you only consider get() and not range() and all() ?
>  - we cannot have a second get() (this would be ambiguous) but need
> another name like getWithTs() (or something better)
>  - what use case do you have in mind for getKeyTs() ? Would a single new
> method returning KeyContext not be sufficient?
>  - for backward compatibility, we will also need a new interface and
> cannot just extend the existing one
>
>
>
> -Matthias
>
> On 5/29/17 4:55 PM, Jeyhun Karimov wrote:
>
> Dear community,
>
> I want to share KIP-165 [1] based on issue KAFKA-4304 [2].
> I would like to get your comments.
>
> [1]https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>
> 165%3A+Extend+Interactive+Queries+for+return+latest+
> update+timestamp+per+key
>
> [2] https://issues.apache.org/jira/browse/KAFKA-4304
>
> Cheers,
> Jeyhun
>
>
>
>
> --
> <http://www.openbet.com/> Michal Borowiecki
> Senior Software Engineer L4
> T: +44 208 742 1600 <+44%2020%208742%201600>
>
>
> +44 203 249 8448 <+44%2020%203249%208448>
>
>
>
> E: michal.borowie...@openbet.com
> W: www.openbet.com
> OpenBet Ltd
>
> Chiswick Park Building 9
>
> 566 Chiswick High Rd
>
> London
>
> W4 5XT
>
> UK
> <https://www.openbet.com/email_promo>
> This message is confidential and intended only for the addressee. If you
> have received this message in error, please immediately notify the
> postmas...@openbet.com and delete it from your system as well as any
> copies. The content of e-mails as well as traffic data may be monitored by
> OpenBet for employment and security purposes. To protect the environment
> please do not print this e-mail unless necessary. OpenBet Ltd. Registered
> Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
> United Kingdom. A company registered in England and Wales. Registered no.
> 3134634. VAT no. GB927523612
>
-- 
-Cheers

Jeyhun


[VOTE]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-06-27 Thread Jeyhun Karimov
Dear all,

I would like to start the vote on KIP-149 [1].


Cheers,
Jeyhun


[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner

-- 
-Cheers

Jeyhun


Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-06-27 Thread Jeyhun Karimov
Hi all,

I updated the KIP w.r.t. discussion and comments.
Basically I eliminated overloads for particular method if they are more
than 3.
As we can see there are a lot of overloads (and more will come with KIP-149
:) )
So, is it wise to
wait the result of constructive DSL thread or
extend KIP to address this issue as well or
continue as it is?

Cheers,
Jeyhun

On Wed, Jun 14, 2017 at 11:29 PM Guozhang Wang <wangg...@gmail.com> wrote:

> LGTM. Thanks!
>
>
> Guozhang
>
> On Tue, Jun 13, 2017 at 2:20 PM, Jeyhun Karimov <je.kari...@gmail.com>
> wrote:
>
> > Thanks for the comment Matthias. After all the discussion (thanks to all
> > participants), I think this (single method that passes in a RecordContext
> > object) is the best alternative.
> > Just a side note: I think KAFKA-3907 [1] can also be integrated into the
> > KIP by adding related method inside RecordContext interface.
> >
> >
> > [1] https://issues.apache.org/jira/browse/KAFKA-3907
> >
> >
> > Cheers,
> > Jeyhun
> >
> > On Tue, Jun 13, 2017 at 7:50 PM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > Hi,
> > >
> > > I would like to push this discussion further. It seems we got nice
> > > alternatives (thanks for the summary Jeyhun!).
> > >
> > > With respect to RichFunctions and allowing them to be stateful, I have
> > > my doubt as expressed already. From my understanding, the idea was to
> > > give access to record metadata information only. If you want to do a
> > > stateful computation you should rather use #transform().
> > >
> > > Furthermore, as pointed out, we would need to switch to a
> > > supplier-pattern introducing many more overloads.
> > >
> > > For those reason, I advocate for a simple interface with a single
> method
> > > that passes in a RecordContext object.
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 6/6/17 5:15 PM, Guozhang Wang wrote:
> > > > Thanks for the comprehensive summary!
> > > >
> > > > Personally I'd prefer the option of passing RecordContext as an
> > > additional
> > > > parameter into he overloaded function. But I'm also open to other
> > > arguments
> > > > if there are sth. that I have overlooked.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Mon, Jun 5, 2017 at 3:19 PM, Jeyhun Karimov <je.kari...@gmail.com
> >
> > > wrote:
> > > >
> > > >> Hi,
> > > >>
> > > >> Thanks for your comments Matthias and Guozhang.
> > > >>
> > > >> Below I mention the quick summary of the main alternatives we looked
> > at
> > > to
> > > >> introduce the Rich functions (I will refer to it as Rich functions
> > > until we
> > > >> find better/another name). Initially the proposed alternatives was
> not
> > > >> backwards-compatible, so I will not mention them.
> > > >> The related discussions are spread in KIP-149 and in this KIP
> > (KIP-159)
> > > >> discussion threads.
> > > >>
> > > >>
> > > >>
> > > >> 1. The idea of rich functions came into the stage with KIP-149, in
> > > >> discussion thread. As a result we extended KIP-149 to support Rich
> > > >> functions as well.
> > > >>
> > > >> 2.  To as part of the Rich functions, we provided init
> > > (ProcessorContext)
> > > >> method. Afterwards, Dammian suggested that we should not provide
> > > >> ProcessorContext to users. As a result, we separated the two
> problems
> > > into
> > > >> two separate KIPs, as it seems they can be solved in parallel.
> > > >>
> > > >> - One approach we considered was :
> > > >>
> > > >> public interface ValueMapperWithKey<K, V, VR> {
> > > >> VR apply(final K key, final V value);
> > > >> }
> > > >>
> > > >> public interface RichValueMapper<K, V, VR> extends RichFunction{
> > > >> }
> > > >>
> > > >> public interface RichFunction {
> > > >> void init(RecordContext recordContext);
> > > >> void close();
> > > >> }
> > > >>
> > > >> public interface RecordContext {
> > > >> String application

Re: Handling 2 to 3 Million Events before Kafka

2017-06-21 Thread Jeyhun Karimov
Hi,

With kafka you can increase overall throughput  by increasing the number of
nodes in a cluster.
I had a similar issue, where we needed to ingest vast amounts of data to
streaming system.
In our case, kafka was a bottleneck, because of disk I/O. To solve it, we
implemented (simple) distributed pub-sub system with C which reside data in
memory. Also you should take account your network bandwidth and the
(upper-bound) capability of your processing engine or http server.


Cheers,
Jeyhun


On Wed, Jun 21, 2017 at 2:58 PM SenthilKumar K 
wrote:

> Hi Team ,   Sorry if this question is irrelevant to Kafka Group ...
>
> I have been trying to solve problem of handling 5 GB/sec ingestion. Kafka
> is really good candidate for us to handle this ingestion rate ..
>
>
> 100K machines > { Http Server (Jetty/Netty) } --> Kafka Cluster..
>
> I see the problem in Http Server where it can't handle beyond 50K events
> per instance ..  I'm thinking some other solution would be right choice
> before Kafka ..
>
> Anyone worked on similar use case and similar load ? Suggestions/Thoughts ?
>
> --Senthil
>
-- 
-Cheers

Jeyhun


[jira] [Commented] (KAFKA-4829) Improve logging of StreamTask commits

2017-06-14 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16049834#comment-16049834
 ] 

Jeyhun Karimov commented on KAFKA-4829:
---

[~guozhang] I would suggest similar model to SQL Server's logging configs 
(https://docs.microsoft.com/en-us/sql/relational-databases/backup-restore/recovery-models-sql-server)
I think current logging can be too much on high workloads and we need a config 
parameter on this. I believe this will require a KIP. WDYT?

> Improve logging of StreamTask commits
> -
>
> Key: KAFKA-4829
> URL: https://issues.apache.org/jira/browse/KAFKA-4829
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Steven Schlansker
>Priority: Minor
>  Labels: user-experience
>
> Currently I see this every commit interval:
> {code}
> 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] 
> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing 
> task StreamTask 1_31
> 2017-02-28T21:27:16.659Z INFO <> [StreamThread-1] 
> o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Committing 
> task StreamTask 2_31
> {code}
> We have ~10 tasks in our topology, 4 topics, and 32 partitions per topic.
> This means every commit interval we log a few hundred lines of the above
> which is an order of magnitude chattier than anything else in the log
> during normal operations.
> To improve visibility of important messages, we should reduce the chattiness 
> of normal commits and highlight abnormal commits.  An example proposal:
> existing message is fine at TRACE level for diagnostics
> {{TRACE o.a.k.s.p.i.StreamThread - Committing task StreamTask 1_31}}
> normal fast case, wrap them all up into one summary line
> {{INFO o.a.k.s.p.i.StreamThreads - 64 stream tasks committed in 25ms}}
> some kind of threshold / messaging in case it doesn't complete quickly or 
> logs an exception
> {{ERROR o.a.k.s.p.i.StreamThread - StreamTask 1_32 did not commit in 100ms}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-3907) Better support for user-specific committing in the Streams DSL

2017-06-14 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-3907:
-

Assignee: Jeyhun Karimov

> Better support for user-specific committing in the Streams DSL
> --
>
> Key: KAFKA-3907
> URL: https://issues.apache.org/jira/browse/KAFKA-3907
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>    Assignee: Jeyhun Karimov
>  Labels: api
>
> Currently for user-specifically committing the current processing state, 
> users can make use of the {{ProcessorContext}} object, which is exposed in 
> the {{Processor}} API. Other than that, the application will also 
> automatically committing the processing state based on the configured 
> interval.
> Hence in the Streams DSL, if a user wants to explicitly call {{commit}}, she 
> needs to use a {{process(ProcessorSupplier)}} API to get a customized 
> processor instance in order to access the {{ProcessorContext}}. We should 
> think of a better way to support user-specific committing interfaces inside 
> the high-level Streams DSL.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-06-14 Thread Jeyhun Karimov
Hi,

I introduced ValueTransformerCommon just to combine common methods of
ValueTransformer and ValueTransformerWithKey and avoid copy-paste.
I am aware of this issue and I agree that this needs users to compile the
code and therefore is not backwards compatible. When I saw this issue, I
thought the degree of incompatibility is small (the public APIs are the
same, users just need to recompile their code), so we can trade more
maintainable code in this case. I have to comments/solutions:

1. Basically we can remove ValueTransformerCommon class and return
ValueTransformer to its original form, which means there will be no issues
with backwards-compatibility. We just copy and past the methods inside
ValueTransformerCommon to ValueTransformerWithKey and maybe in future
releases, we can introduce ValueTransformerCommon.

2.  I have some doubts about Matthias's proposal.
If we extent withKey interface from original one   as you mentioned in
previous email, then we have to deal with
 ValueTransformer.transform(V value) method. As a result, inside withKey
interface we will have two transforms. Even if we make it abstract class,
user still have an access to play with both transform() methods. I think
this should not be allowed and seems "hacky" to me. Actually this was one
reason why I created withKey classes independently from original
interfaces.

Of course, you can correct me if I am wrong.


Cheers,
Jeyhun



On Tue, Jun 13, 2017 at 7:42 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> I agree with Guozhang's second point. This change does not seem backward
> compatible.
>
> As we don't have to support lambdas, it might be the easiest thing to
> just extend the current interface:
>
> > public interface ValueTransformerWithKey<K, V, VR> extends
> ValueTransformer
>
> When plugging the topology together, we can check if we get the
> `withKey` variant and use a corresponding runtime class for execution,
> so we get only a single time check. Thus, for the `withKey` variant, the
> will be a `transfrom(V value)` method, but we will never call it.
>
> Maybe we could make `ValueTransformerWithKey` an abstract class with a
> `final` no-op implemenation of `transform(V value)` ?
>
>
> -Matthias
>
>
> On 6/6/17 4:58 PM, Guozhang Wang wrote:
> > Jeyhun, Matthias:
> >
> > Thanks for the explanation, I overlooked the repartition argument
> > previously.
> >
> > 1) Based on that argument I'm convinced of having ValueMapperWithKey /
> > ValueJoinerWithKey / ValueTransformerWithKey; though I'm still not
> > convinced with ReducerWithKey and InitializerWithKey since for the former
> > it can be covered with `aggregate` completely and with latter I have seen
> > little use cases with it.
> >
> > 2) Another comment is on public interface ValueTransformer<V, VR> extends
> > ValueTransformerCommon:
> >
> > I think changing the interface to extend from a new interface is not
> binary
> > compatible though source compatible, i.e. users still need to recompile
> > their code though no need to make code changes. We may need to mention
> that
> > in the upgrade path if we want to keep it that way.
> >
> > Guozhang
> >
> > On Mon, Jun 5, 2017 at 2:28 PM, Jeyhun Karimov <je.kari...@gmail.com>
> wrote:
> >
> >> Hi,
> >>
> >>
> >> Sorry for late reply. Just to make everybody in sync, the current
> version
> >> of KIP supports lambdas. "withKey" (ValueMapperWithKey) interfaces are
> >> independent, meaning they do not extend from "withoutKey" (ValueMapper)
> >> interfaces.
> >>
> >>
> >> I agree with Guozhang, and I am personally a bit reluctant to increase
> >> overloaded methods in public APIs but it seems this is only way to solve
> >> all related jira issues.
> >> However, the most overloaded methods will be with ValueJoiner type,
> which
> >> will be with ValueJoinerWithKey with new overloaded methods. Other
> >> interfaces require mostly 1 extra overload.
> >>
> >>
> >>>> I would suggest not doing it if user pop it up, but rather suggesting
> >> them
> >>>> to use `map`
> >>
> >> I agree with Matthias as the core idea of this KIP was to collect all
> >> related jira issues and propose one-shot solution for all. Afterwards,
> we
> >> broke its scope into 2 KIPs (149 and 159).
> >>
> >> Cheers,
> >> Jeyhun
> >>
> >>
> >>
> >> On Mon, Jun 5, 2017 at 7:55 AM Matthias J. Sax <matth...@confluent.io>
> >> wrote:
> >>
> >>> I guess I 

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-06-13 Thread Jeyhun Karimov
Thanks for the comment Matthias. After all the discussion (thanks to all
participants), I think this (single method that passes in a RecordContext
object) is the best alternative.
Just a side note: I think KAFKA-3907 [1] can also be integrated into the
KIP by adding related method inside RecordContext interface.


[1] https://issues.apache.org/jira/browse/KAFKA-3907


Cheers,
Jeyhun

On Tue, Jun 13, 2017 at 7:50 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> Hi,
>
> I would like to push this discussion further. It seems we got nice
> alternatives (thanks for the summary Jeyhun!).
>
> With respect to RichFunctions and allowing them to be stateful, I have
> my doubt as expressed already. From my understanding, the idea was to
> give access to record metadata information only. If you want to do a
> stateful computation you should rather use #transform().
>
> Furthermore, as pointed out, we would need to switch to a
> supplier-pattern introducing many more overloads.
>
> For those reason, I advocate for a simple interface with a single method
> that passes in a RecordContext object.
>
>
> -Matthias
>
>
> On 6/6/17 5:15 PM, Guozhang Wang wrote:
> > Thanks for the comprehensive summary!
> >
> > Personally I'd prefer the option of passing RecordContext as an
> additional
> > parameter into he overloaded function. But I'm also open to other
> arguments
> > if there are sth. that I have overlooked.
> >
> > Guozhang
> >
> >
> > On Mon, Jun 5, 2017 at 3:19 PM, Jeyhun Karimov <je.kari...@gmail.com>
> wrote:
> >
> >> Hi,
> >>
> >> Thanks for your comments Matthias and Guozhang.
> >>
> >> Below I mention the quick summary of the main alternatives we looked at
> to
> >> introduce the Rich functions (I will refer to it as Rich functions
> until we
> >> find better/another name). Initially the proposed alternatives was not
> >> backwards-compatible, so I will not mention them.
> >> The related discussions are spread in KIP-149 and in this KIP (KIP-159)
> >> discussion threads.
> >>
> >>
> >>
> >> 1. The idea of rich functions came into the stage with KIP-149, in
> >> discussion thread. As a result we extended KIP-149 to support Rich
> >> functions as well.
> >>
> >> 2.  To as part of the Rich functions, we provided init
> (ProcessorContext)
> >> method. Afterwards, Dammian suggested that we should not provide
> >> ProcessorContext to users. As a result, we separated the two problems
> into
> >> two separate KIPs, as it seems they can be solved in parallel.
> >>
> >> - One approach we considered was :
> >>
> >> public interface ValueMapperWithKey<K, V, VR> {
> >> VR apply(final K key, final V value);
> >> }
> >>
> >> public interface RichValueMapper<K, V, VR> extends RichFunction{
> >> }
> >>
> >> public interface RichFunction {
> >> void init(RecordContext recordContext);
> >> void close();
> >> }
> >>
> >> public interface RecordContext {
> >> String applicationId();
> >> TaskId taskId();
> >> StreamsMetrics metrics();
> >> String topic();
> >> int partition();
> >> long offset();
> >> long timestamp();
> >> Map<String, Object> appConfigs();
> >> Map<String, Object> appConfigsWithPrefix(String prefix);
> >> }
> >>
> >>
> >> public interface ProcessorContext extends RecordContext {
> >>// all methods but the ones in RecordContext
> >> }
> >>
> >> As a result:
> >> * . All "withKey" and "withoutKey" interfaces can be converted to their
> >> Rich counterparts (with empty init() and close() methods)
> >> *. All related Processors will accept Rich interfaces in their
> >> constructors.
> >> *. So, we convert the related "withKey" or "withoutKey" interfaces to
> Rich
> >> interface while building the topology and initialize the related
> processors
> >> with Rich interfaces only.
> >> *. We will not need to overloaded methods for rich functions as Rich
> >> interfaces extend withKey interfaces. We will just check the object type
> >> and act accordingly.
> >>
> >>
> >>
> >>
> >> 3. There was some thoughts that the above approach does not support
> lambdas
> >> so we should support only one method, only init(RecordConte

[jira] [Comment Edited] (KAFKA-3826) Sampling on throughput / latency metrics recording in Streams

2017-06-12 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16046497#comment-16046497
 ] 

Jeyhun Karimov edited comment on KAFKA-3826 at 6/12/17 12:32 PM:
-

[~guozhang] I think KAFKA-4829 also can be related to this jira and all other 
related issues need one-shot solution. 
I think providing user defined sampling function for metrics like latency and 
throughput is feasible. However, we have many log4j loggings in the library 
that for each receiving record we make a log, which can clearly be a bottleneck 
in some use-cases.
 So, we can stick to user defined sampling for latency and throughput and for 
all logs we should provide a config that specifies the frequency of logging. 
For example, if frequency is 1.0, then the library functions will log all, 0.0 
will not log. 

WDYT? cc: [~mjsax]


was (Author: jeyhunkarimov):
[~guozhang] I think KAFKA-4829 also can be related to this jira and all other 
related issues need one-shot solution. 
I think providing sampling function for metrics like latency and throughput is 
feasible. However, we have many log4j loggings in the library that for each 
receiving record we make a log, which can clearly be a bottleneck in some 
use-cases.
 So, we can stick to sampling for latency and throughput and for all logs we 
should provide a config that specifies the frequency of logging. For example, 
if frequency is 1.0, then the library functions will log all, 0.0 will not log. 

WDYT? cc:\[~mjsax]

> Sampling on throughput / latency metrics recording in Streams
> -
>
> Key: KAFKA-3826
> URL: https://issues.apache.org/jira/browse/KAFKA-3826
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture, performance
>
> In Kafka Streams we record throughput / latency metrics on EACH processing 
> record, causing a lot of recording overhead. Instead, we should consider 
> statistically sampling messages flowing through to measures latency and 
> throughput.
> This is based on our observations from KAFKA-3769 and KAFKA-3811.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3826) Sampling on throughput / latency metrics recording in Streams

2017-06-12 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16046497#comment-16046497
 ] 

Jeyhun Karimov commented on KAFKA-3826:
---

[~guozhang] I think KAFKA-4829 also can be related to this jira and all other 
related issues need one-shot solution. 
I think providing sampling function for metrics like latency and throughput is 
feasible. However, we have many log4j loggings in the library that for each 
receiving record we make a log, which can clearly be a bottleneck in some 
use-cases.
 So, we can stick to sampling for latency and throughput and for all logs we 
should provide a config that specifies the frequency of logging. For example, 
if frequency is 1.0, then the library functions will log all, 0.0 will not log. 

WDYT? cc:\[~mjsax]

> Sampling on throughput / latency metrics recording in Streams
> -
>
> Key: KAFKA-3826
> URL: https://issues.apache.org/jira/browse/KAFKA-3826
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture, performance
>
> In Kafka Streams we record throughput / latency metrics on EACH processing 
> record, causing a lot of recording overhead. Instead, we should consider 
> statistically sampling messages flowing through to measures latency and 
> throughput.
> This is based on our observations from KAFKA-3769 and KAFKA-3811.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-4653) Improve test coverage of RocksDBStore

2017-06-10 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-4653:
-

Assignee: Jeyhun Karimov

> Improve test coverage of RocksDBStore
> -
>
> Key: KAFKA-4653
> URL: https://issues.apache.org/jira/browse/KAFKA-4653
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
>    Assignee: Jeyhun Karimov
>Priority: Minor
> Fix For: 0.11.1.0
>
>
> {{putAll}} - not covered
> {{putInternal} - exceptions



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4658) Improve test coverage InMemoryKeyValueLoggedStore

2017-06-10 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-4658:
-

Assignee: Jeyhun Karimov

> Improve test coverage InMemoryKeyValueLoggedStore
> -
>
> Key: KAFKA-4658
> URL: https://issues.apache.org/jira/browse/KAFKA-4658
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
>    Assignee: Jeyhun Karimov
> Fix For: 0.11.1.0
>
>
> {{putAll} not covered



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4659) Improve test coverage of CachingKeyValueStore

2017-06-10 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-4659:
-

Assignee: Jeyhun Karimov

> Improve test coverage of CachingKeyValueStore
> -
>
> Key: KAFKA-4659
> URL: https://issues.apache.org/jira/browse/KAFKA-4659
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
>    Assignee: Jeyhun Karimov
>Priority: Minor
> Fix For: 0.11.1.0
>
>
> {{putIfAbsent}} mostly not covered



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4655) Improve test coverage of CompositeReadOnlySessionStore

2017-06-10 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-4655:
-

Assignee: Jeyhun Karimov

> Improve test coverage of CompositeReadOnlySessionStore
> --
>
> Key: KAFKA-4655
> URL: https://issues.apache.org/jira/browse/KAFKA-4655
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
>    Assignee: Jeyhun Karimov
> Fix For: 0.11.1.0
>
>
> exceptions in fetch and internal iterator



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4661) Improve test coverage UsePreviousTimeOnInvalidTimestamp

2017-06-09 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-4661:
-

Assignee: Jeyhun Karimov

> Improve test coverage UsePreviousTimeOnInvalidTimestamp
> ---
>
> Key: KAFKA-4661
> URL: https://issues.apache.org/jira/browse/KAFKA-4661
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
>    Assignee: Jeyhun Karimov
>Priority: Minor
>
> Exception branch not tested



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: KIP-160 - Augment KStream.print() to allow users pass in extra parameters in the printed string

2017-06-05 Thread Jeyhun Karimov
I agree with Matthias's comment. Constructing RecordContext with more
metadata seems more feasible for me.

Cheers,
Jeyun

On Mon, Jun 5, 2017 at 7:47 AM Matthias J. Sax 
wrote:

> Not with the scope of the current discussion.
>
> So far, we discuss to add `RecordContext`, but the context object we use
> could also provide some more metadata. I see no reason why not to expose
> the node name there. We already expose TaskId vie `ProcessorContext`. We
> could also add thread name. IMHO, this would be better than dictating
> any prefix.
>
> Thoughts?
>
>
> -Matthias
>
> On 6/4/17 9:03 PM, Guozhang Wang wrote:
> > Matthias,
> >
> > I think even with KIP-159 users would not be able to access the processor
> > node name right?
> >
> > Guozhang
> >
> > On Thu, Jun 1, 2017 at 10:28 PM, Matthias J. Sax 
> > wrote:
> >
> >> Thanks for the KIP.
> >>
> >> Two comments:
> >>  - I think we should include #writeAsText()
> >>  - I am not sure if we should use
> >>
> >>> "[" + this.streamName + "]: " + mapper.apply(keyToPrint, valueToPrint)
> >>
> >> in case a mapper is provided. This still dictates a fixed prefix a user
> >> might not want to have (what contradicts or at least limits the scope of
> >> this new functionality). Considering he current discussion of KIP-159, a
> >> user would be able to access the stream name within the provided mapper
> >> and add it if they wish anyway, and thus, I don't think we should force
> >> this format.
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 5/30/17 1:38 PM, Guozhang Wang wrote:
> >>> Overall +1. One comment about the wiki itself:
> >>>
> >>> Could you replace the general description of "Argument KStream.print()
> >> which
> >>> is KStream.print(KeyValueMapper)" with the actual added
> >>> overloaded functions in the wiki page?
> >>>
> >>>
> >>> Guozhang
> >>>
> >>> On Mon, May 22, 2017 at 12:21 AM, James Chain <
> james.chain1...@gmail.com
> >>>
> >>> wrote:
> >>>
>  Hi All,
> 
>  I want to start this KIP to argument KStream.print().
>  This vote is already started.
>  https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>  160+-+Augment+KStream.print%28%29+to+allow+users+pass+in+
>  extra+parameters+in+the+printed+string
> 
>  Thanks,
> 
>  James Chien
> 
> >>>
> >>>
> >>>
> >>
> >>
> >
> >
>
> --
-Cheers

Jeyhun


Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-06-05 Thread Jeyhun Karimov
; > rich functions)
> >
> >
> > Maybe this approach has already been discussed and I may have overlooked
> in
> > the email thread; anyways, lmk.
> >
> >
> > Guozhang
> >
> >
> >
> > On Thu, Jun 1, 2017 at 10:18 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> I agree with Jeyhun. As already mention, the overall API improvement
> >> ideas are overlapping and/or contradicting each other. For this reason,
> >> not all ideas can be accomplished and some Jira might just be closed as
> >> "won't fix".
> >>
> >> For this reason, we try to do those KIP discussion with are large scope
> >> to get an overall picture to converge to an overall consisted API.
> >>
> >>
> >> @Jeyhun: about the overloads. Yes, we might get more overload. It might
> >> be sufficient though, to do a single xxxWithContext() overload that will
> >> provide key+value+context. Otherwise, if might get too messy having
> >> ValueMapper, ValueMapperWithKey, ValueMapperWithContext,
> >> ValueMapperWithKeyWithContext.
> >>
> >> On the other hand, we also have the "builder pattern" idea as an API
> >> change and this might mitigate the overload problem. Not for simple
> >> function like map/flatMap etc but for joins and aggregations.
> >>
> >>
> >> On the other hand, as I mentioned in an older email, I am personally
> >> fine to break the pure functional interface, and add
> >>
> >>   - interface WithRecordContext with method `open(RecordContext)` (or
> >> `init(...)`, or any better name) -- but not `close()`)
> >>
> >>   - interface ValueMapperWithRecordContext extends ValueMapper,
> >> WithRecordContext
> >>
> >> This would allow us to avoid any overload. Of course, we don't get a
> >> "pure function" interface and also sacrifices Lambdas.
> >>
> >>
> >>
> >> I am personally a little bit undecided what the better option might be.
> >> Curious to hear what other think about this trade off.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 6/1/17 6:13 PM, Jeyhun Karimov wrote:
> >>> Hi Guozhang,
> >>>
> >>> It subsumes partially. Initially the idea was to support RichFunctions
> >> as a
> >>> separate interface. Throughout the discussion, however, we considered
> >> maybe
> >>> overloading the related methods (with RecodContext param) is better
> >>> approach than providing a separate RichFunction interface.
> >>>
> >>> Cheers,
> >>> Jeyhun
> >>>
> >>> On Fri, Jun 2, 2017 at 2:27 AM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>>
> >>>> Does this KIP subsume this ticket as well?
> >>>> https://issues.apache.org/jira/browse/KAFKA-4125
> >>>>
> >>>> On Sat, May 20, 2017 at 9:05 AM, Jeyhun Karimov <je.kari...@gmail.com
> >
> >>>> wrote:
> >>>>
> >>>>> Dear community,
> >>>>>
> >>>>> As we discussed in KIP-149 [DISCUSS] thread [1], I would like to
> >> initiate
> >>>>> KIP for rich functions (interfaces) [2].
> >>>>> I would like to get your comments.
> >>>>>
> >>>>>
> >>>>> [1]
> >>>>> http://search-hadoop.com/m/Kafka/uyzND1PMjdk2CslH12?subj=
> >>>>> Re+DISCUSS+KIP+149+Enabling+key+access+in+
> >> ValueTransformer+ValueMapper+
> >>>>> and+ValueJoiner
> >>>>> [2]
> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>> 159%3A+Introducing+Rich+functions+to+Streams
> >>>>>
> >>>>>
> >>>>> Cheers,
> >>>>> Jeyhun
> >>>>> --
> >>>>> -Cheers
> >>>>>
> >>>>> Jeyhun
> >>>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>>
> >>
> >>
> >
> >
>
> --
-Cheers

Jeyhun


Re: [DISCUSS] KIP-165: Extend Interactive Queries for return latest update timestamp per key

2017-06-03 Thread Jeyhun Karimov
Hi Matthias,

Thanks for comments.

 - why do you only consider get() and not range() and all() ?


The corresponding jira concentrates on single key lookups. Moreover, I
could not find a use-case to include range queries to return records with
timestamp. However, theoritically we can include range() and all() as well.

 - we cannot have a second get() (this would be ambiguous) but need
> another name like getWithTs() (or something better)

 - what use case do you have in mind for getKeyTs() ? Would a single new
> method returning KeyContext not be sufficient?


Thanks for correction, this is my bad.

 - for backward compatibility, we will also need a new interface and
> cannot just extend the existing one


 I will correct the KIP accordingly.

Thanks,
Jeyhun

On Fri, Jun 2, 2017 at 7:36 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Thanks for the KIP Jeyhun.
>
> Some comments:
>  - why do you only consider get() and not range() and all() ?
>  - we cannot have a second get() (this would be ambiguous) but need
> another name like getWithTs() (or something better)
>  - what use case do you have in mind for getKeyTs() ? Would a single new
> method returning KeyContext not be sufficient?
>  - for backward compatibility, we will also need a new interface and
> cannot just extend the existing one
>
>
>
> -Matthias
>
> On 5/29/17 4:55 PM, Jeyhun Karimov wrote:
> > Dear community,
> >
> > I want to share KIP-165 [1] based on issue KAFKA-4304 [2].
> > I would like to get your comments.
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 165%3A+Extend+Interactive+Queries+for+return+latest+
> update+timestamp+per+key
> > [2] https://issues.apache.org/jira/browse/KAFKA-4304
> >
> > Cheers,
> > Jeyhun
> >
>
>


[jira] [Commented] (KAFKA-4325) Improve processing of late records for window operations

2017-06-02 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035688#comment-16035688
 ] 

Jeyhun Karimov commented on KAFKA-4325:
---

[~mjsax], would this jira requre KIP? 

> Improve processing of late records for window operations
> 
>
> Key: KAFKA-4325
> URL: https://issues.apache.org/jira/browse/KAFKA-4325
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>
> Windows are kept until their retention time passed. If a late arriving record 
> is processed that is older than any window kept, a new window is created 
> containing this single late arriving record, the aggregation is computed and 
> the window is immediately discarded afterward (as it is older than retention 
> time).
> This behavior might case problems for downstream application as the original 
> window aggregate might we overwritten with the late single-record- aggregate 
> value. Thus, we should rather not process the late arriving record for this 
> case.
> However, data loss might not be acceptable for all use cases. In order to 
> enable the use to not lose any data, window operators should allow to 
> register a handler function that is called instead of just dropping the late 
> arriving record.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-06-01 Thread Jeyhun Karimov
Hi Guozhang,

It subsumes partially. Initially the idea was to support RichFunctions as a
separate interface. Throughout the discussion, however, we considered maybe
overloading the related methods (with RecodContext param) is better
approach than providing a separate RichFunction interface.

Cheers,
Jeyhun

On Fri, Jun 2, 2017 at 2:27 AM Guozhang Wang <wangg...@gmail.com> wrote:

> Does this KIP subsume this ticket as well?
> https://issues.apache.org/jira/browse/KAFKA-4125
>
> On Sat, May 20, 2017 at 9:05 AM, Jeyhun Karimov <je.kari...@gmail.com>
> wrote:
>
> > Dear community,
> >
> > As we discussed in KIP-149 [DISCUSS] thread [1], I would like to initiate
> > KIP for rich functions (interfaces) [2].
> > I would like to get your comments.
> >
> >
> > [1]
> > http://search-hadoop.com/m/Kafka/uyzND1PMjdk2CslH12?subj=
> > Re+DISCUSS+KIP+149+Enabling+key+access+in+ValueTransformer+ValueMapper+
> > and+ValueJoiner
> > [2]
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 159%3A+Introducing+Rich+functions+to+Streams
> >
> >
> > Cheers,
> > Jeyhun
> > --
> > -Cheers
> >
> > Jeyhun
> >
>
>
>
> --
> -- Guozhang
>
-- 
-Cheers

Jeyhun


[DISCUSS] KIP-165: Extend Interactive Queries for return latest update timestamp per key

2017-05-29 Thread Jeyhun Karimov
Dear community,

I want to share KIP-165 [1] based on issue KAFKA-4304 [2].
I would like to get your comments.

[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-165%3A+Extend+Interactive+Queries+for+return+latest+update+timestamp+per+key
[2] https://issues.apache.org/jira/browse/KAFKA-4304

Cheers,
Jeyhun

-- 
-Cheers

Jeyhun


[jira] [Comment Edited] (KAFKA-4304) Extend Interactive Queries for return latest update timestamp per key

2017-05-28 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027964#comment-16027964
 ] 

Jeyhun Karimov edited comment on KAFKA-4304 at 5/28/17 10:34 PM:
-

>From the user perspective, one would query {{ReadOnlyKeyValueStore#get(key)}} 
>to get the corresponding value and then she would query again for 
>{{ReadOnlyKeyValueStore#getKeyWithTs}} to get key's timestamp. As a result we 
>search twice. This is only for specific use-cases like this.

Maybe we can overload {{ReadOnlyKeyValueStore#get}} method to return 
{{KeyWithTimestamp}} and also add {{ReadOnlyKeyValueStore#get(key)}} for the 
use-cases only requiring just key timestamp. Anyway, I will initiate KIP for 
this.


was (Author: jeyhunkarimov):
>From the user perspective, one would query {{ReadOnlyKeyValueStore#get(key)}} 
>to get the corresponding value and then she would query again for 
>{{ReadOnlyKeyValueStore#getKeyWithTs}} to get key's timestamp. As a result we 
>search twice. This is only for specific use-cases like this.

Maybe we can overload {{ReadOnlyKeyValueStore#get}} method to return 
{{KeyWithTimestamp}} and also add {{ReadOnlyKeyValueStore#get(key)}} for the 
use-cases only requiring just key timestamp.

> Extend Interactive Queries for return latest update timestamp per key
> -
>
> Key: KAFKA-4304
> URL: https://issues.apache.org/jira/browse/KAFKA-4304
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: needs-kip
>
> Currently, when querying state store, it is not clear when the key was 
> updated last. The ides of this JIRA is to make the latest update timestamp 
> for each key-value-pair of the state store accessible.
> For example, this might be useful to
>  * check if a value was update but did not changed (just compare the update 
> TS)
>  * if you want to consider only recently updated keys



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4304) Extend Interactive Queries for return latest update timestamp per key

2017-05-28 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-4304:
-

Assignee: Jeyhun Karimov

> Extend Interactive Queries for return latest update timestamp per key
> -
>
> Key: KAFKA-4304
> URL: https://issues.apache.org/jira/browse/KAFKA-4304
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>    Assignee: Jeyhun Karimov
>Priority: Minor
>  Labels: needs-kip
>
> Currently, when querying state store, it is not clear when the key was 
> updated last. The ides of this JIRA is to make the latest update timestamp 
> for each key-value-pair of the state store accessible.
> For example, this might be useful to
>  * check if a value was update but did not changed (just compare the update 
> TS)
>  * if you want to consider only recently updated keys



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-4304) Extend Interactive Queries for return latest update timestamp per key

2017-05-28 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027964#comment-16027964
 ] 

Jeyhun Karimov edited comment on KAFKA-4304 at 5/28/17 10:25 PM:
-

>From the user perspective, one would query {{ReadOnlyKeyValueStore#get(key)}} 
>to get the corresponding value and then she would query again for 
>{{ReadOnlyKeyValueStore#getKeyWithTs}} to get key's timestamp. As a result we 
>search twice. This is only for specific use-cases like this.

Maybe we can overload {{ReadOnlyKeyValueStore#get}} method to return 
{{KeyWithTimestamp}} and also add {{ReadOnlyKeyValueStore#get(key)}} for the 
use-cases only requiring just key timestamp.


was (Author: jeyhunkarimov):
>From the user perspective, one would query {{ReadOnlyKeyValueStore#get(key)}} 
>to get the corresponding value and then she would query again for 
>{{ReadOnlyKeyValueStore#getKeyWithTs}} to get key's timestamp. As a result we 
>search twice. This is only for specific use-cases like this. 

Maybe we can overload {{ReadOnlyKeyValueStore#get}} method to return 
{{KeyWithTimestamp}}

> Extend Interactive Queries for return latest update timestamp per key
> -
>
> Key: KAFKA-4304
> URL: https://issues.apache.org/jira/browse/KAFKA-4304
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: needs-kip
>
> Currently, when querying state store, it is not clear when the key was 
> updated last. The ides of this JIRA is to make the latest update timestamp 
> for each key-value-pair of the state store accessible.
> For example, this might be useful to
>  * check if a value was update but did not changed (just compare the update 
> TS)
>  * if you want to consider only recently updated keys



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4304) Extend Interactive Queries for return latest update timestamp per key

2017-05-28 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027964#comment-16027964
 ] 

Jeyhun Karimov commented on KAFKA-4304:
---

>From the user perspective, one would query {{ReadOnlyKeyValueStore#get(key)}} 
>to get the corresponding value and then she would query again for 
>{{ReadOnlyKeyValueStore#getKeyWithTs}} to get key's timestamp. As a result we 
>search twice. This is only for specific use-cases like this. 

Maybe we can overload {{ReadOnlyKeyValueStore#get}} method to return 
{{KeyWithTimestamp}}

> Extend Interactive Queries for return latest update timestamp per key
> -
>
> Key: KAFKA-4304
> URL: https://issues.apache.org/jira/browse/KAFKA-4304
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: needs-kip
>
> Currently, when querying state store, it is not clear when the key was 
> updated last. The ides of this JIRA is to make the latest update timestamp 
> for each key-value-pair of the state store accessible.
> For example, this might be useful to
>  * check if a value was update but did not changed (just compare the update 
> TS)
>  * if you want to consider only recently updated keys



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4304) Extend Interactive Queries for return latest update timestamp per key

2017-05-28 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16027958#comment-16027958
 ] 

Jeyhun Karimov commented on KAFKA-4304:
---

[~mjsax] I completely forgot this issue. Sorry for super late response. I 
misunderstood the issue (top-k staff). 
My question is, should we add a new public API to access tuple's update 
timestamp?If yes, I think this would be inefficient. 
Or (while querying with key) should we return the value  in a "package" 
containing its (key's) update timestamp? If yes, this would cause issues with 
backwards compatibility. 
Please correct me if I am wrong

> Extend Interactive Queries for return latest update timestamp per key
> -
>
> Key: KAFKA-4304
> URL: https://issues.apache.org/jira/browse/KAFKA-4304
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: newbie++
>
> Currently, when querying state store, it is not clear when the key was 
> updated last. The ides of this JIRA is to make the latest update timestamp 
> for each key-value-pair of the state store accessible.
> For example, this might be useful to
>  * check if a value was update but did not changed (just compare the update 
> TS)
>  * if you want to consider only recently updated keys



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-05-28 Thread Jeyhun Karimov
After your response on KIP-149 related with ValueTransformerSupplier,
everything
you mentioned now makes complete sense. Thanks for clarification.

Just a note: We will have additional (to KIP-149) overloaded methods: for
each withKey and withoutKey methods (ValueMapper and ValueMapperWithKey) we
will have overloaded methods with RecordContext argument.
Other than this issue, I don't see any limitation.

Cheers,
Jeyhun


On Sun, May 28, 2017 at 6:34 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> Thanks for you comments Jeyhun,
>
> I agree about the disadvantages. Only the punctuation part is something
> I don't buy. IMHO, RichFunctions should not allow to register and use
> punctuation. If you need punctuation, you should use #transform() or
> similar. Note, that we plan to provide `RecordContext` and not
> `ProcessorContext` and thus, it's not even possible to register
> punctuations.
>
> One more thought: if you go with `init()` and `close()` we basically
> allow users to have an in-memory state for a function. Thus, we cannot
> share a single instance of RichValueMapper (etc) over multiple tasks and
> we would need a supplier pattern similar to #transform(). And this would
> "break the flow" of the API, as (Rich)ValueMapperSupplier would not
> inherit from ValueMapper and thus we would need many new overload for
> KStream/KTable classes.
>
> The overall goal of RichFunction (from my understanding) was to provide
> record metadata information (like offset, timestamp, etc) to the user.
> And we still have #transform() that provided the init and close
> functionality. So if we introduce those with RichFunction we are quite
> close to what #transform provides, and thus it feels as if we duplicate
> functionality.
>
> For this reason, it seems to be better to got with the
> `#valueMapper(ValueMapper mapper, RecordContext context)` approach.
>
> WDYT?
>
>
>
> -Matthias
>
> On 5/27/17 11:00 AM, Jeyhun Karimov wrote:
> > Hi,
> >
> > Thanks for your comments. I will refer the overall approach as rich
> > functions until we find a better name.
> >
> > I think there are some pros and cons of the approach you described.
> >
> > Pros is that it is simple, has clear boundaries, avoids misunderstanding
> of
> > term "function".
> > So you propose sth like:
> > KStream.valueMapper (ValueMapper vm, RecordContext rc)
> > or
> > having rich functions with only a single init(RecordContext rc) method.
> >
> > Cons is that:
> >  - This will bring another set of overloads (if we use RecordContext as a
> > separate parameter). We should consider that the rich functions will be
> for
> > all main interfaces.
> >  - I don't think that we need lambdas in rich functions. It is by
> > definition "rich" so, no single method in interface -> as a result no
> > lambdas.
> >  - I disagree that rich functions should only contain init() method. This
> > depends on each interface. For example, for specific interfaces  we can
> add
> > methods (like punctuate()) to their rich functions.
> >
> >
> > Cheers,
> > Jeyhun
> >
> >
> >
> > On Thu, May 25, 2017 at 1:02 AM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> I confess, the term is borrowed from Flink :)
> >>
> >> Personally, I never thought about it, but I tend to agree with Michal. I
> >> also want to clarify, that the main purpose is the ability to access
> >> record metadata. Thus, it might even be sufficient to only have "init".
> >>
> >> An alternative would of course be, to pass in the RecordContext as
> >> method parameter. This would allow us to drop "init()". This might even
> >> allow to use Lambdas and we could keep the name RichFunction as we
> >> preserve the nature of being a function.
> >>
> >>
> >> -Matthias
> >>
> >> On 5/24/17 12:13 PM, Jeyhun Karimov wrote:
> >>> Hi Michal,
> >>>
> >>> Thanks for your comments. I see your point and I agree with it.
> However,
> >>> I don't have a better idea for naming. I checked MR source code. There
> >>> it is used JobConfigurable and Closable, two different interfaces.
> Maybe
> >>> we can rename RichFunction as Configurable?
> >>>
> >>>
> >>> Cheers,
> >>> Jeyhun
> >>>
> >>> On Tue, May 23, 2017 at 2:58 PM Michal Borowiecki
> >>> <michal.borowie...@openbet.com <mailto:michal.borowie...@openbet.com>>
> >>> wro

Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-28 Thread Jeyhun Karimov
Thanks for clarification Matthias, now everything is clear.

On Sun, May 28, 2017 at 6:21 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> I don't think we can drop ValueTransformerSupplier. If you don't have an
> supplier, you only get a single instance of your function. But for a
> stateful transformation, we need multiple instances (one for each task)
> of ValueTransformer.
>
> We don't need suppliers for functions like "ValueMapper" etc because
> those are stateless and thus, we can reuse a single instance over
> multiple tasks -- but we can't do this for ValueTransformer (and similar).
>
> Btw: This reminds me about KIP-159: with regard to the RichFunction we
> might need a supplier pattern, too. (I'll comment on the other thread,
> too.)
>
>
> -Matthias
>
> On 5/28/17 5:45 AM, Jeyhun Karimov wrote:
> > Hi,
> >
> > I updated KIP.
> > Just to avoid misunderstanding, I meant deprecating
> ValueTransformerSupplier
> > and I am ok with ValueTransformer.
> > So instead of using ValueTransformerSupplier can't we directly use
> > ValueTransformer
> > or ValueTransformerWithKey?
> >
> > Btw, in current design all features of ValueTransformer will be available
> > in  ValueTransformerWithKey interface.
> >
> > Cheers,
> > Jeyhun
> >
> > On Sun, May 28, 2017 at 6:15 AM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> Thanks Jeyhun.
> >>
> >> About ValueTransformer: I don't think we can remove it. Note,
> >> ValueTransformer allows to attach a state and also allows to register
> >> punctuations. Both those features will not be available via withKey()
> >> interfaces.
> >>
> >> -Matthias
> >>
> >>
> >> On 5/27/17 1:25 PM, Jeyhun Karimov wrote:
> >>> Hi Matthias,
> >>>
> >>> Thanks for your comments.
> >>>
> >>> I tested the deep copy approach. It has significant overhead.
> Especially
> >>> for "light" and stateless operators it slows down the topology
> >>> significantly (> 20% ). I think "warning"  users about not-changing the
> >> key
> >>> is better warning them about possible performance loss.
> >>>
> >>> About the interfaces, additionally I considered adding
> >> InitializerWithKey,
> >>> AggregatorWithKey and ValueTransformerWithKey. I think they are
> included
> >> in
> >>> PR but not in KIP. I will also include them in KIP, sorry my bad.
> >>> Including ReducerWithKey definitely makes sense. Thanks.
> >>>
> >>> One thing I want to mention is that, maybe we should deprecate methods
> >> with
> >>> argument type ValueTransformerSupplier (KStream.transformValues(...))
> and
> >>> and as a whole the ValueTransformerSupplier interface.
> >>> We can use ValueTransformer/ValueTransformerWithKey type instead
> without
> >>> additional supplier layer.
> >>>
> >>>
> >>> Cheers,
> >>> Jeyhun
> >>>
> >>>
> >>> On Thu, May 25, 2017 at 1:07 AM Matthias J. Sax <matth...@confluent.io
> >
> >>> wrote:
> >>>
> >>>> One more question:
> >>>>
> >>>> Should we add any of
> >>>>  - InitizialierWithKey
> >>>>  - ReducerWithKey
> >>>>  - ValueTransformerWithKey
> >>>>
> >>>> To get consistent/complete API, it might be a good idea. Any thoughts?
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 5/24/17 3:47 PM, Matthias J. Sax wrote:
> >>>>> Jeyhun,
> >>>>>
> >>>>> I was just wondering if you did look into the key-deep-copy idea we
> >>>>> discussed. I am curious to see what the impact might be.
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 5/20/17 2:03 AM, Jeyhun Karimov wrote:
> >>>>>> Hi,
> >>>>>>
> >>>>>> Thanks for your comments. I rethink about including rich functions
> >> into
> >>>>>> this KIP.
> >>>>>> I think once we include rich functions in this KIP and then fix
> >>>>>> ProcessorContext in another KIP and incorporate with existing rich
> >>>>>> functions, the code will not be backward

Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-28 Thread Jeyhun Karimov
Hi,

I updated KIP.
Just to avoid misunderstanding, I meant deprecating  ValueTransformerSupplier
and I am ok with ValueTransformer.
So instead of using ValueTransformerSupplier can't we directly use
ValueTransformer
or ValueTransformerWithKey?

Btw, in current design all features of ValueTransformer will be available
in  ValueTransformerWithKey interface.

Cheers,
Jeyhun

On Sun, May 28, 2017 at 6:15 AM Matthias J. Sax <matth...@confluent.io>
wrote:

> Thanks Jeyhun.
>
> About ValueTransformer: I don't think we can remove it. Note,
> ValueTransformer allows to attach a state and also allows to register
> punctuations. Both those features will not be available via withKey()
> interfaces.
>
> -Matthias
>
>
> On 5/27/17 1:25 PM, Jeyhun Karimov wrote:
> > Hi Matthias,
> >
> > Thanks for your comments.
> >
> > I tested the deep copy approach. It has significant overhead. Especially
> > for "light" and stateless operators it slows down the topology
> > significantly (> 20% ). I think "warning"  users about not-changing the
> key
> > is better warning them about possible performance loss.
> >
> > About the interfaces, additionally I considered adding
> InitializerWithKey,
> > AggregatorWithKey and ValueTransformerWithKey. I think they are included
> in
> > PR but not in KIP. I will also include them in KIP, sorry my bad.
> > Including ReducerWithKey definitely makes sense. Thanks.
> >
> > One thing I want to mention is that, maybe we should deprecate methods
> with
> > argument type ValueTransformerSupplier (KStream.transformValues(...)) and
> > and as a whole the ValueTransformerSupplier interface.
> > We can use ValueTransformer/ValueTransformerWithKey type instead without
> > additional supplier layer.
> >
> >
> > Cheers,
> > Jeyhun
> >
> >
> > On Thu, May 25, 2017 at 1:07 AM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> One more question:
> >>
> >> Should we add any of
> >>  - InitizialierWithKey
> >>  - ReducerWithKey
> >>  - ValueTransformerWithKey
> >>
> >> To get consistent/complete API, it might be a good idea. Any thoughts?
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 5/24/17 3:47 PM, Matthias J. Sax wrote:
> >>> Jeyhun,
> >>>
> >>> I was just wondering if you did look into the key-deep-copy idea we
> >>> discussed. I am curious to see what the impact might be.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 5/20/17 2:03 AM, Jeyhun Karimov wrote:
> >>>> Hi,
> >>>>
> >>>> Thanks for your comments. I rethink about including rich functions
> into
> >>>> this KIP.
> >>>> I think once we include rich functions in this KIP and then fix
> >>>> ProcessorContext in another KIP and incorporate with existing rich
> >>>> functions, the code will not be backwards compatible.
> >>>>
> >>>> I see Damian's and your point more clearly now.
> >>>>
> >>>> Conclusion: we include only withKey interfaces in this KIP (I updated
> >> the
> >>>> KIP), I will try also initiate another KIP for rich functions.
> >>>>
> >>>> Cheers,
> >>>> Jeyhun
> >>>>
> >>>> On Fri, May 19, 2017 at 10:50 PM Matthias J. Sax <
> matth...@confluent.io
> >>>
> >>>> wrote:
> >>>>
> >>>>> With the current KIP, using ValueMapper and ValueMapperWithKey
> >>>>> interfaces, RichFunction seems to be an independent add-on. To fix
> the
> >>>>> original issue to allow key access, RichFunctions are not required
> >> IMHO.
> >>>>>
> >>>>> I initially put the RichFunction idea on the table, because I was
> >> hoping
> >>>>> to get a uniform API. And I think, is was good to consider them --
> >>>>> however, the discussion showed that they are not necessary for key
> >>>>> access. Thus, it seems to be better to handle RichFunctions in an own
> >>>>> KIP. The ProcessorContext/RecordContext issues seems to be a main
> >>>>> challenge for this. And introducing RichFunctions with parameter-less
> >>>>> init() method, seem not to help too much. We would get an
> >> "intermediate"
> >>>>> API that we want to change anyway l

Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-27 Thread Jeyhun Karimov
Hi Matthias,

Thanks for your comments.

I tested the deep copy approach. It has significant overhead. Especially
for "light" and stateless operators it slows down the topology
significantly (> 20% ). I think "warning"  users about not-changing the key
is better warning them about possible performance loss.

About the interfaces, additionally I considered adding InitializerWithKey,
AggregatorWithKey and ValueTransformerWithKey. I think they are included in
PR but not in KIP. I will also include them in KIP, sorry my bad.
Including ReducerWithKey definitely makes sense. Thanks.

One thing I want to mention is that, maybe we should deprecate methods with
argument type ValueTransformerSupplier (KStream.transformValues(...)) and
and as a whole the ValueTransformerSupplier interface.
We can use ValueTransformer/ValueTransformerWithKey type instead without
additional supplier layer.


Cheers,
Jeyhun


On Thu, May 25, 2017 at 1:07 AM Matthias J. Sax <matth...@confluent.io>
wrote:

> One more question:
>
> Should we add any of
>  - InitizialierWithKey
>  - ReducerWithKey
>  - ValueTransformerWithKey
>
> To get consistent/complete API, it might be a good idea. Any thoughts?
>
>
> -Matthias
>
>
> On 5/24/17 3:47 PM, Matthias J. Sax wrote:
> > Jeyhun,
> >
> > I was just wondering if you did look into the key-deep-copy idea we
> > discussed. I am curious to see what the impact might be.
> >
> >
> > -Matthias
> >
> > On 5/20/17 2:03 AM, Jeyhun Karimov wrote:
> >> Hi,
> >>
> >> Thanks for your comments. I rethink about including rich functions into
> >> this KIP.
> >> I think once we include rich functions in this KIP and then fix
> >> ProcessorContext in another KIP and incorporate with existing rich
> >> functions, the code will not be backwards compatible.
> >>
> >> I see Damian's and your point more clearly now.
> >>
> >> Conclusion: we include only withKey interfaces in this KIP (I updated
> the
> >> KIP), I will try also initiate another KIP for rich functions.
> >>
> >> Cheers,
> >> Jeyhun
> >>
> >> On Fri, May 19, 2017 at 10:50 PM Matthias J. Sax <matth...@confluent.io
> >
> >> wrote:
> >>
> >>> With the current KIP, using ValueMapper and ValueMapperWithKey
> >>> interfaces, RichFunction seems to be an independent add-on. To fix the
> >>> original issue to allow key access, RichFunctions are not required
> IMHO.
> >>>
> >>> I initially put the RichFunction idea on the table, because I was
> hoping
> >>> to get a uniform API. And I think, is was good to consider them --
> >>> however, the discussion showed that they are not necessary for key
> >>> access. Thus, it seems to be better to handle RichFunctions in an own
> >>> KIP. The ProcessorContext/RecordContext issues seems to be a main
> >>> challenge for this. And introducing RichFunctions with parameter-less
> >>> init() method, seem not to help too much. We would get an
> "intermediate"
> >>> API that we want to change anyway later on...
> >>>
> >>> As you put already much effort into RichFunction, feel free do push
> this
> >>> further and start a new KIP (we can do this even in parallel) -- we
> >>> don't want to slow you down :) But it make the discussion and code
> >>> review easier, if we separate both IMHO.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 5/19/17 2:25 AM, Jeyhun Karimov wrote:
> >>>> Hi Damian,
> >>>>
> >>>> Thanks for your comments. I think providing to users *interface*
> rather
> >>>> than *abstract class* should be preferred (Matthias also raised this
> >>> issue
> >>>> ), anyway I changed the corresponding parts of KIP.
> >>>>
> >>>> Regarding with passing additional contextual information, I think it
> is a
> >>>> tradeoff,
> >>>> 1) first, we fix the context parameter for *init() *method in another
> PR
> >>>> and solve Rich functions afterwards
> >>>> 2) first, we fix the requested issues on jira ([1-3]) with providing
> >>>> (not-complete) Rich functions and integrate the context parameters to
> >>> this
> >>>> afterwards (like improvement)
> >>>>
> >>>> To me, the second approach seems more incremental. However you are
> right,
> >>>> the names might confuse th

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-05-27 Thread Jeyhun Karimov
Hi,

Thanks for your comments. I will refer the overall approach as rich
functions until we find a better name.

I think there are some pros and cons of the approach you described.

Pros is that it is simple, has clear boundaries, avoids misunderstanding of
term "function".
So you propose sth like:
KStream.valueMapper (ValueMapper vm, RecordContext rc)
or
having rich functions with only a single init(RecordContext rc) method.

Cons is that:
 - This will bring another set of overloads (if we use RecordContext as a
separate parameter). We should consider that the rich functions will be for
all main interfaces.
 - I don't think that we need lambdas in rich functions. It is by
definition "rich" so, no single method in interface -> as a result no
lambdas.
 - I disagree that rich functions should only contain init() method. This
depends on each interface. For example, for specific interfaces  we can add
methods (like punctuate()) to their rich functions.


Cheers,
Jeyhun



On Thu, May 25, 2017 at 1:02 AM Matthias J. Sax <matth...@confluent.io>
wrote:

> I confess, the term is borrowed from Flink :)
>
> Personally, I never thought about it, but I tend to agree with Michal. I
> also want to clarify, that the main purpose is the ability to access
> record metadata. Thus, it might even be sufficient to only have "init".
>
> An alternative would of course be, to pass in the RecordContext as
> method parameter. This would allow us to drop "init()". This might even
> allow to use Lambdas and we could keep the name RichFunction as we
> preserve the nature of being a function.
>
>
> -Matthias
>
> On 5/24/17 12:13 PM, Jeyhun Karimov wrote:
> > Hi Michal,
> >
> > Thanks for your comments. I see your point and I agree with it. However,
> > I don't have a better idea for naming. I checked MR source code. There
> > it is used JobConfigurable and Closable, two different interfaces. Maybe
> > we can rename RichFunction as Configurable?
> >
> >
> > Cheers,
> > Jeyhun
> >
> > On Tue, May 23, 2017 at 2:58 PM Michal Borowiecki
> > <michal.borowie...@openbet.com <mailto:michal.borowie...@openbet.com>>
> > wrote:
> >
> > Hi Jeyhun,
> >
> > I understand your argument about "Rich" in RichFunctions. Perhaps
> > I'm just being too puritan here, but let me ask this anyway:
> >
> > What is it that makes something a function? To me a function is
> > something that takes zero or more arguments and possibly returns a
> > value and while it may have side-effects (as opposed to "pure
> > functions" which can't), it doesn't have any life-cycle of its own.
> > This is what, in my mind, distinguishes the concept of a "function"
> > from that of more vaguely defined concepts.
> >
> > So if we add a life-cycle to a function, in that understanding, it
> > doesn't become a rich function but instead stops being a function
> > altogether.
> >
> > You could say it's "just semantics" but to me precise use of
> > language in the given context is an important foundation for good
> > engineering. And in the context of programming "function" has a
> > precise meaning. Of course we can say that in the context of Kafka
> > Streams "function" has a different, looser meaning but I'd argue
> > that won't do anyone any good.
> >
> > On the other hand other frameworks such as Flink use this
> > terminology, so it could be that consistency is the reason. I'm
> > guessing that's why the name was proposed in the first place. My
> > point is simply that it's a poor choice of wording and Kafka Streams
> > don't have to follow that to the letter.
> >
> > Cheers,
> >
> > Michal
> >
> >
> > On 23/05/17 13:26, Jeyhun Karimov wrote:
> >> Hi Michal,
> >>
> >> Thanks for your comments.
> >>
> >>
> >> To me at least it feels strange that something is called a
> >> function yet doesn't follow the functional interface
> >> definition of having just one abstract method. I suppose init
> >> and close could be made default methods with empty bodies once
> >> Java 7 support is dropped to mitigate that concern. Still, I
> >> feel some resistance to consider something that requires
> >> initialisation and closing (which implies holding state) as
> >> being a function. Sounds more like the Processor/Transform

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-05-24 Thread Jeyhun Karimov
Hi Michal,

Thanks for your comments. I see your point and I agree with it. However, I
don't have a better idea for naming. I checked MR source code. There it is
used JobConfigurable and Closable, two different interfaces. Maybe we can
rename RichFunction as Configurable?


Cheers,
Jeyhun

On Tue, May 23, 2017 at 2:58 PM Michal Borowiecki <
michal.borowie...@openbet.com> wrote:

> Hi Jeyhun,
>
> I understand your argument about "Rich" in RichFunctions. Perhaps I'm just
> being too puritan here, but let me ask this anyway:
>
> What is it that makes something a function? To me a function is something
> that takes zero or more arguments and possibly returns a value and while it
> may have side-effects (as opposed to "pure functions" which can't), it
> doesn't have any life-cycle of its own. This is what, in my mind,
> distinguishes the concept of a "function" from that of more vaguely defined
> concepts.
>
> So if we add a life-cycle to a function, in that understanding, it doesn't
> become a rich function but instead stops being a function altogether.
>
> You could say it's "just semantics" but to me precise use of language in
> the given context is an important foundation for good engineering. And in
> the context of programming "function" has a precise meaning. Of course we
> can say that in the context of Kafka Streams "function" has a different,
> looser meaning but I'd argue that won't do anyone any good.
>
> On the other hand other frameworks such as Flink use this terminology, so
> it could be that consistency is the reason. I'm guessing that's why the
> name was proposed in the first place. My point is simply that it's a poor
> choice of wording and Kafka Streams don't have to follow that to the letter.
>
> Cheers,
>
> Michal
>
> On 23/05/17 13:26, Jeyhun Karimov wrote:
>
> Hi Michal,
>
> Thanks for your comments.
>
>
> To me at least it feels strange that something is called a function yet
>> doesn't follow the functional interface definition of having just one
>> abstract method. I suppose init and close could be made default methods
>> with empty bodies once Java 7 support is dropped to mitigate that concern.
>> Still, I feel some resistance to consider something that requires
>> initialisation and closing (which implies holding state) as being a
>> function. Sounds more like the Processor/Transformer kind of thing
>> semantically, rather than a function.
>
>
>  -  If we called the interface name only Function your assumptions will
> hold. However, the keyword Rich by definition implies that we have a
> function (as you described, with one abstract method and etc) but it is
> rich. So, there are multiple methods in it.
> Ideally it should be:
>
> public interface RichFunction extends Function {  // this is the
> Function that you described
>   void close();
>   void init(Some params);
>...
> }
>
>
> The KIP says there are multiple use-cases for this but doesn't enumerate
>> any - I think some examples would be useful, otherwise that section sounds
>> a little bit vague.
>
>
> I thought it is obvious by definition but I will update it. Thanks.
>
>
> IMHO, it's the access to the RecordContext is where the added value lies
>> but maybe I'm just lacking in imagination, so I'm asking all this to better
>> understand the rationale for init() and close().
>
>
> Maybe I should add some examples. Thanks.
>
>
> Cheers,
> Jeyhun
>
> On Mon, May 22, 2017 at 11:02 AM, Michal Borowiecki <
> michal.borowie...@openbet.com> wrote:
>
>> Hi Jeyhun,
>>
>> I'd like to understand better the premise of RichFunctions and why init(Some
>> params), close() are said to be needed.
>> To me at least it feels strange that something is called a function yet
>> doesn't follow the functional interface definition of having just one
>> abstract method. I suppose init and close could be made default methods
>> with empty bodies once Java 7 support is dropped to mitigate that concern.
>> Still, I feel some resistance to consider something that requires
>> initialisation and closing (which implies holding state) as being a
>> function. Sounds more like the Processor/Transformer kind of thing
>> semantically, rather than a function.
>>
>> The KIP says there are multiple use-cases for this but doesn't enumerate
>> any - I think some examples would be useful, otherwise that section sounds
>> a little bit vague.
>>
>> IMHO, it's the access to the RecordContext is where the added value lies
>> but maybe I'm just lacking in imagination, so I'm asking all this to b

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-05-23 Thread Jeyhun Karimov
Hi Michal,

Thanks for your comments.


To me at least it feels strange that something is called a function yet
> doesn't follow the functional interface definition of having just one
> abstract method. I suppose init and close could be made default methods
> with empty bodies once Java 7 support is dropped to mitigate that concern.
> Still, I feel some resistance to consider something that requires
> initialisation and closing (which implies holding state) as being a
> function. Sounds more like the Processor/Transformer kind of thing
> semantically, rather than a function.


 -  If we called the interface name only Function your assumptions will
hold. However, the keyword Rich by definition implies that we have a
function (as you described, with one abstract method and etc) but it is
rich. So, there are multiple methods in it.
Ideally it should be:

public interface RichFunction extends Function {  // this is the
Function that you described
  void close();
  void init(Some params);
   ...
}


The KIP says there are multiple use-cases for this but doesn't enumerate
> any - I think some examples would be useful, otherwise that section sounds
> a little bit vague.


I thought it is obvious by definition but I will update it. Thanks.


IMHO, it's the access to the RecordContext is where the added value lies
> but maybe I'm just lacking in imagination, so I'm asking all this to better
> understand the rationale for init() and close().


Maybe I should add some examples. Thanks.


Cheers,
Jeyhun

On Mon, May 22, 2017 at 11:02 AM, Michal Borowiecki <
michal.borowie...@openbet.com> wrote:

> Hi Jeyhun,
>
> I'd like to understand better the premise of RichFunctions and why init(Some
> params), close() are said to be needed.
> To me at least it feels strange that something is called a function yet
> doesn't follow the functional interface definition of having just one
> abstract method. I suppose init and close could be made default methods
> with empty bodies once Java 7 support is dropped to mitigate that concern.
> Still, I feel some resistance to consider something that requires
> initialisation and closing (which implies holding state) as being a
> function. Sounds more like the Processor/Transformer kind of thing
> semantically, rather than a function.
>
> The KIP says there are multiple use-cases for this but doesn't enumerate
> any - I think some examples would be useful, otherwise that section sounds
> a little bit vague.
>
> IMHO, it's the access to the RecordContext is where the added value lies
> but maybe I'm just lacking in imagination, so I'm asking all this to better
> understand the rationale for init() and close().
>
> Thanks,
> Michał
>
> On 20/05/17 17:05, Jeyhun Karimov wrote:
>
> Dear community,
>
> As we discussed in KIP-149 [DISCUSS] thread [1], I would like to initiate
> KIP for rich functions (interfaces) [2].
> I would like to get your comments.
>
>
> [1]http://search-hadoop.com/m/Kafka/uyzND1PMjdk2CslH12?subj=Re+DISCUSS+KIP+149+Enabling+key+access+in+ValueTransformer+ValueMapper+and+ValueJoiner
> [2]https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams
>
>
> Cheers,
> Jeyhun
>
>
> --
> <http://www.openbet.com/> Michal Borowiecki
> Senior Software Engineer L4
> T: +44 208 742 1600 <+44%2020%208742%201600>
>
>
> +44 203 249 8448 <+44%2020%203249%208448>
>
>
>
> E: michal.borowie...@openbet.com
> W: www.openbet.com
> OpenBet Ltd
>
> Chiswick Park Building 9
>
> 566 Chiswick High Rd
>
> London
>
> W4 5XT
>
> UK
> <https://www.openbet.com/email_promo>
> This message is confidential and intended only for the addressee. If you
> have received this message in error, please immediately notify the
> postmas...@openbet.com and delete it from your system as well as any
> copies. The content of e-mails as well as traffic data may be monitored by
> OpenBet for employment and security purposes. To protect the environment
> please do not print this e-mail unless necessary. OpenBet Ltd. Registered
> Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
> United Kingdom. A company registered in England and Wales. Registered no.
> 3134634. VAT no. GB927523612
>


[DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-05-20 Thread Jeyhun Karimov
Dear community,

As we discussed in KIP-149 [DISCUSS] thread [1], I would like to initiate
KIP for rich functions (interfaces) [2].
I would like to get your comments.


[1]
http://search-hadoop.com/m/Kafka/uyzND1PMjdk2CslH12?subj=Re+DISCUSS+KIP+149+Enabling+key+access+in+ValueTransformer+ValueMapper+and+ValueJoiner
[2]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams


Cheers,
Jeyhun
-- 
-Cheers

Jeyhun


Re: [VOTE] KIP-123: Allow per stream/table timestamp extractor

2017-05-20 Thread Jeyhun Karimov
Dear community,

I want to inform you that because of the case [1] we needed to add an extra
overloaded method to KIP:
*KStreamBuilder.globalTable(final Serde keySerde, final Serde
valSerde, final String topic, final String storeName) *

[1]
https://github.com/confluentinc/examples/commit/2cd0b87bc8a7eab0e7199fa0079db6417f0e6b63#commitcomment-22200864


Cheers,
Jeyhun


On Sat, May 6, 2017 at 1:57 AM Guozhang Wang <wangg...@gmail.com> wrote:

> THank you, Jeyhun!
>
> Will made a final pass on the PR soon.
>
> Guozhang
>
> On Tue, Apr 25, 2017 at 11:13 PM, Michael Noll <mich...@confluent.io>
> wrote:
>
> > Thanks for your work and for driving this, Jeyhun! :-)
> >
> > -Michael
> >
> >
> > On Tue, Apr 25, 2017 at 11:11 PM, Jeyhun Karimov <je.kari...@gmail.com>
> > wrote:
> >
> > > Dear all,
> > >
> > > I am closing this vote now. The KIP got accepted with
> > >
> > > +3 binding (Guozhang, Ewen, Gwen)
> > >
> > > Thanks all (especially for Mathias) for guiding me throughout my first
> > KIP.
> > >
> > >
> > > Cheers,
> > > Jeyhun
> > >
> > > On Mon, Apr 24, 2017 at 9:32 PM Thomas Becker <tobec...@tivo.com>
> wrote:
> > >
> > > > +1 (non-binding)
> > > >
> > > > On Tue, 2017-02-28 at 08:59 +, Jeyhun Karimov wrote:
> > > > > Dear community,
> > > > >
> > > > > I'd like to start the vote for KIP-123:
> > > > > https://cwiki.apache.org/confluence/pages/viewpage.
> > action?pageId=6871
> > > > > 4788
> > > > >
> > > > >
> > > > > Cheers,
> > > > > Jeyhun
> > > > --
> > > >
> > > >
> > > > Tommy Becker
> > > >
> > > > Senior Software Engineer
> > > >
> > > > O +1 919.460.4747 <(919)%20460-4747> <(919)%20460-4747>
> > > >
> > > > tivo.com
> > > >
> > > >
> > > > 
> > > >
> > > > This email and any attachments may contain confidential and
> privileged
> > > > material for the sole use of the intended recipient. Any review,
> > copying,
> > > > or distribution of this email (or any attachments) by others is
> > > prohibited.
> > > > If you are not the intended recipient, please contact the sender
> > > > immediately and permanently delete this email and any attachments. No
> > > > employee or agent of TiVo Inc. is authorized to conclude any binding
> > > > agreement on behalf of TiVo Inc. by email. Binding agreements with
> TiVo
> > > > Inc. may only be made by a signed written agreement.
> > > >
> > > --
> > > -Cheers
> > >
> > > Jeyhun
> > >
> >
>
>
>
> --
> -- Guozhang
>
-- 
-Cheers

Jeyhun


Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-20 Thread Jeyhun Karimov
Hi,

Thanks for your comments. I rethink about including rich functions into
this KIP.
I think once we include rich functions in this KIP and then fix
ProcessorContext in another KIP and incorporate with existing rich
functions, the code will not be backwards compatible.

I see Damian's and your point more clearly now.

Conclusion: we include only withKey interfaces in this KIP (I updated the
KIP), I will try also initiate another KIP for rich functions.

Cheers,
Jeyhun

On Fri, May 19, 2017 at 10:50 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> With the current KIP, using ValueMapper and ValueMapperWithKey
> interfaces, RichFunction seems to be an independent add-on. To fix the
> original issue to allow key access, RichFunctions are not required IMHO.
>
> I initially put the RichFunction idea on the table, because I was hoping
> to get a uniform API. And I think, is was good to consider them --
> however, the discussion showed that they are not necessary for key
> access. Thus, it seems to be better to handle RichFunctions in an own
> KIP. The ProcessorContext/RecordContext issues seems to be a main
> challenge for this. And introducing RichFunctions with parameter-less
> init() method, seem not to help too much. We would get an "intermediate"
> API that we want to change anyway later on...
>
> As you put already much effort into RichFunction, feel free do push this
> further and start a new KIP (we can do this even in parallel) -- we
> don't want to slow you down :) But it make the discussion and code
> review easier, if we separate both IMHO.
>
>
> -Matthias
>
>
> On 5/19/17 2:25 AM, Jeyhun Karimov wrote:
> > Hi Damian,
> >
> > Thanks for your comments. I think providing to users *interface* rather
> > than *abstract class* should be preferred (Matthias also raised this
> issue
> > ), anyway I changed the corresponding parts of KIP.
> >
> > Regarding with passing additional contextual information, I think it is a
> > tradeoff,
> > 1) first, we fix the context parameter for *init() *method in another PR
> > and solve Rich functions afterwards
> > 2) first, we fix the requested issues on jira ([1-3]) with providing
> > (not-complete) Rich functions and integrate the context parameters to
> this
> > afterwards (like improvement)
> >
> > To me, the second approach seems more incremental. However you are right,
> > the names might confuse the users.
> >
> >
> >
> > [1] https://issues.apache.org/jira/browse/KAFKA-4218
> > [2] https://issues.apache.org/jira/browse/KAFKA-4726
> > [3] https://issues.apache.org/jira/browse/KAFKA-3745
> >
> >
> > Cheers,
> > Jeyhun
> >
> >
> > On Fri, May 19, 2017 at 10:42 AM Damian Guy <damian@gmail.com>
> wrote:
> >
> >> Hi,
> >>
> >> I see you've removed the `ProcessorContext` from the RichFunction which
> is
> >> good, but why is it a `RichFunction`? I'd have expected it to pass some
> >> additional contextual information, i.e., the `RecordContext` that
> contains
> >> just the topic, partition, timestamp, offset.  I'm ok with it not
> passing
> >> this contextual information, but is the naming incorrect? I'm not sure,
> >> tbh. I'm wondering if we should drop `RichFunctions` until we can do it
> >> properly with the correct context?
> >>
> >> Also, i don't like the abstract classes: RichValueMapper,
> RichValueJoiner,
> >> RichInitializer etc. Why can't they not just be interfaces? Generally we
> >> should provide users with Intefaces to code against, not classes.
> >>
> >> Thanks,
> >> Damian
> >>
> >> On Fri, 19 May 2017 at 00:50 Jeyhun Karimov <je.kari...@gmail.com>
> wrote:
> >>
> >>> Hi,
> >>>
> >>> Thanks. I initiated the PR as well, to get a better overview.
> >>>
> >>> The only reason that I used abstract class instead of interface for
> Rich
> >>> functions is that in future if we will have some AbstractRichFunction
> >>> abstract classes,
> >>> we can easily extend:
> >>>
> >>> public abstract class RichValueMapper<K, V, VR>  implements
> >>> ValueMapperWithKey<K, V, VR>, RichFunction *extends
> >> AbstractRichFunction*{
> >>> }
> >>>  With interfaces we are only limited to interfaces for inheritance.
> >>>
> >>> However, I think we can easily change it (from abstract class ->
> >> interface)
> >>> if you think interface is a better fit.
> >>>
> >&

[jira] [Commented] (KAFKA-4785) Records from internal repartitioning topics should always use RecordMetadataTimestampExtractor

2017-05-19 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16017575#comment-16017575
 ] 

Jeyhun Karimov commented on KAFKA-4785:
---

[~mjsax] Thanks for reminding. I am on it

> Records from internal repartitioning topics should always use 
> RecordMetadataTimestampExtractor
> --
>
> Key: KAFKA-4785
> URL: https://issues.apache.org/jira/browse/KAFKA-4785
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Matthias J. Sax
>Assignee: Jeyhun Karimov
>
> Users can specify what timestamp extractor should be used to decode the 
> timestamp of input topic records. As long as RecordMetadataTimestamp or 
> WallclockTime is use this is fine. 
> However, for custom timestamp extractors it might be invalid to apply this 
> custom extractor to records received from internal repartitioning topics. The 
> reason is that Streams sets the current "stream time" as record metadata 
> timestamp explicitly before writing to intermediate repartitioning topics 
> because this timestamp should be use by downstream subtopologies. A custom 
> timestamp extractor might return something different breaking this assumption.
> Thus, for reading data from intermediate repartitioning topic, the configured 
> timestamp extractor should not be used, but the record's metadata timestamp 
> should be extracted as record timestamp.
> In order to leverage the same behavior for intermediate user topic (ie, used 
> in {{through()}})  we can leverage KAFKA-4144 and internally set an extractor 
> for those "intermediate sources" that returns the record's metadata timestamp 
> in order to overwrite the global extractor from {{StreamsConfig}} (ie, set 
> {{FailOnInvalidTimestampExtractor}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-19 Thread Jeyhun Karimov
Hi Damian,

Thanks for your comments. I think providing to users *interface* rather
than *abstract class* should be preferred (Matthias also raised this issue
), anyway I changed the corresponding parts of KIP.

Regarding with passing additional contextual information, I think it is a
tradeoff,
1) first, we fix the context parameter for *init() *method in another PR
and solve Rich functions afterwards
2) first, we fix the requested issues on jira ([1-3]) with providing
(not-complete) Rich functions and integrate the context parameters to this
afterwards (like improvement)

To me, the second approach seems more incremental. However you are right,
the names might confuse the users.



[1] https://issues.apache.org/jira/browse/KAFKA-4218
[2] https://issues.apache.org/jira/browse/KAFKA-4726
[3] https://issues.apache.org/jira/browse/KAFKA-3745


Cheers,
Jeyhun


On Fri, May 19, 2017 at 10:42 AM Damian Guy <damian@gmail.com> wrote:

> Hi,
>
> I see you've removed the `ProcessorContext` from the RichFunction which is
> good, but why is it a `RichFunction`? I'd have expected it to pass some
> additional contextual information, i.e., the `RecordContext` that contains
> just the topic, partition, timestamp, offset.  I'm ok with it not passing
> this contextual information, but is the naming incorrect? I'm not sure,
> tbh. I'm wondering if we should drop `RichFunctions` until we can do it
> properly with the correct context?
>
> Also, i don't like the abstract classes: RichValueMapper, RichValueJoiner,
> RichInitializer etc. Why can't they not just be interfaces? Generally we
> should provide users with Intefaces to code against, not classes.
>
> Thanks,
> Damian
>
> On Fri, 19 May 2017 at 00:50 Jeyhun Karimov <je.kari...@gmail.com> wrote:
>
> > Hi,
> >
> > Thanks. I initiated the PR as well, to get a better overview.
> >
> > The only reason that I used abstract class instead of interface for Rich
> > functions is that in future if we will have some AbstractRichFunction
> > abstract classes,
> > we can easily extend:
> >
> > public abstract class RichValueMapper<K, V, VR>  implements
> > ValueMapperWithKey<K, V, VR>, RichFunction *extends
> AbstractRichFunction*{
> > }
> >  With interfaces we are only limited to interfaces for inheritance.
> >
> > However, I think we can easily change it (from abstract class ->
> interface)
> > if you think interface is a better fit.
> >
> >
> > Cheers,
> > Jeyhun
> >
> >
> > On Fri, May 19, 2017 at 1:00 AM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > Thanks for the update and explanations. The KIP is quite improved now
> --
> > > great job!
> > >
> > > One more question: Why are RichValueMapper etc abstract classes and not
> > > interfaces?
> > >
> > >
> > > Overall, I like the KIP a lot!
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 5/16/17 7:03 AM, Jeyhun Karimov wrote:
> > > > Hi,
> > > >
> > > > Thanks for your comments.
> > > >
> > > > I think supporting Lambdas for `withKey` and `AbstractRichFunction`
> > > >> don't go together, as Lambdas are only supported for interfaces
> AFAIK.
> > > >
> > > >
> > > > Maybe I misunderstood your comment.
> > > > *withKey* and and *withOnlyValue* are interfaces. So they don't have
> > > direct
> > > > relation with *AbstractRichFunction*.
> > > > *withKey* and and *withOnlyValue* interfaces have only one  method ,
> so
> > > we
> > > > can use lambdas.
> > > > Where does the *AbstractRichFunction* comes to play? Inside Rich
> > > functions.
> > > > And we use Rich functions in 2 places:
> > > >
> > > > 1. User doesn't use rich functions. Just regular *withKey* and and
> > > > *withOnlyValue* interfaces(both support lambdas) . We get those
> > > interfaces
> > > > and wrap into Rich function while building the topology, and send to
> > > > Processor.
> > > > 2. User does use rich functions (Rich functions implement *withKey*
> > > > interface). As a result no lamdas here by definition. In this case,
> > while
> > > > building the topology we do a type check if the object type is
> > > > *withKey* or *RichFunction*.
> > > >
> > > > So *AbstractRichFunction* is just syntactic sugar for Rich functions
> > and
> > > > does not affect using lambdas.
> > > >
> > 

Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-18 Thread Jeyhun Karimov
Hi,

Thanks. I initiated the PR as well, to get a better overview.

The only reason that I used abstract class instead of interface for Rich
functions is that in future if we will have some AbstractRichFunction
abstract classes,
we can easily extend:

public abstract class RichValueMapper<K, V, VR>  implements
ValueMapperWithKey<K, V, VR>, RichFunction *extends  AbstractRichFunction*{
}
 With interfaces we are only limited to interfaces for inheritance.

However, I think we can easily change it (from abstract class -> interface)
if you think interface is a better fit.


Cheers,
Jeyhun


On Fri, May 19, 2017 at 1:00 AM Matthias J. Sax <matth...@confluent.io>
wrote:

> Thanks for the update and explanations. The KIP is quite improved now --
> great job!
>
> One more question: Why are RichValueMapper etc abstract classes and not
> interfaces?
>
>
> Overall, I like the KIP a lot!
>
>
> -Matthias
>
>
> On 5/16/17 7:03 AM, Jeyhun Karimov wrote:
> > Hi,
> >
> > Thanks for your comments.
> >
> > I think supporting Lambdas for `withKey` and `AbstractRichFunction`
> >> don't go together, as Lambdas are only supported for interfaces AFAIK.
> >
> >
> > Maybe I misunderstood your comment.
> > *withKey* and and *withOnlyValue* are interfaces. So they don't have
> direct
> > relation with *AbstractRichFunction*.
> > *withKey* and and *withOnlyValue* interfaces have only one  method , so
> we
> > can use lambdas.
> > Where does the *AbstractRichFunction* comes to play? Inside Rich
> functions.
> > And we use Rich functions in 2 places:
> >
> > 1. User doesn't use rich functions. Just regular *withKey* and and
> > *withOnlyValue* interfaces(both support lambdas) . We get those
> interfaces
> > and wrap into Rich function while building the topology, and send to
> > Processor.
> > 2. User does use rich functions (Rich functions implement *withKey*
> > interface). As a result no lamdas here by definition. In this case, while
> > building the topology we do a type check if the object type is
> > *withKey* or *RichFunction*.
> >
> > So *AbstractRichFunction* is just syntactic sugar for Rich functions and
> > does not affect using lambdas.
> >
> > Thus, if we want to support Lambdas for `withKey`, we need to have a
> >> interface approach like this
> >>   - RichFunction -> only adding init() and close()
> >>   - ValueMapper
> >>   - ValueMapperWithKey
> >>   - RichValueMapper extends ValueMapperWithKey, RichFunction
> >
> >
> > As I said above, currently we support lambdas for *withKey* interfaces as
> > well.  However, I agree with your idea and I will remove the
> > AbstractRichFunction from the design.
> >
> > As an alternative, we could argue, that it is sufficient to support
> >> Lambdas for the "plain" API only, but not for any "extended API". For
> >> this, RichFunction could add key+init+close and AbstractRichFunction
> >> would allow to only care about getting the key.
> >> Not sure, which one is better. I don't like the idea of more overloaded
> >> methods to get Lambdas for `withKey` interfaces too much because we have
> >> already so many overlaods. On the other hand, I do see value in
> >> supporting Lambdas for `withKey`.
> >
> >
> > Just to clarify, with current design we have only one extra overloaded
> > method per *withOnlyValue* interface:  which is *withKey* version of
> > particular interface.
> > We don't need extra overload for Rich function as Rich function
> implements
> > *withKey* interface as a result they have same type. We differentiate
> them
> > while building the topology.
> > We supported lambdas for *withKey* APIs because of the comment:
> >
> > @Jeyhun: I did not put any thought into this, but can we have a design
> >> that allows for both? Also, with regard to lambdas, it might make sense
> >> to allow for both `V -> newV` and `(K, V) -> newV` ?
> >
> >
> > However, I don't think that this complicates the overall design
> > significantly.
> >
> >
> > Depending on what we want to support, it might make sense to
> >> include/exclude RichFunctions from this KIP -- and thus, this also
> >> determines if we should have a "ProcessorContext KIP" before driving
> >> this KIP further.
> >
> >
> > Based on our discussion I think we should keep Rich functions as I don't
> > think that they bring extra layer of overhead to library.
> >
> > Any comments are ap

Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-16 Thread Jeyhun Karimov
Hi,

Thanks for your comments.

I think supporting Lambdas for `withKey` and `AbstractRichFunction`
> don't go together, as Lambdas are only supported for interfaces AFAIK.


Maybe I misunderstood your comment.
*withKey* and and *withOnlyValue* are interfaces. So they don't have direct
relation with *AbstractRichFunction*.
*withKey* and and *withOnlyValue* interfaces have only one  method , so we
can use lambdas.
Where does the *AbstractRichFunction* comes to play? Inside Rich functions.
And we use Rich functions in 2 places:

1. User doesn't use rich functions. Just regular *withKey* and and
*withOnlyValue* interfaces(both support lambdas) . We get those interfaces
and wrap into Rich function while building the topology, and send to
Processor.
2. User does use rich functions (Rich functions implement *withKey*
interface). As a result no lamdas here by definition. In this case, while
building the topology we do a type check if the object type is
*withKey* or *RichFunction*.

So *AbstractRichFunction* is just syntactic sugar for Rich functions and
does not affect using lambdas.

Thus, if we want to support Lambdas for `withKey`, we need to have a
> interface approach like this
>   - RichFunction -> only adding init() and close()
>   - ValueMapper
>   - ValueMapperWithKey
>   - RichValueMapper extends ValueMapperWithKey, RichFunction


As I said above, currently we support lambdas for *withKey* interfaces as
well.  However, I agree with your idea and I will remove the
AbstractRichFunction from the design.

As an alternative, we could argue, that it is sufficient to support
> Lambdas for the "plain" API only, but not for any "extended API". For
> this, RichFunction could add key+init+close and AbstractRichFunction
> would allow to only care about getting the key.
> Not sure, which one is better. I don't like the idea of more overloaded
> methods to get Lambdas for `withKey` interfaces too much because we have
> already so many overlaods. On the other hand, I do see value in
> supporting Lambdas for `withKey`.


Just to clarify, with current design we have only one extra overloaded
method per *withOnlyValue* interface:  which is *withKey* version of
particular interface.
We don't need extra overload for Rich function as Rich function implements
*withKey* interface as a result they have same type. We differentiate them
while building the topology.
We supported lambdas for *withKey* APIs because of the comment:

@Jeyhun: I did not put any thought into this, but can we have a design
> that allows for both? Also, with regard to lambdas, it might make sense
> to allow for both `V -> newV` and `(K, V) -> newV` ?


However, I don't think that this complicates the overall design
significantly.


Depending on what we want to support, it might make sense to
> include/exclude RichFunctions from this KIP -- and thus, this also
> determines if we should have a "ProcessorContext KIP" before driving
> this KIP further.


Based on our discussion I think we should keep Rich functions as I don't
think that they bring extra layer of overhead to library.

Any comments are appreciated.

Cheers,
Jeyhun


On Tue, May 16, 2017 at 12:10 AM Matthias J. Sax <matth...@confluent.io>
wrote:

> Jeyhun,
>
> thanks for the update.
>
> I think supporting Lambdas for `withKey` and `AbstractRichFunction`
> don't go together, as Lambdas are only supported for interfaces AFAIK.
>
> Thus, if we want to support Lambdas for `withKey`, we need to have a
> interface approach like this
>
>   - RichFunction -> only adding init() and close()
>
>   - ValueMapper
>   - ValueMapperWithKey
>
>   - RichValueMapper extends ValueMapperWithKey, RichFunction
>
> For this approach, AbstractRichFunction does not make sense anymore, as
> the only purpose of `RichFunction` is to allow the implementation of
> init() and close() -- if you don't want those, you would implement a
> different interface (ie, ValueMapperWithKey)
>
> As an alternative, we could argue, that it is sufficient to support
> Lambdas for the "plain" API only, but not for any "extended API". For
> this, RichFunction could add key+init+close and AbstractRichFunction
> would allow to only care about getting the key.
>
> Not sure, which one is better. I don't like the idea of more overloaded
> methods to get Lambdas for `withKey` interfaces too much because we have
> already so many overlaods. On the other hand, I do see value in
> supporting Lambdas for `withKey`.
>
> Depending on what we want to support, it might make sense to
> include/exclude RichFunctions from this KIP -- and thus, this also
> determines if we should have a "ProcessorContext KIP" before driving
> this KIP further.
>
> Thoughts?
>
>
>
>
> -Matthias
>
>
> On 5/15/17 

Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-15 Thread Jeyhun Karimov
r interface like the
> > > `RecordContext`  - remembering it is easier to add methods/interfaces
> > later
> > > than to remove them
> > >
> > > On Sat, 13 May 2017 at 22:26 Matthias J. Sax <matth...@confluent.io>
> > wrote:
> > >
> > >> Jeyhun,
> > >>
> > >> I am not an expert on Lambdas. Can you elaborate a little bit? I
> cannot
> > >> follow the explanation in the KIP to see what the problem is.
> > >>
> > >> For updating the KIP title I don't know -- guess it's up to you.
> Maybe a
> > >> committer can comment on this?
> > >>
> > >>
> > >> Further comments:
> > >>
> > >>  - The KIP get a little hard to read -- can you maybe reformat the
> wiki
> > >> page a little bit? I think using `CodeBlock` would help.
> > >>
> > >>  - What about KStream-KTable joins? You don't have overlaods added for
> > >> them. Why? (Even if I still hope that we don't need to add any new
> > >> overloads)
> > >>
> > >>  - Why do we need `AbstractRichFunction`?
> > >>
> > >>  - What about interfaces Initializer, ForeachAction, Merger,
> Predicate,
> > >> Reducer? I don't want to say we should/need to add to all, but we
> should
> > >> discuss all of them and add where it does make sense (e.g.,
> > >> RichForachAction does make sense IMHO)
> > >>
> > >>
> > >> Btw: I like the hierarchy `ValueXX` -- `ValueXXWithKey` --
> `RichValueXX`
> > >> in general -- but why can't we do all this with interfaces only?
> > >>
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >>
> > >> On 5/11/17 7:00 AM, Jeyhun Karimov wrote:
> > >>> Hi,
> > >>>
> > >>> Thanks for your comments. I think we cannot extend the two interfaces
> > if
> > >> we
> > >>> want to keep lambdas. I updated the KIP [1]. Maybe I should change
> the
> > >>> title, because now we are not limiting the KIP to only ValueMapper,
> > >>> ValueTransformer and ValueJoiner.
> > >>> Please feel free to comment.
> > >>>
> > >>> [1]
> > >>>
> > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 149%3A+Enabling+key+access+in+ValueTransformer%2C+
> ValueMapper%2C+and+ValueJoiner
> > >>>
> > >>>
> > >>> Cheers,
> > >>> Jeyhun
> > >>>
> > >>> On Tue, May 9, 2017 at 7:36 PM Matthias J. Sax <
> matth...@confluent.io>
> > >>> wrote:
> > >>>
> > >>>> If `ValueMapperWithKey` extends `ValueMapper` we don't need the new
> > >>>> overlaod.
> > >>>>
> > >>>> And yes, we need to do one check -- but this happens when building
> the
> > >>>> topology. At runtime (I mean after KafkaStream#start() we don't need
> > any
> > >>>> check as we will always use `ValueMapperWithKey`)
> > >>>>
> > >>>>
> > >>>> -Matthias
> > >>>>
> > >>>>
> > >>>> On 5/9/17 2:55 AM, Jeyhun Karimov wrote:
> > >>>>> Hi,
> > >>>>>
> > >>>>> Thanks for feedback.
> > >>>>> Then we need to overload method
> > >>>>>KStream<K, VR> mapValues(ValueMapper VR>
> > >>>>> mapper);
> > >>>>> with
> > >>>>>KStream<K, VR> mapValues(ValueMapperWithKey > extends
> > >>>> VR>
> > >>>>> mapper);
> > >>>>>
> > >>>>> and in runtime (inside processor) we still have to check it is
> > >>>> ValueMapper
> > >>>>> or ValueMapperWithKey before wrapping it into the rich function.
> > >>>>>
> > >>>>>
> > >>>>> Please correct me if I am wrong.
> > >>>>>
> > >>>>> Cheers,
> > >>>>> Jeyhun
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> On Tue, May 9, 2017 at 10:56 AM Michal Borowiecki <
> > >>>>> michal.borowie...@openbet.com> wrote:
> >

Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-11 Thread Jeyhun Karimov
Hi,

Thanks for your comments. I think we cannot extend the two interfaces if we
want to keep lambdas. I updated the KIP [1]. Maybe I should change the
title, because now we are not limiting the KIP to only ValueMapper,
ValueTransformer and ValueJoiner.
Please feel free to comment.

[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner


Cheers,
Jeyhun

On Tue, May 9, 2017 at 7:36 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> If `ValueMapperWithKey` extends `ValueMapper` we don't need the new
> overlaod.
>
> And yes, we need to do one check -- but this happens when building the
> topology. At runtime (I mean after KafkaStream#start() we don't need any
> check as we will always use `ValueMapperWithKey`)
>
>
> -Matthias
>
>
> On 5/9/17 2:55 AM, Jeyhun Karimov wrote:
> > Hi,
> >
> > Thanks for feedback.
> > Then we need to overload method
> >KStream<K, VR> mapValues(ValueMapper
> > mapper);
> > with
> >KStream<K, VR> mapValues(ValueMapperWithKey VR>
> > mapper);
> >
> > and in runtime (inside processor) we still have to check it is
> ValueMapper
> > or ValueMapperWithKey before wrapping it into the rich function.
> >
> >
> > Please correct me if I am wrong.
> >
> > Cheers,
> > Jeyhun
> >
> >
> >
> >
> > On Tue, May 9, 2017 at 10:56 AM Michal Borowiecki <
> > michal.borowie...@openbet.com> wrote:
> >
> >> +1 :)
> >>
> >>
> >> On 08/05/17 23:52, Matthias J. Sax wrote:
> >>> Hi,
> >>>
> >>> I was reading the updated KIP and I am wondering, if we should do the
> >>> design a little different.
> >>>
> >>> Instead of distinguishing between a RichFunction and non-RichFunction
> at
> >>> runtime level, we would use RichFunctions all the time. Thus, on the
> DSL
> >>> entry level, if a user provides a non-RichFunction, we wrap it by a
> >>> RichFunction that is fully implemented by Streams. This RichFunction
> >>> would just forward the call omitting the key:
> >>>
> >>> Just to sketch the idea (incomplete code snippet):
> >>>
> >>>> public StreamsRichValueMapper implements RichValueMapper() {
> >>>>private ValueMapper userProvidedMapper; // set by constructor
> >>>>
> >>>>public VR apply(final K key, final V1 value1, final V2 value2) {
> >>>>return userProvidedMapper(value1, value2);
> >>>>}
> >>>> }
> >>>
> >>>  From a performance point of view, I am not sure if the
> >>> "if(isRichFunction)" including casts etc would have more overhead than
> >>> this approach (we would do more nested method call for non-RichFunction
> >>> which should be more common than RichFunctions).
> >>>
> >>> This approach should not effect lambdas (or do I miss something?) and
> >>> might be cleaner, as we could have one more top level interface
> >>> `RichFunction` with methods `init()` and `close()` and also interfaces
> >>> for `RichValueMapper` etc. (thus, no abstract classes required).
> >>>
> >>>
> >>> Any thoughts?
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 5/6/17 5:29 PM, Jeyhun Karimov wrote:
> >>>> Hi,
> >>>>
> >>>> Thanks for comments. I extended PR and KIP to include rich functions.
> I
> >>>> will still have to evaluate the cost of deep copying of keys.
> >>>>
> >>>> Cheers,
> >>>> Jeyhun
> >>>>
> >>>> On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak <
> >> mathieu.fenn...@replicon.com>
> >>>> wrote:
> >>>>
> >>>>> Hey Matthias,
> >>>>>
> >>>>> My opinion would be that documenting the immutability of the key is
> the
> >>>>> best approach available.  Better than requiring the key to be
> >> serializable
> >>>>> (as with Jeyhun's last pass at the PR), no performance risk.
> >>>>>
> >>>>> It'd be different if Java had immutable type constraints of some
> kind.
> >> :-)
> >>>>>
> >>>>> Mathieu
> >>>>>
> >>>>>
> >>>>> On Fri, May 5, 2017 at 11

Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-09 Thread Jeyhun Karimov
One correction:


and in runtime (inside processor) we still have to check it is ValueMapper
> or ValueMapperWithKey before wrapping it into the rich function.


this will be in compile time in API level.



Cheers,
Jeyhun


On Tue, May 9, 2017 at 11:55 AM Jeyhun Karimov <je.kari...@gmail.com> wrote:

> Hi,
>
> Thanks for feedback.
> Then we need to overload method
>KStream<K, VR> mapValues(ValueMapper
> mapper);
> with
>KStream<K, VR> mapValues(ValueMapperWithKey VR> mapper);
>
> and in runtime (inside processor) we still have to check it is ValueMapper
> or ValueMapperWithKey before wrapping it into the rich function.
>
>
> Please correct me if I am wrong.
>
> Cheers,
> Jeyhun
>
>
>
>
> On Tue, May 9, 2017 at 10:56 AM Michal Borowiecki <
> michal.borowie...@openbet.com> wrote:
>
>> +1 :)
>>
>>
>> On 08/05/17 23:52, Matthias J. Sax wrote:
>> > Hi,
>> >
>> > I was reading the updated KIP and I am wondering, if we should do the
>> > design a little different.
>> >
>> > Instead of distinguishing between a RichFunction and non-RichFunction at
>> > runtime level, we would use RichFunctions all the time. Thus, on the DSL
>> > entry level, if a user provides a non-RichFunction, we wrap it by a
>> > RichFunction that is fully implemented by Streams. This RichFunction
>> > would just forward the call omitting the key:
>> >
>> > Just to sketch the idea (incomplete code snippet):
>> >
>> >> public StreamsRichValueMapper implements RichValueMapper() {
>> >>private ValueMapper userProvidedMapper; // set by constructor
>> >>
>> >>public VR apply(final K key, final V1 value1, final V2 value2) {
>> >>return userProvidedMapper(value1, value2);
>> >>}
>> >> }
>> >
>> >  From a performance point of view, I am not sure if the
>> > "if(isRichFunction)" including casts etc would have more overhead than
>> > this approach (we would do more nested method call for non-RichFunction
>> > which should be more common than RichFunctions).
>> >
>> > This approach should not effect lambdas (or do I miss something?) and
>> > might be cleaner, as we could have one more top level interface
>> > `RichFunction` with methods `init()` and `close()` and also interfaces
>> > for `RichValueMapper` etc. (thus, no abstract classes required).
>> >
>> >
>> > Any thoughts?
>> >
>> >
>> > -Matthias
>> >
>> >
>> > On 5/6/17 5:29 PM, Jeyhun Karimov wrote:
>> >> Hi,
>> >>
>> >> Thanks for comments. I extended PR and KIP to include rich functions. I
>> >> will still have to evaluate the cost of deep copying of keys.
>> >>
>> >> Cheers,
>> >> Jeyhun
>> >>
>> >> On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak <
>> mathieu.fenn...@replicon.com>
>> >> wrote:
>> >>
>> >>> Hey Matthias,
>> >>>
>> >>> My opinion would be that documenting the immutability of the key is
>> the
>> >>> best approach available.  Better than requiring the key to be
>> serializable
>> >>> (as with Jeyhun's last pass at the PR), no performance risk.
>> >>>
>> >>> It'd be different if Java had immutable type constraints of some
>> kind. :-)
>> >>>
>> >>> Mathieu
>> >>>
>> >>>
>> >>> On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax <
>> matth...@confluent.io>
>> >>> wrote:
>> >>>
>> >>>> Agreed about RichFunction. If we follow this path, it should cover
>> >>>> all(?) DSL interfaces.
>> >>>>
>> >>>> About guarding the key -- I am still not sure what to do about it...
>> >>>> Maybe it might be enough to document it (and name the key parameter
>> like
>> >>>> `readOnlyKey` to make is very clear). Ultimately, I would prefer to
>> >>>> guard against any modification, but I have no good idea how to do
>> this.
>> >>>> Not sure what others think about the risk of corrupted partitioning
>> >>>> (what would be a user error and we could say, well, bad luck, you
>> got a
>> >>>> bug in your code, that's not our fault), vs deep copy with a
>> potential
>> >>>> 

Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-09 Thread Jeyhun Karimov
Hi,

Thanks for feedback.
Then we need to overload method
   KStream<K, VR> mapValues(ValueMapper
mapper);
with
   KStream<K, VR> mapValues(ValueMapperWithKey
mapper);

and in runtime (inside processor) we still have to check it is ValueMapper
or ValueMapperWithKey before wrapping it into the rich function.


Please correct me if I am wrong.

Cheers,
Jeyhun




On Tue, May 9, 2017 at 10:56 AM Michal Borowiecki <
michal.borowie...@openbet.com> wrote:

> +1 :)
>
>
> On 08/05/17 23:52, Matthias J. Sax wrote:
> > Hi,
> >
> > I was reading the updated KIP and I am wondering, if we should do the
> > design a little different.
> >
> > Instead of distinguishing between a RichFunction and non-RichFunction at
> > runtime level, we would use RichFunctions all the time. Thus, on the DSL
> > entry level, if a user provides a non-RichFunction, we wrap it by a
> > RichFunction that is fully implemented by Streams. This RichFunction
> > would just forward the call omitting the key:
> >
> > Just to sketch the idea (incomplete code snippet):
> >
> >> public StreamsRichValueMapper implements RichValueMapper() {
> >>private ValueMapper userProvidedMapper; // set by constructor
> >>
> >>public VR apply(final K key, final V1 value1, final V2 value2) {
> >>return userProvidedMapper(value1, value2);
> >>}
> >> }
> >
> >  From a performance point of view, I am not sure if the
> > "if(isRichFunction)" including casts etc would have more overhead than
> > this approach (we would do more nested method call for non-RichFunction
> > which should be more common than RichFunctions).
> >
> > This approach should not effect lambdas (or do I miss something?) and
> > might be cleaner, as we could have one more top level interface
> > `RichFunction` with methods `init()` and `close()` and also interfaces
> > for `RichValueMapper` etc. (thus, no abstract classes required).
> >
> >
> > Any thoughts?
> >
> >
> > -Matthias
> >
> >
> > On 5/6/17 5:29 PM, Jeyhun Karimov wrote:
> >> Hi,
> >>
> >> Thanks for comments. I extended PR and KIP to include rich functions. I
> >> will still have to evaluate the cost of deep copying of keys.
> >>
> >> Cheers,
> >> Jeyhun
> >>
> >> On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak <
> mathieu.fenn...@replicon.com>
> >> wrote:
> >>
> >>> Hey Matthias,
> >>>
> >>> My opinion would be that documenting the immutability of the key is the
> >>> best approach available.  Better than requiring the key to be
> serializable
> >>> (as with Jeyhun's last pass at the PR), no performance risk.
> >>>
> >>> It'd be different if Java had immutable type constraints of some kind.
> :-)
> >>>
> >>> Mathieu
> >>>
> >>>
> >>> On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax <
> matth...@confluent.io>
> >>> wrote:
> >>>
> >>>> Agreed about RichFunction. If we follow this path, it should cover
> >>>> all(?) DSL interfaces.
> >>>>
> >>>> About guarding the key -- I am still not sure what to do about it...
> >>>> Maybe it might be enough to document it (and name the key parameter
> like
> >>>> `readOnlyKey` to make is very clear). Ultimately, I would prefer to
> >>>> guard against any modification, but I have no good idea how to do
> this.
> >>>> Not sure what others think about the risk of corrupted partitioning
> >>>> (what would be a user error and we could say, well, bad luck, you got
> a
> >>>> bug in your code, that's not our fault), vs deep copy with a potential
> >>>> performance hit (that we can't quantity atm without any performance
> >>> test).
> >>>> We do have a performance system test. Maybe it's worth for you to
> apply
> >>>> the deep copy strategy and run the test. It's very basic performance
> >>>> test only, but might give some insight. If you want to do this, look
> >>>> into folder "tests" for general test setup, and into
> >>>> "tests/kafaktests/benchmarks/streams" to find find the perf test.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 5/5/17 8:55 AM, Jeyhun Karimov wrote:
> >>>>> Hi Matthias,
> >>>>>
> >>>>>

Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-07 Thread Jeyhun Karimov
Hi Michal,

Thanks for your comments. We proposed the similar solution to yours in KIP
(please look at rejected alternatives). However, after the discussion in
mailing list I extended it to rich functions. Maybe we should keep them
both: simple and rich versions.

Cheers,
Jeyhun

On Sun, May 7, 2017 at 11:46 AM Michal Borowiecki <
michal.borowie...@openbet.com> wrote:

> Do I understanding correctly, that with the proposed pattern one could not
> pass a lambda expression and access the context from within it?
>
> TBH, I was hoping that for simple things this would be possible:
>
> myStream.map( (key, value, ctx) -> new KeyValue<>(ctx.partition(), value) )
>
> or (more to the original point of this KIP):
>
> myStream.mapValues( (key, value, ctx) -> new MyValueWrapper(value,
> ctx.partition(), ctx.offset()) )
>
> but it looks like that would require subclassing RichFunction? That's a
> bit of an inconvenience IMO.
>
> Cheers,
>
> Michal
>
> On 07/05/17 01:29, Jeyhun Karimov wrote:
>
> Hi,
>
> Thanks for comments. I extended PR and KIP to include rich functions. I
> will still have to evaluate the cost of deep copying of keys.
>
> Cheers,
> Jeyhun
>
> On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak <mathieu.fenn...@replicon.com> 
> <mathieu.fenn...@replicon.com>
> wrote:
>
>
> Hey Matthias,
>
> My opinion would be that documenting the immutability of the key is the
> best approach available.  Better than requiring the key to be serializable
> (as with Jeyhun's last pass at the PR), no performance risk.
>
> It'd be different if Java had immutable type constraints of some kind. :-)
>
> Mathieu
>
>
> On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax <matth...@confluent.io> 
> <matth...@confluent.io>
> wrote:
>
>
> Agreed about RichFunction. If we follow this path, it should cover
> all(?) DSL interfaces.
>
> About guarding the key -- I am still not sure what to do about it...
> Maybe it might be enough to document it (and name the key parameter like
> `readOnlyKey` to make is very clear). Ultimately, I would prefer to
> guard against any modification, but I have no good idea how to do this.
> Not sure what others think about the risk of corrupted partitioning
> (what would be a user error and we could say, well, bad luck, you got a
> bug in your code, that's not our fault), vs deep copy with a potential
> performance hit (that we can't quantity atm without any performance
>
> test).
>
>
> We do have a performance system test. Maybe it's worth for you to apply
> the deep copy strategy and run the test. It's very basic performance
> test only, but might give some insight. If you want to do this, look
> into folder "tests" for general test setup, and into
> "tests/kafaktests/benchmarks/streams" to find find the perf test.
>
>
> -Matthias
>
> On 5/5/17 8:55 AM, Jeyhun Karimov wrote:
>
> Hi Matthias,
>
> I think extending KIP to include RichFunctions totally  makes sense.
>
> So,
>
>  we don't want to guard the keys because it is costly.
> If we introduce RichFunctions I think it should not be limited only
>
> the 3
>
> interfaces proposed in KIP but to wide range of interfaces.
> Please correct me if I am wrong.
>
> Cheers,
> Jeyhun
>
> On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax <matth...@confluent.io
>
>  wrote:
>
>
> One follow up. There was this email on the user list:
>
> http://search-hadoop.com/m/Kafka/uyzND17KhCaBzPSZ1?subj=
>
> Shouldn+t+the+initializer+of+a+stream+aggregate+accept+the+key+
>
>
> It might make sense so include Initializer, Adder, and Substractor
> inferface, too.
>
> And we should double check if there are other interface we might miss
>
> atm.
>
>
>
> -Matthias
>
>
> On 5/4/17 1:31 PM, Matthias J. Sax wrote:
>
> Thanks for updating the KIP.
>
> Deep copying the key will work for sure, but I am actually a little
>
> bit
>
> worried about performance impact... We might want to do some test to
> quantify this impact.
>
>
> Btw: this remind me about the idea of `RichFunction` interface that
> would allow users to access record metadata (like timestamp, offset,
> partition etc) within DSL. This would be a similar concept. Thus, I
>
> am
>
> wondering, if it would make sense to enlarge the scope of this KIP by
> that? WDYT?
>
>
>
> -Matthias
>
>
> On 5/3/17 2:08 AM, Jeyhun Karimov wrote:
>
> Hi Mathieu,
>
> Thanks for feedback. I followed similar approach and updated PR and
>
> KIP
>
> accordingly. I tried to guard the key in Processors sending a copy
>
> of
>
> an

Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-06 Thread Jeyhun Karimov
Hi,

Thanks for comments. I extended PR and KIP to include rich functions. I
will still have to evaluate the cost of deep copying of keys.

Cheers,
Jeyhun

On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak <mathieu.fenn...@replicon.com>
wrote:

> Hey Matthias,
>
> My opinion would be that documenting the immutability of the key is the
> best approach available.  Better than requiring the key to be serializable
> (as with Jeyhun's last pass at the PR), no performance risk.
>
> It'd be different if Java had immutable type constraints of some kind. :-)
>
> Mathieu
>
>
> On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > Agreed about RichFunction. If we follow this path, it should cover
> > all(?) DSL interfaces.
> >
> > About guarding the key -- I am still not sure what to do about it...
> > Maybe it might be enough to document it (and name the key parameter like
> > `readOnlyKey` to make is very clear). Ultimately, I would prefer to
> > guard against any modification, but I have no good idea how to do this.
> > Not sure what others think about the risk of corrupted partitioning
> > (what would be a user error and we could say, well, bad luck, you got a
> > bug in your code, that's not our fault), vs deep copy with a potential
> > performance hit (that we can't quantity atm without any performance
> test).
> >
> > We do have a performance system test. Maybe it's worth for you to apply
> > the deep copy strategy and run the test. It's very basic performance
> > test only, but might give some insight. If you want to do this, look
> > into folder "tests" for general test setup, and into
> > "tests/kafaktests/benchmarks/streams" to find find the perf test.
> >
> >
> > -Matthias
> >
> > On 5/5/17 8:55 AM, Jeyhun Karimov wrote:
> > > Hi Matthias,
> > >
> > > I think extending KIP to include RichFunctions totally  makes sense.
> So,
> > >  we don't want to guard the keys because it is costly.
> > > If we introduce RichFunctions I think it should not be limited only
> the 3
> > > interfaces proposed in KIP but to wide range of interfaces.
> > > Please correct me if I am wrong.
> > >
> > > Cheers,
> > > Jeyhun
> > >
> > > On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax <matth...@confluent.io
> >
> > > wrote:
> > >
> > >> One follow up. There was this email on the user list:
> > >>
> > >>
> > >> http://search-hadoop.com/m/Kafka/uyzND17KhCaBzPSZ1?subj=
> > Shouldn+t+the+initializer+of+a+stream+aggregate+accept+the+key+
> > >>
> > >> It might make sense so include Initializer, Adder, and Substractor
> > >> inferface, too.
> > >>
> > >> And we should double check if there are other interface we might miss
> > atm.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >> On 5/4/17 1:31 PM, Matthias J. Sax wrote:
> > >>> Thanks for updating the KIP.
> > >>>
> > >>> Deep copying the key will work for sure, but I am actually a little
> bit
> > >>> worried about performance impact... We might want to do some test to
> > >>> quantify this impact.
> > >>>
> > >>>
> > >>> Btw: this remind me about the idea of `RichFunction` interface that
> > >>> would allow users to access record metadata (like timestamp, offset,
> > >>> partition etc) within DSL. This would be a similar concept. Thus, I
> am
> > >>> wondering, if it would make sense to enlarge the scope of this KIP by
> > >>> that? WDYT?
> > >>>
> > >>>
> > >>>
> > >>> -Matthias
> > >>>
> > >>>
> > >>> On 5/3/17 2:08 AM, Jeyhun Karimov wrote:
> > >>>> Hi Mathieu,
> > >>>>
> > >>>> Thanks for feedback. I followed similar approach and updated PR and
> > KIP
> > >>>> accordingly. I tried to guard the key in Processors sending a copy
> of
> > an
> > >>>> actual key.
> > >>>> Because I am doing deep copy of an object, I think memory can be
> > >> bottleneck
> > >>>> in some use-cases.
> > >>>>
> > >>>> Cheers,
> > >>>> Jeyhun
> > >>>>
> > >>>> On Tue, May 2, 2017 at 5:10 PM Mathieu Fenniak <

Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-05 Thread Jeyhun Karimov
Hi Matthias,

I think extending KIP to include RichFunctions totally  makes sense. So,
 we don't want to guard the keys because it is costly.
If we introduce RichFunctions I think it should not be limited only the 3
interfaces proposed in KIP but to wide range of interfaces.
Please correct me if I am wrong.

Cheers,
Jeyhun

On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax <matth...@confluent.io>
wrote:

> One follow up. There was this email on the user list:
>
>
> http://search-hadoop.com/m/Kafka/uyzND17KhCaBzPSZ1?subj=Shouldn+t+the+initializer+of+a+stream+aggregate+accept+the+key+
>
> It might make sense so include Initializer, Adder, and Substractor
> inferface, too.
>
> And we should double check if there are other interface we might miss atm.
>
>
> -Matthias
>
>
> On 5/4/17 1:31 PM, Matthias J. Sax wrote:
> > Thanks for updating the KIP.
> >
> > Deep copying the key will work for sure, but I am actually a little bit
> > worried about performance impact... We might want to do some test to
> > quantify this impact.
> >
> >
> > Btw: this remind me about the idea of `RichFunction` interface that
> > would allow users to access record metadata (like timestamp, offset,
> > partition etc) within DSL. This would be a similar concept. Thus, I am
> > wondering, if it would make sense to enlarge the scope of this KIP by
> > that? WDYT?
> >
> >
> >
> > -Matthias
> >
> >
> > On 5/3/17 2:08 AM, Jeyhun Karimov wrote:
> >> Hi Mathieu,
> >>
> >> Thanks for feedback. I followed similar approach and updated PR and KIP
> >> accordingly. I tried to guard the key in Processors sending a copy of an
> >> actual key.
> >> Because I am doing deep copy of an object, I think memory can be
> bottleneck
> >> in some use-cases.
> >>
> >> Cheers,
> >> Jeyhun
> >>
> >> On Tue, May 2, 2017 at 5:10 PM Mathieu Fenniak <
> mathieu.fenn...@replicon.com>
> >> wrote:
> >>
> >>> Hi Jeyhun,
> >>>
> >>> This approach would change ValueMapper (...etc) to be classes, rather
> than
> >>> interfaces, which is also a backwards incompatible change.  An
> alternative
> >>> approach that would be backwards compatible would be to define new
> >>> interfaces, and provide overrides where those interfaces are used.
> >>>
> >>> Unfortunately, making the key parameter as "final" doesn't change much
> >>> about guarding against key change.  It only prevents the parameter
> variable
> >>> from being reassigned.  If the key type is a mutable object (eg.
> byte[]),
> >>> it can still be mutated. (eg. key[0] = 0).  But I'm not really sure
> there's
> >>> much that can be done about that.
> >>>
> >>> Mathieu
> >>>
> >>>
> >>> On Mon, May 1, 2017 at 5:39 PM, Jeyhun Karimov <je.kari...@gmail.com>
> >>> wrote:
> >>>
> >>>> Thanks for comments.
> >>>>
> >>>> The concerns makes sense. Although we can guard for immutable keys in
> >>>> current implementation (with few changes), I didn't consider backward
> >>>> compatibility.
> >>>>
> >>>> In this case 2 solutions come to my mind. In both cases, user accesses
> >>> the
> >>>> key in Object type, as passing extra type parameter will break
> >>>> backwards-compatibility.  So user has to cast to actual key type.
> >>>>
> >>>> 1. Firstly, We can overload apply method with 2 argument (key and
> value)
> >>>> and force key to be *final*. By doing this,  I think we can address
> both
> >>>> backward-compatibility and guarding against key change.
> >>>>
> >>>> 2. Secondly, we can create class KeyAccess like:
> >>>>
> >>>> public class KeyAccess {
> >>>> Object key;
> >>>> public void beforeApply(final Object key) {
> >>>> this.key = key;
> >>>> }
> >>>> public Object getKey() {
> >>>> return key;
> >>>> }
> >>>> }
> >>>>
> >>>> We can extend *ValueMapper, ValueJoiner* and *ValueTransformer* from
> >>>> *KeyAccess*. Inside processor (for example *KTableMapValuesProcessor*)
> >>>> before calling *mapper.apply(value)* we can set the *key* by
> >>>> *mapper.beforeApply(key)*. 

Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-03 Thread Jeyhun Karimov
Hi Mathieu,

Thanks for feedback. I followed similar approach and updated PR and KIP
accordingly. I tried to guard the key in Processors sending a copy of an
actual key.
Because I am doing deep copy of an object, I think memory can be bottleneck
in some use-cases.

Cheers,
Jeyhun

On Tue, May 2, 2017 at 5:10 PM Mathieu Fenniak <mathieu.fenn...@replicon.com>
wrote:

> Hi Jeyhun,
>
> This approach would change ValueMapper (...etc) to be classes, rather than
> interfaces, which is also a backwards incompatible change.  An alternative
> approach that would be backwards compatible would be to define new
> interfaces, and provide overrides where those interfaces are used.
>
> Unfortunately, making the key parameter as "final" doesn't change much
> about guarding against key change.  It only prevents the parameter variable
> from being reassigned.  If the key type is a mutable object (eg. byte[]),
> it can still be mutated. (eg. key[0] = 0).  But I'm not really sure there's
> much that can be done about that.
>
> Mathieu
>
>
> On Mon, May 1, 2017 at 5:39 PM, Jeyhun Karimov <je.kari...@gmail.com>
> wrote:
>
> > Thanks for comments.
> >
> > The concerns makes sense. Although we can guard for immutable keys in
> > current implementation (with few changes), I didn't consider backward
> > compatibility.
> >
> > In this case 2 solutions come to my mind. In both cases, user accesses
> the
> > key in Object type, as passing extra type parameter will break
> > backwards-compatibility.  So user has to cast to actual key type.
> >
> > 1. Firstly, We can overload apply method with 2 argument (key and value)
> > and force key to be *final*. By doing this,  I think we can address both
> > backward-compatibility and guarding against key change.
> >
> > 2. Secondly, we can create class KeyAccess like:
> >
> > public class KeyAccess {
> > Object key;
> > public void beforeApply(final Object key) {
> > this.key = key;
> > }
> > public Object getKey() {
> > return key;
> > }
> > }
> >
> > We can extend *ValueMapper, ValueJoiner* and *ValueTransformer* from
> > *KeyAccess*. Inside processor (for example *KTableMapValuesProcessor*)
> > before calling *mapper.apply(value)* we can set the *key* by
> > *mapper.beforeApply(key)*. As a result, user can use *getKey()* to access
> > the key inside *apply(value)* method.
> >
> >
> > Cheers,
> > Jeyhun
> >
> >
> >
> >
> > On Mon, May 1, 2017 at 7:24 PM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > Jeyhun,
> > >
> > > thanks a lot for the KIP!
> > >
> > > I think there are two issues we need to address:
> > >
> > > (1) The KIP does not consider backward compatibility. Users did
> complain
> > > about this in past releases already, and as the user base grows, we
> > > should not break backward compatibility in future releases anymore.
> > > Thus, we should think of a better way to allow key access.
> > >
> > > Mathieu's comment goes into the same direction
> > >
> > > >> On the other hand, the number of compile failures that would need to
> > be
> > > >> fixed from this change is unfortunate. :-)
> > >
> > > (2) Another concern is, that there is no guard to prevent user code to
> > > modify the key. This might corrupt partitioning if users do alter the
> > > key (accidentally -- or users are just not aware that they are not
> > > allowed to modify the provided key object) and thus break the
> > > application. (This was the original motivation to not provide the key
> in
> > > the first place -- it's guards against modification.)
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> > > On 5/1/17 6:31 AM, Mathieu Fenniak wrote:
> > > > Hi Jeyhun,
> > > >
> > > > I just want to add my voice that, I too, have wished for access to
> the
> > > > record key during a mapValues or similar operation.
> > > >
> > > > On the other hand, the number of compile failures that would need to
> be
> > > > fixed from this change is unfortunate. :-)  But at least it would all
> > be
> > > a
> > > > pretty clear and easy change.
> > > >
> > > > Mathieu
> > > >
> > > >
> > > > On Mon, May 1, 2017 at 6:55 AM, Jeyhun Karimov <je.kari...@gmail.com
> >

Re: [DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-01 Thread Jeyhun Karimov
Thanks for comments.

The concerns makes sense. Although we can guard for immutable keys in
current implementation (with few changes), I didn't consider backward
compatibility.

In this case 2 solutions come to my mind. In both cases, user accesses the
key in Object type, as passing extra type parameter will break
backwards-compatibility.  So user has to cast to actual key type.

1. Firstly, We can overload apply method with 2 argument (key and value)
and force key to be *final*. By doing this,  I think we can address both
backward-compatibility and guarding against key change.

2. Secondly, we can create class KeyAccess like:

public class KeyAccess {
Object key;
public void beforeApply(final Object key) {
this.key = key;
}
public Object getKey() {
return key;
}
}

We can extend *ValueMapper, ValueJoiner* and *ValueTransformer* from
*KeyAccess*. Inside processor (for example *KTableMapValuesProcessor*)
before calling *mapper.apply(value)* we can set the *key* by
*mapper.beforeApply(key)*. As a result, user can use *getKey()* to access
the key inside *apply(value)* method.


Cheers,
Jeyhun




On Mon, May 1, 2017 at 7:24 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> Jeyhun,
>
> thanks a lot for the KIP!
>
> I think there are two issues we need to address:
>
> (1) The KIP does not consider backward compatibility. Users did complain
> about this in past releases already, and as the user base grows, we
> should not break backward compatibility in future releases anymore.
> Thus, we should think of a better way to allow key access.
>
> Mathieu's comment goes into the same direction
>
> >> On the other hand, the number of compile failures that would need to be
> >> fixed from this change is unfortunate. :-)
>
> (2) Another concern is, that there is no guard to prevent user code to
> modify the key. This might corrupt partitioning if users do alter the
> key (accidentally -- or users are just not aware that they are not
> allowed to modify the provided key object) and thus break the
> application. (This was the original motivation to not provide the key in
> the first place -- it's guards against modification.)
>
>
> -Matthias
>
>
>
> On 5/1/17 6:31 AM, Mathieu Fenniak wrote:
> > Hi Jeyhun,
> >
> > I just want to add my voice that, I too, have wished for access to the
> > record key during a mapValues or similar operation.
> >
> > On the other hand, the number of compile failures that would need to be
> > fixed from this change is unfortunate. :-)  But at least it would all be
> a
> > pretty clear and easy change.
> >
> > Mathieu
> >
> >
> > On Mon, May 1, 2017 at 6:55 AM, Jeyhun Karimov <je.kari...@gmail.com>
> wrote:
> >
> >> Dear community,
> >>
> >> I want to share KIP-149 [1] based on issues KAFKA-4218 [2], KAFKA-4726
> [3],
> >> KAFKA-3745 [4]. The related PR can be found at [5].
> >> I would like to get your comments.
> >>
> >> [1]
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 149%3A+Enabling+key+access+in+ValueTransformer%2C+
> >> ValueMapper%2C+and+ValueJoiner
> >> [2] https://issues.apache.org/jira/browse/KAFKA-4218
> >> [3] https://issues.apache.org/jira/browse/KAFKA-4726
> >> [4] https://issues.apache.org/jira/browse/KAFKA-3745
> >> [5] https://github.com/apache/kafka/pull/2946
> >>
> >>
> >> Cheers,
> >> Jeyhun
> >>
> >>
> >>
> >> --
> >> -Cheers
> >>
> >> Jeyhun
> >>
> >
>
> --
-Cheers

Jeyhun


[DISCUSS]: KIP-149: Enabling key access in ValueTransformer, ValueMapper, and ValueJoiner

2017-05-01 Thread Jeyhun Karimov
Dear community,

I want to share KIP-149 [1] based on issues KAFKA-4218 [2], KAFKA-4726 [3],
KAFKA-3745 [4]. The related PR can be found at [5].
I would like to get your comments.

[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner
[2] https://issues.apache.org/jira/browse/KAFKA-4218
[3] https://issues.apache.org/jira/browse/KAFKA-4726
[4] https://issues.apache.org/jira/browse/KAFKA-3745
[5] https://github.com/apache/kafka/pull/2946


Cheers,
Jeyhun



-- 
-Cheers

Jeyhun


[jira] [Comment Edited] (KAFKA-4785) Records from internal repartitioning topics should always use RecordMetadataTimestampExtractor

2017-04-29 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15989838#comment-15989838
 ] 

Jeyhun Karimov edited comment on KAFKA-4785 at 4/29/17 6:08 PM:


[~mjsax], (as it is discussed in KAFKA-4218 ) I think we can group KAFKA-4219 
and this issue together.


was (Author: jeyhunkarimov):
[~mjsax], as it is discussed in KAFKA-4218 , we can group KAFKA-4219 and this 
issue together.

> Records from internal repartitioning topics should always use 
> RecordMetadataTimestampExtractor
> --
>
> Key: KAFKA-4785
> URL: https://issues.apache.org/jira/browse/KAFKA-4785
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Matthias J. Sax
>Assignee: Jeyhun Karimov
>
> Users can specify what timestamp extractor should be used to decode the 
> timestamp of input topic records. As long as RecordMetadataTimestamp or 
> WallclockTime is use this is fine. 
> However, for custom timestamp extractors it might be invalid to apply this 
> custom extractor to records received from internal repartitioning topics. The 
> reason is that Streams sets the current "stream time" as record metadata 
> timestamp explicitly before writing to intermediate repartitioning topics 
> because this timestamp should be use by downstream subtopologies. A custom 
> timestamp extractor might return something different breaking this assumption.
> Thus, for reading data from intermediate repartitioning topic, the configured 
> timestamp extractor should not be used, but the record's metadata timestamp 
> should be extracted as record timestamp.
> In order to leverage the same behavior for intermediate user topic (ie, used 
> in {{through()}})  we can leverage KAFKA-4144 and internally set an extractor 
> for those "intermediate sources" that returns the record's metadata timestamp 
> in order to overwrite the global extractor from {{StreamsConfig}} (ie, set 
> {{FailOnInvalidTimestampExtractor}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3745) Consider adding join key to ValueJoiner interface

2017-04-29 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15989988#comment-15989988
 ] 

Jeyhun Karimov commented on KAFKA-3745:
---

[~sree2k] is there an update on issue?

> Consider adding join key to ValueJoiner interface
> -
>
> Key: KAFKA-3745
> URL: https://issues.apache.org/jira/browse/KAFKA-3745
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Sreepathi Prasanna
>Priority: Minor
>  Labels: api, needs-kip, newbie
>
> In working with Kafka Stream joining, it's sometimes the case that a join key 
> is not actually present in the values of the joins themselves (if, for 
> example, a previous transform generated an ephemeral join key.) In such 
> cases, the actual key of the join is not available in the ValueJoiner 
> implementation to be used to construct the final joined value. This can be 
> worked around by explicitly threading the join key into the value if needed, 
> but it seems like extending the interface to pass the join key along as well 
> would be helpful.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4785) Records from internal repartitioning topics should always use RecordMetadataTimestampExtractor

2017-04-29 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15989838#comment-15989838
 ] 

Jeyhun Karimov commented on KAFKA-4785:
---

[~mjsax], as it is discussed in KAFKA-4218 , we can group KAFKA-4219 and this 
issue together.

> Records from internal repartitioning topics should always use 
> RecordMetadataTimestampExtractor
> --
>
> Key: KAFKA-4785
> URL: https://issues.apache.org/jira/browse/KAFKA-4785
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Matthias J. Sax
>Assignee: Jeyhun Karimov
>
> Users can specify what timestamp extractor should be used to decode the 
> timestamp of input topic records. As long as RecordMetadataTimestamp or 
> WallclockTime is use this is fine. 
> However, for custom timestamp extractors it might be invalid to apply this 
> custom extractor to records received from internal repartitioning topics. The 
> reason is that Streams sets the current "stream time" as record metadata 
> timestamp explicitly before writing to intermediate repartitioning topics 
> because this timestamp should be use by downstream subtopologies. A custom 
> timestamp extractor might return something different breaking this assumption.
> Thus, for reading data from intermediate repartitioning topic, the configured 
> timestamp extractor should not be used, but the record's metadata timestamp 
> should be extracted as record timestamp.
> In order to leverage the same behavior for intermediate user topic (ie, used 
> in {{through()}})  we can leverage KAFKA-4144 and internally set an extractor 
> for those "intermediate sources" that returns the record's metadata timestamp 
> in order to overwrite the global extractor from {{StreamsConfig}} (ie, set 
> {{FailOnInvalidTimestampExtractor}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4218) Enable access to key in ValueTransformer

2017-04-29 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15989837#comment-15989837
 ] 

Jeyhun Karimov commented on KAFKA-4218:
---

[~mjsax], thanks for your comments. I absolutely agree on this. I think the 
most relevant issues to group (with this issue) are (as shown in relevant links 
as well) :  KAFKA-4726 and KAFKA-3745 (although there is assignee, I haven't 
seen PR on this issue and it is stale for some time). Do you agree on this or 
are there other relevant issues that should be added to this group?


> Enable access to key in ValueTransformer
> 
>
> Key: KAFKA-4218
> URL: https://issues.apache.org/jira/browse/KAFKA-4218
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>    Assignee: Jeyhun Karimov
>  Labels: api, needs-kip, newbie
>
> While transforming values via {{KStream.transformValues}} and 
> {{ValueTransformer}}, the key associated with the value may be needed, even 
> if it is not changed.  For instance, it may be used to access stores.  
> As of now, the key is not available within these methods and interfaces, 
> leading to the use of {{KStream.transform}} and {{Transformer}}, and the 
> unnecessary creation of new {{KeyValue}} objects.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4218) Enable access to key in ValueTransformer

2017-04-28 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-4218:
-

Assignee: Jeyhun Karimov

> Enable access to key in ValueTransformer
> 
>
> Key: KAFKA-4218
> URL: https://issues.apache.org/jira/browse/KAFKA-4218
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>    Assignee: Jeyhun Karimov
>  Labels: api, needs-kip, newbie
>
> While transforming values via {{KStream.transformValues}} and 
> {{ValueTransformer}}, the key associated with the value may be needed, even 
> if it is not changed.  For instance, it may be used to access stores.  
> As of now, the key is not available within these methods and interfaces, 
> leading to the use of {{KStream.transform}} and {{Transformer}}, and the 
> unnecessary creation of new {{KeyValue}} objects.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4785) Records from internal repartitioning topics should always use RecordMetadataTimestampExtractor

2017-04-28 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15989557#comment-15989557
 ] 

Jeyhun Karimov commented on KAFKA-4785:
---

[~mjsax] Do you think KAFKA-4144 is blocking for this issue? 

> Records from internal repartitioning topics should always use 
> RecordMetadataTimestampExtractor
> --
>
> Key: KAFKA-4785
> URL: https://issues.apache.org/jira/browse/KAFKA-4785
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Matthias J. Sax
>Assignee: Jeyhun Karimov
>
> Users can specify what timestamp extractor should be used to decode the 
> timestamp of input topic records. As long as RecordMetadataTimestamp or 
> WallclockTime is use this is fine. 
> However, for custom timestamp extractors it might be invalid to apply this 
> custom extractor to records received from internal repartitioning topics. The 
> reason is that Streams sets the current "stream time" as record metadata 
> timestamp explicitly before writing to intermediate repartitioning topics 
> because this timestamp should be use by downstream subtopologies. A custom 
> timestamp extractor might return something different breaking this assumption.
> Thus, for reading data from intermediate repartitioning topic, the configured 
> timestamp extractor should not be used, but the record's metadata timestamp 
> should be extracted as record timestamp.
> In order to leverage the same behavior for intermediate user topic (ie, used 
> in {{through()}})  we can leverage KAFKA-4144 and internally set an extractor 
> for those "intermediate sources" that returns the record's metadata timestamp 
> in order to overwrite the global extractor from {{StreamsConfig}} (ie, set 
> {{FailOnInvalidTimestampExtractor}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4785) Records from internal repartitioning topics should always use RecordMetadataTimestampExtractor

2017-04-28 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov reassigned KAFKA-4785:
-

Assignee: Jeyhun Karimov

> Records from internal repartitioning topics should always use 
> RecordMetadataTimestampExtractor
> --
>
> Key: KAFKA-4785
> URL: https://issues.apache.org/jira/browse/KAFKA-4785
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Matthias J. Sax
>Assignee: Jeyhun Karimov
>
> Users can specify what timestamp extractor should be used to decode the 
> timestamp of input topic records. As long as RecordMetadataTimestamp or 
> WallclockTime is use this is fine. 
> However, for custom timestamp extractors it might be invalid to apply this 
> custom extractor to records received from internal repartitioning topics. The 
> reason is that Streams sets the current "stream time" as record metadata 
> timestamp explicitly before writing to intermediate repartitioning topics 
> because this timestamp should be use by downstream subtopologies. A custom 
> timestamp extractor might return something different breaking this assumption.
> Thus, for reading data from intermediate repartitioning topic, the configured 
> timestamp extractor should not be used, but the record's metadata timestamp 
> should be extracted as record timestamp.
> In order to leverage the same behavior for intermediate user topic (ie, used 
> in {{through()}})  we can leverage KAFKA-4144 and internally set an extractor 
> for those "intermediate sources" that returns the record's metadata timestamp 
> in order to overwrite the global extractor from {{StreamsConfig}} (ie, set 
> {{FailOnInvalidTimestampExtractor}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Splitting tasks in streams?

2017-04-28 Thread Jeyhun Karimov
Hi Eno,

Thanks for reply. For me it was important that the particular use-case can
be involved within kafka-stream's boundaries. I would put this in future
plans as I don't think now it is approproate time to introduce this feature
in streams library. Currently implementing query optimization (like [1]) on
a given topology and effective load balancing(like [2]) would be a good
start to reach the goal (which is provided in previous email).

[1] https://issues.apache.org/jira/browse/KAFKA-4601
[2] https://issues.apache.org/jira/browse/KAFKA-4969

Cheers,
Jeyhun


On Fri, Apr 28, 2017 at 5:42 PM Eno Thereska <eno.there...@gmail.com> wrote:

> Hi Jeyhun,
>
> You make a good observation and I think a discussion/contribution around
> this would be very much appreciated by the community. Are you thinking of a
> KIP perhaps?
>
> Eno
>
> > On 28 Apr 2017, at 16:13, Jeyhun Karimov <je.kari...@gmail.com> wrote:
> >
> > Hi community,
> >
> > I have a question regarding with streams library.
> >
> > Currently, in kafka-streams we run the whole topology in one instance and
> > there can be several topologies  or tasks in a single node. However,
> there
> > can be use-cases with very complex topologies with costly operators. So,
> > when we want to scale-up, instead of copying the whole topology to run in
> > parallel, we may need to scale-up specific operators (or subgraphs in
> > tasks) in topology (it depends on a defined cost function).
> > So my question is that,  is the specified use-case is compatible with
> > kafka-streams motivation and would it be appreciated by community the
> > relevant contribution?
> >
> >
> > Cheers,
> > Jeyhun
> > --
> > -Cheers
> >
> > Jeyhun
>
> --
-Cheers

Jeyhun


Splitting tasks in streams?

2017-04-28 Thread Jeyhun Karimov
Hi community,

I have a question regarding with streams library.

Currently, in kafka-streams we run the whole topology in one instance and
there can be several topologies  or tasks in a single node. However, there
can be use-cases with very complex topologies with costly operators. So,
when we want to scale-up, instead of copying the whole topology to run in
parallel, we may need to scale-up specific operators (or subgraphs in
tasks) in topology (it depends on a defined cost function).
So my question is that,  is the specified use-case is compatible with
kafka-streams motivation and would it be appreciated by community the
relevant contribution?


Cheers,
Jeyhun
-- 
-Cheers

Jeyhun


Re: [VOTE] KIP-123: Allow per stream/table timestamp extractor

2017-04-25 Thread Jeyhun Karimov
Dear all,

I am closing this vote now. The KIP got accepted with

+3 binding (Guozhang, Ewen, Gwen)

Thanks all (especially for Mathias) for guiding me throughout my first KIP.


Cheers,
Jeyhun

On Mon, Apr 24, 2017 at 9:32 PM Thomas Becker <tobec...@tivo.com> wrote:

> +1 (non-binding)
>
> On Tue, 2017-02-28 at 08:59 +, Jeyhun Karimov wrote:
> > Dear community,
> >
> > I'd like to start the vote for KIP-123:
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=6871
> > 4788
> >
> >
> > Cheers,
> > Jeyhun
> --
>
>
> Tommy Becker
>
> Senior Software Engineer
>
> O +1 919.460.4747 <(919)%20460-4747>
>
> tivo.com
>
>
> 
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>
-- 
-Cheers

Jeyhun


[jira] [Commented] (KAFKA-4144) Allow per stream/table timestamp extractor

2017-04-25 Thread Jeyhun Karimov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15983515#comment-15983515
 ] 

Jeyhun Karimov commented on KAFKA-4144:
---

[~mjsax] I am sorry, I didn't read your email carefully. My bad.

> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>    Assignee: Jeyhun Karimov
>  Labels: api, kip
>
> At the moment the timestamp extractor is configured via a {{StreamConfig}} 
> value to {{KafkaStreams}}. That means you can only have a single timestamp 
> extractor per app, even though you may be joining multiple streams/tables 
> that require different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> {{KStreamBuilder.stream()/table()}}, just like you can specify key and value 
> serdes that override the StreamConfig defaults.
> KIP: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=68714788
> Specifying a per-stream extractor should only be possible for sources, but 
> not for intermediate topics. For PAPI we cannot enforce this, but for DSL 
> {{through()}} should not allow to set a custom extractor by the user. In 
> contrast, with regard to KAFKA-4785, is must internally set an extractor that 
> returns the record's metadata timestamp in order to overwrite the global 
> extractor from {{StreamsConfig}} (ie, set 
> {{FailOnInvalidTimestampExtractor}}). This change should be done in 
> KAFKA-4785 though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4144) Allow per stream/table timestamp extractor

2017-04-25 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov updated KAFKA-4144:
--
Status: Reopened  (was: Closed)

> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>    Assignee: Jeyhun Karimov
>  Labels: api, kip
>
> At the moment the timestamp extractor is configured via a {{StreamConfig}} 
> value to {{KafkaStreams}}. That means you can only have a single timestamp 
> extractor per app, even though you may be joining multiple streams/tables 
> that require different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> {{KStreamBuilder.stream()/table()}}, just like you can specify key and value 
> serdes that override the StreamConfig defaults.
> KIP: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=68714788
> Specifying a per-stream extractor should only be possible for sources, but 
> not for intermediate topics. For PAPI we cannot enforce this, but for DSL 
> {{through()}} should not allow to set a custom extractor by the user. In 
> contrast, with regard to KAFKA-4785, is must internally set an extractor that 
> returns the record's metadata timestamp in order to overwrite the global 
> extractor from {{StreamsConfig}} (ie, set 
> {{FailOnInvalidTimestampExtractor}}). This change should be done in 
> KAFKA-4785 though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (KAFKA-4144) Allow per stream/table timestamp extractor

2017-04-25 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov closed KAFKA-4144.
-

> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>    Assignee: Jeyhun Karimov
>  Labels: api, kip
>
> At the moment the timestamp extractor is configured via a {{StreamConfig}} 
> value to {{KafkaStreams}}. That means you can only have a single timestamp 
> extractor per app, even though you may be joining multiple streams/tables 
> that require different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> {{KStreamBuilder.stream()/table()}}, just like you can specify key and value 
> serdes that override the StreamConfig defaults.
> KIP: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=68714788
> Specifying a per-stream extractor should only be possible for sources, but 
> not for intermediate topics. For PAPI we cannot enforce this, but for DSL 
> {{through()}} should not allow to set a custom extractor by the user. In 
> contrast, with regard to KAFKA-4785, is must internally set an extractor that 
> returns the record's metadata timestamp in order to overwrite the global 
> extractor from {{StreamsConfig}} (ie, set 
> {{FailOnInvalidTimestampExtractor}}). This change should be done in 
> KAFKA-4785 though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-4144) Allow per stream/table timestamp extractor

2017-04-25 Thread Jeyhun Karimov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov resolved KAFKA-4144.
---
Resolution: Fixed
  Reviewer: Matthias J. Sax

> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>    Assignee: Jeyhun Karimov
>  Labels: api, kip
>
> At the moment the timestamp extractor is configured via a {{StreamConfig}} 
> value to {{KafkaStreams}}. That means you can only have a single timestamp 
> extractor per app, even though you may be joining multiple streams/tables 
> that require different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> {{KStreamBuilder.stream()/table()}}, just like you can specify key and value 
> serdes that override the StreamConfig defaults.
> KIP: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=68714788
> Specifying a per-stream extractor should only be possible for sources, but 
> not for intermediate topics. For PAPI we cannot enforce this, but for DSL 
> {{through()}} should not allow to set a custom extractor by the user. In 
> contrast, with regard to KAFKA-4785, is must internally set an extractor that 
> returns the record's metadata timestamp in order to overwrite the global 
> extractor from {{StreamsConfig}} (ie, set 
> {{FailOnInvalidTimestampExtractor}}). This change should be done in 
> KAFKA-4785 though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-123: Allow per stream/table timestamp extractor

2017-03-24 Thread Jeyhun Karimov
Thanks for the comments Matthias. I updated the KIP and PR.

Cheers,
Jeyhun

On Fri, Mar 24, 2017 at 8:34 AM Eno Thereska <eno.there...@gmail.com> wrote:

> +1 (non-binding)
>
> Thanks
> Eno
>
> > On 24 Mar 2017, at 03:37, Matthias J. Sax <matth...@confluent.io> wrote:
> >
> > Thanks Jeyhun.
> >
> > Can you also update the KIP accordingly. It must contain all changes to
> > public API. Thus, list all parameters that get deprecated and newly
> > added. And add a sentence about backward compatibility.
> >
> >
> > -Matthias
> >
> > On 3/23/17 3:16 AM, Jeyhun Karimov wrote:
> >> Sorry for a super late update. I made an update on related PR.
> >>
> >> Cheers,
> >> Jeyhun
> >>
> >> On Wed, Mar 22, 2017 at 9:09 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>
> >>> Jeyhun,
> >>>
> >>> Could you update the status of this KIP since it has been some time
> since
> >>> the last vote?
> >>>
> >>> I'm +1 besides the minor comments mentioned above.
> >>>
> >>>
> >>> Guozhang
> >>>
> >>>
> >>> On Mon, Mar 6, 2017 at 3:14 PM, Jeyhun Karimov <je.kari...@gmail.com>
> >>> wrote:
> >>>
> >>>> Sorry I was late. I will update javadocs in related methods to
> emphasize
> >>>> that TimestampExtractor is stateless.
> >>>>
> >>>> Cheers,
> >>>> Jeyhun
> >>>>
> >>>> On Mon, Mar 6, 2017 at 8:17 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>>>
> >>>>> 1) Sounds good.
> >>>>>
> >>>>> 2) Yeah what I meant is to emphasize that TimestampExtractor to be
> >>>>> stateless in the docs somewhere.
> >>>>>
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>>
> >>>>> On Sun, Mar 5, 2017 at 4:27 PM, Matthias J. Sax <
> matth...@confluent.io
> >>>>
> >>>>> wrote:
> >>>>>
> >>>>>> Guozhang,
> >>>>>>
> >>>>>> about renaming the config parameters. I like this idea, but want to
> >>>>>> point out, that this change should be done in a backward compatible
> >>>> way.
> >>>>>> Thus, we need to keep (and only deprecate) the current parameter
> >>> names.
> >>>>>>
> >>>>>> I am not sure about (2)? What do you worry about? Using a "stateful
> >>>>>> extractor"? -- this would be an antipattern IMHO. We can clarify
> >>> that a
> >>>>>> TimestampExtrator should be stateless though (even if this should be
> >>>>>> clear).
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>> On 3/4/17 6:36 PM, Guozhang Wang wrote:
> >>>>>>> Jeyhun,
> >>>>>>>
> >>>>>>> Thanks for proposing this KIP! And sorry for getting late in the
> >>>>>> discussion.
> >>>>>>>
> >>>>>>> I have a general suggestion not directly related to this KIP and a
> >>>>> couple
> >>>>>>> of comments for this KIP here:
> >>>>>>>
> >>>>>>> I agree with Mathieu's observation, partly because we are now
> >>> having
> >>>>> lots
> >>>>>>> of overloaded functions both in the DSL and in PAPI, and it would
> >>> be
> >>>>>> quite
> >>>>>>> confusing to users. As Matthias mentioned we do have some plans to
> >>>>>> refactor
> >>>>>>> this API, but just wanted to point it out that this KIP may likely
> >>>> urge
> >>>>>> us
> >>>>>>> to do the API refactoring sooner than planned. My personal
> >>> preference
> >>>>>> would
> >>>>>>> be doing that the next release (i.e. 0.11.0.0 in June).
> >>>>>>>
> >>>>>>>
> >>>>>>> Now some detailed comments:
> >>>>>>>
> >>>>>>> 1. I'd suggest change TIMESTAMP_EXTRACTOR_CLASS_C

Re: [VOTE] KIP-123: Allow per stream/table timestamp extractor

2017-03-23 Thread Jeyhun Karimov
Sorry for a super late update. I made an update on related PR.

Cheers,
Jeyhun

On Wed, Mar 22, 2017 at 9:09 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Jeyhun,
>
> Could you update the status of this KIP since it has been some time since
> the last vote?
>
> I'm +1 besides the minor comments mentioned above.
>
>
> Guozhang
>
>
> On Mon, Mar 6, 2017 at 3:14 PM, Jeyhun Karimov <je.kari...@gmail.com>
> wrote:
>
> > Sorry I was late. I will update javadocs in related methods to emphasize
> > that TimestampExtractor is stateless.
> >
> > Cheers,
> > Jeyhun
> >
> > On Mon, Mar 6, 2017 at 8:17 PM Guozhang Wang <wangg...@gmail.com> wrote:
> >
> > > 1) Sounds good.
> > >
> > > 2) Yeah what I meant is to emphasize that TimestampExtractor to be
> > > stateless in the docs somewhere.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sun, Mar 5, 2017 at 4:27 PM, Matthias J. Sax <matth...@confluent.io
> >
> > > wrote:
> > >
> > > > Guozhang,
> > > >
> > > > about renaming the config parameters. I like this idea, but want to
> > > > point out, that this change should be done in a backward compatible
> > way.
> > > > Thus, we need to keep (and only deprecate) the current parameter
> names.
> > > >
> > > > I am not sure about (2)? What do you worry about? Using a "stateful
> > > > extractor"? -- this would be an antipattern IMHO. We can clarify
> that a
> > > > TimestampExtrator should be stateless though (even if this should be
> > > > clear).
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 3/4/17 6:36 PM, Guozhang Wang wrote:
> > > > > Jeyhun,
> > > > >
> > > > > Thanks for proposing this KIP! And sorry for getting late in the
> > > > discussion.
> > > > >
> > > > > I have a general suggestion not directly related to this KIP and a
> > > couple
> > > > > of comments for this KIP here:
> > > > >
> > > > > I agree with Mathieu's observation, partly because we are now
> having
> > > lots
> > > > > of overloaded functions both in the DSL and in PAPI, and it would
> be
> > > > quite
> > > > > confusing to users. As Matthias mentioned we do have some plans to
> > > > refactor
> > > > > this API, but just wanted to point it out that this KIP may likely
> > urge
> > > > us
> > > > > to do the API refactoring sooner than planned. My personal
> preference
> > > > would
> > > > > be doing that the next release (i.e. 0.11.0.0 in June).
> > > > >
> > > > >
> > > > > Now some detailed comments:
> > > > >
> > > > > 1. I'd suggest change TIMESTAMP_EXTRACTOR_CLASS_CONFIG to
> > > > > "default.timestamp.extractor" or "global.timestamp.extractor" (also
> > the
> > > > > Java variable name can be changed accordingly) along with this
> > change.
> > > In
> > > > > addition, maybe we can piggy-backing this to also rename
> > > > > KEY_SERDE_CLASS_CONFIG/VALUE_SERDE_CLASS_CONFIG to "default.key.."
> > etc
> > > > in
> > > > > this KIP.
> > > > >
> > > > > 2. Another thing we should consider, is that since now we could
> > > > potentially
> > > > > use multiple timestamp extractor instances than a single one, this
> > may
> > > be
> > > > > breaking if user's customization did some global bookkeeping based
> on
> > > the
> > > > > previous assumption (maybe a wild thought but e.g. keeping track
> some
> > > > > global counts in the extractor as a local variable). We need to
> > clarify
> > > > > this change in the javadoc and also potentially in the upgrade web
> > doc
> > > > > sections.
> > > > >
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Wed, Mar 1, 2017 at 6:09 AM, Michael Noll <mich...@confluent.io
> >
> > > > wrote:
> > > > >
> > > > >> +1 (non-binding)
> > > > >>
> > > > &g

Re: [VOTE] KIP-123: Allow per stream/table timestamp extractor

2017-03-06 Thread Jeyhun Karimov
Sorry I was late. I will update javadocs in related methods to emphasize
that TimestampExtractor is stateless.

Cheers,
Jeyhun

On Mon, Mar 6, 2017 at 8:17 PM Guozhang Wang <wangg...@gmail.com> wrote:

> 1) Sounds good.
>
> 2) Yeah what I meant is to emphasize that TimestampExtractor to be
> stateless in the docs somewhere.
>
>
> Guozhang
>
>
> On Sun, Mar 5, 2017 at 4:27 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > Guozhang,
> >
> > about renaming the config parameters. I like this idea, but want to
> > point out, that this change should be done in a backward compatible way.
> > Thus, we need to keep (and only deprecate) the current parameter names.
> >
> > I am not sure about (2)? What do you worry about? Using a "stateful
> > extractor"? -- this would be an antipattern IMHO. We can clarify that a
> > TimestampExtrator should be stateless though (even if this should be
> > clear).
> >
> >
> > -Matthias
> >
> >
> > On 3/4/17 6:36 PM, Guozhang Wang wrote:
> > > Jeyhun,
> > >
> > > Thanks for proposing this KIP! And sorry for getting late in the
> > discussion.
> > >
> > > I have a general suggestion not directly related to this KIP and a
> couple
> > > of comments for this KIP here:
> > >
> > > I agree with Mathieu's observation, partly because we are now having
> lots
> > > of overloaded functions both in the DSL and in PAPI, and it would be
> > quite
> > > confusing to users. As Matthias mentioned we do have some plans to
> > refactor
> > > this API, but just wanted to point it out that this KIP may likely urge
> > us
> > > to do the API refactoring sooner than planned. My personal preference
> > would
> > > be doing that the next release (i.e. 0.11.0.0 in June).
> > >
> > >
> > > Now some detailed comments:
> > >
> > > 1. I'd suggest change TIMESTAMP_EXTRACTOR_CLASS_CONFIG to
> > > "default.timestamp.extractor" or "global.timestamp.extractor" (also the
> > > Java variable name can be changed accordingly) along with this change.
> In
> > > addition, maybe we can piggy-backing this to also rename
> > > KEY_SERDE_CLASS_CONFIG/VALUE_SERDE_CLASS_CONFIG to "default.key.." etc
> > in
> > > this KIP.
> > >
> > > 2. Another thing we should consider, is that since now we could
> > potentially
> > > use multiple timestamp extractor instances than a single one, this may
> be
> > > breaking if user's customization did some global bookkeeping based on
> the
> > > previous assumption (maybe a wild thought but e.g. keeping track some
> > > global counts in the extractor as a local variable). We need to clarify
> > > this change in the javadoc and also potentially in the upgrade web doc
> > > sections.
> > >
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, Mar 1, 2017 at 6:09 AM, Michael Noll <mich...@confluent.io>
> > wrote:
> > >
> > >> +1 (non-binding)
> > >>
> > >> Thanks for the KIP!
> > >>
> > >> On Wed, Mar 1, 2017 at 1:49 PM, Bill Bejeck <bbej...@gmail.com>
> wrote:
> > >>
> > >>> +1
> > >>>
> > >>> Thanks
> > >>> Bill
> > >>>
> > >>> On Wed, Mar 1, 2017 at 5:06 AM, Eno Thereska <eno.there...@gmail.com
> >
> > >>> wrote:
> > >>>
> > >>>> +1 (non binding).
> > >>>>
> > >>>> Thanks
> > >>>> Eno
> > >>>>> On 28 Feb 2017, at 17:22, Matthias J. Sax <matth...@confluent.io>
> > >>> wrote:
> > >>>>>
> > >>>>> +1
> > >>>>>
> > >>>>> Thanks a lot for the KIP!
> > >>>>>
> > >>>>> -Matthias
> > >>>>>
> > >>>>>
> > >>>>> On 2/28/17 1:35 AM, Damian Guy wrote:
> > >>>>>> Thanks for the KIP Jeyhun!
> > >>>>>>
> > >>>>>> +1
> > >>>>>>
> > >>>>>> On Tue, 28 Feb 2017 at 08:59 Jeyhun Karimov <je.kari...@gmail.com
> >
> > >>>> wrote:
> > >>>>>>
> > >>>>>>> Dear community,
> > >>>>>>>
> > >>>>>>> I'd like to start the vote for KIP-123:
> > >>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.
> > >>>> action?pageId=68714788
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Cheers,
> > >>>>>>> Jeyhun
> > >>>>>>> --
> > >>>>>>> -Cheers
> > >>>>>>>
> > >>>>>>> Jeyhun
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >>>
> > >>
> > >
> > >
> > >
> >
> >
>
>
> --
> -- Guozhang
>
-- 
-Cheers

Jeyhun


Re: [DISCUSS] KIP-123: Allow per stream/table timestamp extractor

2017-02-28 Thread Jeyhun Karimov
Hi Eno,

Thanks for clarification. I think it is by definition allowed.  So if we
want to join a stream that uses wallclock time with a stream that uses an
event time, then we can assign the first one a timestamp extractor that
returns system clock, and for the second stream we can assign timestamp
extractor that extracts/computes the event time from record.

Cheers,
Jeyhun

On Tue, Feb 28, 2017 at 11:40 AM Eno Thereska <eno.there...@gmail.com>
wrote:

> Hi Jeyhun,
>
> I mean something slightly different. In your motivation you say "joining
> multiple streams/tables that require different timestamp extraction
> methods". I wan to understand the scope of this. Is it allowed to have a
> stream that uses wallclock time join a stream that uses event time? (It
> would be good to give some examples in the motivation about scenarios you
> envision). If the join is not allowed, how do you prevent that join from
> happening? Do you throw an exception?
>
> Thanks
> Eno
>
>
> > On 28 Feb 2017, at 10:04, Jeyhun Karimov <je.kari...@gmail.com> wrote:
> >
> > Hi Eno,
> >
> > Thanks for feedback. I think you mean [1]. In this KIP we do not consider
> > the situations you mentioned. So, either we can extend the KIP and solve
> > mentioned issues  or submit 2 PRs incrementally.
> >
> > [1] https://issues.apache.org/jira/browse/KAFKA-4785
> >
> >
> > Cheers,
> > Jeyhun
> >
> > On Tue, Feb 28, 2017 at 10:41 AM Eno Thereska <eno.there...@gmail.com>
> > wrote:
> >
> >> Hi Jeyhun,
> >>
> >> Thanks for the KIP, sorry I'm coming a bit late to the discussion.
> >>
> >> One thing I'd like to understand is whether we can avoid situations
> where
> >> the user is mixing different times (event time vs. wallclock time) in
> their
> >> processing inadvertently. Before this KIP, all the relevant topics have
> one
> >> time stamp extractor so that issue does not come up.
> >>
> >> What will be the behavior if times mismatch, e.g., for joins?
> >>
> >> Thanks
> >> Eno
> >>
> >>> On 22 Feb 2017, at 09:21, Jeyhun Karimov <je.kari...@gmail.com> wrote:
> >>>
> >>> Dear community,
> >>>
> >>> I would like to get further feedbacks on this KIP (if any).
> >>>
> >>> Cheers
> >>> Jeyhun
> >>>
> >>> On Wed, Feb 15, 2017 at 2:36 AM Matthias J. Sax <matth...@confluent.io
> >
> >>> wrote:
> >>>
> >>>> Mathieu,
> >>>>
> >>>> I personally agree with your observation, and we have plans to submit
> a
> >>>> KIP like this. If you want to drive this discussion feel free to start
> >>>> the KIP by yourself!
> >>>>
> >>>> Having said that, for this KIP we might want to focus the discussion
> the
> >>>> the actual feature that gets added: allowing to specify different
> >>>> TS-Extractor for different inputs.
> >>>>
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 2/14/17 4:54 PM, Mathieu Fenniak wrote:
> >>>>> Hi Jeyhun,
> >>>>>
> >>>>> This KIP might not be the appropriate time, but my first thought
> >> reading
> >>>> it
> >>>>> is that it might make sense to introduce a builder-style API rather
> >> than
> >>>>> adding a mix of new method overloads with independent optional
> >>>> parameters.
> >>>>> :-)
> >>>>>
> >>>>> eg. stream(), table(), globalTable(), addSource(), could all accept a
> >>>>> "TopicReference" parameter that can be built like:
> >>>>>
> >>>>
> >>
> TopicReference("my-topic").keySerde(...).valueSerde(...).autoOffsetReset(...).timestampExtractor(...).build().
> >>>>>
> >>>>> Mathieu
> >>>>>
> >>>>>
> >>>>> On Tue, Feb 14, 2017 at 5:31 PM, Jeyhun Karimov <
> je.kari...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Dear community,
> >>>>>>
> >>>>>> I want to share the KIP-123 [1] which is based on issue KAFKA-4144
> >> [2].
> >>>> You
> >>>>>> can check the PR in [3].
> >>>>>>
> >>>>>> I would like to get your comments.
> >>>>>>
> >>>>>> [1]
> >>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=68714788
> >>>>>> [2] https://issues.apache.org/jira/browse/KAFKA-4144
> >>>>>> [3] https://github.com/apache/kafka/pull/2466
> >>>>>>
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Jeyhun
> >>>>>> --
> >>>>>> -Cheers
> >>>>>>
> >>>>>> Jeyhun
> >>>>>>
> >>>>>
> >>>>
> >>>> --
> >>> -Cheers
> >>>
> >>> Jeyhun
> >>
> >> --
> > -Cheers
> >
> > Jeyhun
>
> --
-Cheers

Jeyhun


Re: [DISCUSS] KIP-123: Allow per stream/table timestamp extractor

2017-02-28 Thread Jeyhun Karimov
Hi Eno,

Thanks for feedback. I think you mean [1]. In this KIP we do not consider
the situations you mentioned. So, either we can extend the KIP and solve
mentioned issues  or submit 2 PRs incrementally.

[1] https://issues.apache.org/jira/browse/KAFKA-4785


Cheers,
Jeyhun

On Tue, Feb 28, 2017 at 10:41 AM Eno Thereska <eno.there...@gmail.com>
wrote:

> Hi Jeyhun,
>
> Thanks for the KIP, sorry I'm coming a bit late to the discussion.
>
> One thing I'd like to understand is whether we can avoid situations where
> the user is mixing different times (event time vs. wallclock time) in their
> processing inadvertently. Before this KIP, all the relevant topics have one
> time stamp extractor so that issue does not come up.
>
> What will be the behavior if times mismatch, e.g., for joins?
>
> Thanks
> Eno
>
> > On 22 Feb 2017, at 09:21, Jeyhun Karimov <je.kari...@gmail.com> wrote:
> >
> > Dear community,
> >
> > I would like to get further feedbacks on this KIP (if any).
> >
> > Cheers
> > Jeyhun
> >
> > On Wed, Feb 15, 2017 at 2:36 AM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> Mathieu,
> >>
> >> I personally agree with your observation, and we have plans to submit a
> >> KIP like this. If you want to drive this discussion feel free to start
> >> the KIP by yourself!
> >>
> >> Having said that, for this KIP we might want to focus the discussion the
> >> the actual feature that gets added: allowing to specify different
> >> TS-Extractor for different inputs.
> >>
> >>
> >>
> >> -Matthias
> >>
> >> On 2/14/17 4:54 PM, Mathieu Fenniak wrote:
> >>> Hi Jeyhun,
> >>>
> >>> This KIP might not be the appropriate time, but my first thought
> reading
> >> it
> >>> is that it might make sense to introduce a builder-style API rather
> than
> >>> adding a mix of new method overloads with independent optional
> >> parameters.
> >>> :-)
> >>>
> >>> eg. stream(), table(), globalTable(), addSource(), could all accept a
> >>> "TopicReference" parameter that can be built like:
> >>>
> >>
> TopicReference("my-topic").keySerde(...).valueSerde(...).autoOffsetReset(...).timestampExtractor(...).build().
> >>>
> >>> Mathieu
> >>>
> >>>
> >>> On Tue, Feb 14, 2017 at 5:31 PM, Jeyhun Karimov <je.kari...@gmail.com>
> >>> wrote:
> >>>
> >>>> Dear community,
> >>>>
> >>>> I want to share the KIP-123 [1] which is based on issue KAFKA-4144
> [2].
> >> You
> >>>> can check the PR in [3].
> >>>>
> >>>> I would like to get your comments.
> >>>>
> >>>> [1]
> >>>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=68714788
> >>>> [2] https://issues.apache.org/jira/browse/KAFKA-4144
> >>>> [3] https://github.com/apache/kafka/pull/2466
> >>>>
> >>>>
> >>>> Cheers,
> >>>> Jeyhun
> >>>> --
> >>>> -Cheers
> >>>>
> >>>> Jeyhun
> >>>>
> >>>
> >>
> >> --
> > -Cheers
> >
> > Jeyhun
>
> --
-Cheers

Jeyhun


[VOTE] KIP-123: Allow per stream/table timestamp extractor

2017-02-28 Thread Jeyhun Karimov
Dear community,

I'd like to start the vote for KIP-123:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=68714788


Cheers,
Jeyhun
-- 
-Cheers

Jeyhun


Re: [DISCUSS] KIP-123: Allow per stream/table timestamp extractor

2017-02-22 Thread Jeyhun Karimov
Dear community,

I would like to get further feedbacks on this KIP (if any).

Cheers
Jeyhun

On Wed, Feb 15, 2017 at 2:36 AM Matthias J. Sax <matth...@confluent.io>
wrote:

> Mathieu,
>
> I personally agree with your observation, and we have plans to submit a
> KIP like this. If you want to drive this discussion feel free to start
> the KIP by yourself!
>
> Having said that, for this KIP we might want to focus the discussion the
> the actual feature that gets added: allowing to specify different
> TS-Extractor for different inputs.
>
>
>
> -Matthias
>
> On 2/14/17 4:54 PM, Mathieu Fenniak wrote:
> > Hi Jeyhun,
> >
> > This KIP might not be the appropriate time, but my first thought reading
> it
> > is that it might make sense to introduce a builder-style API rather than
> > adding a mix of new method overloads with independent optional
> parameters.
> > :-)
> >
> > eg. stream(), table(), globalTable(), addSource(), could all accept a
> > "TopicReference" parameter that can be built like:
> >
> TopicReference("my-topic").keySerde(...).valueSerde(...).autoOffsetReset(...).timestampExtractor(...).build().
> >
> > Mathieu
> >
> >
> > On Tue, Feb 14, 2017 at 5:31 PM, Jeyhun Karimov <je.kari...@gmail.com>
> > wrote:
> >
> >> Dear community,
> >>
> >> I want to share the KIP-123 [1] which is based on issue KAFKA-4144 [2].
> You
> >> can check the PR in [3].
> >>
> >> I would like to get your comments.
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=68714788
> >> [2] https://issues.apache.org/jira/browse/KAFKA-4144
> >> [3] https://github.com/apache/kafka/pull/2466
> >>
> >>
> >> Cheers,
> >> Jeyhun
> >> --
> >> -Cheers
> >>
> >> Jeyhun
> >>
> >
>
> --
-Cheers

Jeyhun


  1   2   >