Note that while I agree with the initial proposal (withKeySerdes, withJoinType, 
etc), I don't agree with things like .materialize(), .enableCaching(), 
.enableLogging(). 

The former maintain the declarative DSL, while the later break the declarative 
part by mixing system decisions in the DSL.  I think there is a difference 
between the two proposals.

Eno

> On 22 Jun 2017, at 03:46, Guozhang Wang <wangg...@gmail.com> wrote:
> 
> I have been thinking about reducing all these overloaded functions for
> stateful operations (there are some other places that introduces overloaded
> functions but let's focus on these only in this discussion), what I used to
> have is to use some "materialize" function on the KTables, like:
> 
> ---------------------------------------
> 
> // specifying the topology
> 
> KStream stream1 = builder.stream();
> KTable table1 = stream1.groupby(...).aggregate(initializer, aggregator,
> sessionMerger, sessionWindows);  // do not allow to pass-in a state store
> supplier here any more
> 
> // additional specs along with the topology above
> 
> table1.materialize("queryableStoreName"); // or..
> table1.materialize("queryableStoreName").enableCaching().enableLogging();
> // or..
> table1.materialize(stateStoreSupplier); // add the metrics / logging /
> caching / windowing functionalities on top of the store, or..
> table1.materialize(stateStoreSupplier).enableCaching().enableLogging(); //
> etc..
> 
> ---------------------------------------
> 
> But thinking about it more, I feel Damian's first proposal is better since
> my proposal would likely to break the concatenation (e.g. we may not be
> able to do sth. like "table1.filter().map().groupBy().aggregate()" if we
> want to use different specs for the intermediate filtered KTable).
> 
> 
> But since this is a incompatibility change, and we are going to remove the
> compatibility annotations soon it means we only have one chance and we
> really have to make it right. So I'd call out for anyone try to rewrite
> your examples / demo code with the proposed new API and see if it feel
> natural, for example, if I want to use a different storage engine than the
> default rockDB engine how could I easily specify that with the proposed
> APIs?
> 
> Meanwhile Damian could you provide a formal set of APIs for people to
> exercise on them? Also could you briefly describe how custom storage
> engines could be swapped in with the above APIs?
> 
> 
> 
> Guozhang
> 
> 
> On Wed, Jun 21, 2017 at 9:08 AM, Eno Thereska <eno.there...@gmail.com>
> wrote:
> 
>> To make it clear, it’s outlined by Damian, I just copy pasted what he told
>> me in person :)
>> 
>> Eno
>> 
>>> On Jun 21, 2017, at 4:40 PM, Bill Bejeck <bbej...@gmail.com> wrote:
>>> 
>>> +1 for the approach outlined above by Eno.
>>> 
>>> On Wed, Jun 21, 2017 at 11:28 AM, Damian Guy <damian....@gmail.com>
>> wrote:
>>> 
>>>> Thanks Eno.
>>>> 
>>>> Yes i agree. We could apply this same approach to most of the operations
>>>> where we have multiple overloads, i.e., we have a single method for each
>>>> operation that takes the required parameters and everything else is
>>>> specified as you have done above.
>>>> 
>>>> On Wed, 21 Jun 2017 at 16:24 Eno Thereska <eno.there...@gmail.com>
>> wrote:
>>>> 
>>>>> (cc’ing user-list too)
>>>>> 
>>>>> Given that we already have StateStoreSuppliers that are configurable
>>>> using
>>>>> the fluent-like API, probably it’s worth discussing the other examples
>>>> with
>>>>> joins and serdes first since those have many overloads and are in need
>> of
>>>>> some TLC.
>>>>> 
>>>>> So following your example, I guess you’d have something like:
>>>>> .join()
>>>>>  .withKeySerdes(…)
>>>>>  .withValueSerdes(…)
>>>>>  .withJoinType(“outer”)
>>>>> 
>>>>> etc?
>>>>> 
>>>>> I like the approach since it still remains declarative and it’d reduce
>>>> the
>>>>> number of overloads by quite a bit.
>>>>> 
>>>>> Eno
>>>>> 
>>>>>> On Jun 21, 2017, at 3:37 PM, Damian Guy <damian....@gmail.com> wrote:
>>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> I'd like to get a discussion going around some of the API choices
>> we've
>>>>>> made in the DLS. In particular those that relate to stateful
>> operations
>>>>>> (though this could expand).
>>>>>> As it stands we lean heavily on overloaded methods in the API, i.e,
>>>> there
>>>>>> are 9 overloads for KGroupedStream.count(..)! It is becoming noisy and
>>>> i
>>>>>> feel it is only going to get worse as we add more optional params. In
>>>>>> particular we've had some requests to be able to turn caching off, or
>>>>>> change log configs,  on a per operator basis (note this can be done
>> now
>>>>> if
>>>>>> you pass in a StateStoreSupplier, but this can be a bit cumbersome).
>>>>>> 
>>>>>> So this is a bit of an open question. How can we change the DSL
>>>> overloads
>>>>>> so that it flows, is simple to use and understand, and is easily
>>>> extended
>>>>>> in the future?
>>>>>> 
>>>>>> One option would be to use a fluent API approach for providing the
>>>>> optional
>>>>>> params, so something like this:
>>>>>> 
>>>>>> groupedStream.count()
>>>>>> .withStoreName("name")
>>>>>> .withCachingEnabled(false)
>>>>>> .withLoggingEnabled(config)
>>>>>> .table()
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> Another option would be to provide a Builder to the count method, so
>> it
>>>>>> would look something like this:
>>>>>> groupedStream.count(new
>>>>>> CountBuilder("storeName").withCachingEnabled(false).build())
>>>>>> 
>>>>>> Another option is to say: Hey we don't need this, what are you on
>>>> about!
>>>>>> 
>>>>>> The above has focussed on state store related overloads, but the same
>>>>> ideas
>>>>>> could  be applied to joins etc, where we presently have many join
>>>> methods
>>>>>> and many overloads.
>>>>>> 
>>>>>> Anyway, i look forward to hearing your opinions.
>>>>>> 
>>>>>> Thanks,
>>>>>> Damian
>>>>> 
>>>>> 
>>>> 
>> 
>> 
> 
> 
> -- 
> -- Guozhang

Reply via email to