It's too issues we want to tackle

 - too many overload (for some method we have already more than 10(
 - improve custom store API


On 7/7/17 3:42 PM, Jan Filipiak wrote:
> It makes me want to cry.
> why on earth is the DSL going to expose all its implementation details now?
> especially being materialized or not.
> If we want to take usefull steps in that direction maybe we are looking
> for a way to let the user switch back and forth between PAPI and DSL?
> A change as the proposed would not eliminate any of my pain points while
> still being a heck of work migrating towards to.
> Since I am only following this from the point where Eno CC'ed it into
> the users list:
> Can someone please rephrase for me what problem this is trying to solve?
> I don't mean to be rude but It uses a problematic feature
> "StateStoreSuppliers in DSL" to justify making it even worse. This helps
> us nowhere in making the configs more flexible, its just syntactic sugar.
> A low effort shoot like: lets add a properties to operations that would
> otherwise become overloaded to heavy? Or pull the configs by some naming
> schema
> form the overall properties. Additionally to that we get rid of
> StateStoreSuppliers in the DSL and have them also configured by said
> properties.
> => way easier to migrate to, way less risk, way more flexible in the
> future (different implementations of the same operation don't require
> code change to configure)
> Line 184 makes especially no sense to me. what is a KTableKTable non
> materialized join anyways?
> Hope we can discuss more on this.
> On 07.07.2017 17:23, Guozhang Wang wrote:
>> I messed the indentation on github code repos; this would be easier to
>> read:
>> Guozhang
>> On Fri, Jul 7, 2017 at 1:30 AM, Guozhang Wang <> wrote:
>>> Hi Damian / Kyle,
>>> I think I agree with you guys about the pros / cons of using the builder
>>> pattern v.s. using some "secondary classes". And I'm thinking if we can
>>> take a "mid" manner between these two. I spent some time with a slight
>>> different approach from Damian's current proposal:
>>> java/org/apache/kafka/streams/
>>> The key idea is to tolerate the final "table()" or "stream()"
>>> function to
>>> "upgrade" from the secondary classes to the first citizen classes, while
>>> having all the specs inside this function. Also this proposal
>>> includes some
>>> other refactoring that people have been discussed about for the
>>> builder to
>>> reduce the overloaded functions as well. WDYT?
>>> Guozhang
>>> On Tue, Jul 4, 2017 at 1:40 AM, Damian Guy <> wrote:
>>>> Hi Jan,
>>>> Thanks very much for the input.
>>>> On Tue, 4 Jul 2017 at 08:54 Jan Filipiak <>
>>>> wrote:
>>>>> Hi Damian,
>>>>> I do see your point of something needs to change. But I fully agree
>>>>> with
>>>>> Gouzhang when he says.
>>>>> ---
>>>>> But since this is a incompatibility change, and we are going to remove
>>>> the
>>>>> compatibility annotations soon it means we only have one chance and we
>>>>> really have to make it right.
>>>>> ----
>>>> I think we all agree on this one! Hence the discussion.
>>>>> I fear all suggestions do not go far enough to become something that
>>>> will
>>>>> carry on for very much longer.
>>>>> I am currently working on KAFKA-3705 and try to find the most easy way
>>>> for
>>>>> the user to give me all the required functionality. The easiest
>>>> interface I
>>>>> could come up so far can be looked at here.
>>>> de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
>>>> kafka/streams/kstream/internals/
>>>> And its already horribly complicated. I am currently unable to find the
>>>>> right abstraction level to have everything falling into place
>>>> naturally. To
>>>>> be honest I already think introducing
>>>> To be fair that is not a particularly easy problem to solve!
>>>> de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
>>>> kafka/streams/kstream/internals/
>>>>> was unideal and makes everything a mess.
>>>> I'm not sure i agree that it makes everything a mess, but It could have
>>>> been done differently.
>>>> The JoinType:Whatever is also not really flexible. 2 things come to my
>>>> mind:
>>>>> 1. I don't think we should rule out config based decisions say configs
>>>> like
>>>>>          streams.$applicationID.joins.$joinname.conf = value
>>>> Is this just for config? Or are you suggesting that we could somehow
>>>> "code"
>>>> the join in a config file?
>>>>> This can allow for tremendous changes without single API change and
>>>>> IMO
>>>> it
>>>>> was not considered enough yet.
>>>>> 2. Push logic from the DSL to the Callback classes. A ValueJoiner for
>>>>> example can be used to implement different join types as the user
>>>> wishes.
>>>> Do you have an example of how this might look?
>>>>> As Gouzhang said: stopping to break users is very important.
>>>> Of course. We want to make it as easy as possible for people to use
>>>> streams.
>>>> especially with this changes + All the plans I sadly only have in my
>>>> head
>>>>> but hopefully the first link can give a glimpse.
>>>>> Thanks for preparing the examples made it way clearer to me what
>>>>> exactly
>>>>> we are talking about. I would argue to go a bit slower and more
>>>> carefull on
>>>>> this one. At some point we need to get it right. Peeking over to the
>>>> hadoop
>>>>> guys with their hughe userbase. Config files really work well for
>>>>> them.
>>>>> Best Jan
>>>>> On 30.06.2017 09:31, Damian Guy wrote:
>>>>>> Thanks Matthias
>>>>>> On Fri, 30 Jun 2017 at 08:05 Matthias J. Sax <>
>>>>> wrote:
>>>>>>> I am just catching up on this thread, so sorry for the long email in
>>>>>>> advance... Also, it's to some extend a dump of thoughts and not
>>>> always a
>>>>>>> clear proposal. Still need to think about this in more detail. But
>>>> maybe
>>>>>>> it helps other to get new ideas :)
>>>>>>>>> However, I don't understand your argument about putting
>>>>>>>>> aggregate()
>>>>>>>>> after the withXX() -- all the calls to withXX() set optional
>>>>> parameters
>>>>>>>>> for aggregate() and not for groupBy() -- but a groupBy().withXX()
>>>>>>>>> indicates that the withXX() belongs to the groupBy(). IMHO, this
>>>> might
>>>>>>>>> be quite confusion for developers.
>>>>>>>> I see what you are saying, but the grouped stream is effectively a
>>>>> no-op
>>>>>>>> until you call one of the aggregate/count/reduce etc functions. So
>>>> the
>>>>>>>> optional params are ones that are applicable to any of the
>>>> operations
>>>>> you
>>>>>>>> can perform on this grouped stream. Then the final
>>>>>>>> count()/reduce()/aggregate() call has any of the params that are
>>>>>>>> required/specific to that function.
>>>>>>> I understand your argument, but you don't share the conclusion.
>>>>>>> If we
>>>>>>> need a "final/terminal" call, the better way might be
>>>>>>> .groupBy().count().withXX().build()
>>>>>>> (with a better name for build() though)
>>>>>> The point is that all the other calls, i.e,withBlah, windowed, etc
>>>> apply
>>>>>> too all the aggregate functions. The terminal call being the actual
>>>> type
>>>>> of
>>>>>> aggregation you want to do. I personally find this more natural than
>>>>>> groupBy().count().withBlah().build()
>>>>>>>> groupedStream.count(/** non windowed count**/)
>>>>>>>> groupedStream.windowed(TimeWindows.of(10L)).count(...)
>>>>>>>> groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)
>>>>>>> I like this. However, I don't see a reason to have windowed() and
>>>>>>> sessionWindowed(). We should have one top-level `Windows` interface
>>>> that
>>>>>>> both `TimeWindows` and `SessionWindows` implement and just have a
>>>> single
>>>>>>> windowed() method that accepts all `Windows`. (I did not like the
>>>>>>> separation of `SessionWindows` in the first place, and this seems to
>>>> be
>>>>>>> an opportunity to clean this up. It was hard to change when we
>>>>>>> introduced session windows)
>>>>>> Yes - true we should look into that.
>>>>>>> Btw: we do you the imperative groupBy() and groupByKey(), and
>>>>>>> thus we
>>>>>>> might also want to use windowBy() (instead of windowed()). Not sure
>>>> how
>>>>>>> important this is, but it seems to be inconsistent otherwise.
>>>>>> Makes sense
>>>>>>> About joins:  I don't like .withJoinType(JoinType.LEFT) at all. I
>>>> think,
>>>>>>> defining an inner/left/outer join is not an optional argument but a
>>>>>>> first class concept and should have a proper representation in the
>>>> API
>>>>>>> (like the current methods join(), leftJoin, outerJoin()).
>>>>>> Yep, i did originally have it as a required param and maybe that is
>>>> what
>>>>> we
>>>>>> go with. It could have a default, but maybe that is confusing.
>>>>>>> About the two join API proposals, the second one has too much boiler
>>>>>>> plate code for my taste. Also, the actual join() operator has only
>>>> one
>>>>>>> argument what is weird to me, as in my thinking process, the main
>>>>>>> operator call, should have one parameter per mandatory argument but
>>>> your
>>>>>>> proposal put the mandatory arguments into Joins.streamStreamJoin()
>>>> call.
>>>>>>> This is far from intuitive IMHO.
>>>>>> This is the builder pattern, you only need one param as the builder
>>>> has
>>>>>> captured all of the required and optional arguments.
>>>>>>> The first join proposal also seems to align better with the pattern
>>>>>>> suggested for aggregations and having the same pattern for all
>>>> operators
>>>>>>> is important (as you stated already).
>>>>>> This is why i offered two alternatives as i started out with. 1 is
>>>>>> the
>>>>>> builder pattern, the other is the more fluent pattern.
>>>>>>> Coming back to the config vs optional parameter. What about having a
>>>>>>> method withConfig[s](...) that allow to put in the configuration?
>>>>>> Sure, it is currently called withLogConfig() as that is the only
>>>>>> thing
>>>>> that
>>>>>> is really config.
>>>>>>> This also raises the question if until() is a windows property?
>>>>>>> Actually, until() seems to be a configuration parameter and thus,
>>>> should
>>>>>>> not not have it's own method.
>>>>>> Hmmm, i don't agree. Until is a property of the window. It is going
>>>> to be
>>>>>> potentially different for every window operation you do in a streams
>>>> app.
>>>>>>> Browsing throw your example DSL branch, I also saw this one:
>>>>>>>> final KTable<Windowed<String>, Long> windowed>
>>>>>>>    groupedStream.counting()
>>>>>>>>                    .windowed(TimeWindows.of(10L).until(10))
>>>>>>>>                    .table();
>>>>>>> This is an interesting idea, and it remind my on some feedback about
>>>> "I
>>>>>>> wanted to count a stream, but there was no count() method -- I first
>>>>>>> needed to figure out, that I need to group the stream first to be
>>>> able
>>>>>>> to count it. It does make sense in hindsight but was not obvious in
>>>> the
>>>>>>> beginning". Thus, carrying out this thought, we could also do the
>>>>>>> following:
>>>>>>> stream.count().groupedBy().windowedBy().table();
>>>>>>> -> Note, I use "grouped" and "windowed" instead of imperative here,
>>>> as
>>>>>>> it comes after the count()
>>>>>>> This would be more consistent than your proposal (that has grouping
>>>>>>> before but windowing after count()). It might even allow us to
>>>>>>> enrich
>>>>>>> the API with a some syntactic sugar like `stream.count().table()` to
>>>> get
>>>>>>> the overall count of all records (this would obviously not scale,
>>>> but we
>>>>>>> could support it -- if not now, maybe later).
>>>>>> I guess i'd prefer
>>>>>> stream.groupBy().windowBy().count()
>>>>>> stream.groupBy().windowBy().reduce()
>>>>>> stream.groupBy().count()
>>>>>> As i said above, everything that happens before the final aggregate
>>>> call
>>>>>> can be applied to any of them. So it makes sense to me to do those
>>>> things
>>>>>> ahead of the final aggregate call.
>>>>>>> Last about builder pattern. I am convinced that we need some
>>>> "terminal"
>>>>>>> operator/method that tells us when to add the processor to the
>>>> topology.
>>>>>>> But I don't see the need for a plain builder pattern that feels
>>>> alien to
>>>>>>> me (see my argument about the second join proposal). Using .stream()
>>>> /
>>>>>>> .table() as use in many examples might work. But maybe a more
>>>>>>> generic
>>>>>>> name that we can use in all places like build() or apply() might
>>>> also be
>>>>>>> an option.
>>>>>> Sure, a generic name might be ok.
>>>>>>> -Matthias
>>>>>>> On 6/29/17 7:37 AM, Damian Guy wrote:
>>>>>>>> Thanks Kyle.
>>>>>>>> On Thu, 29 Jun 2017 at 15:11 Kyle Winkelman <
>>>>>>>> wrote:
>>>>>>>>> Hi Damian,
>>>>>>>>>>>>> When trying to program in the fluent API that has been
>>>> discussed
>>>>>>> most
>>>>>>>>> it
>>>>>>>>>>>>> feels difficult to know when you will actually get an object
>>>> you
>>>>> can
>>>>>>>>> reuse.
>>>>>>>>>>>>> What if I make one KGroupedStream that I want to reuse, is it
>>>>> legal
>>>>>>> to
>>>>>>>>>>>>> reuse it or does this approach expect you to call grouped each
>>>>> time?
>>>>>>>>>>> I'd anticipate that once you have a KGroupedStream you can
>>>> re-use it
>>>>>>> as
>>>>>>>>> you
>>>>>>>>>>> can today.
>>>>>>>>> You said it yourself in another post that the grouped stream is
>>>>>>>>> effectively a no-op until a count, reduce, or aggregate. The way I
>>>> see
>>>>>>> it
>>>>>>>>> you wouldn’t be able to reuse anything except KStreams and
>>>>>>>>> KTables,
>>>>>>> because
>>>>>>>>> most of this fluent api would continue returning this (this being
>>>> the
>>>>>>>>> builder object currently being manipulated).
>>>>>>>> So, if you ever store a reference to anything but KStreams and
>>>> KTables
>>>>>>> and
>>>>>>>>> you use it in two different ways then its possible you make
>>>>> conflicting
>>>>>>>>> withXXX() calls on the same builder.
>>>>>>>> No necessarily true. It could return a new instance of the builder,
>>>>> i.e.,
>>>>>>>> the builders being immutable. So if you held a reference to the
>>>> builder
>>>>>>> it
>>>>>>>> would always be the same as it was when it was created.
>>>>>>>>> GroupedStream<K,V> groupedStreamWithDefaultSerdes =
>>>> kStream.grouped();
>>>>>>>>> GroupedStream<K,V> groupedStreamWithDeclaredSerdes =
>>>>>>>>> groupedStreamsWithDefaultSerdes.withKeySerde(…).withValueSerde(…);
>>>>>>>>> I’ll admit that this shouldn’t happen but some user is going to do
>>>> it
>>>>>>>>> eventually…
>>>>>>>>> Depending on implementation uses of groupedStreamWithDefaultSerdes
>>>>> would
>>>>>>>>> most likely be equivalent to the version withDeclaredSerdes. One
>>>> work
>>>>>>>>> around would be to always make copies of the config objects you
>>>>>>>>> are
>>>>>>>>> building, but this approach has its own problem because now we
>>>> have to
>>>>>>>>> identify which configs are equivalent so we don’t create repeated
>>>>>>>>> processors.
>>>>>>>>> The point of this long winded example is that we always have to be
>>>>>>>>> thinking about all of the possible ways it could be misused by a
>>>> user
>>>>>>>>> (causing them to see hard to diagnose problems).
>>>>>>>> Exactly! That is the point of the discussion really.
>>>>>>>>> In my attempt at a couple methods with builders I feel that I
>>>>>>>>> could
>>>>>>>>> confidently say the user couldn’t really mess it up.
>>>>>>>>>> // Count
>>>>>>>>>> KTable<String, Long> count =
>>>>> kGroupedStream.count(Count.count().withQueryableStoreName("my-store"));
>>>>>>>>> The kGroupedStream is reusable and if they attempted to reuse the
>>>>> Count
>>>>>>>>> for some reason it would throw an error message saying that a
>>>>>>>>> store
>>>>>>> named
>>>>>>>>> “my-store” already exists.
>>>>>>>> Yes i agree and i think using builders is my preferred pattern.
>>>>>>>> Cheers,
>>>>>>>> Damian
>>>>>>>>> Thanks,
>>>>>>>>> Kyle
>>>>>>>>> From: Damian Guy
>>>>>>>>> Sent: Thursday, June 29, 2017 3:59 AM
>>>>>>>>> To:
>>>>>>>>> Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring
>>>>>>>>> Hi Kyle,
>>>>>>>>> Thanks for your input. Really appreciated.
>>>>>>>>> On Thu, 29 Jun 2017 at 06:09 Kyle Winkelman <
>>>>>>>>> wrote:
>>>>>>>>>> I like more of a builder pattern even though others have voiced
>>>>> against
>>>>>>>>>> it. The reason I like it is because it makes it clear to the user
>>>>> that
>>>>>>> a
>>>>>>>>>> call to KGroupedStream#count will return a KTable not some
>>>>> intermediate
>>>>>>>>>> class that I need to undetstand.
>>>>>>>>> Yes, that makes sense.
>>>>>>>>>> When trying to program in the fluent API that has been discussed
>>>> most
>>>>>>> it
>>>>>>>>>> feels difficult to know when you will actually get an object you
>>>> can
>>>>>>>>> reuse.
>>>>>>>>>> What if I make one KGroupedStream that I want to reuse, is it
>>>> legal
>>>>> to
>>>>>>>>>> reuse it or does this approach expect you to call grouped each
>>>> time?
>>>>>>>>> I'd anticipate that once you have a KGroupedStream you can re-use
>>>> it
>>>>> as
>>>>>>> you
>>>>>>>>> can today.
>>>>>>>>>> This question doesn’t pop into my head at all in the builder
>>>> pattern
>>>>> I
>>>>>>>>>> assume I can reuse everything.
>>>>>>>>>> Finally, I like .groupByKey and .groupBy(KeyValueMapper) not a
>>>>>>>>>> big
>>>>> fan
>>>>>>> of
>>>>>>>>>> the grouped.
>>>>>>>>>> Yes, grouped() was more for demonstration and because groupBy()
>>>> and
>>>>>>>>> groupByKey() were taken! So i'd imagine the api would actually
>>>> want to
>>>>>>> be
>>>>>>>>> groupByKey(/** no required args***/).withOptionalArg() and
>>>>>>>>> groupBy(KeyValueMapper m).withOpitionalArg(...)  of course this
>>>>>>>>> all
>>>>>>> depends
>>>>>>>>> on maintaining backward compatibility.
>>>>>>>>>> Unfortunately, the below approach would require atleast 2
>>>> (probably
>>>>> 3)
>>>>>>>>>> overloads (one for returning a KTable and one for returning a
>>>> KTable
>>>>>>> with
>>>>>>>>>> Windowed Key, probably would want to split windowed and
>>>>> sessionwindowed
>>>>>>>>> for
>>>>>>>>>> ease of implementation) of each count, reduce, and aggregate.
>>>>>>>>>> Obviously not exhaustive but enough for you to get the picture.
>>>>> Count,
>>>>>>>>>> Reduce, and Aggregate supply 3 static methods to initialize the
>>>>>>> builder:
>>>>>>>>>> // Count
>>>>>>>>>> KTable<String, Long> count =
>>>>> groupedStream.count(Count.count().withQueryableStoreName("my-store"));
>>>>>>>>>> // Windowed Count
>>>>>>>>>> KTable<Windowed<String>, Long> windowedCount =
>>>>> groupedStream.count(Count.windowed(TimeWindows.of(10L).until
>>>> (10)).withQueryableStoreName("my-windowed-store"));
>>>>>>>>>> // Session Count
>>>>>>>>>> KTable<Windowed<String>, Long> sessionCount =
>>>>> groupedStream.count(Count.sessionWindowed(SessionWindows.
>>>> with(10L)).withQueryableStoreName("my-session-windowed-store"));
>>>>>>>>> Above and below, i think i'd prefer it to be:
>>>>>>>>> groupedStream.count(/** non windowed count**/)
>>>>>>>>> groupedStream.windowed(TimeWindows.of(10L)).count(...)
>>>>>>>>> groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)
>>>>>>>>>> // Reduce
>>>>>>>>>> Reducer<Long> reducer;
>>>>>>>>>> KTable<String, Long> reduce = groupedStream.reduce(reducer,
>>>>>>>>>> Reduce.reduce().withQueryableStoreName("my-store"));
>>>>>>>>>> // Aggregate Windowed with Custom Store
>>>>>>>>>> Initializer<String> initializer;
>>>>>>>>>> Aggregator<String, Long, String> aggregator;
>>>>>>>>>> KTable<Windowed<String>, String> aggregate =
>>>>>>>>>> groupedStream.aggregate(initializer, aggregator,
>>>>> Aggregate.windowed(TimeWindows.of(10L).until(10)).
>>>> withStateStoreSupplier(stateStoreSupplier)));
>>>>>>>>>> // Cogroup SessionWindowed
>>>>>>>>>> KTable<String, String> cogrouped =
>>>>> groupedStream1.cogroup(aggregator1)
>>>>>>>>>>           .cogroup(groupedStream2, aggregator2)
>>>>>>>>>>           .aggregate(initializer, aggregator,
>>>>>>>>>> Aggregate.sessionWindowed(SessionWindows.with(10L),
>>>>>>>>>> sessionMerger).withQueryableStoreName("my-store"));
>>>>>>>>>> public class Count {
>>>>>>>>>>       public static class Windowed extends Count {
>>>>>>>>>>           private Windows windows;
>>>>>>>>>>       }
>>>>>>>>>>       public static class SessionWindowed extends Count {
>>>>>>>>>>           private SessionWindows sessionWindows;
>>>>>>>>>>       }
>>>>>>>>>>       public static Count count();
>>>>>>>>>>       public static Windowed windowed(Windows windows);
>>>>>>>>>>       public static SessionWindowed
>>>>>>>>>> sessionWindowed(SessionWindows
>>>>>>>>>> sessionWindows);
>>>>>>>>>>       // All withXXX(...) methods.
>>>>>>>>>> }
>>>>>>>>>> public class KGroupedStream {
>>>>>>>>>>       public KTable<K, Long> count(Count count);
>>>>>>>>>>       public KTable<Windowed<K>, Long> count(Count.Windowed
>>>>>>>>>> count);
>>>>>>>>>>       public KTable<Windowed<K>, Long>
>>>>>>>>>> count(Count.SessionWindowed
>>>>>>> count);
>>>>>>>>>> …
>>>>>>>>>> }
>>>>>>>>>> Thanks,
>>>>>>>>>> Kyle
>>>>>>>>>> From: Guozhang Wang
>>>>>>>>>> Sent: Wednesday, June 28, 2017 7:45 PM
>>>>>>>>>> To:
>>>>>>>>>> Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring
>>>>>>>>>> I played the current proposal a bit with
>>>>>>>>>> tree/dsl-experiment <
>>>>>>>> ,
>>>>>>>>>> and here are my observations:
>>>>>>>>>> 1. Personally I prefer
>>>>>>>>>>       " / stream.groupByKey()"
>>>>>>>>>> than
>>>>>>>>>>       " /"
>>>>>>>>>> Since 1) withKeyMapper is not enforced programmatically though it
>>>> is
>>>>>>> not
>>>>>>>>>> "really" optional like others, 2) syntax-wise it reads more
>>>> natural.
>>>>>>>>>> I think it is okay to add the APIs in (
>>>> c/main/java/org/apache/kafka/streams/kstream/
>>>>>>>>>> )
>>>>>>>>>> in KGroupedStream.
>>>>>>>>>> 2. For the "withStateStoreSupplier" API, are the user supposed to
>>>>> pass
>>>>>>> in
>>>>>>>>>> the most-inner state store supplier (e.g. then one whose get()
>>>> return
>>>>>>>>>> RocksDBStore), or it is supposed to return the most-outer
>>>>>>>>>> supplier
>>>>> with
>>>>>>>>>> logging / metrics / etc? I think it would be more useful to only
>>>>>>> require
>>>>>>>>>> users pass in the inner state store supplier while specifying
>>>>> caching /
>>>>>>>>>> logging through other APIs.
>>>>>>>>>> In addition, the "GroupedWithCustomStore" is a bit suspicious to
>>>> me:
>>>>> we
>>>>>>>>> are
>>>>>>>>>> allowing users to call other APIs like "withQueryableName"
>>>> multiple
>>>>>>> time,
>>>>>>>>>> but only call "withStateStoreSupplier" only once in the end. Why
>>>> is
>>>>>>> that?
>>>>>>>>>> 3. The current DSL seems to be only for aggregations, what about
>>>>> joins?
>>>>>>>>>> 4. I think it is okay to keep the "withLogConfig": for the
>>>>>>>>>> StateStoreSupplier it will still be user code specifying the
>>>> topology
>>>>>>> so
>>>>>>>>> I
>>>>>>>>>> do not see there is a big difference.
>>>>>>>>>> 5. "WindowedGroupedStream" 's withStateStoreSupplier should take
>>>> the
>>>>>>>>>> windowed state store supplier to enforce typing?
>>>>>>>>>> Below are minor ones:
>>>>>>>>>> 6. "withQueryableName": maybe better "withQueryableStateName"?
>>>>>>>>>> 7. "withLogConfig": maybe better "withLoggingTopicConfig()"?
>>>>>>>>>> Guozhang
>>>>>>>>>> On Wed, Jun 28, 2017 at 3:59 PM, Matthias J. Sax <
>>>>>>>>>> wrote:
>>>>>>>>>>> I see your point about "when to add the processor to the
>>>> topology".
>>>>>>>>> That
>>>>>>>>>>> is indeed an issue. Not sure it we could allow "updates" to the
>>>>>>>>>> topology...
>>>>>>>>>>> I don't see any problem with having all the withXX() in KTable
>>>>>>>>> interface
>>>>>>>>>>> -- but this might be subjective.
>>>>>>>>>>> However, I don't understand your argument about putting
>>>> aggregate()
>>>>>>>>>>> after the withXX() -- all the calls to withXX() set optional
>>>>>>> parameters
>>>>>>>>>>> for aggregate() and not for groupBy() -- but a
>>>>>>>>>>> groupBy().withXX()
>>>>>>>>>>> indicates that the withXX() belongs to the groupBy(). IMHO, this
>>>>> might
>>>>>>>>>>> be quite confusion for developers.
>>>>>>>>>>> -Matthias
>>>>>>>>>>> On 6/28/17 2:55 AM, Damian Guy wrote:
>>>>>>>>>>>>> I also think that mixing optional parameters with configs is a
>>>> bad
>>>>>>>>>> idea.
>>>>>>>>>>>>> Have not proposal for this atm but just wanted to mention it.
>>>> Hope
>>>>>>>>> to
>>>>>>>>>>>>> find some time to come up with something.
>>>>>>>>>>>> Yes, i don't like the mix of config either. But the only real
>>>>> config
>>>>>>>>>> here
>>>>>>>>>>>> is the logging config - which we don't really need as it can
>>>>> already
>>>>>>>>> be
>>>>>>>>>>>> done via a custom StateStoreSupplier.
>>>>>>>>>>>>> What I don't like in the current proposal is the
>>>>>>>>>>>>> .grouped().withKeyMapper() -- the current solution with
>>>>>>>>> .groupBy(...)
>>>>>>>>>>>>> and .groupByKey() seems better. For clarity, we could
>>>>>>>>>>>>> rename to
>>>>>>>>>>>>> .groupByNewKey(...) and .groupByCurrentKey() (even if we
>>>>>>>>>>>>> should
>>>>> find
>>>>>>>>>>>>> some better names).
>>>>>>>>>>>> it could be groupByKey(), groupBy() or something different bt
>>>>>>>>>>>>> The proposed pattern "chains" grouping and aggregation too
>>>> close
>>>>>>>>>>>>> together. I would rather separate both more than less, ie, do
>>>> into
>>>>>>>>> the
>>>>>>>>>>>>> opposite direction.
>>>>>>>>>>>>> I am also wondering, if we could so something more "fluent".
>>>> The
>>>>>>>>>> initial
>>>>>>>>>>>>> proposal was like:
>>>>>>>>>>>>>>> groupedStream.count()
>>>>>>>>>>>>>>>      .withStoreName("name")
>>>>>>>>>>>>>>>      .withCachingEnabled(false)
>>>>>>>>>>>>>>>      .withLoggingEnabled(config)
>>>>>>>>>>>>>>>      .table()
>>>>>>>>>>>>> The .table() statement in the end was kinda alien.
>>>>>>>>>>>> I agree, but then all of the withXXX methods need to be on
>>>> KTable
>>>>>>>>> which
>>>>>>>>>>> is
>>>>>>>>>>>> worse in my opinion. You also need something that is going to
>>>>> "build"
>>>>>>>>>> the
>>>>>>>>>>>> internal processors and add them to the topology.
>>>>>>>>>>>>> The current proposal put the count() into the end -- ie, the
>>>>>>>>> optional
>>>>>>>>>>>>> parameter for count() have to specified on the .grouped() call
>>>> -- 
>>>>>>>>> this
>>>>>>>>>>>>> does not seems to be the best way either.
>>>>>>>>>>>> I actually prefer this method as you are building a grouped
>>>> stream
>>>>>>>>> that
>>>>>>>>>>> you
>>>>>>>>>>>> will aggregate. So
>>>>>>>>> table.grouped(...).withOptionalStuff().aggregate(..)
>>>>>>>>>>> etc
>>>>>>>>>>>> seems natural to me.
>>>>>>>>>>>>> I did not think this through in detail, but can't we just do
>>>> the
>>>>>>>>>> initial
>>>>>>>>>>>>> proposal with the .table() ?
>>>>>>>>>>>>> groupedStream.count().withStoreName("name").mapValues(...)
>>>>>>>>>>>>> Each .withXXX(...) return the current KTable and all the
>>>>> .withXXX()
>>>>>>>>>> are
>>>>>>>>>>>>> just added to the KTable interface. Or do I miss anything why
>>>> this
>>>>>>>>>> wont'
>>>>>>>>>>>>> work or any obvious disadvantage?
>>>>>>>>>>>> See above.
>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>> On 6/22/17 4:06 AM, Damian Guy wrote:
>>>>>>>>>>>>>> Thanks everyone. My latest attempt is below. It builds on the
>>>>>>>>> fluent
>>>>>>>>>>>>>> approach, but i think it is slightly nicer.
>>>>>>>>>>>>>> I agree with some of what Eno said about mixing configy stuff
>>>> in
>>>>>>>>> the
>>>>>>>>>>> DSL,
>>>>>>>>>>>>>> but i think that enabling caching and enabling logging are
>>>> things
>>>>>>>>>> that
>>>>>>>>>>>>>> aren't actually config. I'd probably not add
>>>> withLogConfig(...)
>>>>>>>>> (even
>>>>>>>>>>>>>> though it is below) as this is actually config and we already
>>>>> have
>>>>>>>>> a
>>>>>>>>>>> way
>>>>>>>>>>>>> of
>>>>>>>>>>>>>> doing that, via the StateStoreSupplier. Arguably we could use
>>>> the
>>>>>>>>>>>>>> StateStoreSupplier for disabling caching etc, but as it
>>>>>>>>>>>>>> stands
>>>>> that
>>>>>>>>>> is
>>>>>>>>>>> a
>>>>>>>>>>>>>> bit of a tedious process for someone that just wants to use
>>>> the
>>>>>>>>>> default
>>>>>>>>>>>>>> storage engine, but not have caching enabled.
>>>>>>>>>>>>>> There is also an orthogonal concern that Guozhang alluded
>>>> to....
>>>>> If
>>>>>>>>>> you
>>>>>>>>>>>>>> want to plug in a custom storage engine and you want it to be
>>>>>>>>> logged
>>>>>>>>>>> etc,
>>>>>>>>>>>>>> you would currently need to implement that yourself. Ideally
>>>> we
>>>>> can
>>>>>>>>>>>>> provide
>>>>>>>>>>>>>> a way where we will wrap the custom store with logging,
>>>> metrics,
>>>>>>>>>> etc. I
>>>>>>>>>>>>>> need to think about where this fits, it is probably more
>>>>>>>>> appropriate
>>>>>>>>>> on
>>>>>>>>>>>>> the
>>>>>>>>>>>>>> Stores API.
>>>>>>>>>>>>>> final KeyValueMapper<String, String, Long> keyMapper = null;
>>>>>>>>>>>>>> // count with mapped key
>>>>>>>>>>>>>> final KTable<Long, Long> count = stream.grouped()
>>>>>>>>>>>>>>           .withKeyMapper(keyMapper)
>>>>>>>>>>>>>>           .withKeySerde(Serdes.Long())
>>>>>>>>>>>>>>           .withValueSerde(Serdes.String())
>>>>>>>>>>>>>>           .withQueryableName("my-store")
>>>>>>>>>>>>>>           .count();
>>>>>>>>>>>>>> // windowed count
>>>>>>>>>>>>>> final KTable<Windowed<String>, Long> windowedCount =
>>>>>>>>> stream.grouped()
>>>>>>>>>>>>>>           .withQueryableName("my-window-store")
>>>>>>>>>>>>>>           .windowed(TimeWindows.of(10L).until(10))
>>>>>>>>>>>>>>           .count();
>>>>>>>>>>>>>> // windowed reduce
>>>>>>>>>>>>>> final Reducer<String> windowedReducer = null;
>>>>>>>>>>>>>> final KTable<Windowed<String>, String> windowedReduce =
>>>>>>>>>>> stream.grouped()
>>>>>>>>>>>>>>           .withQueryableName("my-window-store")
>>>>>>>>>>>>>>           .windowed(TimeWindows.of(10L).until(10))
>>>>>>>>>>>>>>           .reduce(windowedReducer);
>>>>>>>>>>>>>> final Aggregator<String, String, Long> aggregator = null;
>>>>>>>>>>>>>> final Initializer<Long> init = null;
>>>>>>>>>>>>>> // aggregate
>>>>>>>>>>>>>> final KTable<String, Long> aggregate = stream.grouped()
>>>>>>>>>>>>>>           .withQueryableName("my-aggregate-store")
>>>>>>>>>>>>>>           .aggregate(aggregator, init, Serdes.Long());
>>>>>>>>>>>>>> final StateStoreSupplier<KeyValueStore<String, Long>>
>>>>>>>>>>> stateStoreSupplier
>>>>>>>>>>>>> = null;
>>>>>>>>>>>>>> // aggregate with custom store
>>>>>>>>>>>>>> final KTable<String, Long> aggWithCustomStore =
>>>> stream.grouped()
>>>>>>>>>>>>>>           .withStateStoreSupplier(stateStoreSupplier)
>>>>>>>>>>>>>>           .aggregate(aggregator, init);
>>>>>>>>>>>>>> // disable caching
>>>>>>>>>>>>>> stream.grouped()
>>>>>>>>>>>>>>           .withQueryableName("name")
>>>>>>>>>>>>>>           .withCachingEnabled(false)
>>>>>>>>>>>>>>           .count();
>>>>>>>>>>>>>> // disable logging
>>>>>>>>>>>>>> stream.grouped()
>>>>>>>>>>>>>>           .withQueryableName("q")
>>>>>>>>>>>>>>           .withLoggingEnabled(false)
>>>>>>>>>>>>>>           .count();
>>>>>>>>>>>>>> // override log config
>>>>>>>>>>>>>> final Reducer<String> reducer = null;
>>>>>>>>>>>>>> stream.grouped()
>>>>>>>>>>>>>>           .withLogConfig(Collections.sin
>>>> gletonMap("segment.size",
>>>>>>>>>> "10"))
>>>>>>>>>>>>>>           .reduce(reducer);
>>>>>>>>>>>>>> If anyone wants to play around with this you can find the
>>>>>>>>>>>>>> code
>>>>>>>>> here:
>>>>>>>>>>>>>> Note: It won't actually work as most of the methods just
>>>> return
>>>>>>>>> null.
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>> On Thu, 22 Jun 2017 at 11:18 Ismael Juma <>
>>>>>>>>> wrote:
>>>>>>>>>>>>>>> Thanks Damian. I think both options have pros and cons. And
>>>> both
>>>>>>>>> are
>>>>>>>>>>>>> better
>>>>>>>>>>>>>>> than overload abuse.
>>>>>>>>>>>>>>> The fluent API approach reads better, no mention of builder
>>>> or
>>>>>>>>> build
>>>>>>>>>>>>>>> anywhere. The main downside is that the method signatures
>>>> are a
>>>>>>>>>> little
>>>>>>>>>>>>> less
>>>>>>>>>>>>>>> clear. By reading the method signature, one doesn't
>>>> necessarily
>>>>>>>>>> knows
>>>>>>>>>>>>> what
>>>>>>>>>>>>>>> it returns. Also, one needs to figure out the special method
>>>>>>>>>>> (`table()`
>>>>>>>>>>>>> in
>>>>>>>>>>>>>>> this case) that gives you what you actually care about
>>>> (`KTable`
>>>>>>>>> in
>>>>>>>>>>> this
>>>>>>>>>>>>>>> case). Not major issues, but worth mentioning while doing
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>> comparison.
>>>>>>>>>>>>>>> The builder approach avoids the issues mentioned above, but
>>>> it
>>>>>>>>>> doesn't
>>>>>>>>>>>>> read
>>>>>>>>>>>>>>> as well.
>>>>>>>>>>>>>>> Ismael
>>>>>>>>>>>>>>> On Wed, Jun 21, 2017 at 3:37 PM, Damian Guy <
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>> I'd like to get a discussion going around some of the API
>>>>> choices
>>>>>>>>>>> we've
>>>>>>>>>>>>>>>> made in the DLS. In particular those that relate to
>>>>>>>>>>>>>>>> stateful
>>>>>>>>>>> operations
>>>>>>>>>>>>>>>> (though this could expand).
>>>>>>>>>>>>>>>> As it stands we lean heavily on overloaded methods in the
>>>> API,
>>>>>>>>> i.e,
>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>> are 9 overloads for KGroupedStream.count(..)! It is
>>>>>>>>>>>>>>>> becoming
>>>>>>>>> noisy
>>>>>>>>>>> and
>>>>>>>>>>>>> i
>>>>>>>>>>>>>>>> feel it is only going to get worse as we add more optional
>>>>>>>>> params.
>>>>>>>>>> In
>>>>>>>>>>>>>>>> particular we've had some requests to be able to turn
>>>> caching
>>>>>>>>> off,
>>>>>>>>>> or
>>>>>>>>>>>>>>>> change log configs,  on a per operator basis (note this can
>>>> be
>>>>>>>>> done
>>>>>>>>>>> now
>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>> you pass in a StateStoreSupplier, but this can be a bit
>>>>>>>>>> cumbersome).
>>>>>>>>>>>>>>>> So this is a bit of an open question. How can we change the
>>>> DSL
>>>>>>>>>>>>> overloads
>>>>>>>>>>>>>>>> so that it flows, is simple to use and understand, and is
>>>>> easily
>>>>>>>>>>>>> extended
>>>>>>>>>>>>>>>> in the future?
>>>>>>>>>>>>>>>> One option would be to use a fluent API approach for
>>>> providing
>>>>>>>>> the
>>>>>>>>>>>>>>> optional
>>>>>>>>>>>>>>>> params, so something like this:
>>>>>>>>>>>>>>>> groupedStream.count()
>>>>>>>>>>>>>>>>      .withStoreName("name")
>>>>>>>>>>>>>>>>      .withCachingEnabled(false)
>>>>>>>>>>>>>>>>      .withLoggingEnabled(config)
>>>>>>>>>>>>>>>>      .table()
>>>>>>>>>>>>>>>> Another option would be to provide a Builder to the count
>>>>> method,
>>>>>>>>>> so
>>>>>>>>>>> it
>>>>>>>>>>>>>>>> would look something like this:
>>>>>>>>>>>>>>>> groupedStream.count(new
>>>>>>>>>>>>>>>> CountBuilder("storeName").with
>>>> CachingEnabled(false).build())
>>>>>>>>>>>>>>>> Another option is to say: Hey we don't need this, what are
>>>> you
>>>>> on
>>>>>>>>>>>>> about!
>>>>>>>>>>>>>>>> The above has focussed on state store related overloads,
>>>>>>>>>>>>>>>> but
>>>>> the
>>>>>>>>>> same
>>>>>>>>>>>>>>> ideas
>>>>>>>>>>>>>>>> could  be applied to joins etc, where we presently have
>>>>>>>>>>>>>>>> many
>>>>> join
>>>>>>>>>>>>> methods
>>>>>>>>>>>>>>>> and many overloads.
>>>>>>>>>>>>>>>> Anyway, i look forward to hearing your opinions.
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Damian
>>>>>>>>>> -- 
>>>>>>>>>> -- Guozhang
>>> -- 
>>> -- Guozhang

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to