Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2020-05-22 Thread Ivan Ponomarev

Hello John,


1.
-

> Perhaps it would be better to stick with "as" for now
> and just file a Jira to switch them all at the same time [for 
compatibility with Kotlin]


Fully agree! BTW it's really not a big problem: in Kotlin they have a 
standard workaround 
(https://kotlinlang.org/docs/reference/java-interop.html#escaping-for-java-identifiers-that-are-keywords-in-kotlin). 
So actually this should be a very low priority issue, if an issue at all.


> I don't understand how your new proposed
> methods would work any differently than the ones you already
> had proposed in the KIP. It seems like you'd still have to provide
> the generic type parameters on the first static factory call. Can you
> explain how your new interface proposal differs from the existing KIP?

In the KIP, I didn't clarify what methods should be static. Now I 
propose the following methods:


non-static: withChain(Function), withName(String).

static: as(String), with(Function), with(Function, String).

The overloaded `with` version that provides both Function and name can 
be used without causing type inference problem!!


2.


> Regarding making the K,V types covariant also, yes, that would indeed
> be nice, but I'm not sure it will actually work.

What I'm keeping in mind is the following example: imagine

static KStream func(KStream s) {
return s.mapValues(n -> (Integer) n + 1);
}

BranchedKStream b =
s.split().branch((k, v) -> isInteger(v),
   //Won't compile!!
   Branched.with(Me::func));

The simple workaround here is to change `func`'s return type from 
KStream<...Integer> to KStream<...Number>.


[On the other hand, we already agreed to remove `withJavaConsumer` from 
`Branched`, so during code migration I will have to modify my functions' 
return types anyway -- I mean, from `void` to `KStream`!! ]


>  the map you're returning is Map, and of course a K is not the 
same as "? extends K", so it doesn't seem compatible.


I think what you actually meant here is that KStreamextends V> is not fit as a value for Map>. This 
particularly is not a problem, since KStream 
can be safely explicitly cast to KStream, and be put to the map.


BUT, I do really afraid of pitfalls of nested wildcard types. So maybe 
for now it's better to just admit that API is not absolutely perfect and 
accept it as is, that is


Function, ? extends KStream>

Regards,

Ivan


21.05.2020 17:59, John Roesler пишет:

Hello Ivan,

Thanks for the refinement. Actually, I did not know that "as" would
clash with a Kotlin operator. Maybe we should depart from convention
and just avoid methods named "as" in the future.

The convention is that "as(String name)" is used for the static factory
method, whereas "withName(String name)" is an instance method
inherited from NamedOperation. If you wish to propose to avoid "as"
for compatibility with Kotlin, I might suggest "fromName(String name)",
although it's somewhat dubious, since all the other configuration
classes use "as". Perhaps it would be better to stick with "as" for now
and just file a Jira to switch them all at the same time.

Re. 3:
Regarding the type inference problem, yes, it's a blemish on all of our
configuraion objects. The problem is that Java infers the type
based on the _first_ method in the chain. While it does consider what
the recipient of the method result wants, it only considers the _next_
recipient.

Thus, if you call as("foo") and immediately assign it to a
Branched variable, java infers the type correctly. But
when the "next recipient" is a chained method call, like "withChain",
then the chained method doesn't bound the type (by definition,
withChain is defined on Branched, so Java will take
the broadest possible inferece and bind the type to
Branched, at which point, it can't be revised anymore.

As a user of Java, this is exceedingly annoying, since it doesn't seem
that hard to recursively consider the entire context when inferring the
generic type parameters, but this is what we have to work with.

To be honest, though, I don't understand how your new proposed
methods would work any differently than the ones you already
had proposed in the KIP. It seems like you'd still have to provide
the generic type parameters on the first static factory call. Can you
explain how your new interface proposal differs from the existing KIP?

Re. 4:
Regarding making the K,V types covariant also, yes, that would indeed
be nice, but I'm not sure it will actually work. You might want to give it a
try. In the past, we've run into soe truly strange interactions between the
Java type inferencer and lambdas (and/or anonymous inner classes) in
combination with nested covariant types.

Another issue is that the value type of the map you're returning is
Map, and of course a K is not the same as "? extends K", so it
doesn't seem compatible.

Thanks again,
-John

On Thu, May 21, 2020, at 04:20, Ivan Ponomarev wr

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2020-05-21 Thread Ivan Ponomarev

Hi,

Thanks Matthias for your suggestion: yes, I agree that getting rid of 
`with[Java]Consumer` makes this thing 'as simple as possible, but not 
simpler'.


I made some quick API mocking in my IDE and tried to implement examples 
from KIP.


1. Having to return something from lambda is not a very big deal.

2. For a moment I thouht that I won't be able to use method references 
for already written stream consumers, but then I realized that I can 
just change my methods from returning void to returning the input 
parameter and use references to them. Not very convenient, but passable.


So, I'm ready to agree: 1) we use only functions, no consumer 2) when 
function returns null, we don't insert it into the resulting map.


Usually it's better to implement a non-perfect, but workable solution as 
a first approximation. And later we can always add to `Branched` 
anything we want.


3. Do we have any guidelines on how parameter classes like Branched 
should be built? First of all, it seems that `as` now is more preferred 
than `withName` (although as you probably know it clashes with Kotlin's 
`as` operator).


Then, while trying to mock the APIs, I found out that my Java cannot 
infer types in the following construction:


.branch((key, value) -> value == null,
   Branched.as("foo").withChain(s -> s.mapValues(...)))


so I have to write

.branch((key, value) -> value == null,
   Branched.as("foo").withChain(s -> s.mapValues(...)))


This is not tolerable IMO, so this is the list of `Branched` methods 
that I came to (will you please validate it):


static  Branched as(String name);

static  Branched with(Function, ? 
extends KStream> chain);


static  Branched with(Function, ? 
extends KStream> chain, String name);


//non-static!
Branched withChain(Function, ? extends 
KStream> chain);



4. And one more. What do you think, do we need that flexibility:

Function, ? extends KStream> chain

vs.

Function, ? extends KStreamextends K, ? extends V>> chain


??

Regards,

Ivan


21.05.2020 6:54, John Roesler пишет:

Thanks for this thought, Matthias,

Your idea has a few aspects I find attractive:
1. There’s no ambiguity at all about what will be in the map, because there’s 
only one thing that could be there, which is whatever is returned from the 
chain function.
2. We keep the API smaller. Thanks to the extensible way this KIP is designed, 
it would be trivially easy to add the “terminal” chain later. As you say, fewer 
concepts leads to an API that is easier to learn.
3. We get to side-step the naming of this method. Although I didn’t complain 
about withJavaConsumer, it was only because I couldn’t think of a better name. 
Still, it’s somewhat unsatisfying to name a method after its argument type, 
since this provides no information at all about what the method does. I was 
willing to accept it because I didn’t have an alternative, but I would be happy 
to skip this method for now to avoid the problem until we have more inspiration.

The only con I see is that it makes the code a little less ergonomic to write 
when you don’t want to return the result of the chain (such as when the chain 
is terminal), since I’m your example, you have to declare a block with a return 
statement at the end. It’s not ideal, but it doesn’t seem too bad to me.

Lastly, on the null question, I’d be fine with allowing a null result, which 
would just remove the branch from the returned map. It seems nicer than forcing 
people to pick a stream to return when their chain is terminal and they don’t 
want to use the result later.

Thanks again for sharing the idea,
John

On Wed, May 20, 2020, at 18:17, Matthias J. Sax wrote:

Thanks for updating the KIP!

I guess the only open question is about `Branched.withJavaConsumer` and
its relationship to the returned `Map`.

Originally, we discussed two main patterns:

  (1) split a stream and return the substreams for futher processing
  (2) split a stream and modify the substreams with in-place method chaining

To combine both patterns we wanted to allow for

   -> split a stream, modify the substreams, and return the _modified_
substreams for further processing


But is it also an issue? With Kafka Streams, we can split the topology graph at 
any point. Technically, it's OK to do both: feed the KStream to a 
[Java]Consumer AND save it in resulting Map. If one doesn't need the stream in 
the Map, one simply does not extract it from there


That is of course possible. However, it introduces some "hidded" semantics:

  - using `withChain` I get the modified sub-stream
  - using `withJavaConsumer` I get the unmodifed sub-stream

This seems to be quite subtle to me.



 From my understanding the original idea of `withJavaConsumer` was to
model a terminal operation, ie, it should be similar to:

Branched.withChain(s -> {
   s.to();
   return null;
})

However, I am not sure if we should even allow `withChain()` to return
`null`? IMHO, we should throw an exception for this case to avoid a `key
->

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2020-05-15 Thread Ivan Ponomarev

Hello, John, hello Matthias!

Thank you very much for your detailed feedback!

-

John,

> It looks like you missed my reply on Apr 23rd.

For some unknown reason it didn't reach my inbox, fortunately we have 
all the emails on the web.


> 1. Can you propose to deprecate (but not remove) the existing 
‘branch’ method?


Done, in "Compatibility, Deprecation, and Migration Plan" section.

> 2. [Explain why 'branch' operator is superior to branching directly 
off of the parent KStream for the needs of dynamic branching]


Done, see an ugly counterexample in 'Dynamic Branching' section.

> 3. [`withConsumer` is confusing with Kafka Consumer... maybe `withSink`?]

As Mathhias noted, `withSink` can also be confusing. I renamed this 
method to `withJavaConsumer` per Matthias' suggestion.


> 4. ...It seems like there are two disjoint use cases: EITHER using 
chain and the result map OR using just the sink


This is discussed below.

--

Mathhias,

> 1. [We should rename `KBranchedStream` -> `BranchedKStream`]

Done.

> 2. [Ambiguous phrase about 'parameterless' version of the `branch` 
method]


Fixed.


> 3. Overview of newly added methods/interfaces

Done in `Proposed Changes` section.


> 4. [Concerning John's note] > I don't think that using both 
`withChain()` and `withConsumer()` is the
issue, as the KIP clearly states that the result of `withChain()` will 
be given to the `Consumer`.


Yes, I agree!

> The issue is really with the `Consumer` and the returned `Map` of 
`defautBranch()` and `noDefaultBranch()`. Maybe a reasonable 
implementation would be to not add the "branch" to the result map if 
`withConsumer` is used?


But is it also an issue? With Kafka Streams, we can split the topology 
graph at any point. Technically, it's OK to do both: feed the KStream to 
a [Java]Consumer AND save it in resulting Map. If one doesn't need the 
stream in the Map, one simply does not extract it from there :-)


In the current version of KIP it is assumed that the returned map 
contains ALL the branches, either tagged with IDs explicitly set by the 
programmer, or with some default auto-generated ids. Dealing with this 
map is the user's responsibility.


What seems to me to be an issue is introducing exclusions to this 
general rule, like 'swallowing' some streams by provided 
[Java]Consumers. This can make things complicated. What if a user 
provides both the name of the branch and a [Java]Consumer? What do they 
mean in this case? Should we 'swallow' the stream or save it to the map? 
There's no point in 'saving the space' in this map, so maybe just leave 
it as it is?




I rewrote the KIP and also fixed a couple of typos.

Looking forward for your feedback again!

Regards,

Ivan.



08.05.2020 22:55, Matthias J. Sax пишет:

Thanks for updating the KIP!

I also have some minor comment:



(1) We should rename `KBranchedStream` -> `BranchedKStream`

(Most classed follow this naming pattern now, eg, CoGroupedKStream,
TimeWindowedKStream etc -- there is just the "legacy" `KGroupedStream`
and `KGroupedKTable` that we cannot rename without a breaking change...
so we just keep them.)



(2) Quote:


Both branch and defaultBranch operations also have overloaded parameterless 
alternatives.


I think `branch()` always needs to take a `Predicate` and assume you
meant that `Branched` is optional. Can you maybe rephrase it accordingly
as `branch()` would not be "parameterless".



(3) Can you maybe add an overview in the "Public Interface" section) of
newly added and deprecated methods/classes (cf.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup#KIP-150-Kafka-StreamsCogroup-PublicInterfaces)



(4) What is unclear from the KIP is the interaction of `withConsumer()`
and the finally returned `Map`. This related to John's
4th comment:


It seems like there are really two disjoint use cases: EITHER using chain and 
the result map OR using just the sink.


I don't think that using both `withChain()` and `withConsumer()` is the
issue though, as the KIP clearly states that the result of `withChain()`
will be given to the `Consumer`. The issue is really with the `Consumer`
and the returned `Map` of `defautBranch()` and `noDefaultBranch()`.

Maybe a reasonable implementation would be to not add the "branch" to
the result map if `withConsumer` is used? As long as we clearly document
it in the JavaDocs, this might be fine?



(5) Reply to John's comments:


3. Reading the KIP, I found ‘withConsumer’ confusing; I thought you were 
talking about the kafka Consumer interface (which doesn’t make sense, of 
course). I get that you were referring to the java Consumer interface, but we 
should still probably to to avoid the ambiguity. Just throwing out a 
suggestion, how about ‘withSink’?


IMHO, `withSink` has the issue that it might be confused with a "sink
node", ie., writing the KStream to a topic.

Maybe `withJava

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2020-05-08 Thread Matthias J. Sax
Thanks for updating the KIP!

I also have some minor comment:



(1) We should rename `KBranchedStream` -> `BranchedKStream`

(Most classed follow this naming pattern now, eg, CoGroupedKStream,
TimeWindowedKStream etc -- there is just the "legacy" `KGroupedStream`
and `KGroupedKTable` that we cannot rename without a breaking change...
so we just keep them.)



(2) Quote:

> Both branch and defaultBranch operations also have overloaded parameterless 
> alternatives.

I think `branch()` always needs to take a `Predicate` and assume you
meant that `Branched` is optional. Can you maybe rephrase it accordingly
as `branch()` would not be "parameterless".



(3) Can you maybe add an overview in the "Public Interface" section) of
newly added and deprecated methods/classes (cf.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup#KIP-150-Kafka-StreamsCogroup-PublicInterfaces)



(4) What is unclear from the KIP is the interaction of `withConsumer()`
and the finally returned `Map`. This related to John's
4th comment:

> It seems like there are really two disjoint use cases: EITHER using chain and 
> the result map OR using just the sink.

I don't think that using both `withChain()` and `withConsumer()` is the
issue though, as the KIP clearly states that the result of `withChain()`
will be given to the `Consumer`. The issue is really with the `Consumer`
and the returned `Map` of `defautBranch()` and `noDefaultBranch()`.

Maybe a reasonable implementation would be to not add the "branch" to
the result map if `withConsumer` is used? As long as we clearly document
it in the JavaDocs, this might be fine?



(5) Reply to John's comments:

> 3. Reading the KIP, I found ‘withConsumer’ confusing; I thought you were 
> talking about the kafka Consumer interface (which doesn’t make sense, of 
> course). I get that you were referring to the java Consumer interface, but we 
> should still probably to to avoid the ambiguity. Just throwing out a 
> suggestion, how about ‘withSink’?

IMHO, `withSink` has the issue that it might be confused with a "sink
node", ie., writing the KStream to a topic.

Maybe `withJavaConsumer` would make it less ambiguous?




-Matthias




On 5/8/20 7:13 AM, John Roesler wrote:
> Hi Ivan,
> 
> It looks like you missed my reply on Apr 23rd. I think it’s close, but I had 
> a few last comments. 
> 
> Thanks,
> John
> 
> On Sun, May 3, 2020, at 15:39, Ivan Ponomarev wrote:
>> Hello everyone,
>>
>> will someone please take a look at the reworked KIP?
>>
>> I believe that now it follows design principles and takes into account 
>> all the arguments discussed here.
>>
>>
>> Regards,
>>
>> Ivan
>>
>>
>> 23.04.2020 2:45, Ivan Ponomarev пишет:
>>> Hi,
>>>
>>> I have read the John's "DSL design principles" and have completely 
>>> rewritten the KIP, see 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
>>>  
>>>
>>>
>>>
>>> This version includes all the previous discussion results and follows 
>>> the design principles, with one exception.
>>>
>>> The exception is
>>>
>>> branch(Predicate predicate, Branched branched)
>>>
>>> which formally violates 'no more than one parameter' rule, but I think 
>>> here it is justified.
>>>
>>> We must provide a predicate for a branch and don't need to provide one 
>>> for the default branch. Thus for both operations we may use a single 
>>> Branched parameter class, with an extra method parameter for `branch`.
>>>
>>> Since predicate is a natural, necessary part of a branch, no 
>>> 'proliferation of overloads, deprecations, etc.' is expected here as it 
>>> is said in the rationale for the 'single parameter rule'.
>>>
>>> WDYT, is this KIP mature enough to begin voting?
>>>
>>> Regards,
>>>
>>> Ivan
>>>
>>> 21.04.2020 2:09, Matthias J. Sax пишет:
 Ivan,

 no worries about getting side tracked. Glad to have you back!

 The DSL improved further in the meantime and we already have a `Named`
 config object to name operators. It seems reasonable to me to build on 
 this.

 Furthermore, John did a writeup about "DSL design principles" that we
 want to follow:
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
  

 -- might be worth to checkout.


 -Matthias


 On 4/17/20 4:30 PM, Ivan Ponomarev wrote:
> Hi everyone!
>
> Let me revive the discussion of this KIP.
>
> I'm very sorry for stopping my participation in the discussion in June
> 2019. My project work was very intensive then and it didn't leave me
> spare time. But I think I must finish this, because we invested
> substantial effort into this discussion and I'm not feel entitled to
> propose other things before this one is finalized.
>
> During these months I proceeded with writing and reviewing Kafka
> Streams-related code. Every time I needed branching, Spring-Kafka's
> KafkaStreamBra

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2020-05-08 Thread John Roesler
Hi Ivan,

It looks like you missed my reply on Apr 23rd. I think it’s close, but I had a 
few last comments. 

Thanks,
John

On Sun, May 3, 2020, at 15:39, Ivan Ponomarev wrote:
> Hello everyone,
> 
> will someone please take a look at the reworked KIP?
> 
> I believe that now it follows design principles and takes into account 
> all the arguments discussed here.
> 
> 
> Regards,
> 
> Ivan
> 
> 
> 23.04.2020 2:45, Ivan Ponomarev пишет:
> > Hi,
> > 
> > I have read the John's "DSL design principles" and have completely 
> > rewritten the KIP, see 
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
> >  
> > 
> > 
> > 
> > This version includes all the previous discussion results and follows 
> > the design principles, with one exception.
> > 
> > The exception is
> > 
> > branch(Predicate predicate, Branched branched)
> > 
> > which formally violates 'no more than one parameter' rule, but I think 
> > here it is justified.
> > 
> > We must provide a predicate for a branch and don't need to provide one 
> > for the default branch. Thus for both operations we may use a single 
> > Branched parameter class, with an extra method parameter for `branch`.
> > 
> > Since predicate is a natural, necessary part of a branch, no 
> > 'proliferation of overloads, deprecations, etc.' is expected here as it 
> > is said in the rationale for the 'single parameter rule'.
> > 
> > WDYT, is this KIP mature enough to begin voting?
> > 
> > Regards,
> > 
> > Ivan
> > 
> > 21.04.2020 2:09, Matthias J. Sax пишет:
> >> Ivan,
> >>
> >> no worries about getting side tracked. Glad to have you back!
> >>
> >> The DSL improved further in the meantime and we already have a `Named`
> >> config object to name operators. It seems reasonable to me to build on 
> >> this.
> >>
> >> Furthermore, John did a writeup about "DSL design principles" that we
> >> want to follow:
> >> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
> >>  
> >>
> >> -- might be worth to checkout.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 4/17/20 4:30 PM, Ivan Ponomarev wrote:
> >>> Hi everyone!
> >>>
> >>> Let me revive the discussion of this KIP.
> >>>
> >>> I'm very sorry for stopping my participation in the discussion in June
> >>> 2019. My project work was very intensive then and it didn't leave me
> >>> spare time. But I think I must finish this, because we invested
> >>> substantial effort into this discussion and I'm not feel entitled to
> >>> propose other things before this one is finalized.
> >>>
> >>> During these months I proceeded with writing and reviewing Kafka
> >>> Streams-related code. Every time I needed branching, Spring-Kafka's
> >>> KafkaStreamBrancher class of my invention (the original idea for this
> >>> KIP) worked for me -- that's another reason why I gave up pushing the
> >>> KIP forward. When I was coming across the problem with the scope of
> >>> branches, I worked around it this way:
> >>>
> >>> AtomicReference> result = new AtomicReference<>();
> >>> new KafkaStreamBrancher<>()
> >>>  .branch()
> >>>  .defaultBranch(result::set)
> >>>  .onTopOf(someStream);
> >>> result.get()...
> >>>
> >>>
> >>> And yes, of course I don't feel very happy with this approach.
> >>>
> >>> I think that Matthias came up with a bright solution in his post from
> >>> May, 24th 2019. Let me quote it:
> >>>
> >>> KStream#split() -> KBranchedStream
> >>> // branch is not easily accessible in current scope
> >>> KBranchedStream#branch(Predicate, Consumer)
> >>>    -> KBranchedStream
> >>> // assign a name to the branch and
> >>> // return the sub-stream to the current scope later
> >>> //
> >>> // can be simple as `#branch(p, s->s, "name")`
> >>> // or also complex as `#branch(p, s->s.filter(...), "name")`
> >>> KBranchedStream#branch(Predicate, Function, String)
> >>>    -> KBranchedStream
> >>> // default branch is not easily accessible
> >>> // return map of all named sub-stream into current scope
> >>> KBranchedStream#default(Cosumer)
> >>>    -> Map
> >>> // assign custom name to default-branch
> >>> // return map of all named sub-stream into current scope
> >>> KBranchedStream#default(Function, String)
> >>>    -> Map
> >>> // assign a default name for default
> >>> // return map of all named sub-stream into current scope
> >>> KBranchedStream#defaultBranch(Function)
> >>>    -> Map
> >>> // return map of all names sub-stream into current scope
> >>> KBranchedStream#noDefaultBranch()
> >>>    -> Map
> >>>
> >>> I believe this would satisfy everyone. Optional names seems to be a good
> >>> idea: when you don't need to have the branches in the same scope, you
> >>> just don't use names and you don't risk making your code brittle. Or,
> >>> you might want to add names just for debugging purposes. Or, finally,
> >>> you might use the returned Map to have the named branches in the
> >>> original scope.
> >>>
> >>> There also was an input from John Roesler on J

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2020-05-03 Thread Ivan Ponomarev

Hello everyone,

will someone please take a look at the reworked KIP?

I believe that now it follows design principles and takes into account 
all the arguments discussed here.



Regards,

Ivan


23.04.2020 2:45, Ivan Ponomarev пишет:

Hi,

I have read the John's "DSL design principles" and have completely 
rewritten the KIP, see 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream 




This version includes all the previous discussion results and follows 
the design principles, with one exception.


The exception is

branch(Predicate predicate, Branched branched)

which formally violates 'no more than one parameter' rule, but I think 
here it is justified.


We must provide a predicate for a branch and don't need to provide one 
for the default branch. Thus for both operations we may use a single 
Branched parameter class, with an extra method parameter for `branch`.


Since predicate is a natural, necessary part of a branch, no 
'proliferation of overloads, deprecations, etc.' is expected here as it 
is said in the rationale for the 'single parameter rule'.


WDYT, is this KIP mature enough to begin voting?

Regards,

Ivan

21.04.2020 2:09, Matthias J. Sax пишет:

Ivan,

no worries about getting side tracked. Glad to have you back!

The DSL improved further in the meantime and we already have a `Named`
config object to name operators. It seems reasonable to me to build on 
this.


Furthermore, John did a writeup about "DSL design principles" that we
want to follow:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar 


-- might be worth to checkout.


-Matthias


On 4/17/20 4:30 PM, Ivan Ponomarev wrote:

Hi everyone!

Let me revive the discussion of this KIP.

I'm very sorry for stopping my participation in the discussion in June
2019. My project work was very intensive then and it didn't leave me
spare time. But I think I must finish this, because we invested
substantial effort into this discussion and I'm not feel entitled to
propose other things before this one is finalized.

During these months I proceeded with writing and reviewing Kafka
Streams-related code. Every time I needed branching, Spring-Kafka's
KafkaStreamBrancher class of my invention (the original idea for this
KIP) worked for me -- that's another reason why I gave up pushing the
KIP forward. When I was coming across the problem with the scope of
branches, I worked around it this way:

AtomicReference> result = new AtomicReference<>();
new KafkaStreamBrancher<>()
 .branch()
 .defaultBranch(result::set)
 .onTopOf(someStream);
result.get()...


And yes, of course I don't feel very happy with this approach.

I think that Matthias came up with a bright solution in his post from
May, 24th 2019. Let me quote it:

KStream#split() -> KBranchedStream
// branch is not easily accessible in current scope
KBranchedStream#branch(Predicate, Consumer)
   -> KBranchedStream
// assign a name to the branch and
// return the sub-stream to the current scope later
//
// can be simple as `#branch(p, s->s, "name")`
// or also complex as `#branch(p, s->s.filter(...), "name")`
KBranchedStream#branch(Predicate, Function, String)
   -> KBranchedStream
// default branch is not easily accessible
// return map of all named sub-stream into current scope
KBranchedStream#default(Cosumer)
   -> Map
// assign custom name to default-branch
// return map of all named sub-stream into current scope
KBranchedStream#default(Function, String)
   -> Map
// assign a default name for default
// return map of all named sub-stream into current scope
KBranchedStream#defaultBranch(Function)
   -> Map
// return map of all names sub-stream into current scope
KBranchedStream#noDefaultBranch()
   -> Map

I believe this would satisfy everyone. Optional names seems to be a good
idea: when you don't need to have the branches in the same scope, you
just don't use names and you don't risk making your code brittle. Or,
you might want to add names just for debugging purposes. Or, finally,
you might use the returned Map to have the named branches in the
original scope.

There also was an input from John Roesler on June 4th, 2019, who
suggested using Named class. I can't comment on this. The idea seems
reasonable, but in this matter I'd rather trust people who are more
familiar with Streams API design principles than me.

Regards,

Ivan



08.10.2019 1:38, Matthias J. Sax пишет:
I am moving this KIP into "inactive status". Feel free to resume the 
KIP

at any point.

If anybody else is interested in picking up this KIP, feel free to 
do so.




-Matthias

On 7/11/19 4:00 PM, Matthias J. Sax wrote:

Ivan,

did you see my last reply? What do you think about my proposal to mix
both approaches and try to get best-of-both worlds?


-Matthias

On 6/11/19 3:56 PM, Matthias J. Sax wrote:

Thanks for the input John!


under your suggestion, it seems that the name is required


If you want to get the `KStream` as p

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2020-04-23 Thread John Roesler
Hi Ivan,

Welcome back! Thanks for the update. This KIP looks really nice. 

Thanks also for taking into account the grammar. I think you’ve done a good job 
of balancing the proposed grammar’s objectives with the current API’s idioms. 

I have just a few comments remaining:

1. Can you propose to deprecate (but not remove) the existing ‘branch’ method? 
Since the proposed API is presented as a superior alternative, we may as well 
prune so that the total surface area remains constant. 

2. There’s one example (the enum one) that says “this is why this API is 
superior to branching directly off of the parent KStream”, but it doesn’t show 
what it would look like to branch off the parent KStream. Can you add that 
counterexample? My one reservation to the idea of a branch operator at all is 
that you can implicitly branch off of any KTable or KStream. I think it would 
be nice to devote some text to explaining when either one or the other is 
preferable. 

3. Reading the KIP, I found ‘withConsumer’ confusing; I thought you were 
talking about the kafka Consumer interface (which doesn’t make sense, of 
course). I get that you were referring to the java Consumer interface, but we 
should still probably to to avoid the ambiguity. Just throwing out a 
suggestion, how about ‘withSink’?

4. More of an aside, it seems slightly convoluted to have withChain and 
withConsumer, since the sink operation could theoretically just go on the end 
of the chain. But I can see that you wanted to capture the chain and return it 
in the KStream-valued Map, which is incompatible with a terminal chain. It 
seems like there are really two disjoint use cases: EITHER using chain and the 
result map OR using just the sink. The API doesn’t codify this, though , which 
might be confusing. I really have no idea of what to do differently, though. 
Unless you’re feeling more creative than I am, maybe we’ll just plan to clarify 
this in the JavaDoc. 

Thanks again!
-John

On Wed, Apr 22, 2020, at 18:45, Ivan Ponomarev wrote:
> Hi,
> 
> I have read the John's "DSL design principles" and have completely 
> rewritten the KIP, see 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
> 
> 
> This version includes all the previous discussion results and follows 
> the design principles, with one exception.
> 
> The exception is
> 
> branch(Predicate predicate, Branched branched)
> 
> which formally violates 'no more than one parameter' rule, but I think 
> here it is justified.
> 
> We must provide a predicate for a branch and don't need to provide one 
> for the default branch. Thus for both operations we may use a single 
> Branched parameter class, with an extra method parameter for `branch`.
> 
> Since predicate is a natural, necessary part of a branch, no 
> 'proliferation of overloads, deprecations, etc.' is expected here as it 
> is said in the rationale for the 'single parameter rule'.
> 
> WDYT, is this KIP mature enough to begin voting?
> 
> Regards,
> 
> Ivan
> 
> 21.04.2020 2:09, Matthias J. Sax пишет:
> > Ivan,
> > 
> > no worries about getting side tracked. Glad to have you back!
> > 
> > The DSL improved further in the meantime and we already have a `Named`
> > config object to name operators. It seems reasonable to me to build on this.
> > 
> > Furthermore, John did a writeup about "DSL design principles" that we
> > want to follow:
> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
> > -- might be worth to checkout.
> > 
> > 
> > -Matthias
> > 
> > 
> > On 4/17/20 4:30 PM, Ivan Ponomarev wrote:
> >> Hi everyone!
> >>
> >> Let me revive the discussion of this KIP.
> >>
> >> I'm very sorry for stopping my participation in the discussion in June
> >> 2019. My project work was very intensive then and it didn't leave me
> >> spare time. But I think I must finish this, because we invested
> >> substantial effort into this discussion and I'm not feel entitled to
> >> propose other things before this one is finalized.
> >>
> >> During these months I proceeded with writing and reviewing Kafka
> >> Streams-related code. Every time I needed branching, Spring-Kafka's
> >> KafkaStreamBrancher class of my invention (the original idea for this
> >> KIP) worked for me -- that's another reason why I gave up pushing the
> >> KIP forward. When I was coming across the problem with the scope of
> >> branches, I worked around it this way:
> >>
> >> AtomicReference> result = new AtomicReference<>();
> >> new KafkaStreamBrancher<>()
> >>      .branch()
> >>      .defaultBranch(result::set)
> >>      .onTopOf(someStream);
> >> result.get()...
> >>
> >>
> >> And yes, of course I don't feel very happy with this approach.
> >>
> >> I think that Matthias came up with a bright solution in his post from
> >> May, 24th 2019. Let me quote it:
> >>
> >> KStream#split() -> KBranchedStream
> >> // branch is not easily accessible in current scope
> >> KBranchedStream#branch(Pred

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2020-04-22 Thread Ivan Ponomarev

Hi,

I have read the John's "DSL design principles" and have completely 
rewritten the KIP, see 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream



This version includes all the previous discussion results and follows 
the design principles, with one exception.


The exception is

branch(Predicate predicate, Branched branched)

which formally violates 'no more than one parameter' rule, but I think 
here it is justified.


We must provide a predicate for a branch and don't need to provide one 
for the default branch. Thus for both operations we may use a single 
Branched parameter class, with an extra method parameter for `branch`.


Since predicate is a natural, necessary part of a branch, no 
'proliferation of overloads, deprecations, etc.' is expected here as it 
is said in the rationale for the 'single parameter rule'.


WDYT, is this KIP mature enough to begin voting?

Regards,

Ivan

21.04.2020 2:09, Matthias J. Sax пишет:

Ivan,

no worries about getting side tracked. Glad to have you back!

The DSL improved further in the meantime and we already have a `Named`
config object to name operators. It seems reasonable to me to build on this.

Furthermore, John did a writeup about "DSL design principles" that we
want to follow:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
-- might be worth to checkout.


-Matthias


On 4/17/20 4:30 PM, Ivan Ponomarev wrote:

Hi everyone!

Let me revive the discussion of this KIP.

I'm very sorry for stopping my participation in the discussion in June
2019. My project work was very intensive then and it didn't leave me
spare time. But I think I must finish this, because we invested
substantial effort into this discussion and I'm not feel entitled to
propose other things before this one is finalized.

During these months I proceeded with writing and reviewing Kafka
Streams-related code. Every time I needed branching, Spring-Kafka's
KafkaStreamBrancher class of my invention (the original idea for this
KIP) worked for me -- that's another reason why I gave up pushing the
KIP forward. When I was coming across the problem with the scope of
branches, I worked around it this way:

AtomicReference> result = new AtomicReference<>();
new KafkaStreamBrancher<>()
     .branch()
     .defaultBranch(result::set)
     .onTopOf(someStream);
result.get()...


And yes, of course I don't feel very happy with this approach.

I think that Matthias came up with a bright solution in his post from
May, 24th 2019. Let me quote it:

KStream#split() -> KBranchedStream
// branch is not easily accessible in current scope
KBranchedStream#branch(Predicate, Consumer)
   -> KBranchedStream
// assign a name to the branch and
// return the sub-stream to the current scope later
//
// can be simple as `#branch(p, s->s, "name")`
// or also complex as `#branch(p, s->s.filter(...), "name")`
KBranchedStream#branch(Predicate, Function, String)
   -> KBranchedStream
// default branch is not easily accessible
// return map of all named sub-stream into current scope
KBranchedStream#default(Cosumer)
   -> Map
// assign custom name to default-branch
// return map of all named sub-stream into current scope
KBranchedStream#default(Function, String)
   -> Map
// assign a default name for default
// return map of all named sub-stream into current scope
KBranchedStream#defaultBranch(Function)
   -> Map
// return map of all names sub-stream into current scope
KBranchedStream#noDefaultBranch()
   -> Map

I believe this would satisfy everyone. Optional names seems to be a good
idea: when you don't need to have the branches in the same scope, you
just don't use names and you don't risk making your code brittle. Or,
you might want to add names just for debugging purposes. Or, finally,
you might use the returned Map to have the named branches in the
original scope.

There also was an input from John Roesler on June 4th, 2019, who
suggested using Named class. I can't comment on this. The idea seems
reasonable, but in this matter I'd rather trust people who are more
familiar with Streams API design principles than me.

Regards,

Ivan



08.10.2019 1:38, Matthias J. Sax пишет:

I am moving this KIP into "inactive status". Feel free to resume the KIP
at any point.

If anybody else is interested in picking up this KIP, feel free to do so.



-Matthias

On 7/11/19 4:00 PM, Matthias J. Sax wrote:

Ivan,

did you see my last reply? What do you think about my proposal to mix
both approaches and try to get best-of-both worlds?


-Matthias

On 6/11/19 3:56 PM, Matthias J. Sax wrote:

Thanks for the input John!


under your suggestion, it seems that the name is required


If you want to get the `KStream` as part of the `Map` back using a
`Function`, yes. If you follow the "embedded chaining" pattern using a
`Consumer`, no.

Allowing for a default name via `split()` can of course be done.
Similarly, using `Named` instead of `String` is possible.

I wan

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2020-04-20 Thread Matthias J. Sax
Ivan,

no worries about getting side tracked. Glad to have you back!

The DSL improved further in the meantime and we already have a `Named`
config object to name operators. It seems reasonable to me to build on this.

Furthermore, John did a writeup about "DSL design principles" that we
want to follow:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
-- might be worth to checkout.


-Matthias


On 4/17/20 4:30 PM, Ivan Ponomarev wrote:
> Hi everyone!
> 
> Let me revive the discussion of this KIP.
> 
> I'm very sorry for stopping my participation in the discussion in June
> 2019. My project work was very intensive then and it didn't leave me
> spare time. But I think I must finish this, because we invested
> substantial effort into this discussion and I'm not feel entitled to
> propose other things before this one is finalized.
> 
> During these months I proceeded with writing and reviewing Kafka
> Streams-related code. Every time I needed branching, Spring-Kafka's
> KafkaStreamBrancher class of my invention (the original idea for this
> KIP) worked for me -- that's another reason why I gave up pushing the
> KIP forward. When I was coming across the problem with the scope of
> branches, I worked around it this way:
> 
> AtomicReference> result = new AtomicReference<>();
> new KafkaStreamBrancher<>()
>     .branch()
>     .defaultBranch(result::set)
>     .onTopOf(someStream);
> result.get()...
> 
> 
> And yes, of course I don't feel very happy with this approach.
> 
> I think that Matthias came up with a bright solution in his post from
> May, 24th 2019. Let me quote it:
> 
> KStream#split() -> KBranchedStream
> // branch is not easily accessible in current scope
> KBranchedStream#branch(Predicate, Consumer)
>   -> KBranchedStream
> // assign a name to the branch and
> // return the sub-stream to the current scope later
> //
> // can be simple as `#branch(p, s->s, "name")`
> // or also complex as `#branch(p, s->s.filter(...), "name")`
> KBranchedStream#branch(Predicate, Function, String)
>   -> KBranchedStream
> // default branch is not easily accessible
> // return map of all named sub-stream into current scope
> KBranchedStream#default(Cosumer)
>   -> Map
> // assign custom name to default-branch
> // return map of all named sub-stream into current scope
> KBranchedStream#default(Function, String)
>   -> Map
> // assign a default name for default
> // return map of all named sub-stream into current scope
> KBranchedStream#defaultBranch(Function)
>   -> Map
> // return map of all names sub-stream into current scope
> KBranchedStream#noDefaultBranch()
>   -> Map
> 
> I believe this would satisfy everyone. Optional names seems to be a good
> idea: when you don't need to have the branches in the same scope, you
> just don't use names and you don't risk making your code brittle. Or,
> you might want to add names just for debugging purposes. Or, finally,
> you might use the returned Map to have the named branches in the
> original scope.
> 
> There also was an input from John Roesler on June 4th, 2019, who
> suggested using Named class. I can't comment on this. The idea seems
> reasonable, but in this matter I'd rather trust people who are more
> familiar with Streams API design principles than me.
> 
> Regards,
> 
> Ivan
> 
> 
> 
> 08.10.2019 1:38, Matthias J. Sax пишет:
>> I am moving this KIP into "inactive status". Feel free to resume the KIP
>> at any point.
>>
>> If anybody else is interested in picking up this KIP, feel free to do so.
>>
>>
>>
>> -Matthias
>>
>> On 7/11/19 4:00 PM, Matthias J. Sax wrote:
>>> Ivan,
>>>
>>> did you see my last reply? What do you think about my proposal to mix
>>> both approaches and try to get best-of-both worlds?
>>>
>>>
>>> -Matthias
>>>
>>> On 6/11/19 3:56 PM, Matthias J. Sax wrote:
 Thanks for the input John!

> under your suggestion, it seems that the name is required

 If you want to get the `KStream` as part of the `Map` back using a
 `Function`, yes. If you follow the "embedded chaining" pattern using a
 `Consumer`, no.

 Allowing for a default name via `split()` can of course be done.
 Similarly, using `Named` instead of `String` is possible.

 I wanted to sketch out a high level proposal to merge both patterns
 only. Your suggestions to align the new API with the existing API make
 totally sense.



 One follow up question: Would `Named` be optional or required in
 `split()` and `branch()`? It's unclear from your example.

 If both are mandatory, what do we gain by it? The returned `Map` only
 contains the corresponding branches, so why should we prefix all of
 them? If only `Named` is mandatory in `branch()`, but optional in
 `split()`, the same question raises?

 Requiring `Named` in `split()` seems only to make sense, if `Named` is
 optional in `branch()` and we generate `-X` suffix using a counter for
 different b

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2020-04-17 Thread Ivan Ponomarev

Hi everyone!

Let me revive the discussion of this KIP.

I'm very sorry for stopping my participation in the discussion in June 
2019. My project work was very intensive then and it didn't leave me 
spare time. But I think I must finish this, because we invested 
substantial effort into this discussion and I'm not feel entitled to 
propose other things before this one is finalized.


During these months I proceeded with writing and reviewing Kafka 
Streams-related code. Every time I needed branching, Spring-Kafka's 
KafkaStreamBrancher class of my invention (the original idea for this 
KIP) worked for me -- that's another reason why I gave up pushing the 
KIP forward. When I was coming across the problem with the scope of 
branches, I worked around it this way:


AtomicReference> result = new AtomicReference<>();
new KafkaStreamBrancher<>()
.branch()
.defaultBranch(result::set)
.onTopOf(someStream);
result.get()...


And yes, of course I don't feel very happy with this approach.

I think that Matthias came up with a bright solution in his post from 
May, 24th 2019. Let me quote it:


KStream#split() -> KBranchedStream
// branch is not easily accessible in current scope
KBranchedStream#branch(Predicate, Consumer)
  -> KBranchedStream
// assign a name to the branch and
// return the sub-stream to the current scope later
//
// can be simple as `#branch(p, s->s, "name")`
// or also complex as `#branch(p, s->s.filter(...), "name")`
KBranchedStream#branch(Predicate, Function, String)
  -> KBranchedStream
// default branch is not easily accessible
// return map of all named sub-stream into current scope
KBranchedStream#default(Cosumer)
  -> Map
// assign custom name to default-branch
// return map of all named sub-stream into current scope
KBranchedStream#default(Function, String)
  -> Map
// assign a default name for default
// return map of all named sub-stream into current scope
KBranchedStream#defaultBranch(Function)
  -> Map
// return map of all names sub-stream into current scope
KBranchedStream#noDefaultBranch()
  -> Map

I believe this would satisfy everyone. Optional names seems to be a good 
idea: when you don't need to have the branches in the same scope, you 
just don't use names and you don't risk making your code brittle. Or, 
you might want to add names just for debugging purposes. Or, finally, 
you might use the returned Map to have the named branches in the 
original scope.


There also was an input from John Roesler on June 4th, 2019, who 
suggested using Named class. I can't comment on this. The idea seems 
reasonable, but in this matter I'd rather trust people who are more 
familiar with Streams API design principles than me.


Regards,

Ivan



08.10.2019 1:38, Matthias J. Sax пишет:

I am moving this KIP into "inactive status". Feel free to resume the KIP
at any point.

If anybody else is interested in picking up this KIP, feel free to do so.



-Matthias

On 7/11/19 4:00 PM, Matthias J. Sax wrote:

Ivan,

did you see my last reply? What do you think about my proposal to mix
both approaches and try to get best-of-both worlds?


-Matthias

On 6/11/19 3:56 PM, Matthias J. Sax wrote:

Thanks for the input John!


under your suggestion, it seems that the name is required


If you want to get the `KStream` as part of the `Map` back using a
`Function`, yes. If you follow the "embedded chaining" pattern using a
`Consumer`, no.

Allowing for a default name via `split()` can of course be done.
Similarly, using `Named` instead of `String` is possible.

I wanted to sketch out a high level proposal to merge both patterns
only. Your suggestions to align the new API with the existing API make
totally sense.



One follow up question: Would `Named` be optional or required in
`split()` and `branch()`? It's unclear from your example.

If both are mandatory, what do we gain by it? The returned `Map` only
contains the corresponding branches, so why should we prefix all of
them? If only `Named` is mandatory in `branch()`, but optional in
`split()`, the same question raises?

Requiring `Named` in `split()` seems only to make sense, if `Named` is
optional in `branch()` and we generate `-X` suffix using a counter for
different branch name. However, this might lead to the problem of
changing names if branches are added/removed. Also, how would the names
be generated if `Consumer` is mixed in (ie, not all branches are
returned in the `Map`).

If `Named` is optional for both, it could happen that a user misses to
specify a name for a branch what would lead to runtime issues.


Hence, I am actually in favor to not allow a default name but keep
`split()` without parameter and make `Named` in `branch()` required if a
`Function` is used. This makes it explicit to the user that specifying a
name is required if a `Function` is used.



About


KBranchedStream#branch(BranchConfig)


I don't think that the branching predicate is a configuration and hence
would not include it in a configuration object.



Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-10-07 Thread Matthias J. Sax
I am moving this KIP into "inactive status". Feel free to resume the KIP
at any point.

If anybody else is interested in picking up this KIP, feel free to do so.



-Matthias

On 7/11/19 4:00 PM, Matthias J. Sax wrote:
> Ivan,
> 
> did you see my last reply? What do you think about my proposal to mix
> both approaches and try to get best-of-both worlds?
> 
> 
> -Matthias
> 
> On 6/11/19 3:56 PM, Matthias J. Sax wrote:
>> Thanks for the input John!
>>
>>> under your suggestion, it seems that the name is required
>>
>> If you want to get the `KStream` as part of the `Map` back using a
>> `Function`, yes. If you follow the "embedded chaining" pattern using a
>> `Consumer`, no.
>>
>> Allowing for a default name via `split()` can of course be done.
>> Similarly, using `Named` instead of `String` is possible.
>>
>> I wanted to sketch out a high level proposal to merge both patterns
>> only. Your suggestions to align the new API with the existing API make
>> totally sense.
>>
>>
>>
>> One follow up question: Would `Named` be optional or required in
>> `split()` and `branch()`? It's unclear from your example.
>>
>> If both are mandatory, what do we gain by it? The returned `Map` only
>> contains the corresponding branches, so why should we prefix all of
>> them? If only `Named` is mandatory in `branch()`, but optional in
>> `split()`, the same question raises?
>>
>> Requiring `Named` in `split()` seems only to make sense, if `Named` is
>> optional in `branch()` and we generate `-X` suffix using a counter for
>> different branch name. However, this might lead to the problem of
>> changing names if branches are added/removed. Also, how would the names
>> be generated if `Consumer` is mixed in (ie, not all branches are
>> returned in the `Map`).
>>
>> If `Named` is optional for both, it could happen that a user misses to
>> specify a name for a branch what would lead to runtime issues.
>>
>>
>> Hence, I am actually in favor to not allow a default name but keep
>> `split()` without parameter and make `Named` in `branch()` required if a
>> `Function` is used. This makes it explicit to the user that specifying a
>> name is required if a `Function` is used.
>>
>>
>>
>> About
>>
>>> KBranchedStream#branch(BranchConfig)
>>
>> I don't think that the branching predicate is a configuration and hence
>> would not include it in a configuration object.
>>
>>> withChain(...);
>>
>> Similar, `withChain()` (that would only take a `Consumer`?) does not
>> seem to be a configuration. We can also not prevent a user to call
>> `withName()` in combination of `withChain()` what does not make sense
>> IMHO. We could of course throw an RTE but not have a compile time check
>> seems less appealing. Also, it could happen that neither `withChain()`
>> not `withName()` is called and the branch is missing in the returned
>> `Map` what lead to runtime issues, too.
>>
>> Hence, I don't think that we should add `BranchConfig`. A config object
>> is helpful if each configuration can be set independently of all others,
>> but this seems not to be the case here. If we add new configuration
>> later, we can also just move forward by deprecating the methods that
>> accept `Named` and add new methods that accepted `BranchConfig` (that
>> would of course implement `Named`).
>>
>>
>> Thoughts?
>>
>>
>> @Ivan, what do you think about the general idea to blend the two main
>> approaches of returning a `Map` plus an "embedded chaining"?
>>
>>
>>
>> -Matthias
>>
>>
>>
>> On 6/4/19 10:33 AM, John Roesler wrote:
>>> Thanks for the idea, Matthias, it does seem like this would satisfy
>>> everyone. Returning the map from the terminal operations also solves
>>> the problem of merging/joining the branched streams, if we want to add
>>> support for the compliment later on.
>>>
>>> Under your suggestion, it seems that the name is required. Otherwise,
>>> we wouldn't have keys for the map to return. I this this is actually
>>> not too bad, since experience has taught us that, although names for
>>> operations are not required to define stream processing logic, it does
>>> significantly improve the operational experience when you can map the
>>> topology, logs, metrics, etc. back to the source code. Since you
>>> wouldn't (have to) reference the name to chain extra processing onto
>>> the branch (thanks to the second argument), you can avoid the
>>> "unchecked name" problem that Ivan pointed out.
>>>
>>> In the current implementation of Branch, you can name the branch
>>> operator itself, and then all the branches get index-suffixed names
>>> built from the branch operator name. I guess under this proposal, we
>>> could naturally append the branch name to the branching operator name,
>>> like this:
>>>
>>>stream.split(Named.withName("mysplit")) //creates node "mysplit"
>>>   .branch(..., ..., "abranch") // creates node "mysplit-abranch"
>>>   .defaultBranch(...) // creates node "mysplit-default"
>>>
>>> It does make me wonder about the D

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-07-29 Thread Matthias J. Sax
What is the status of this KIP?

-Matthias

On 7/11/19 4:00 PM, Matthias J. Sax wrote:
> Ivan,
> 
> did you see my last reply? What do you think about my proposal to mix
> both approaches and try to get best-of-both worlds?
> 
> 
> -Matthias
> 
> On 6/11/19 3:56 PM, Matthias J. Sax wrote:
>> Thanks for the input John!
>>
>>> under your suggestion, it seems that the name is required
>>
>> If you want to get the `KStream` as part of the `Map` back using a
>> `Function`, yes. If you follow the "embedded chaining" pattern using a
>> `Consumer`, no.
>>
>> Allowing for a default name via `split()` can of course be done.
>> Similarly, using `Named` instead of `String` is possible.
>>
>> I wanted to sketch out a high level proposal to merge both patterns
>> only. Your suggestions to align the new API with the existing API make
>> totally sense.
>>
>>
>>
>> One follow up question: Would `Named` be optional or required in
>> `split()` and `branch()`? It's unclear from your example.
>>
>> If both are mandatory, what do we gain by it? The returned `Map` only
>> contains the corresponding branches, so why should we prefix all of
>> them? If only `Named` is mandatory in `branch()`, but optional in
>> `split()`, the same question raises?
>>
>> Requiring `Named` in `split()` seems only to make sense, if `Named` is
>> optional in `branch()` and we generate `-X` suffix using a counter for
>> different branch name. However, this might lead to the problem of
>> changing names if branches are added/removed. Also, how would the names
>> be generated if `Consumer` is mixed in (ie, not all branches are
>> returned in the `Map`).
>>
>> If `Named` is optional for both, it could happen that a user misses to
>> specify a name for a branch what would lead to runtime issues.
>>
>>
>> Hence, I am actually in favor to not allow a default name but keep
>> `split()` without parameter and make `Named` in `branch()` required if a
>> `Function` is used. This makes it explicit to the user that specifying a
>> name is required if a `Function` is used.
>>
>>
>>
>> About
>>
>>> KBranchedStream#branch(BranchConfig)
>>
>> I don't think that the branching predicate is a configuration and hence
>> would not include it in a configuration object.
>>
>>> withChain(...);
>>
>> Similar, `withChain()` (that would only take a `Consumer`?) does not
>> seem to be a configuration. We can also not prevent a user to call
>> `withName()` in combination of `withChain()` what does not make sense
>> IMHO. We could of course throw an RTE but not have a compile time check
>> seems less appealing. Also, it could happen that neither `withChain()`
>> not `withName()` is called and the branch is missing in the returned
>> `Map` what lead to runtime issues, too.
>>
>> Hence, I don't think that we should add `BranchConfig`. A config object
>> is helpful if each configuration can be set independently of all others,
>> but this seems not to be the case here. If we add new configuration
>> later, we can also just move forward by deprecating the methods that
>> accept `Named` and add new methods that accepted `BranchConfig` (that
>> would of course implement `Named`).
>>
>>
>> Thoughts?
>>
>>
>> @Ivan, what do you think about the general idea to blend the two main
>> approaches of returning a `Map` plus an "embedded chaining"?
>>
>>
>>
>> -Matthias
>>
>>
>>
>> On 6/4/19 10:33 AM, John Roesler wrote:
>>> Thanks for the idea, Matthias, it does seem like this would satisfy
>>> everyone. Returning the map from the terminal operations also solves
>>> the problem of merging/joining the branched streams, if we want to add
>>> support for the compliment later on.
>>>
>>> Under your suggestion, it seems that the name is required. Otherwise,
>>> we wouldn't have keys for the map to return. I this this is actually
>>> not too bad, since experience has taught us that, although names for
>>> operations are not required to define stream processing logic, it does
>>> significantly improve the operational experience when you can map the
>>> topology, logs, metrics, etc. back to the source code. Since you
>>> wouldn't (have to) reference the name to chain extra processing onto
>>> the branch (thanks to the second argument), you can avoid the
>>> "unchecked name" problem that Ivan pointed out.
>>>
>>> In the current implementation of Branch, you can name the branch
>>> operator itself, and then all the branches get index-suffixed names
>>> built from the branch operator name. I guess under this proposal, we
>>> could naturally append the branch name to the branching operator name,
>>> like this:
>>>
>>>stream.split(Named.withName("mysplit")) //creates node "mysplit"
>>>   .branch(..., ..., "abranch") // creates node "mysplit-abranch"
>>>   .defaultBranch(...) // creates node "mysplit-default"
>>>
>>> It does make me wonder about the DSL syntax itself, though.
>>>
>>> We don't have a defined grammar, so there's plenty of room to debate
>>> the "best" syntax in the 

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-07-11 Thread Matthias J. Sax
Ivan,

did you see my last reply? What do you think about my proposal to mix
both approaches and try to get best-of-both worlds?


-Matthias

On 6/11/19 3:56 PM, Matthias J. Sax wrote:
> Thanks for the input John!
> 
>> under your suggestion, it seems that the name is required
> 
> If you want to get the `KStream` as part of the `Map` back using a
> `Function`, yes. If you follow the "embedded chaining" pattern using a
> `Consumer`, no.
> 
> Allowing for a default name via `split()` can of course be done.
> Similarly, using `Named` instead of `String` is possible.
> 
> I wanted to sketch out a high level proposal to merge both patterns
> only. Your suggestions to align the new API with the existing API make
> totally sense.
> 
> 
> 
> One follow up question: Would `Named` be optional or required in
> `split()` and `branch()`? It's unclear from your example.
> 
> If both are mandatory, what do we gain by it? The returned `Map` only
> contains the corresponding branches, so why should we prefix all of
> them? If only `Named` is mandatory in `branch()`, but optional in
> `split()`, the same question raises?
> 
> Requiring `Named` in `split()` seems only to make sense, if `Named` is
> optional in `branch()` and we generate `-X` suffix using a counter for
> different branch name. However, this might lead to the problem of
> changing names if branches are added/removed. Also, how would the names
> be generated if `Consumer` is mixed in (ie, not all branches are
> returned in the `Map`).
> 
> If `Named` is optional for both, it could happen that a user misses to
> specify a name for a branch what would lead to runtime issues.
> 
> 
> Hence, I am actually in favor to not allow a default name but keep
> `split()` without parameter and make `Named` in `branch()` required if a
> `Function` is used. This makes it explicit to the user that specifying a
> name is required if a `Function` is used.
> 
> 
> 
> About
> 
>> KBranchedStream#branch(BranchConfig)
> 
> I don't think that the branching predicate is a configuration and hence
> would not include it in a configuration object.
> 
>> withChain(...);
> 
> Similar, `withChain()` (that would only take a `Consumer`?) does not
> seem to be a configuration. We can also not prevent a user to call
> `withName()` in combination of `withChain()` what does not make sense
> IMHO. We could of course throw an RTE but not have a compile time check
> seems less appealing. Also, it could happen that neither `withChain()`
> not `withName()` is called and the branch is missing in the returned
> `Map` what lead to runtime issues, too.
> 
> Hence, I don't think that we should add `BranchConfig`. A config object
> is helpful if each configuration can be set independently of all others,
> but this seems not to be the case here. If we add new configuration
> later, we can also just move forward by deprecating the methods that
> accept `Named` and add new methods that accepted `BranchConfig` (that
> would of course implement `Named`).
> 
> 
> Thoughts?
> 
> 
> @Ivan, what do you think about the general idea to blend the two main
> approaches of returning a `Map` plus an "embedded chaining"?
> 
> 
> 
> -Matthias
> 
> 
> 
> On 6/4/19 10:33 AM, John Roesler wrote:
>> Thanks for the idea, Matthias, it does seem like this would satisfy
>> everyone. Returning the map from the terminal operations also solves
>> the problem of merging/joining the branched streams, if we want to add
>> support for the compliment later on.
>>
>> Under your suggestion, it seems that the name is required. Otherwise,
>> we wouldn't have keys for the map to return. I this this is actually
>> not too bad, since experience has taught us that, although names for
>> operations are not required to define stream processing logic, it does
>> significantly improve the operational experience when you can map the
>> topology, logs, metrics, etc. back to the source code. Since you
>> wouldn't (have to) reference the name to chain extra processing onto
>> the branch (thanks to the second argument), you can avoid the
>> "unchecked name" problem that Ivan pointed out.
>>
>> In the current implementation of Branch, you can name the branch
>> operator itself, and then all the branches get index-suffixed names
>> built from the branch operator name. I guess under this proposal, we
>> could naturally append the branch name to the branching operator name,
>> like this:
>>
>>stream.split(Named.withName("mysplit")) //creates node "mysplit"
>>   .branch(..., ..., "abranch") // creates node "mysplit-abranch"
>>   .defaultBranch(...) // creates node "mysplit-default"
>>
>> It does make me wonder about the DSL syntax itself, though.
>>
>> We don't have a defined grammar, so there's plenty of room to debate
>> the "best" syntax in the context of each operation, but in general,
>> the KStream DSL operators follow this pattern:
>>
>> operator(function, config_object?) OR operator(config_object)
>>
>> where config_o

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-06-11 Thread Matthias J. Sax
Thanks for the input John!

> under your suggestion, it seems that the name is required

If you want to get the `KStream` as part of the `Map` back using a
`Function`, yes. If you follow the "embedded chaining" pattern using a
`Consumer`, no.

Allowing for a default name via `split()` can of course be done.
Similarly, using `Named` instead of `String` is possible.

I wanted to sketch out a high level proposal to merge both patterns
only. Your suggestions to align the new API with the existing API make
totally sense.



One follow up question: Would `Named` be optional or required in
`split()` and `branch()`? It's unclear from your example.

If both are mandatory, what do we gain by it? The returned `Map` only
contains the corresponding branches, so why should we prefix all of
them? If only `Named` is mandatory in `branch()`, but optional in
`split()`, the same question raises?

Requiring `Named` in `split()` seems only to make sense, if `Named` is
optional in `branch()` and we generate `-X` suffix using a counter for
different branch name. However, this might lead to the problem of
changing names if branches are added/removed. Also, how would the names
be generated if `Consumer` is mixed in (ie, not all branches are
returned in the `Map`).

If `Named` is optional for both, it could happen that a user misses to
specify a name for a branch what would lead to runtime issues.


Hence, I am actually in favor to not allow a default name but keep
`split()` without parameter and make `Named` in `branch()` required if a
`Function` is used. This makes it explicit to the user that specifying a
name is required if a `Function` is used.



About

> KBranchedStream#branch(BranchConfig)

I don't think that the branching predicate is a configuration and hence
would not include it in a configuration object.

> withChain(...);

Similar, `withChain()` (that would only take a `Consumer`?) does not
seem to be a configuration. We can also not prevent a user to call
`withName()` in combination of `withChain()` what does not make sense
IMHO. We could of course throw an RTE but not have a compile time check
seems less appealing. Also, it could happen that neither `withChain()`
not `withName()` is called and the branch is missing in the returned
`Map` what lead to runtime issues, too.

Hence, I don't think that we should add `BranchConfig`. A config object
is helpful if each configuration can be set independently of all others,
but this seems not to be the case here. If we add new configuration
later, we can also just move forward by deprecating the methods that
accept `Named` and add new methods that accepted `BranchConfig` (that
would of course implement `Named`).


Thoughts?


@Ivan, what do you think about the general idea to blend the two main
approaches of returning a `Map` plus an "embedded chaining"?



-Matthias



On 6/4/19 10:33 AM, John Roesler wrote:
> Thanks for the idea, Matthias, it does seem like this would satisfy
> everyone. Returning the map from the terminal operations also solves
> the problem of merging/joining the branched streams, if we want to add
> support for the compliment later on.
> 
> Under your suggestion, it seems that the name is required. Otherwise,
> we wouldn't have keys for the map to return. I this this is actually
> not too bad, since experience has taught us that, although names for
> operations are not required to define stream processing logic, it does
> significantly improve the operational experience when you can map the
> topology, logs, metrics, etc. back to the source code. Since you
> wouldn't (have to) reference the name to chain extra processing onto
> the branch (thanks to the second argument), you can avoid the
> "unchecked name" problem that Ivan pointed out.
> 
> In the current implementation of Branch, you can name the branch
> operator itself, and then all the branches get index-suffixed names
> built from the branch operator name. I guess under this proposal, we
> could naturally append the branch name to the branching operator name,
> like this:
> 
>stream.split(Named.withName("mysplit")) //creates node "mysplit"
>   .branch(..., ..., "abranch") // creates node "mysplit-abranch"
>   .defaultBranch(...) // creates node "mysplit-default"
> 
> It does make me wonder about the DSL syntax itself, though.
> 
> We don't have a defined grammar, so there's plenty of room to debate
> the "best" syntax in the context of each operation, but in general,
> the KStream DSL operators follow this pattern:
> 
> operator(function, config_object?) OR operator(config_object)
> 
> where config_object is often just Named in the "function" variant.
> Even when the config_object isn't a Named, but some other config
> class, that config class _always_ implements NamedOperation.
> 
> Here, we're introducing a totally different pattern:
> 
>   operator(function, function, string)
> 
> where the string is the name.
> My first question is whether the name should inst

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-06-04 Thread John Roesler
Thanks for the idea, Matthias, it does seem like this would satisfy
everyone. Returning the map from the terminal operations also solves
the problem of merging/joining the branched streams, if we want to add
support for the compliment later on.

Under your suggestion, it seems that the name is required. Otherwise,
we wouldn't have keys for the map to return. I this this is actually
not too bad, since experience has taught us that, although names for
operations are not required to define stream processing logic, it does
significantly improve the operational experience when you can map the
topology, logs, metrics, etc. back to the source code. Since you
wouldn't (have to) reference the name to chain extra processing onto
the branch (thanks to the second argument), you can avoid the
"unchecked name" problem that Ivan pointed out.

In the current implementation of Branch, you can name the branch
operator itself, and then all the branches get index-suffixed names
built from the branch operator name. I guess under this proposal, we
could naturally append the branch name to the branching operator name,
like this:

   stream.split(Named.withName("mysplit")) //creates node "mysplit"
  .branch(..., ..., "abranch") // creates node "mysplit-abranch"
  .defaultBranch(...) // creates node "mysplit-default"

It does make me wonder about the DSL syntax itself, though.

We don't have a defined grammar, so there's plenty of room to debate
the "best" syntax in the context of each operation, but in general,
the KStream DSL operators follow this pattern:

operator(function, config_object?) OR operator(config_object)

where config_object is often just Named in the "function" variant.
Even when the config_object isn't a Named, but some other config
class, that config class _always_ implements NamedOperation.

Here, we're introducing a totally different pattern:

  operator(function, function, string)

where the string is the name.
My first question is whether the name should instead be specified with
the NamedOperation interface.

My second question is whether we should just roll all these arguments
up into a config object like:

   KBranchedStream#branch(BranchConfig)

   interface BranchConfig extends NamedOperation {
withPredicate(...);
withChain(...);
withName(...);
  }

Although I guess we'd like to call BranchConfig something more like
"Branched", even if I don't particularly like that pattern.

This makes the source code a little noisier, but it also makes us more
future-proof, as we can deal with a wide range of alternatives purely
in the config interface, and never have to deal with adding overloads
to the KBranchedStream if/when we decide we want the name to be
optional, or the KStream->KStream to be optional.

WDYT?

Thanks,
-John

On Fri, May 24, 2019 at 5:25 PM Michael Drogalis
 wrote:
>
> Matthias: I think that's pretty reasonable from my point of view. Good
> suggestion.
>
> On Thu, May 23, 2019 at 9:50 PM Matthias J. Sax 
> wrote:
>
> > Interesting discussion.
> >
> > I am wondering, if we cannot unify the advantage of both approaches:
> >
> >
> >
> > KStream#split() -> KBranchedStream
> >
> > // branch is not easily accessible in current scope
> > KBranchedStream#branch(Predicate, Consumer)
> >   -> KBranchedStream
> >
> > // assign a name to the branch and
> > // return the sub-stream to the current scope later
> > //
> > // can be simple as `#branch(p, s->s, "name")`
> > // or also complex as `#branch(p, s->s.filter(...), "name")`
> > KBranchedStream#branch(Predicate, Function, String)
> >   -> KBranchedStream
> >
> > // default branch is not easily accessible
> > // return map of all named sub-stream into current scope
> > KBranchedStream#default(Cosumer)
> >   -> Map
> >
> > // assign custom name to default-branch
> > // return map of all named sub-stream into current scope
> > KBranchedStream#default(Function, String)
> >   -> Map
> >
> > // assign a default name for default
> > // return map of all named sub-stream into current scope
> > KBranchedStream#defaultBranch(Function)
> >   -> Map
> >
> > // return map of all names sub-stream into current scope
> > KBranchedStream#noDefaultBranch()
> >   -> Map
> >
> >
> >
> > Hence, for each sub-stream, the user can pick to add a name and return
> > the branch "result" to the calling scope or not. The implementation can
> > also check at runtime that all returned names are unique. The returned
> > Map can be empty and it's also optional to use the Map.
> >
> > To me, it seems like a good way to get best of both worlds.
> >
> > Thoughts?
> >
> >
> >
> > -Matthias
> >
> >
> >
> >
> > On 5/6/19 5:15 PM, John Roesler wrote:
> > > Ivan,
> > >
> > > That's a very good point about the "start" operator in the dynamic case.
> > > I had no problem with "split()"; I was just questioning the necessity.
> > > Since you've provided a proof of necessity, I'm in favor of the
> > > "split()" start operator. Thanks!
> > >
> > > Separately, I

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-05-24 Thread Michael Drogalis
Matthias: I think that's pretty reasonable from my point of view. Good
suggestion.

On Thu, May 23, 2019 at 9:50 PM Matthias J. Sax 
wrote:

> Interesting discussion.
>
> I am wondering, if we cannot unify the advantage of both approaches:
>
>
>
> KStream#split() -> KBranchedStream
>
> // branch is not easily accessible in current scope
> KBranchedStream#branch(Predicate, Consumer)
>   -> KBranchedStream
>
> // assign a name to the branch and
> // return the sub-stream to the current scope later
> //
> // can be simple as `#branch(p, s->s, "name")`
> // or also complex as `#branch(p, s->s.filter(...), "name")`
> KBranchedStream#branch(Predicate, Function, String)
>   -> KBranchedStream
>
> // default branch is not easily accessible
> // return map of all named sub-stream into current scope
> KBranchedStream#default(Cosumer)
>   -> Map
>
> // assign custom name to default-branch
> // return map of all named sub-stream into current scope
> KBranchedStream#default(Function, String)
>   -> Map
>
> // assign a default name for default
> // return map of all named sub-stream into current scope
> KBranchedStream#defaultBranch(Function)
>   -> Map
>
> // return map of all names sub-stream into current scope
> KBranchedStream#noDefaultBranch()
>   -> Map
>
>
>
> Hence, for each sub-stream, the user can pick to add a name and return
> the branch "result" to the calling scope or not. The implementation can
> also check at runtime that all returned names are unique. The returned
> Map can be empty and it's also optional to use the Map.
>
> To me, it seems like a good way to get best of both worlds.
>
> Thoughts?
>
>
>
> -Matthias
>
>
>
>
> On 5/6/19 5:15 PM, John Roesler wrote:
> > Ivan,
> >
> > That's a very good point about the "start" operator in the dynamic case.
> > I had no problem with "split()"; I was just questioning the necessity.
> > Since you've provided a proof of necessity, I'm in favor of the
> > "split()" start operator. Thanks!
> >
> > Separately, I'm interested to see where the present discussion leads.
> > I've written enough Javascript code in my life to be suspicious of
> > nested closures. You have a good point about using method references (or
> > indeed function literals also work). It should be validating that this
> > was also the JS community's first approach to flattening the logic when
> > their nested closure situation got out of hand. Unfortunately, it's
> > replacing nesting with redirection, both of which disrupt code
> > readability (but in different ways for different reasons). In other
> > words, I agree that function references is *the* first-order solution if
> > the nested code does indeed become a problem.
> >
> > However, the history of JS also tells us that function references aren't
> > the end of the story either, and you can see that by observing that
> > there have been two follow-on eras, as they continue trying to cope with
> > the consequences of living in such a callback-heavy language. First, you
> > have Futures/Promises, which essentially let you convert nested code to
> > method-chained code (Observables/FP is a popular variation on this).
> > Most lately, you have async/await, which is an effort to apply language
> > (not just API) syntax to the problem, and offer the "flattest" possible
> > programming style to solve the problem (because you get back to just one
> > code block per functional unit).
> >
> > Stream-processing is a different domain, and Java+KStreams is nowhere
> > near as callback heavy as JS, so I don't think we have to take the JS
> > story for granted, but then again, I think we can derive some valuable
> > lessons by looking sideways to adjacent domains. I'm just bringing this
> > up to inspire further/deeper discussion. At the same time, just like JS,
> > we can afford to take an iterative approach to the problem.
> >
> > Separately again, I'm interested in the post-branch merge (and I'd also
> > add join) problem that Paul brought up. We can clearly punt on it, by
> > terminating the nested branches with sink operators. But is there a DSL
> > way to do it?
> >
> > Thanks again for your driving this,
> > -John
> >
> > On Thu, May 2, 2019 at 7:39 PM Paul Whalen  > > wrote:
> >
> > Ivan, I’ll definitely forfeit my point on the clumsiness of the
> > branch(predicate, consumer) solution, I don’t see any real drawbacks
> > for the dynamic case.
> >
> > IMO the one trade off to consider at this point is the scope
> > question. I don’t know if I totally agree that “we rarely need them
> > in the same scope” since merging the branches back together later
> > seems like a perfectly plausible use case that can be a lot nicer
> > when the branched streams are in the same scope. That being said,
> > for the reasons Ivan listed, I think it is overall the better
> > solution - working around the scope thing is easy enough if you need
> > to.
> >
> > > On May 2, 2019, at 7:00 PM, Ivan

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-05-23 Thread Matthias J. Sax
Interesting discussion.

I am wondering, if we cannot unify the advantage of both approaches:



KStream#split() -> KBranchedStream

// branch is not easily accessible in current scope
KBranchedStream#branch(Predicate, Consumer)
  -> KBranchedStream

// assign a name to the branch and
// return the sub-stream to the current scope later
//
// can be simple as `#branch(p, s->s, "name")`
// or also complex as `#branch(p, s->s.filter(...), "name")`
KBranchedStream#branch(Predicate, Function, String)
  -> KBranchedStream

// default branch is not easily accessible
// return map of all named sub-stream into current scope
KBranchedStream#default(Cosumer)
  -> Map

// assign custom name to default-branch
// return map of all named sub-stream into current scope
KBranchedStream#default(Function, String)
  -> Map

// assign a default name for default
// return map of all named sub-stream into current scope
KBranchedStream#defaultBranch(Function)
  -> Map

// return map of all names sub-stream into current scope
KBranchedStream#noDefaultBranch()
  -> Map



Hence, for each sub-stream, the user can pick to add a name and return
the branch "result" to the calling scope or not. The implementation can
also check at runtime that all returned names are unique. The returned
Map can be empty and it's also optional to use the Map.

To me, it seems like a good way to get best of both worlds.

Thoughts?



-Matthias




On 5/6/19 5:15 PM, John Roesler wrote:
> Ivan,
> 
> That's a very good point about the "start" operator in the dynamic case.
> I had no problem with "split()"; I was just questioning the necessity.
> Since you've provided a proof of necessity, I'm in favor of the
> "split()" start operator. Thanks!
> 
> Separately, I'm interested to see where the present discussion leads.
> I've written enough Javascript code in my life to be suspicious of
> nested closures. You have a good point about using method references (or
> indeed function literals also work). It should be validating that this
> was also the JS community's first approach to flattening the logic when
> their nested closure situation got out of hand. Unfortunately, it's
> replacing nesting with redirection, both of which disrupt code
> readability (but in different ways for different reasons). In other
> words, I agree that function references is *the* first-order solution if
> the nested code does indeed become a problem.
> 
> However, the history of JS also tells us that function references aren't
> the end of the story either, and you can see that by observing that
> there have been two follow-on eras, as they continue trying to cope with
> the consequences of living in such a callback-heavy language. First, you
> have Futures/Promises, which essentially let you convert nested code to
> method-chained code (Observables/FP is a popular variation on this).
> Most lately, you have async/await, which is an effort to apply language
> (not just API) syntax to the problem, and offer the "flattest" possible
> programming style to solve the problem (because you get back to just one
> code block per functional unit).
> 
> Stream-processing is a different domain, and Java+KStreams is nowhere
> near as callback heavy as JS, so I don't think we have to take the JS
> story for granted, but then again, I think we can derive some valuable
> lessons by looking sideways to adjacent domains. I'm just bringing this
> up to inspire further/deeper discussion. At the same time, just like JS,
> we can afford to take an iterative approach to the problem.
> 
> Separately again, I'm interested in the post-branch merge (and I'd also
> add join) problem that Paul brought up. We can clearly punt on it, by
> terminating the nested branches with sink operators. But is there a DSL
> way to do it?
> 
> Thanks again for your driving this,
> -John 
> 
> On Thu, May 2, 2019 at 7:39 PM Paul Whalen  > wrote:
> 
> Ivan, I’ll definitely forfeit my point on the clumsiness of the
> branch(predicate, consumer) solution, I don’t see any real drawbacks
> for the dynamic case.
> 
> IMO the one trade off to consider at this point is the scope
> question. I don’t know if I totally agree that “we rarely need them
> in the same scope” since merging the branches back together later
> seems like a perfectly plausible use case that can be a lot nicer
> when the branched streams are in the same scope. That being said,
> for the reasons Ivan listed, I think it is overall the better
> solution - working around the scope thing is easy enough if you need
> to.
> 
> > On May 2, 2019, at 7:00 PM, Ivan Ponomarev
>  wrote:
> >
> > Hello everyone, thank you all for joining the discussion!
> >
> > Well, I don't think the idea of named branches, be it a
> LinkedHashMap (no other Map will do, because order of definition
> matters) or `branch` method  taking name and Consumer has more
> advantages than drawbacks.
> 

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-05-06 Thread John Roesler
Ivan,

That's a very good point about the "start" operator in the dynamic case. I
had no problem with "split()"; I was just questioning the necessity. Since
you've provided a proof of necessity, I'm in favor of the "split()" start
operator. Thanks!

Separately, I'm interested to see where the present discussion leads. I've
written enough Javascript code in my life to be suspicious of nested
closures. You have a good point about using method references (or indeed
function literals also work). It should be validating that this was also
the JS community's first approach to flattening the logic when their nested
closure situation got out of hand. Unfortunately, it's replacing nesting
with redirection, both of which disrupt code readability (but in different
ways for different reasons). In other words, I agree that function
references is *the* first-order solution if the nested code does indeed
become a problem.

However, the history of JS also tells us that function references aren't
the end of the story either, and you can see that by observing that there
have been two follow-on eras, as they continue trying to cope with the
consequences of living in such a callback-heavy language. First, you have
Futures/Promises, which essentially let you convert nested code to
method-chained code (Observables/FP is a popular variation on this). Most
lately, you have async/await, which is an effort to apply language (not
just API) syntax to the problem, and offer the "flattest" possible
programming style to solve the problem (because you get back to just one
code block per functional unit).

Stream-processing is a different domain, and Java+KStreams is nowhere near
as callback heavy as JS, so I don't think we have to take the JS story for
granted, but then again, I think we can derive some valuable lessons by
looking sideways to adjacent domains. I'm just bringing this up to inspire
further/deeper discussion. At the same time, just like JS, we can afford to
take an iterative approach to the problem.

Separately again, I'm interested in the post-branch merge (and I'd also add
join) problem that Paul brought up. We can clearly punt on it, by
terminating the nested branches with sink operators. But is there a DSL way
to do it?

Thanks again for your driving this,
-John

On Thu, May 2, 2019 at 7:39 PM Paul Whalen  wrote:

> Ivan, I’ll definitely forfeit my point on the clumsiness of the
> branch(predicate, consumer) solution, I don’t see any real drawbacks for
> the dynamic case.
>
> IMO the one trade off to consider at this point is the scope question. I
> don’t know if I totally agree that “we rarely need them in the same scope”
> since merging the branches back together later seems like a perfectly
> plausible use case that can be a lot nicer when the branched streams are in
> the same scope. That being said, for the reasons Ivan listed, I think it is
> overall the better solution - working around the scope thing is easy enough
> if you need to.
>
> > On May 2, 2019, at 7:00 PM, Ivan Ponomarev 
> wrote:
> >
> > Hello everyone, thank you all for joining the discussion!
> >
> > Well, I don't think the idea of named branches, be it a LinkedHashMap
> (no other Map will do, because order of definition matters) or `branch`
> method  taking name and Consumer has more advantages than drawbacks.
> >
> > In my opinion, the only real positive outcome from Michael's proposal is
> that all the returned branches are in the same scope. But 1) we rarely need
> them in the same scope 2) there is a workaround for the scope problem,
> described in the KIP.
> >
> > 'Inlining the complex logic' is not a problem, because we can use method
> references instead of lambdas. In real world scenarios you tend to split
> the complex logic to methods anyway, so the code is going to be clean.
> >
> > The drawbacks are strong. The cohesion between predicates and handlers
> is lost. We have to define predicates in one place, and handlers in
> another. This opens the door for bugs:
> >
> > - what if we forget to define a handler for a name? or a name for a
> handler?
> > - what if we misspell a name?
> > - what if we copy-paste and duplicate a name?
> >
> > What Michael propose would have been totally OK if we had been writing
> the API in Lua, Ruby or Python. In those languages the "dynamic naming"
> approach would have looked most concise and beautiful. But in Java we
> expect all the problems related to identifiers to be eliminated in compile
> time.
> >
> > Do we have to invent duck-typing for the Java API?
> >
> > And if we do, what advantage are we supposed to get besides having all
> the branches in the same scope? Michael, maybe I'm missing your point?
> >
> > ---
> >
> > Earlier in this discussion John Roesler also proposed to do without
> "start branching" operator, and later Paul mentioned that in the case when
> we have to add a dynamic number of branches, the current KIP is 'clumsier'
> compared to Michael's 'Map' solution. Let me address both c

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-05-02 Thread Paul Whalen
Ivan, I’ll definitely forfeit my point on the clumsiness of the 
branch(predicate, consumer) solution, I don’t see any real drawbacks for the 
dynamic case. 

IMO the one trade off to consider at this point is the scope question. I don’t 
know if I totally agree that “we rarely need them in the same scope” since 
merging the branches back together later seems like a perfectly plausible use 
case that can be a lot nicer when the branched streams are in the same scope. 
That being said, for the reasons Ivan listed, I think it is overall the better 
solution - working around the scope thing is easy enough if you need to. 

> On May 2, 2019, at 7:00 PM, Ivan Ponomarev  wrote:
> 
> Hello everyone, thank you all for joining the discussion!
> 
> Well, I don't think the idea of named branches, be it a LinkedHashMap (no 
> other Map will do, because order of definition matters) or `branch` method  
> taking name and Consumer has more advantages than drawbacks.
> 
> In my opinion, the only real positive outcome from Michael's proposal is that 
> all the returned branches are in the same scope. But 1) we rarely need them 
> in the same scope 2) there is a workaround for the scope problem, described 
> in the KIP.
> 
> 'Inlining the complex logic' is not a problem, because we can use method 
> references instead of lambdas. In real world scenarios you tend to split the 
> complex logic to methods anyway, so the code is going to be clean.
> 
> The drawbacks are strong. The cohesion between predicates and handlers is 
> lost. We have to define predicates in one place, and handlers in another. 
> This opens the door for bugs:
> 
> - what if we forget to define a handler for a name? or a name for a handler?
> - what if we misspell a name?
> - what if we copy-paste and duplicate a name?
> 
> What Michael propose would have been totally OK if we had been writing the 
> API in Lua, Ruby or Python. In those languages the "dynamic naming" approach 
> would have looked most concise and beautiful. But in Java we expect all the 
> problems related to identifiers to be eliminated in compile time.
> 
> Do we have to invent duck-typing for the Java API?
> 
> And if we do, what advantage are we supposed to get besides having all the 
> branches in the same scope? Michael, maybe I'm missing your point?
> 
> ---
> 
> Earlier in this discussion John Roesler also proposed to do without "start 
> branching" operator, and later Paul mentioned that in the case when we have 
> to add a dynamic number of branches, the current KIP is 'clumsier' compared 
> to Michael's 'Map' solution. Let me address both comments here.
> 
> 1) "Start branching" operator (I think that *split* is a good name for it 
> indeed) is critical when we need to do a dynamic branching, see example below.
> 
> 2) No, dynamic branching in current KIP is not clumsy at all. Imagine a 
> real-world scenario when you need one branch per enum value (say, 
> RecordType). You can have something like this:
> 
> /*John:if we had to start with stream.branch(...) here, it would have been 
> much messier.*/
> KBranchedStream branched = stream.split();
> 
> /*Not clumsy at all :-)*/
> for (RecordType recordType : RecordType.values())
> branched = branched.branch((k, v) -> v.getRecType() == recordType,
> recordType::processRecords);
> 
> Regards,
> 
> Ivan
> 
> 
> 02.05.2019 14:40, Matthias J. Sax пишет:
>> I also agree with Michael's observation about the core problem of
>> current `branch()` implementation.
>> 
>> However, I also don't like to pass in a clumsy Map object. My thinking
>> was more aligned with Paul's proposal to just add a name to each
>> `branch()` statement and return a `Map`.
>> 
>> It makes the code easier to read, and also make the order of
>> `Predicates` (that is essential) easier to grasp.
>> 
>> Map> branches = stream.split()
>>.branch("branchOne", Predicate)
>>.branch( "branchTwo", Predicate)
>>.defaultBranch("defaultBranch");
>> An open question is the case for which no defaultBranch() should be
>> specified. Atm, `split()` and `branch()` would return `BranchedKStream`
>> and the call to `defaultBranch()` that returns the `Map` is mandatory
>> (what is not the case atm). Or is this actually not a real problem,
>> because users can just ignore the branch returned by `defaultBranch()`
>> in the result `Map` ?
>> 
>> 
>> About "inlining": So far, it seems to be a matter of personal
>> preference. I can see arguments for both, but no "killer argument" yet
>> that clearly make the case for one or the other.
>> 
>> 
>> -Matthias
>> 
>>> On 5/1/19 6:26 PM, Paul Whalen wrote:
>>> Perhaps inlining is the wrong terminology. It doesn’t require that a lambda 
>>> with the full downstream topology be defined inline - it can be a method 
>>> reference as with Ivan’s original suggestion.  The advantage of putting the 
>>> predicate and its downstream logic (Consumer) together in branch() is that 
>>> they are requi

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-05-02 Thread Ivan Ponomarev

Hello everyone, thank you all for joining the discussion!

Well, I don't think the idea of named branches, be it a LinkedHashMap 
(no other Map will do, because order of definition matters) or `branch` 
method  taking name and Consumer has more advantages than drawbacks.


In my opinion, the only real positive outcome from Michael's proposal is 
that all the returned branches are in the same scope. But 1) we rarely 
need them in the same scope 2) there is a workaround for the scope 
problem, described in the KIP.


'Inlining the complex logic' is not a problem, because we can use method 
references instead of lambdas. In real world scenarios you tend to split 
the complex logic to methods anyway, so the code is going to be clean.


The drawbacks are strong. The cohesion between predicates and handlers 
is lost. We have to define predicates in one place, and handlers in 
another. This opens the door for bugs:


- what if we forget to define a handler for a name? or a name for a handler?
- what if we misspell a name?
- what if we copy-paste and duplicate a name?

What Michael propose would have been totally OK if we had been writing 
the API in Lua, Ruby or Python. In those languages the "dynamic naming" 
approach would have looked most concise and beautiful. But in Java we 
expect all the problems related to identifiers to be eliminated in 
compile time.


Do we have to invent duck-typing for the Java API?

And if we do, what advantage are we supposed to get besides having all 
the branches in the same scope? Michael, maybe I'm missing your point?


---

Earlier in this discussion John Roesler also proposed to do without 
"start branching" operator, and later Paul mentioned that in the case 
when we have to add a dynamic number of branches, the current KIP is 
'clumsier' compared to Michael's 'Map' solution. Let me address both 
comments here.


1) "Start branching" operator (I think that *split* is a good name for 
it indeed) is critical when we need to do a dynamic branching, see 
example below.


2) No, dynamic branching in current KIP is not clumsy at all. Imagine a 
real-world scenario when you need one branch per enum value (say, 
RecordType). You can have something like this:


/*John:if we had to start with stream.branch(...) here, it would have 
been much messier.*/

KBranchedStream branched = stream.split();

/*Not clumsy at all :-)*/
for (RecordType recordType : RecordType.values())
    branched = branched.branch((k, v) -> v.getRecType() == 
recordType,

    recordType::processRecords);

Regards,

Ivan


02.05.2019 14:40, Matthias J. Sax пишет:

I also agree with Michael's observation about the core problem of
current `branch()` implementation.

However, I also don't like to pass in a clumsy Map object. My thinking
was more aligned with Paul's proposal to just add a name to each
`branch()` statement and return a `Map`.

It makes the code easier to read, and also make the order of
`Predicates` (that is essential) easier to grasp.


Map> branches = stream.split()
.branch("branchOne", Predicate)
.branch( "branchTwo", Predicate)
.defaultBranch("defaultBranch");

An open question is the case for which no defaultBranch() should be
specified. Atm, `split()` and `branch()` would return `BranchedKStream`
and the call to `defaultBranch()` that returns the `Map` is mandatory
(what is not the case atm). Or is this actually not a real problem,
because users can just ignore the branch returned by `defaultBranch()`
in the result `Map` ?


About "inlining": So far, it seems to be a matter of personal
preference. I can see arguments for both, but no "killer argument" yet
that clearly make the case for one or the other.


-Matthias

On 5/1/19 6:26 PM, Paul Whalen wrote:

Perhaps inlining is the wrong terminology. It doesn’t require that a lambda 
with the full downstream topology be defined inline - it can be a method 
reference as with Ivan’s original suggestion.  The advantage of putting the 
predicate and its downstream logic (Consumer) together in branch() is that they 
are required to be near to each other.

Ultimately the downstream code has to live somewhere, and deep branch trees 
will be hard to read regardless.


On May 1, 2019, at 1:07 PM, Michael Drogalis  
wrote:

I'm less enthusiastic about inlining the branch logic with its downstream
functionality. Programs that have deep branch trees will quickly become
harder to read as a single unit.


On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen  wrote:

Also +1 on the issues/goals as Michael outlined them, I think that sets a
great framework for the discussion.

Regarding the SortedMap solution, my understanding is that the current
proposal in the KIP is what is in my PR which (pending naming decisions) is
roughly this:

stream.split()
.branch(Predicate, Consumer>)
.branch(Predicate, Consumer>)
.defaultBranch(Consumer>);

Obviously some ordering is necessary, since branching as a construct
doesn't work without it

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-05-02 Thread Matthias J. Sax
I also agree with Michael's observation about the core problem of
current `branch()` implementation.

However, I also don't like to pass in a clumsy Map object. My thinking
was more aligned with Paul's proposal to just add a name to each
`branch()` statement and return a `Map`.

It makes the code easier to read, and also make the order of
`Predicates` (that is essential) easier to grasp.

 Map> branches = stream.split()
.branch("branchOne", Predicate)
.branch( "branchTwo", Predicate)
.defaultBranch("defaultBranch");

An open question is the case for which no defaultBranch() should be
specified. Atm, `split()` and `branch()` would return `BranchedKStream`
and the call to `defaultBranch()` that returns the `Map` is mandatory
(what is not the case atm). Or is this actually not a real problem,
because users can just ignore the branch returned by `defaultBranch()`
in the result `Map` ?


About "inlining": So far, it seems to be a matter of personal
preference. I can see arguments for both, but no "killer argument" yet
that clearly make the case for one or the other.


-Matthias

On 5/1/19 6:26 PM, Paul Whalen wrote:
> Perhaps inlining is the wrong terminology. It doesn’t require that a lambda 
> with the full downstream topology be defined inline - it can be a method 
> reference as with Ivan’s original suggestion.  The advantage of putting the 
> predicate and its downstream logic (Consumer) together in branch() is that 
> they are required to be near to each other. 
> 
> Ultimately the downstream code has to live somewhere, and deep branch trees 
> will be hard to read regardless.
> 
>> On May 1, 2019, at 1:07 PM, Michael Drogalis  
>> wrote:
>>
>> I'm less enthusiastic about inlining the branch logic with its downstream
>> functionality. Programs that have deep branch trees will quickly become
>> harder to read as a single unit.
>>
>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen  wrote:
>>>
>>> Also +1 on the issues/goals as Michael outlined them, I think that sets a
>>> great framework for the discussion.
>>>
>>> Regarding the SortedMap solution, my understanding is that the current
>>> proposal in the KIP is what is in my PR which (pending naming decisions) is
>>> roughly this:
>>>
>>> stream.split()
>>>.branch(Predicate, Consumer>)
>>>.branch(Predicate, Consumer>)
>>>.defaultBranch(Consumer>);
>>>
>>> Obviously some ordering is necessary, since branching as a construct
>>> doesn't work without it, but this solution seems like it provides as much
>>> associativity as the SortedMap solution, because each branch() call
>>> directly associates the "conditional" with the "code block."  The value it
>>> provides over the KIP solution is the accessing of streams in the same
>>> scope.
>>>
>>> The KIP solution is less "dynamic" than the SortedMap solution in the sense
>>> that it is slightly clumsier to add a dynamic number of branches, but it is
>>> certainly possible.  It seems to me like the API should favor the "static"
>>> case anyway, and should make it simple and readable to fluently declare and
>>> access your branches in-line.  It also makes it impossible to ignore a
>>> branch, and it is possible to build an (almost) identical SortedMap
>>> solution on top of it.
>>>
>>> I could also see a middle ground where instead of a raw SortedMap being
>>> taken in, branch() takes a name and not a Consumer.  Something like this:
>>>
>>> Map> branches = stream.split()
>>>.branch("branchOne", Predicate)
>>>.branch( "branchTwo", Predicate)
>>>.defaultBranch("defaultBranch", Consumer>);
>>>
>>> Pros for that solution:
>>> - accessing branched KStreams in same scope
>>> - no double brace initialization, hopefully slightly more readable than
>>> SortedMap
>>>
>>> Cons
>>> - downstream branch logic cannot be specified inline which makes it harder
>>> to read top to bottom (like existing API and SortedMap, but unlike the KIP)
>>> - you can forget to "handle" one of the branched streams (like existing
>>> API and SortedMap, but unlike the KIP)
>>>
>>> (KBranchedStreams could even work *both* ways but perhaps that's overdoing
>>> it).
>>>
>>> Overall I'm curious how important it is to be able to easily access the
>>> branched KStream in the same scope as the original.  It's possible that it
>>> doesn't need to be handled directly by the API, but instead left up to the
>>> user.  I'm sort of in the middle on it.
>>>
>>> Paul
>>>
>>>
>>>
>>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman 
>>> wrote:
>>>
 I'd like to +1 what Michael said about the issues with the existing
>>> branch
 method, I agree with what he's outlined and I think we should proceed by
 trying to alleviate these problems. Specifically it seems important to be
 able to cleanly access the individual branches (eg by mapping
 name->stream), which I thought was the original intention of this KIP.

 That said, I don't think we should so easily give in to the double brace
 ant

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-05-01 Thread Paul Whalen
Perhaps inlining is the wrong terminology. It doesn’t require that a lambda 
with the full downstream topology be defined inline - it can be a method 
reference as with Ivan’s original suggestion.  The advantage of putting the 
predicate and its downstream logic (Consumer) together in branch() is that they 
are required to be near to each other. 

Ultimately the downstream code has to live somewhere, and deep branch trees 
will be hard to read regardless.

> On May 1, 2019, at 1:07 PM, Michael Drogalis  
> wrote:
> 
> I'm less enthusiastic about inlining the branch logic with its downstream
> functionality. Programs that have deep branch trees will quickly become
> harder to read as a single unit.
> 
>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen  wrote:
>> 
>> Also +1 on the issues/goals as Michael outlined them, I think that sets a
>> great framework for the discussion.
>> 
>> Regarding the SortedMap solution, my understanding is that the current
>> proposal in the KIP is what is in my PR which (pending naming decisions) is
>> roughly this:
>> 
>> stream.split()
>>.branch(Predicate, Consumer>)
>>.branch(Predicate, Consumer>)
>>.defaultBranch(Consumer>);
>> 
>> Obviously some ordering is necessary, since branching as a construct
>> doesn't work without it, but this solution seems like it provides as much
>> associativity as the SortedMap solution, because each branch() call
>> directly associates the "conditional" with the "code block."  The value it
>> provides over the KIP solution is the accessing of streams in the same
>> scope.
>> 
>> The KIP solution is less "dynamic" than the SortedMap solution in the sense
>> that it is slightly clumsier to add a dynamic number of branches, but it is
>> certainly possible.  It seems to me like the API should favor the "static"
>> case anyway, and should make it simple and readable to fluently declare and
>> access your branches in-line.  It also makes it impossible to ignore a
>> branch, and it is possible to build an (almost) identical SortedMap
>> solution on top of it.
>> 
>> I could also see a middle ground where instead of a raw SortedMap being
>> taken in, branch() takes a name and not a Consumer.  Something like this:
>> 
>> Map> branches = stream.split()
>>.branch("branchOne", Predicate)
>>.branch( "branchTwo", Predicate)
>>.defaultBranch("defaultBranch", Consumer>);
>> 
>> Pros for that solution:
>> - accessing branched KStreams in same scope
>> - no double brace initialization, hopefully slightly more readable than
>> SortedMap
>> 
>> Cons
>> - downstream branch logic cannot be specified inline which makes it harder
>> to read top to bottom (like existing API and SortedMap, but unlike the KIP)
>> - you can forget to "handle" one of the branched streams (like existing
>> API and SortedMap, but unlike the KIP)
>> 
>> (KBranchedStreams could even work *both* ways but perhaps that's overdoing
>> it).
>> 
>> Overall I'm curious how important it is to be able to easily access the
>> branched KStream in the same scope as the original.  It's possible that it
>> doesn't need to be handled directly by the API, but instead left up to the
>> user.  I'm sort of in the middle on it.
>> 
>> Paul
>> 
>> 
>> 
>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman 
>> wrote:
>> 
>>> I'd like to +1 what Michael said about the issues with the existing
>> branch
>>> method, I agree with what he's outlined and I think we should proceed by
>>> trying to alleviate these problems. Specifically it seems important to be
>>> able to cleanly access the individual branches (eg by mapping
>>> name->stream), which I thought was the original intention of this KIP.
>>> 
>>> That said, I don't think we should so easily give in to the double brace
>>> anti-pattern or force ours users into it if at all possible to
>> avoid...just
>>> my two cents.
>>> 
>>> Cheers,
>>> Sophie
>>> 
>>> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
>>> michael.droga...@confluent.io> wrote:
>>> 
 I’d like to propose a different way of thinking about this. To me,
>> there
 are three problems with the existing branch signature:
 
 1. If you use it the way most people do, Java raises unsafe type
>>> warnings.
 2. The way in which you use the stream branches is positionally coupled
>>> to
 the ordering of the conditionals.
 3. It is brittle to extend existing branch calls with additional code
 paths.
 
 Using associative constructs instead of relying on ordered constructs
>>> would
 be a stronger approach. Consider a signature that instead looks like
>>> this:
 
 Map> KStream#branch(SortedMap>>> super K,? super V>>);
 
 Branches are given names in a map, and as a result, the API returns a
 mapping of names to streams. The ordering of the conditionals is
>>> maintained
 because it’s a sorted map. Insert order determines the order of
>>> evaluation.
 
 This solves problem 1 because there are no more varargs. It s

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-05-01 Thread Michael Drogalis
I'm less enthusiastic about inlining the branch logic with its downstream
functionality. Programs that have deep branch trees will quickly become
harder to read as a single unit.

On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen  wrote:

> Also +1 on the issues/goals as Michael outlined them, I think that sets a
> great framework for the discussion.
>
> Regarding the SortedMap solution, my understanding is that the current
> proposal in the KIP is what is in my PR which (pending naming decisions) is
> roughly this:
>
> stream.split()
> .branch(Predicate, Consumer>)
> .branch(Predicate, Consumer>)
> .defaultBranch(Consumer>);
>
> Obviously some ordering is necessary, since branching as a construct
> doesn't work without it, but this solution seems like it provides as much
> associativity as the SortedMap solution, because each branch() call
> directly associates the "conditional" with the "code block."  The value it
> provides over the KIP solution is the accessing of streams in the same
> scope.
>
> The KIP solution is less "dynamic" than the SortedMap solution in the sense
> that it is slightly clumsier to add a dynamic number of branches, but it is
> certainly possible.  It seems to me like the API should favor the "static"
> case anyway, and should make it simple and readable to fluently declare and
> access your branches in-line.  It also makes it impossible to ignore a
> branch, and it is possible to build an (almost) identical SortedMap
> solution on top of it.
>
> I could also see a middle ground where instead of a raw SortedMap being
> taken in, branch() takes a name and not a Consumer.  Something like this:
>
> Map> branches = stream.split()
> .branch("branchOne", Predicate)
> .branch( "branchTwo", Predicate)
> .defaultBranch("defaultBranch", Consumer>);
>
> Pros for that solution:
>  - accessing branched KStreams in same scope
>  - no double brace initialization, hopefully slightly more readable than
> SortedMap
>
> Cons
>  - downstream branch logic cannot be specified inline which makes it harder
> to read top to bottom (like existing API and SortedMap, but unlike the KIP)
>  - you can forget to "handle" one of the branched streams (like existing
> API and SortedMap, but unlike the KIP)
>
> (KBranchedStreams could even work *both* ways but perhaps that's overdoing
> it).
>
> Overall I'm curious how important it is to be able to easily access the
> branched KStream in the same scope as the original.  It's possible that it
> doesn't need to be handled directly by the API, but instead left up to the
> user.  I'm sort of in the middle on it.
>
> Paul
>
>
>
> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman 
> wrote:
>
> > I'd like to +1 what Michael said about the issues with the existing
> branch
> > method, I agree with what he's outlined and I think we should proceed by
> > trying to alleviate these problems. Specifically it seems important to be
> > able to cleanly access the individual branches (eg by mapping
> > name->stream), which I thought was the original intention of this KIP.
> >
> > That said, I don't think we should so easily give in to the double brace
> > anti-pattern or force ours users into it if at all possible to
> avoid...just
> > my two cents.
> >
> > Cheers,
> > Sophie
> >
> > On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
> > michael.droga...@confluent.io> wrote:
> >
> > > I’d like to propose a different way of thinking about this. To me,
> there
> > > are three problems with the existing branch signature:
> > >
> > > 1. If you use it the way most people do, Java raises unsafe type
> > warnings.
> > > 2. The way in which you use the stream branches is positionally coupled
> > to
> > > the ordering of the conditionals.
> > > 3. It is brittle to extend existing branch calls with additional code
> > > paths.
> > >
> > > Using associative constructs instead of relying on ordered constructs
> > would
> > > be a stronger approach. Consider a signature that instead looks like
> > this:
> > >
> > > Map> KStream#branch(SortedMap > > super K,? super V>>);
> > >
> > > Branches are given names in a map, and as a result, the API returns a
> > > mapping of names to streams. The ordering of the conditionals is
> > maintained
> > > because it’s a sorted map. Insert order determines the order of
> > evaluation.
> > >
> > > This solves problem 1 because there are no more varargs. It solves
> > problem
> > > 2 because you no longer lean on ordering to access the branch you’re
> > > interested in. It solves problem 3 because you can introduce another
> > > conditional by simply attaching another name to the structure, rather
> > than
> > > messing with the existing indices.
> > >
> > > One of the drawbacks is that creating the map inline is historically
> > > awkward in Java. I know it’s an anti-pattern to use voluminously, but
> > > double brace initialization would clean up the aesthetics.
> > >
> > > On Tue, Apr 30, 2019 at 9:10 AM John Roesler 
> wrote:
> > >
> > >

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-04-30 Thread Paul Whalen
Also +1 on the issues/goals as Michael outlined them, I think that sets a
great framework for the discussion.

Regarding the SortedMap solution, my understanding is that the current
proposal in the KIP is what is in my PR which (pending naming decisions) is
roughly this:

stream.split()
.branch(Predicate, Consumer>)
.branch(Predicate, Consumer>)
.defaultBranch(Consumer>);

Obviously some ordering is necessary, since branching as a construct
doesn't work without it, but this solution seems like it provides as much
associativity as the SortedMap solution, because each branch() call
directly associates the "conditional" with the "code block."  The value it
provides over the KIP solution is the accessing of streams in the same
scope.

The KIP solution is less "dynamic" than the SortedMap solution in the sense
that it is slightly clumsier to add a dynamic number of branches, but it is
certainly possible.  It seems to me like the API should favor the "static"
case anyway, and should make it simple and readable to fluently declare and
access your branches in-line.  It also makes it impossible to ignore a
branch, and it is possible to build an (almost) identical SortedMap
solution on top of it.

I could also see a middle ground where instead of a raw SortedMap being
taken in, branch() takes a name and not a Consumer.  Something like this:

Map> branches = stream.split()
.branch("branchOne", Predicate)
.branch( "branchTwo", Predicate)
.defaultBranch("defaultBranch", Consumer>);

Pros for that solution:
 - accessing branched KStreams in same scope
 - no double brace initialization, hopefully slightly more readable than
SortedMap

Cons
 - downstream branch logic cannot be specified inline which makes it harder
to read top to bottom (like existing API and SortedMap, but unlike the KIP)
 - you can forget to "handle" one of the branched streams (like existing
API and SortedMap, but unlike the KIP)

(KBranchedStreams could even work *both* ways but perhaps that's overdoing
it).

Overall I'm curious how important it is to be able to easily access the
branched KStream in the same scope as the original.  It's possible that it
doesn't need to be handled directly by the API, but instead left up to the
user.  I'm sort of in the middle on it.

Paul



On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman 
wrote:

> I'd like to +1 what Michael said about the issues with the existing branch
> method, I agree with what he's outlined and I think we should proceed by
> trying to alleviate these problems. Specifically it seems important to be
> able to cleanly access the individual branches (eg by mapping
> name->stream), which I thought was the original intention of this KIP.
>
> That said, I don't think we should so easily give in to the double brace
> anti-pattern or force ours users into it if at all possible to avoid...just
> my two cents.
>
> Cheers,
> Sophie
>
> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
> michael.droga...@confluent.io> wrote:
>
> > I’d like to propose a different way of thinking about this. To me, there
> > are three problems with the existing branch signature:
> >
> > 1. If you use it the way most people do, Java raises unsafe type
> warnings.
> > 2. The way in which you use the stream branches is positionally coupled
> to
> > the ordering of the conditionals.
> > 3. It is brittle to extend existing branch calls with additional code
> > paths.
> >
> > Using associative constructs instead of relying on ordered constructs
> would
> > be a stronger approach. Consider a signature that instead looks like
> this:
> >
> > Map> KStream#branch(SortedMap > super K,? super V>>);
> >
> > Branches are given names in a map, and as a result, the API returns a
> > mapping of names to streams. The ordering of the conditionals is
> maintained
> > because it’s a sorted map. Insert order determines the order of
> evaluation.
> >
> > This solves problem 1 because there are no more varargs. It solves
> problem
> > 2 because you no longer lean on ordering to access the branch you’re
> > interested in. It solves problem 3 because you can introduce another
> > conditional by simply attaching another name to the structure, rather
> than
> > messing with the existing indices.
> >
> > One of the drawbacks is that creating the map inline is historically
> > awkward in Java. I know it’s an anti-pattern to use voluminously, but
> > double brace initialization would clean up the aesthetics.
> >
> > On Tue, Apr 30, 2019 at 9:10 AM John Roesler  wrote:
> >
> > > Hi Ivan,
> > >
> > > Thanks for the update.
> > >
> > > FWIW, I agree with Matthias that the current "start branching" operator
> > is
> > > confusing when named the same way as the actual branches. "Split" seems
> > > like a good name. Alternatively, we can do without a "start branching"
> > > operator at all, and just do:
> > >
> > > stream
> > >   .branch(Predicate)
> > >   .branch(Predicate)
> > >   .defaultBranch();
> > >
> > > Tentativ

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-04-30 Thread Sophie Blee-Goldman
I'd like to +1 what Michael said about the issues with the existing branch
method, I agree with what he's outlined and I think we should proceed by
trying to alleviate these problems. Specifically it seems important to be
able to cleanly access the individual branches (eg by mapping
name->stream), which I thought was the original intention of this KIP.

That said, I don't think we should so easily give in to the double brace
anti-pattern or force ours users into it if at all possible to avoid...just
my two cents.

Cheers,
Sophie

On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
michael.droga...@confluent.io> wrote:

> I’d like to propose a different way of thinking about this. To me, there
> are three problems with the existing branch signature:
>
> 1. If you use it the way most people do, Java raises unsafe type warnings.
> 2. The way in which you use the stream branches is positionally coupled to
> the ordering of the conditionals.
> 3. It is brittle to extend existing branch calls with additional code
> paths.
>
> Using associative constructs instead of relying on ordered constructs would
> be a stronger approach. Consider a signature that instead looks like this:
>
> Map> KStream#branch(SortedMap super K,? super V>>);
>
> Branches are given names in a map, and as a result, the API returns a
> mapping of names to streams. The ordering of the conditionals is maintained
> because it’s a sorted map. Insert order determines the order of evaluation.
>
> This solves problem 1 because there are no more varargs. It solves problem
> 2 because you no longer lean on ordering to access the branch you’re
> interested in. It solves problem 3 because you can introduce another
> conditional by simply attaching another name to the structure, rather than
> messing with the existing indices.
>
> One of the drawbacks is that creating the map inline is historically
> awkward in Java. I know it’s an anti-pattern to use voluminously, but
> double brace initialization would clean up the aesthetics.
>
> On Tue, Apr 30, 2019 at 9:10 AM John Roesler  wrote:
>
> > Hi Ivan,
> >
> > Thanks for the update.
> >
> > FWIW, I agree with Matthias that the current "start branching" operator
> is
> > confusing when named the same way as the actual branches. "Split" seems
> > like a good name. Alternatively, we can do without a "start branching"
> > operator at all, and just do:
> >
> > stream
> >   .branch(Predicate)
> >   .branch(Predicate)
> >   .defaultBranch();
> >
> > Tentatively, I think that this branching operation should be terminal.
> That
> > way, we don't create ambiguity about how to use it. That is, `branch`
> > should return `KBranchedStream`, while `defaultBranch` is `void`, to
> > enforce that it comes last, and that there is only one definition of the
> > default branch. Potentially, we should log a warning if there's no
> default,
> > and additionally log a warning (or throw an exception) if a record falls
> > though with no default.
> >
> > Thoughts?
> >
> > Thanks,
> > -John
> >
> > On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax 
> > wrote:
> >
> > > Thanks for updating the KIP and your answers.
> > >
> > >
> > > >  this is to make the name similar to String#split
> > > >> that also returns an array, right?
> > >
> > > The intend was to avoid name duplication. The return type should _not_
> > > be an array.
> > >
> > > The current proposal is
> > >
> > > stream.branch()
> > >   .branch(Predicate)
> > >   .branch(Predicate)
> > >   .defaultBranch();
> > >
> > > IMHO, this reads a little odd, because the first `branch()` does not
> > > take any parameters and has different semantics than the later
> > > `branch()` calls. Note, that from the code snippet above, it's hidden
> > > that the first call is `KStream#branch()` while the others are
> > > `KBranchedStream#branch()` what makes reading the code harder.
> > >
> > > Because I suggested to rename `addBranch()` -> `branch()`, I though it
> > > might be better to also rename `KStream#branch()` to avoid the naming
> > > overlap that seems to be confusing. The following reads much cleaner to
> > me:
> > >
> > > stream.split()
> > >   .branch(Predicate)
> > >   .branch(Predicate)
> > >   .defaultBranch();
> > >
> > > Maybe there is a better alternative to `split()` though to avoid the
> > > naming overlap.
> > >
> > >
> > > > 'default' is, however, a reserved word, so unfortunately we cannot
> have
> > > a method with such name :-)
> > >
> > > Bummer. Didn't consider this. Maybe we can still come up with a short
> > name?
> > >
> > >
> > > Can you add the interface `KBranchedStream` to the KIP with all it's
> > > methods? It will be part of public API and should be contained in the
> > > KIP. For example, it's unclear atm, what the return type of
> > > `defaultBranch()` is.
> > >
> > >
> > > You did not comment on the idea to add a `KBranchedStream#get(int
> index)
> > > -> KStream` method to get the individually branched-KStreams. Wou

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-04-30 Thread Michael Drogalis
I’d like to propose a different way of thinking about this. To me, there
are three problems with the existing branch signature:

1. If you use it the way most people do, Java raises unsafe type warnings.
2. The way in which you use the stream branches is positionally coupled to
the ordering of the conditionals.
3. It is brittle to extend existing branch calls with additional code paths.

Using associative constructs instead of relying on ordered constructs would
be a stronger approach. Consider a signature that instead looks like this:

Map> KStream#branch(SortedMap>);

Branches are given names in a map, and as a result, the API returns a
mapping of names to streams. The ordering of the conditionals is maintained
because it’s a sorted map. Insert order determines the order of evaluation.

This solves problem 1 because there are no more varargs. It solves problem
2 because you no longer lean on ordering to access the branch you’re
interested in. It solves problem 3 because you can introduce another
conditional by simply attaching another name to the structure, rather than
messing with the existing indices.

One of the drawbacks is that creating the map inline is historically
awkward in Java. I know it’s an anti-pattern to use voluminously, but
double brace initialization would clean up the aesthetics.

On Tue, Apr 30, 2019 at 9:10 AM John Roesler  wrote:

> Hi Ivan,
>
> Thanks for the update.
>
> FWIW, I agree with Matthias that the current "start branching" operator is
> confusing when named the same way as the actual branches. "Split" seems
> like a good name. Alternatively, we can do without a "start branching"
> operator at all, and just do:
>
> stream
>   .branch(Predicate)
>   .branch(Predicate)
>   .defaultBranch();
>
> Tentatively, I think that this branching operation should be terminal. That
> way, we don't create ambiguity about how to use it. That is, `branch`
> should return `KBranchedStream`, while `defaultBranch` is `void`, to
> enforce that it comes last, and that there is only one definition of the
> default branch. Potentially, we should log a warning if there's no default,
> and additionally log a warning (or throw an exception) if a record falls
> though with no default.
>
> Thoughts?
>
> Thanks,
> -John
>
> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax 
> wrote:
>
> > Thanks for updating the KIP and your answers.
> >
> >
> > >  this is to make the name similar to String#split
> > >> that also returns an array, right?
> >
> > The intend was to avoid name duplication. The return type should _not_
> > be an array.
> >
> > The current proposal is
> >
> > stream.branch()
> >   .branch(Predicate)
> >   .branch(Predicate)
> >   .defaultBranch();
> >
> > IMHO, this reads a little odd, because the first `branch()` does not
> > take any parameters and has different semantics than the later
> > `branch()` calls. Note, that from the code snippet above, it's hidden
> > that the first call is `KStream#branch()` while the others are
> > `KBranchedStream#branch()` what makes reading the code harder.
> >
> > Because I suggested to rename `addBranch()` -> `branch()`, I though it
> > might be better to also rename `KStream#branch()` to avoid the naming
> > overlap that seems to be confusing. The following reads much cleaner to
> me:
> >
> > stream.split()
> >   .branch(Predicate)
> >   .branch(Predicate)
> >   .defaultBranch();
> >
> > Maybe there is a better alternative to `split()` though to avoid the
> > naming overlap.
> >
> >
> > > 'default' is, however, a reserved word, so unfortunately we cannot have
> > a method with such name :-)
> >
> > Bummer. Didn't consider this. Maybe we can still come up with a short
> name?
> >
> >
> > Can you add the interface `KBranchedStream` to the KIP with all it's
> > methods? It will be part of public API and should be contained in the
> > KIP. For example, it's unclear atm, what the return type of
> > `defaultBranch()` is.
> >
> >
> > You did not comment on the idea to add a `KBranchedStream#get(int index)
> > -> KStream` method to get the individually branched-KStreams. Would be
> > nice to get your feedback about it. It seems you suggest that users
> > would need to write custom utility code otherwise, to access them. We
> > should discuss the pros and cons of both approaches. It feels
> > "incomplete" to me atm, if the API has no built-in support to get the
> > branched-KStreams directly.
> >
> >
> >
> > -Matthias
> >
> >
> > On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
> > > Hi all!
> > >
> > > I have updated the KIP-418 according to the new vision.
> > >
> > > Matthias, thanks for your comment!
> > >
> > >> Renaming KStream#branch() -> #split()
> > >
> > > I can see your point: this is to make the name similar to String#split
> > > that also returns an array, right? But is it worth the loss of
> backwards
> > > compatibility? We can have overloaded branch() as well without
> affecting
> > > the existing code. Maybe the old ar

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-04-30 Thread John Roesler
Hi Ivan,

Thanks for the update.

FWIW, I agree with Matthias that the current "start branching" operator is
confusing when named the same way as the actual branches. "Split" seems
like a good name. Alternatively, we can do without a "start branching"
operator at all, and just do:

stream
  .branch(Predicate)
  .branch(Predicate)
  .defaultBranch();

Tentatively, I think that this branching operation should be terminal. That
way, we don't create ambiguity about how to use it. That is, `branch`
should return `KBranchedStream`, while `defaultBranch` is `void`, to
enforce that it comes last, and that there is only one definition of the
default branch. Potentially, we should log a warning if there's no default,
and additionally log a warning (or throw an exception) if a record falls
though with no default.

Thoughts?

Thanks,
-John

On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax 
wrote:

> Thanks for updating the KIP and your answers.
>
>
> >  this is to make the name similar to String#split
> >> that also returns an array, right?
>
> The intend was to avoid name duplication. The return type should _not_
> be an array.
>
> The current proposal is
>
> stream.branch()
>   .branch(Predicate)
>   .branch(Predicate)
>   .defaultBranch();
>
> IMHO, this reads a little odd, because the first `branch()` does not
> take any parameters and has different semantics than the later
> `branch()` calls. Note, that from the code snippet above, it's hidden
> that the first call is `KStream#branch()` while the others are
> `KBranchedStream#branch()` what makes reading the code harder.
>
> Because I suggested to rename `addBranch()` -> `branch()`, I though it
> might be better to also rename `KStream#branch()` to avoid the naming
> overlap that seems to be confusing. The following reads much cleaner to me:
>
> stream.split()
>   .branch(Predicate)
>   .branch(Predicate)
>   .defaultBranch();
>
> Maybe there is a better alternative to `split()` though to avoid the
> naming overlap.
>
>
> > 'default' is, however, a reserved word, so unfortunately we cannot have
> a method with such name :-)
>
> Bummer. Didn't consider this. Maybe we can still come up with a short name?
>
>
> Can you add the interface `KBranchedStream` to the KIP with all it's
> methods? It will be part of public API and should be contained in the
> KIP. For example, it's unclear atm, what the return type of
> `defaultBranch()` is.
>
>
> You did not comment on the idea to add a `KBranchedStream#get(int index)
> -> KStream` method to get the individually branched-KStreams. Would be
> nice to get your feedback about it. It seems you suggest that users
> would need to write custom utility code otherwise, to access them. We
> should discuss the pros and cons of both approaches. It feels
> "incomplete" to me atm, if the API has no built-in support to get the
> branched-KStreams directly.
>
>
>
> -Matthias
>
>
> On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
> > Hi all!
> >
> > I have updated the KIP-418 according to the new vision.
> >
> > Matthias, thanks for your comment!
> >
> >> Renaming KStream#branch() -> #split()
> >
> > I can see your point: this is to make the name similar to String#split
> > that also returns an array, right? But is it worth the loss of backwards
> > compatibility? We can have overloaded branch() as well without affecting
> > the existing code. Maybe the old array-based `branch` method should be
> > deprecated, but this is a subject for discussion.
> >
> >> Renaming KBranchedStream#addBranch() -> BranchingKStream#branch(),
> > KBranchedStream#defaultBranch() -> BranchingKStream#default()
> >
> > Totally agree with 'addBranch->branch' rename. 'default' is, however, a
> > reserved word, so unfortunately we cannot have a method with such name
> :-)
> >
> >> defaultBranch() does take an `Predicate` as argument, but I think that
> > is not required?
> >
> > Absolutely! I think that was just copy-paste error or something.
> >
> > Dear colleagues,
> >
> > please revise the new version of the KIP and Paul's PR
> > (https://github.com/apache/kafka/pull/6512)
> >
> > Any new suggestions/objections?
> >
> > Regards,
> >
> > Ivan
> >
> >
> > 11.04.2019 11:47, Matthias J. Sax пишет:
> >> Thanks for driving the discussion of this KIP. It seems that everybody
> >> agrees that the current branch() method using arrays is not optimal.
> >>
> >> I had a quick look into the PR and I like the overall proposal. There
> >> are some minor things we need to consider. I would recommend the
> >> following renaming:
> >>
> >> KStream#branch() -> #split()
> >> KBranchedStream#addBranch() -> BranchingKStream#branch()
> >> KBranchedStream#defaultBranch() -> BranchingKStream#default()
> >>
> >> It's just a suggestion to get slightly shorter method names.
> >>
> >> In the current PR, defaultBranch() does take an `Predicate` as argument,
> >> but I think that is not required?
> >>
> >> Also, we should consider KIP-307, that was recently accepted 

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-04-26 Thread Matthias J. Sax
Thanks for updating the KIP and your answers.


>  this is to make the name similar to String#split
>> that also returns an array, right?

The intend was to avoid name duplication. The return type should _not_
be an array.

The current proposal is

stream.branch()
  .branch(Predicate)
  .branch(Predicate)
  .defaultBranch();

IMHO, this reads a little odd, because the first `branch()` does not
take any parameters and has different semantics than the later
`branch()` calls. Note, that from the code snippet above, it's hidden
that the first call is `KStream#branch()` while the others are
`KBranchedStream#branch()` what makes reading the code harder.

Because I suggested to rename `addBranch()` -> `branch()`, I though it
might be better to also rename `KStream#branch()` to avoid the naming
overlap that seems to be confusing. The following reads much cleaner to me:

stream.split()
  .branch(Predicate)
  .branch(Predicate)
  .defaultBranch();

Maybe there is a better alternative to `split()` though to avoid the
naming overlap.


> 'default' is, however, a reserved word, so unfortunately we cannot have a 
> method with such name :-)

Bummer. Didn't consider this. Maybe we can still come up with a short name?


Can you add the interface `KBranchedStream` to the KIP with all it's
methods? It will be part of public API and should be contained in the
KIP. For example, it's unclear atm, what the return type of
`defaultBranch()` is.


You did not comment on the idea to add a `KBranchedStream#get(int index)
-> KStream` method to get the individually branched-KStreams. Would be
nice to get your feedback about it. It seems you suggest that users
would need to write custom utility code otherwise, to access them. We
should discuss the pros and cons of both approaches. It feels
"incomplete" to me atm, if the API has no built-in support to get the
branched-KStreams directly.



-Matthias


On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
> Hi all!
> 
> I have updated the KIP-418 according to the new vision.
> 
> Matthias, thanks for your comment!
> 
>> Renaming KStream#branch() -> #split()
> 
> I can see your point: this is to make the name similar to String#split
> that also returns an array, right? But is it worth the loss of backwards
> compatibility? We can have overloaded branch() as well without affecting
> the existing code. Maybe the old array-based `branch` method should be
> deprecated, but this is a subject for discussion.
> 
>> Renaming KBranchedStream#addBranch() -> BranchingKStream#branch(),
> KBranchedStream#defaultBranch() -> BranchingKStream#default()
> 
> Totally agree with 'addBranch->branch' rename. 'default' is, however, a
> reserved word, so unfortunately we cannot have a method with such name :-)
> 
>> defaultBranch() does take an `Predicate` as argument, but I think that
> is not required?
> 
> Absolutely! I think that was just copy-paste error or something.
> 
> Dear colleagues,
> 
> please revise the new version of the KIP and Paul's PR
> (https://github.com/apache/kafka/pull/6512)
> 
> Any new suggestions/objections?
> 
> Regards,
> 
> Ivan
> 
> 
> 11.04.2019 11:47, Matthias J. Sax пишет:
>> Thanks for driving the discussion of this KIP. It seems that everybody
>> agrees that the current branch() method using arrays is not optimal.
>>
>> I had a quick look into the PR and I like the overall proposal. There
>> are some minor things we need to consider. I would recommend the
>> following renaming:
>>
>> KStream#branch() -> #split()
>> KBranchedStream#addBranch() -> BranchingKStream#branch()
>> KBranchedStream#defaultBranch() -> BranchingKStream#default()
>>
>> It's just a suggestion to get slightly shorter method names.
>>
>> In the current PR, defaultBranch() does take an `Predicate` as argument,
>> but I think that is not required?
>>
>> Also, we should consider KIP-307, that was recently accepted and is
>> currently implemented:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
>>
>> Ie, we should add overloads that accepted a `Named` parameter.
>>
>>
>> For the issue that the created `KStream` object are in different scopes:
>> could we extend `KBranchedStream` with a `get(int index)` method that
>> returns the corresponding "branched" result `KStream` object? Maybe, the
>> second argument of `addBranch()` should not be a `Consumer` but
>> a `Function` and `get()` could return whatever the
>> `Function` returns?
>>
>>
>> Finally, I would also suggest to update the KIP with the current
>> proposal. That makes it easier to review.
>>
>>
>> -Matthias
>>
>>
>>
>> On 3/31/19 12:22 PM, Paul Whalen wrote:
>>> Ivan,
>>>
>>> I'm a bit of a novice here as well, but I think it makes sense for you to
>>> revise the KIP and continue the discussion.  Obviously we'll need some
>>> buy-in from committers that have actual binding votes on whether the KIP
>>> could be adopted.  It would be great to hear if they think this is a 

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-04-13 Thread Ivan Ponomarev

Hi all!

I have updated the KIP-418 according to the new vision.

Matthias, thanks for your comment!


Renaming KStream#branch() -> #split()


I can see your point: this is to make the name similar to String#split 
that also returns an array, right? But is it worth the loss of backwards 
compatibility? We can have overloaded branch() as well without affecting 
the existing code. Maybe the old array-based `branch` method should be 
deprecated, but this is a subject for discussion.


> Renaming KBranchedStream#addBranch() -> BranchingKStream#branch(), 
KBranchedStream#defaultBranch() -> BranchingKStream#default()


Totally agree with 'addBranch->branch' rename. 'default' is, however, a 
reserved word, so unfortunately we cannot have a method with such name :-)


> defaultBranch() does take an `Predicate` as argument, but I think 
that is not required?


Absolutely! I think that was just copy-paste error or something.

Dear colleagues,

please revise the new version of the KIP and Paul's PR 
(https://github.com/apache/kafka/pull/6512)


Any new suggestions/objections?

Regards,

Ivan


11.04.2019 11:47, Matthias J. Sax пишет:

Thanks for driving the discussion of this KIP. It seems that everybody
agrees that the current branch() method using arrays is not optimal.

I had a quick look into the PR and I like the overall proposal. There
are some minor things we need to consider. I would recommend the
following renaming:

KStream#branch() -> #split()
KBranchedStream#addBranch() -> BranchingKStream#branch()
KBranchedStream#defaultBranch() -> BranchingKStream#default()

It's just a suggestion to get slightly shorter method names.

In the current PR, defaultBranch() does take an `Predicate` as argument,
but I think that is not required?

Also, we should consider KIP-307, that was recently accepted and is
currently implemented:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL

Ie, we should add overloads that accepted a `Named` parameter.


For the issue that the created `KStream` object are in different scopes:
could we extend `KBranchedStream` with a `get(int index)` method that
returns the corresponding "branched" result `KStream` object? Maybe, the
second argument of `addBranch()` should not be a `Consumer` but
a `Function` and `get()` could return whatever the
`Function` returns?


Finally, I would also suggest to update the KIP with the current
proposal. That makes it easier to review.


-Matthias



On 3/31/19 12:22 PM, Paul Whalen wrote:

Ivan,

I'm a bit of a novice here as well, but I think it makes sense for you to
revise the KIP and continue the discussion.  Obviously we'll need some
buy-in from committers that have actual binding votes on whether the KIP
could be adopted.  It would be great to hear if they think this is a good
idea overall.  I'm not sure if that happens just by starting a vote, or if
there is generally some indication of interest beforehand.

That being said, I'll continue the discussion a bit: assuming we do move
forward the solution of "stream.branch() returns KBranchedStream", do we
deprecate "stream.branch(...) returns KStream[]"?  I would favor
deprecating, since having two mutually exclusive APIs that accomplish the
same thing is confusing, especially when they're fairly similar anyway.  We
just need to be sure we're not making something impossible/difficult that
is currently possible/easy.

Regarding my PR - I think the general structure would work, it's just a
little sloppy overall in terms of naming and clarity. In particular,
passing in the "predicates" and "children" lists which get modified in
KBranchedStream but read from all the way KStreamLazyBranch is a bit
complicated to follow.

Thanks,
Paul

On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev  wrote:


Hi Paul!

I read your code carefully and now I am fully convinced: your proposal
looks better and should work. We just have to document the crucial fact
that KStream consumers are invoked as they're added. And then it's all
going to be very nice.

What shall we do now? I should re-write the KIP and resume the
discussion here, right?

Why are you telling that your PR 'should not be even a starting point if
we go in this direction'? To me it looks like a good starting point. But
as a novice in this project I might miss some important details.

Regards,

Ivan


28.03.2019 17:38, Paul Whalen пишет:

Ivan,

Maybe I’m missing the point, but I believe the stream.branch() solution

supports this. The couponIssuer::set* consumers will be invoked as they’re
added, not during streamsBuilder.build(). So the user still ought to be
able to call couponIssuer.coupons() afterward and depend on the branched
streams having been set.

The issue I mean to point out is that it is hard to access the branched

streams in the same scope as the original stream (that is, not inside the
couponIssuer), which is a problem with both proposed solutions. It can be
worked around though.

[A

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-04-11 Thread Matthias J. Sax
Thanks for driving the discussion of this KIP. It seems that everybody
agrees that the current branch() method using arrays is not optimal.

I had a quick look into the PR and I like the overall proposal. There
are some minor things we need to consider. I would recommend the
following renaming:

KStream#branch() -> #split()
KBranchedStream#addBranch() -> BranchingKStream#branch()
KBranchedStream#defaultBranch() -> BranchingKStream#default()

It's just a suggestion to get slightly shorter method names.

In the current PR, defaultBranch() does take an `Predicate` as argument,
but I think that is not required?

Also, we should consider KIP-307, that was recently accepted and is
currently implemented:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL

Ie, we should add overloads that accepted a `Named` parameter.


For the issue that the created `KStream` object are in different scopes:
could we extend `KBranchedStream` with a `get(int index)` method that
returns the corresponding "branched" result `KStream` object? Maybe, the
second argument of `addBranch()` should not be a `Consumer` but
a `Function` and `get()` could return whatever the
`Function` returns?


Finally, I would also suggest to update the KIP with the current
proposal. That makes it easier to review.


-Matthias



On 3/31/19 12:22 PM, Paul Whalen wrote:
> Ivan,
> 
> I'm a bit of a novice here as well, but I think it makes sense for you to
> revise the KIP and continue the discussion.  Obviously we'll need some
> buy-in from committers that have actual binding votes on whether the KIP
> could be adopted.  It would be great to hear if they think this is a good
> idea overall.  I'm not sure if that happens just by starting a vote, or if
> there is generally some indication of interest beforehand.
> 
> That being said, I'll continue the discussion a bit: assuming we do move
> forward the solution of "stream.branch() returns KBranchedStream", do we
> deprecate "stream.branch(...) returns KStream[]"?  I would favor
> deprecating, since having two mutually exclusive APIs that accomplish the
> same thing is confusing, especially when they're fairly similar anyway.  We
> just need to be sure we're not making something impossible/difficult that
> is currently possible/easy.
> 
> Regarding my PR - I think the general structure would work, it's just a
> little sloppy overall in terms of naming and clarity. In particular,
> passing in the "predicates" and "children" lists which get modified in
> KBranchedStream but read from all the way KStreamLazyBranch is a bit
> complicated to follow.
> 
> Thanks,
> Paul
> 
> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev  wrote:
> 
>> Hi Paul!
>>
>> I read your code carefully and now I am fully convinced: your proposal
>> looks better and should work. We just have to document the crucial fact
>> that KStream consumers are invoked as they're added. And then it's all
>> going to be very nice.
>>
>> What shall we do now? I should re-write the KIP and resume the
>> discussion here, right?
>>
>> Why are you telling that your PR 'should not be even a starting point if
>> we go in this direction'? To me it looks like a good starting point. But
>> as a novice in this project I might miss some important details.
>>
>> Regards,
>>
>> Ivan
>>
>>
>> 28.03.2019 17:38, Paul Whalen пишет:
>>> Ivan,
>>>
>>> Maybe I’m missing the point, but I believe the stream.branch() solution
>> supports this. The couponIssuer::set* consumers will be invoked as they’re
>> added, not during streamsBuilder.build(). So the user still ought to be
>> able to call couponIssuer.coupons() afterward and depend on the branched
>> streams having been set.
>>>
>>> The issue I mean to point out is that it is hard to access the branched
>> streams in the same scope as the original stream (that is, not inside the
>> couponIssuer), which is a problem with both proposed solutions. It can be
>> worked around though.
>>>
>>> [Also, great to hear additional interest in 401, I’m excited to hear
>> your thoughts!]
>>>
>>> Paul
>>>
 On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev  wrote:

 Hi Paul!

 The idea to postpone the wiring of branches to the
>> streamsBuilder.build() also looked great for me at first glance, but ---

> the newly branched streams are not available in the same scope as each
>> other.  That is, if we wanted to merge them back together again I don't see
>> a way to do that.

 You just took the words right out of my mouth, I was just going to
>> write in details about this issue.

 Consider the example from Bill's book, p. 101: say we need to identify
>> customers who have bought coffee and made a purchase in the electronics
>> store to give them coupons.

 This is the code I usually write under these circumstances using my
>> 'brancher' class:

 @Setter
 class CouponIssuer{
private KStream<> coffePurchases;
 

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-31 Thread Paul Whalen
Ivan,

I'm a bit of a novice here as well, but I think it makes sense for you to
revise the KIP and continue the discussion.  Obviously we'll need some
buy-in from committers that have actual binding votes on whether the KIP
could be adopted.  It would be great to hear if they think this is a good
idea overall.  I'm not sure if that happens just by starting a vote, or if
there is generally some indication of interest beforehand.

That being said, I'll continue the discussion a bit: assuming we do move
forward the solution of "stream.branch() returns KBranchedStream", do we
deprecate "stream.branch(...) returns KStream[]"?  I would favor
deprecating, since having two mutually exclusive APIs that accomplish the
same thing is confusing, especially when they're fairly similar anyway.  We
just need to be sure we're not making something impossible/difficult that
is currently possible/easy.

Regarding my PR - I think the general structure would work, it's just a
little sloppy overall in terms of naming and clarity. In particular,
passing in the "predicates" and "children" lists which get modified in
KBranchedStream but read from all the way KStreamLazyBranch is a bit
complicated to follow.

Thanks,
Paul

On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev  wrote:

> Hi Paul!
>
> I read your code carefully and now I am fully convinced: your proposal
> looks better and should work. We just have to document the crucial fact
> that KStream consumers are invoked as they're added. And then it's all
> going to be very nice.
>
> What shall we do now? I should re-write the KIP and resume the
> discussion here, right?
>
> Why are you telling that your PR 'should not be even a starting point if
> we go in this direction'? To me it looks like a good starting point. But
> as a novice in this project I might miss some important details.
>
> Regards,
>
> Ivan
>
>
> 28.03.2019 17:38, Paul Whalen пишет:
> > Ivan,
> >
> > Maybe I’m missing the point, but I believe the stream.branch() solution
> supports this. The couponIssuer::set* consumers will be invoked as they’re
> added, not during streamsBuilder.build(). So the user still ought to be
> able to call couponIssuer.coupons() afterward and depend on the branched
> streams having been set.
> >
> > The issue I mean to point out is that it is hard to access the branched
> streams in the same scope as the original stream (that is, not inside the
> couponIssuer), which is a problem with both proposed solutions. It can be
> worked around though.
> >
> > [Also, great to hear additional interest in 401, I’m excited to hear
> your thoughts!]
> >
> > Paul
> >
> >> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev  wrote:
> >>
> >> Hi Paul!
> >>
> >> The idea to postpone the wiring of branches to the
> streamsBuilder.build() also looked great for me at first glance, but ---
> >>
> >>> the newly branched streams are not available in the same scope as each
> other.  That is, if we wanted to merge them back together again I don't see
> a way to do that.
> >>
> >> You just took the words right out of my mouth, I was just going to
> write in details about this issue.
> >>
> >> Consider the example from Bill's book, p. 101: say we need to identify
> customers who have bought coffee and made a purchase in the electronics
> store to give them coupons.
> >>
> >> This is the code I usually write under these circumstances using my
> 'brancher' class:
> >>
> >> @Setter
> >> class CouponIssuer{
> >>private KStream<> coffePurchases;
> >>private KStream<> electronicsPurchases;
> >>
> >>KStream<...> coupons(){
> >>return coffePurchases.join(electronicsPurchases...)...whatever
> >>
> >>/*In the real world the code here can be complex, so creation of
> a separate CouponIssuer class is fully justified, in order to separate
> classes' responsibilities.*/
> >>
> >>   }
> >> }
> >>
> >> CouponIssuer couponIssuer = new CouponIssuer();
> >>
> >> new KafkaStreamsBrancher<>()
> >>  .branch(predicate1, couponIssuer::setCoffePurchases)
> >>  .branch(predicate2, couponIssuer::setElectronicsPurchases)
> >>  .onTopOf(transactionStream);
> >>
> >> /*Alas, this won't work if we're going to wire up everything later,
> without the terminal operation!!!*/
> >> couponIssuer.coupons()...
> >>
> >> Does this make sense?  In order to properly initialize the CouponIssuer
> we need the terminal operation to be called before streamsBuilder.build()
> is called.
> >>
> >>
> >> [BTW Paul, I just found out that your KIP-401 is essentially the next
> KIP I was going to write here. I have some thoughts based on my experience,
> so I will join the discussion on KIP-401 soon.]
> >>
> >> Regards,
> >>
> >> Ivan
> >>
> >> 28.03.2019 6:29, Paul Whalen пишет:
> >>> Ivan,
> >>> I tried to make a very rough proof of concept of a fluent API based
> off of
> >>> KStream here (https://github.com/apache/kafka/pull/6512), and I think
> I
> >>> succeeded at removing both cons.
> >>> - Compatibility: I was i

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-29 Thread Ivan Ponomarev

Hi Paul!

I read your code carefully and now I am fully convinced: your proposal 
looks better and should work. We just have to document the crucial fact 
that KStream consumers are invoked as they're added. And then it's all 
going to be very nice.


What shall we do now? I should re-write the KIP and resume the 
discussion here, right?


Why are you telling that your PR 'should not be even a starting point if 
we go in this direction'? To me it looks like a good starting point. But 
as a novice in this project I might miss some important details.


Regards,

Ivan


28.03.2019 17:38, Paul Whalen пишет:

Ivan,

Maybe I’m missing the point, but I believe the stream.branch() solution 
supports this. The couponIssuer::set* consumers will be invoked as they’re 
added, not during streamsBuilder.build(). So the user still ought to be able to 
call couponIssuer.coupons() afterward and depend on the branched streams having 
been set.

The issue I mean to point out is that it is hard to access the branched streams 
in the same scope as the original stream (that is, not inside the 
couponIssuer), which is a problem with both proposed solutions. It can be 
worked around though.

[Also, great to hear additional interest in 401, I’m excited to hear your 
thoughts!]

Paul


On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev  wrote:

Hi Paul!

The idea to postpone the wiring of branches to the streamsBuilder.build() also 
looked great for me at first glance, but ---


the newly branched streams are not available in the same scope as each other.  
That is, if we wanted to merge them back together again I don't see a way to do 
that.


You just took the words right out of my mouth, I was just going to write in 
details about this issue.

Consider the example from Bill's book, p. 101: say we need to identify 
customers who have bought coffee and made a purchase in the electronics store 
to give them coupons.

This is the code I usually write under these circumstances using my 'brancher' 
class:

@Setter
class CouponIssuer{
   private KStream<> coffePurchases;
   private KStream<> electronicsPurchases;

   KStream<...> coupons(){
   return coffePurchases.join(electronicsPurchases...)...whatever

   /*In the real world the code here can be complex, so creation of a 
separate CouponIssuer class is fully justified, in order to separate classes' 
responsibilities.*/

  }
}

CouponIssuer couponIssuer = new CouponIssuer();

new KafkaStreamsBrancher<>()
 .branch(predicate1, couponIssuer::setCoffePurchases)
 .branch(predicate2, couponIssuer::setElectronicsPurchases)
 .onTopOf(transactionStream);

/*Alas, this won't work if we're going to wire up everything later, without the 
terminal operation!!!*/
couponIssuer.coupons()...

Does this make sense?  In order to properly initialize the CouponIssuer we need 
the terminal operation to be called before streamsBuilder.build() is called.


[BTW Paul, I just found out that your KIP-401 is essentially the next KIP I was 
going to write here. I have some thoughts based on my experience, so I will 
join the discussion on KIP-401 soon.]

Regards,

Ivan

28.03.2019 6:29, Paul Whalen пишет:

Ivan,
I tried to make a very rough proof of concept of a fluent API based off of
KStream here (https://github.com/apache/kafka/pull/6512), and I think I
succeeded at removing both cons.
- Compatibility: I was incorrect earlier about compatibility issues,
there aren't any direct ones.  I was unaware that Java is smart enough to
distinguish between a branch(varargs...) returning one thing and branch()
with no arguments returning another thing.
- Requiring a terminal method: We don't actually need it.  We can just
build up the branches in the KBranchedStream who shares its state with the
ProcessorSupplier that will actually do the branching.  It's not terribly
pretty in its current form, but I think it demonstrates its feasibility.
To be clear, I don't think that pull request should be final or even a
starting point if we go in this direction, I just wanted to see how
challenging it would be to get the API working.
I will say though, that I'm not sure the existing solution could be
deprecated in favor of this, which I had originally suggested was a
possibility.  The reason is that the newly branched streams are not
available in the same scope as each other.  That is, if we wanted to merge
them back together again I don't see a way to do that.  The KIP proposal
has the same issue, though - all this means is that for either solution,
deprecating the existing branch(...) is not on the table.
Thanks,
Paul

On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev  wrote:
OK, let me summarize what we have discussed up to this point.

First, it seems that it's commonly agreed that branch API needs
improvement. Motivation is given in the KIP.

There are two potential ways to do it:

1. (as origianlly proposed)

new KafkaStreamsBrancher<..>()
.branch(predicate1, ks ->.

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-28 Thread Paul Whalen
Ivan,

Maybe I’m missing the point, but I believe the stream.branch() solution 
supports this. The couponIssuer::set* consumers will be invoked as they’re 
added, not during streamsBuilder.build(). So the user still ought to be able to 
call couponIssuer.coupons() afterward and depend on the branched streams having 
been set.

The issue I mean to point out is that it is hard to access the branched streams 
in the same scope as the original stream (that is, not inside the 
couponIssuer), which is a problem with both proposed solutions. It can be 
worked around though. 

[Also, great to hear additional interest in 401, I’m excited to hear your 
thoughts!]

Paul

> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev  wrote:
> 
> Hi Paul!
> 
> The idea to postpone the wiring of branches to the streamsBuilder.build() 
> also looked great for me at first glance, but ---
> 
> > the newly branched streams are not available in the same scope as each 
> > other.  That is, if we wanted to merge them back together again I don't see 
> > a way to do that.
> 
> You just took the words right out of my mouth, I was just going to write in 
> details about this issue.
> 
> Consider the example from Bill's book, p. 101: say we need to identify 
> customers who have bought coffee and made a purchase in the electronics store 
> to give them coupons.
> 
> This is the code I usually write under these circumstances using my 
> 'brancher' class:
> 
> @Setter
> class CouponIssuer{
>   private KStream<> coffePurchases;
>   private KStream<> electronicsPurchases;
> 
>   KStream<...> coupons(){
>   return coffePurchases.join(electronicsPurchases...)...whatever
> 
>   /*In the real world the code here can be complex, so creation of a 
> separate CouponIssuer class is fully justified, in order to separate classes' 
> responsibilities.*/
> 
>  }
> }
> 
> CouponIssuer couponIssuer = new CouponIssuer();
> 
> new KafkaStreamsBrancher<>()
> .branch(predicate1, couponIssuer::setCoffePurchases)
> .branch(predicate2, couponIssuer::setElectronicsPurchases)
> .onTopOf(transactionStream);
> 
> /*Alas, this won't work if we're going to wire up everything later, without 
> the terminal operation!!!*/
> couponIssuer.coupons()...
> 
> Does this make sense?  In order to properly initialize the CouponIssuer we 
> need the terminal operation to be called before streamsBuilder.build() is 
> called.
> 
> 
> [BTW Paul, I just found out that your KIP-401 is essentially the next KIP I 
> was going to write here. I have some thoughts based on my experience, so I 
> will join the discussion on KIP-401 soon.]
> 
> Regards,
> 
> Ivan
> 
> 28.03.2019 6:29, Paul Whalen пишет:
>> Ivan,
>> I tried to make a very rough proof of concept of a fluent API based off of
>> KStream here (https://github.com/apache/kafka/pull/6512), and I think I
>> succeeded at removing both cons.
>>- Compatibility: I was incorrect earlier about compatibility issues,
>>there aren't any direct ones.  I was unaware that Java is smart enough to
>>distinguish between a branch(varargs...) returning one thing and branch()
>>with no arguments returning another thing.
>>- Requiring a terminal method: We don't actually need it.  We can just
>>build up the branches in the KBranchedStream who shares its state with the
>>ProcessorSupplier that will actually do the branching.  It's not terribly
>>pretty in its current form, but I think it demonstrates its feasibility.
>> To be clear, I don't think that pull request should be final or even a
>> starting point if we go in this direction, I just wanted to see how
>> challenging it would be to get the API working.
>> I will say though, that I'm not sure the existing solution could be
>> deprecated in favor of this, which I had originally suggested was a
>> possibility.  The reason is that the newly branched streams are not
>> available in the same scope as each other.  That is, if we wanted to merge
>> them back together again I don't see a way to do that.  The KIP proposal
>> has the same issue, though - all this means is that for either solution,
>> deprecating the existing branch(...) is not on the table.
>> Thanks,
>> Paul
>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev  wrote:
>>> OK, let me summarize what we have discussed up to this point.
>>> 
>>> First, it seems that it's commonly agreed that branch API needs
>>> improvement. Motivation is given in the KIP.
>>> 
>>> There are two potential ways to do it:
>>> 
>>> 1. (as origianlly proposed)
>>> 
>>> new KafkaStreamsBrancher<..>()
>>>.branch(predicate1, ks ->..)
>>>.branch(predicate2, ks->..)
>>>.defaultBranch(ks->..) //optional
>>>.onTopOf(stream).mapValues(...) //onTopOf returns its argument
>>> 
>>> PROS: 1) Fully backwards compatible. 2) The code won't make sense until
>>> all the necessary ingredients are provided.
>>> 
>>> CONS: The need to create a KafkaStreamsBrancher instance contrasts the
>>> fluency of other KSt

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-28 Thread Ivan Ponomarev

Hi Paul!

The idea to postpone the wiring of branches to the 
streamsBuilder.build() also looked great for me at first glance, but ---


> the newly branched streams are not available in the same scope as 
each other.  That is, if we wanted to merge them back together again I 
don't see a way to do that.


You just took the words right out of my mouth, I was just going to write 
in details about this issue.


Consider the example from Bill's book, p. 101: say we need to identify 
customers who have bought coffee and made a purchase in the electronics 
store to give them coupons.


This is the code I usually write under these circumstances using my 
'brancher' class:


@Setter
class CouponIssuer{
   private KStream<> coffePurchases;
   private KStream<> electronicsPurchases;

   KStream<...> coupons(){
   return coffePurchases.join(electronicsPurchases...)...whatever

   /*In the real world the code here can be complex, so creation of 
a separate CouponIssuer class is fully justified, in order to separate 
classes' responsibilities.*/


  }
}

CouponIssuer couponIssuer = new CouponIssuer();

new KafkaStreamsBrancher<>()
 .branch(predicate1, couponIssuer::setCoffePurchases)
 .branch(predicate2, couponIssuer::setElectronicsPurchases)
 .onTopOf(transactionStream);

/*Alas, this won't work if we're going to wire up everything later, 
without the terminal operation!!!*/

couponIssuer.coupons()...

Does this make sense?  In order to properly initialize the CouponIssuer 
we need the terminal operation to be called before 
streamsBuilder.build() is called.



[BTW Paul, I just found out that your KIP-401 is essentially the next 
KIP I was going to write here. I have some thoughts based on my 
experience, so I will join the discussion on KIP-401 soon.]


Regards,

Ivan

28.03.2019 6:29, Paul Whalen пишет:

Ivan,

I tried to make a very rough proof of concept of a fluent API based off of
KStream here (https://github.com/apache/kafka/pull/6512), and I think I
succeeded at removing both cons.

- Compatibility: I was incorrect earlier about compatibility issues,
there aren't any direct ones.  I was unaware that Java is smart enough to
distinguish between a branch(varargs...) returning one thing and branch()
with no arguments returning another thing.
- Requiring a terminal method: We don't actually need it.  We can just
build up the branches in the KBranchedStream who shares its state with the
ProcessorSupplier that will actually do the branching.  It's not terribly
pretty in its current form, but I think it demonstrates its feasibility.

To be clear, I don't think that pull request should be final or even a
starting point if we go in this direction, I just wanted to see how
challenging it would be to get the API working.

I will say though, that I'm not sure the existing solution could be
deprecated in favor of this, which I had originally suggested was a
possibility.  The reason is that the newly branched streams are not
available in the same scope as each other.  That is, if we wanted to merge
them back together again I don't see a way to do that.  The KIP proposal
has the same issue, though - all this means is that for either solution,
deprecating the existing branch(...) is not on the table.

Thanks,
Paul

On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev  wrote:


OK, let me summarize what we have discussed up to this point.

First, it seems that it's commonly agreed that branch API needs
improvement. Motivation is given in the KIP.

There are two potential ways to do it:

1. (as origianlly proposed)

new KafkaStreamsBrancher<..>()
.branch(predicate1, ks ->..)
.branch(predicate2, ks->..)
.defaultBranch(ks->..) //optional
.onTopOf(stream).mapValues(...) //onTopOf returns its argument

PROS: 1) Fully backwards compatible. 2) The code won't make sense until
all the necessary ingredients are provided.

CONS: The need to create a KafkaStreamsBrancher instance contrasts the
fluency of other KStream methods.

2. (as Paul proposes)

stream
.branch(predicate1, ks ->...)
.branch(predicate2, ks->...)
.defaultBranch(ks->...) //or noDefault(). Both defaultBranch(..) and
noDefault() return void

PROS: Generally follows the way KStreams interface is defined.

CONS: We need to define two terminal methods (defaultBranch(ks->) and
noDefault()). And for a user it is very easy to miss the fact that one
of the terminal methods should be called. If these methods are not
called, we can throw an exception in runtime.

Colleagues, what are your thoughts? Can we do better?

Regards,

Ivan

27.03.2019 18:46, Ivan Ponomarev пишет:



25.03.2019 17:43, Ivan Ponomarev пишет:

Paul,

I see your point when you are talking about
stream..branch..branch...default..

Still, I believe that this cannot not be implemented the easy way.
Maybe we all should think further.

Let me comment on two of your ideas.


user could specify a terminal method that assumes n

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-27 Thread Paul Whalen
Ivan,

I tried to make a very rough proof of concept of a fluent API based off of
KStream here (https://github.com/apache/kafka/pull/6512), and I think I
succeeded at removing both cons.

   - Compatibility: I was incorrect earlier about compatibility issues,
   there aren't any direct ones.  I was unaware that Java is smart enough to
   distinguish between a branch(varargs...) returning one thing and branch()
   with no arguments returning another thing.
   - Requiring a terminal method: We don't actually need it.  We can just
   build up the branches in the KBranchedStream who shares its state with the
   ProcessorSupplier that will actually do the branching.  It's not terribly
   pretty in its current form, but I think it demonstrates its feasibility.

To be clear, I don't think that pull request should be final or even a
starting point if we go in this direction, I just wanted to see how
challenging it would be to get the API working.

I will say though, that I'm not sure the existing solution could be
deprecated in favor of this, which I had originally suggested was a
possibility.  The reason is that the newly branched streams are not
available in the same scope as each other.  That is, if we wanted to merge
them back together again I don't see a way to do that.  The KIP proposal
has the same issue, though - all this means is that for either solution,
deprecating the existing branch(...) is not on the table.

Thanks,
Paul

On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev  wrote:

> OK, let me summarize what we have discussed up to this point.
>
> First, it seems that it's commonly agreed that branch API needs
> improvement. Motivation is given in the KIP.
>
> There are two potential ways to do it:
>
> 1. (as origianlly proposed)
>
> new KafkaStreamsBrancher<..>()
>.branch(predicate1, ks ->..)
>.branch(predicate2, ks->..)
>.defaultBranch(ks->..) //optional
>.onTopOf(stream).mapValues(...) //onTopOf returns its argument
>
> PROS: 1) Fully backwards compatible. 2) The code won't make sense until
> all the necessary ingredients are provided.
>
> CONS: The need to create a KafkaStreamsBrancher instance contrasts the
> fluency of other KStream methods.
>
> 2. (as Paul proposes)
>
> stream
>.branch(predicate1, ks ->...)
>.branch(predicate2, ks->...)
>.defaultBranch(ks->...) //or noDefault(). Both defaultBranch(..) and
> noDefault() return void
>
> PROS: Generally follows the way KStreams interface is defined.
>
> CONS: We need to define two terminal methods (defaultBranch(ks->) and
> noDefault()). And for a user it is very easy to miss the fact that one
> of the terminal methods should be called. If these methods are not
> called, we can throw an exception in runtime.
>
> Colleagues, what are your thoughts? Can we do better?
>
> Regards,
>
> Ivan
>
> 27.03.2019 18:46, Ivan Ponomarev пишет:
> >
> >
> > 25.03.2019 17:43, Ivan Ponomarev пишет:
> >> Paul,
> >>
> >> I see your point when you are talking about
> >> stream..branch..branch...default..
> >>
> >> Still, I believe that this cannot not be implemented the easy way.
> >> Maybe we all should think further.
> >>
> >> Let me comment on two of your ideas.
> >>
> >>> user could specify a terminal method that assumes nothing will reach
> >>> the default branch,
> >> throwing an exception if such a case occurs.
> >>
> >> 1) OK, apparently this should not be the only option besides
> >> `default`, because there are scenarios when we want to just silently
> >> drop the messages that didn't match any predicate. 2) Throwing an
> >> exception in the middle of data flow processing looks like a bad idea.
> >> In stream processing paradigm, I would prefer to emit a special
> >> message to a dedicated stream. This is exactly where `default` can be
> >> used.
> >>
> >>> it would be fairly easily for the InternalTopologyBuilder to track
> >>> dangling
> >> branches that haven't been terminated and raise a clear error before it
> >> becomes an issue.
> >>
> >> You mean a runtime exception, when the program is compiled and run?
> >> Well,  I'd prefer an API that simply won't compile if used
> >> incorrectly. Can we build such an API as a method chain starting from
> >> KStream object? There is a huge cost difference between runtime and
> >> compile-time errors. Even if a failure uncovers instantly on unit
> >> tests, it costs more for the project than a compilation failure.
> >>
> >> Regards,
> >>
> >> Ivan
> >>
> >>
> >> 25.03.2019 0:38, Paul Whalen пишет:
> >>> Ivan,
> >>>
> >>> Good point about the terminal operation being required.  But is that
> >>> really
> >>> such a bad thing?  If the user doesn't want a defaultBranch they can
> >>> call
> >>> some other terminal method (noDefaultBranch()?) just as easily.  In
> >>> fact I
> >>> think it creates an opportunity for a nicer API - a user could specify
> a
> >>> terminal method that assumes nothing will reach the default branch,
> >>> throwing an exception if such a case occurs.  That seems

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-27 Thread Ivan Ponomarev

OK, let me summarize what we have discussed up to this point.

First, it seems that it's commonly agreed that branch API needs 
improvement. Motivation is given in the KIP.


There are two potential ways to do it:

1. (as origianlly proposed)

new KafkaStreamsBrancher<..>()
  .branch(predicate1, ks ->..)
  .branch(predicate2, ks->..)
  .defaultBranch(ks->..) //optional
  .onTopOf(stream).mapValues(...) //onTopOf returns its argument

PROS: 1) Fully backwards compatible. 2) The code won't make sense until 
all the necessary ingredients are provided.


CONS: The need to create a KafkaStreamsBrancher instance contrasts the 
fluency of other KStream methods.


2. (as Paul proposes)

stream
  .branch(predicate1, ks ->...)
  .branch(predicate2, ks->...)
  .defaultBranch(ks->...) //or noDefault(). Both defaultBranch(..) and 
noDefault() return void


PROS: Generally follows the way KStreams interface is defined.

CONS: We need to define two terminal methods (defaultBranch(ks->) and 
noDefault()). And for a user it is very easy to miss the fact that one 
of the terminal methods should be called. If these methods are not 
called, we can throw an exception in runtime.


Colleagues, what are your thoughts? Can we do better?

Regards,

Ivan

27.03.2019 18:46, Ivan Ponomarev пишет:



25.03.2019 17:43, Ivan Ponomarev пишет:

Paul,

I see your point when you are talking about 
stream..branch..branch...default..


Still, I believe that this cannot not be implemented the easy way. 
Maybe we all should think further.


Let me comment on two of your ideas.

user could specify a terminal method that assumes nothing will reach 
the default branch,

throwing an exception if such a case occurs.

1) OK, apparently this should not be the only option besides 
`default`, because there are scenarios when we want to just silently 
drop the messages that didn't match any predicate. 2) Throwing an 
exception in the middle of data flow processing looks like a bad idea. 
In stream processing paradigm, I would prefer to emit a special 
message to a dedicated stream. This is exactly where `default` can be 
used.


it would be fairly easily for the InternalTopologyBuilder to track 
dangling

branches that haven't been terminated and raise a clear error before it
becomes an issue.

You mean a runtime exception, when the program is compiled and run? 
Well,  I'd prefer an API that simply won't compile if used 
incorrectly. Can we build such an API as a method chain starting from 
KStream object? There is a huge cost difference between runtime and 
compile-time errors. Even if a failure uncovers instantly on unit 
tests, it costs more for the project than a compilation failure.


Regards,

Ivan


25.03.2019 0:38, Paul Whalen пишет:

Ivan,

Good point about the terminal operation being required.  But is that 
really
such a bad thing?  If the user doesn't want a defaultBranch they can 
call
some other terminal method (noDefaultBranch()?) just as easily.  In 
fact I

think it creates an opportunity for a nicer API - a user could specify a
terminal method that assumes nothing will reach the default branch,
throwing an exception if such a case occurs.  That seems like an
improvement over the current branch() API, which allows for the more 
subtle

behavior of records unexpectedly getting dropped.

The need for a terminal operation certainly has to be well 
documented, but
it would be fairly easily for the InternalTopologyBuilder to track 
dangling

branches that haven't been terminated and raise a clear error before it
becomes an issue.  Especially now that there is a "build step" where the
topology is actually wired up, when StreamsBuilder.build() is called.

Regarding onTopOf() returning its argument, I agree that it's 
critical to

allow users to do other operations on the input stream.  With the fluent
solution, it ought to work the same way all other operations do - if you
want to process off the original KStream multiple times, you just 
need the
stream as a variable so you can call as many operations on it as you 
desire.


Thoughts?

Best,
Paul

On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev  
wrote:



Hello Paul,

I afraid this won't work because we do not always need the
defaultBranch. And without a terminal operation we don't know when to
finalize and build the 'branch switch'.

In my proposal, onTopOf returns its argument, so we can do something
more with the original branch after branching.

I understand your point that the need of special object construction
contrasts the fluency of most KStream methods. But here we have a
special case: we build the switch to split the flow, so I think this is
still idiomatic.

Regards,

Ivan



24.03.2019 4:02, Paul Whalen пишет:

Ivan,

I think it's a great idea to improve this API, but I find the 
onTopOff()

mechanism a little confusing since it contrasts the fluency of other
KStream method calls.  Ideally I'd like to just call a method on the

stream
so it still reads top to bottom if the br

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-25 Thread Ivan Ponomarev

Paul,

I see your point when you are talking about 
stream..branch..branch...default..


Still, I believe that this cannot not be implemented the easy way. Maybe 
we all should think further.


Let me comment on two of your ideas.


user could specify a terminal method that assumes nothing will reach the 
default branch,

throwing an exception if such a case occurs.

1) OK, apparently this should not be the only option besides `default`, 
because there are scenarios when we want to just silently drop the 
messages that didn't match any predicate. 2) Throwing an exception in 
the middle of data flow processing looks like a bad idea. In stream 
processing paradigm, I would prefer to emit a special message to a 
dedicated stream. This is exactly where `default` can be used.



it would be fairly easily for the InternalTopologyBuilder to track dangling

branches that haven't been terminated and raise a clear error before it
becomes an issue.

You mean a runtime exception, when the program is compiled and run? 
Well,  I'd prefer an API that simply won't compile if used incorrectly. 
Can we build such an API as a method chain starting from KStream object? 
There is a huge cost difference between runtime and compile-time errors. 
Even if a failure uncovers instantly on unit tests, it costs more for 
the project than a compilation failure.


Regards,

Ivan


25.03.2019 0:38, Paul Whalen пишет:

Ivan,

Good point about the terminal operation being required.  But is that really
such a bad thing?  If the user doesn't want a defaultBranch they can call
some other terminal method (noDefaultBranch()?) just as easily.  In fact I
think it creates an opportunity for a nicer API - a user could specify a
terminal method that assumes nothing will reach the default branch,
throwing an exception if such a case occurs.  That seems like an
improvement over the current branch() API, which allows for the more subtle
behavior of records unexpectedly getting dropped.

The need for a terminal operation certainly has to be well documented, but
it would be fairly easily for the InternalTopologyBuilder to track dangling
branches that haven't been terminated and raise a clear error before it
becomes an issue.  Especially now that there is a "build step" where the
topology is actually wired up, when StreamsBuilder.build() is called.

Regarding onTopOf() returning its argument, I agree that it's critical to
allow users to do other operations on the input stream.  With the fluent
solution, it ought to work the same way all other operations do - if you
want to process off the original KStream multiple times, you just need the
stream as a variable so you can call as many operations on it as you desire.

Thoughts?

Best,
Paul

On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev  wrote:


Hello Paul,

I afraid this won't work because we do not always need the
defaultBranch. And without a terminal operation we don't know when to
finalize and build the 'branch switch'.

In my proposal, onTopOf returns its argument, so we can do something
more with the original branch after branching.

I understand your point that the need of special object construction
contrasts the fluency of most KStream methods. But here we have a
special case: we build the switch to split the flow, so I think this is
still idiomatic.

Regards,

Ivan



24.03.2019 4:02, Paul Whalen пишет:

Ivan,

I think it's a great idea to improve this API, but I find the onTopOff()
mechanism a little confusing since it contrasts the fluency of other
KStream method calls.  Ideally I'd like to just call a method on the

stream

so it still reads top to bottom if the branch cases are defined fluently.
I think the addBranch(predicate, handleCase) is very nice and the right

way

to do things, but what if we flipped around how we specify the source
stream.

Like:

stream.branch()
  .addBranch(predicate1, this::handle1)
  .addBranch(predicate2, this::handle2)
  .defaultBranch(this::handleDefault);

Where branch() returns a KBranchedStreams or KStreamBrancher or

something,

which is added to by addBranch() and terminated by defaultBranch() (which
returns void).  This is obviously incompatible with the current API, so

the

new stream.branch() would have to have a different name, but that seems
like a fairly small problem - we could call it something like branched()

or

branchedStreams() and deprecate the old API.

Does this satisfy the motivations of your KIP?  It seems like it does to
me, allowing for clear in-line branching while also allowing you to
dynamically build of branches off of KBranchedStreams if desired.

Thanks,
Paul



On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev



wrote:


Hi Bill,

Thank you for your reply!

This is how I usually do it:

void handleFirstCase(KStream ks){
  ks.filter().mapValues(...)
}


void handleSecondCase(KStream ks){
  ks.selectKey(...).groupByKey()...
}

..
new KafkaStreamsBrancher()
 .addBranch(predicate1, 

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-24 Thread Paul Whalen
Ivan,

Good point about the terminal operation being required.  But is that really
such a bad thing?  If the user doesn't want a defaultBranch they can call
some other terminal method (noDefaultBranch()?) just as easily.  In fact I
think it creates an opportunity for a nicer API - a user could specify a
terminal method that assumes nothing will reach the default branch,
throwing an exception if such a case occurs.  That seems like an
improvement over the current branch() API, which allows for the more subtle
behavior of records unexpectedly getting dropped.

The need for a terminal operation certainly has to be well documented, but
it would be fairly easily for the InternalTopologyBuilder to track dangling
branches that haven't been terminated and raise a clear error before it
becomes an issue.  Especially now that there is a "build step" where the
topology is actually wired up, when StreamsBuilder.build() is called.

Regarding onTopOf() returning its argument, I agree that it's critical to
allow users to do other operations on the input stream.  With the fluent
solution, it ought to work the same way all other operations do - if you
want to process off the original KStream multiple times, you just need the
stream as a variable so you can call as many operations on it as you desire.

Thoughts?

Best,
Paul

On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev  wrote:

> Hello Paul,
>
> I afraid this won't work because we do not always need the
> defaultBranch. And without a terminal operation we don't know when to
> finalize and build the 'branch switch'.
>
> In my proposal, onTopOf returns its argument, so we can do something
> more with the original branch after branching.
>
> I understand your point that the need of special object construction
> contrasts the fluency of most KStream methods. But here we have a
> special case: we build the switch to split the flow, so I think this is
> still idiomatic.
>
> Regards,
>
> Ivan
>
>
>
> 24.03.2019 4:02, Paul Whalen пишет:
> > Ivan,
> >
> > I think it's a great idea to improve this API, but I find the onTopOff()
> > mechanism a little confusing since it contrasts the fluency of other
> > KStream method calls.  Ideally I'd like to just call a method on the
> stream
> > so it still reads top to bottom if the branch cases are defined fluently.
> > I think the addBranch(predicate, handleCase) is very nice and the right
> way
> > to do things, but what if we flipped around how we specify the source
> > stream.
> >
> > Like:
> >
> > stream.branch()
> >  .addBranch(predicate1, this::handle1)
> >  .addBranch(predicate2, this::handle2)
> >  .defaultBranch(this::handleDefault);
> >
> > Where branch() returns a KBranchedStreams or KStreamBrancher or
> something,
> > which is added to by addBranch() and terminated by defaultBranch() (which
> > returns void).  This is obviously incompatible with the current API, so
> the
> > new stream.branch() would have to have a different name, but that seems
> > like a fairly small problem - we could call it something like branched()
> or
> > branchedStreams() and deprecate the old API.
> >
> > Does this satisfy the motivations of your KIP?  It seems like it does to
> > me, allowing for clear in-line branching while also allowing you to
> > dynamically build of branches off of KBranchedStreams if desired.
> >
> > Thanks,
> > Paul
> >
> >
> >
> > On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev
> 
> > wrote:
> >
> >> Hi Bill,
> >>
> >> Thank you for your reply!
> >>
> >> This is how I usually do it:
> >>
> >> void handleFirstCase(KStream ks){
> >>  ks.filter().mapValues(...)
> >> }
> >>
> >>
> >> void handleSecondCase(KStream ks){
> >>  ks.selectKey(...).groupByKey()...
> >> }
> >>
> >> ..
> >> new KafkaStreamsBrancher()
> >> .addBranch(predicate1, this::handleFirstCase)
> >> .addBranch(predicate2, this::handleSecondCase)
> >> .onTopOf()
> >>
> >> Regards,
> >>
> >> Ivan
> >>
> >> 22.03.2019 1:34, Bill Bejeck пишет:
> >>> Hi Ivan,
> >>>
> >>> Thanks for the KIP.
> >>>
> >>> I have one question, the KafkaStreamsBrancher takes a Consumer as a
> >> second
> >>> argument which returns nothing, and the example in the KIP shows each
> >>> stream from the branch using a terminal node (KafkaStreams#to() in this
> >>> case).
> >>>
> >>> Maybe I've missed something, but how would we handle the case where the
> >>> user has created a branch but wants to continue processing and not
> >>> necessarily use a terminal node on the branched stream immediately?
> >>>
> >>> For example, using today's logic as is if we had something like this:
> >>>
> >>> KStream[] branches = originalStream.branch(predicate1,
> >>> predicate2);
> >>> branches[0].filter().mapValues(...)..
> >>> branches[1].selectKey(...).groupByKey().
> >>>
> >>>
> >>> Thanks!
> >>> Bill
> >>>
> >>>
> >>>
> >>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck  wrote:
> >>>
>  All,
> 
>  I'd like to jump-start the discussio

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-24 Thread Ivan Ponomarev

Hello Paul,

I afraid this won't work because we do not always need the 
defaultBranch. And without a terminal operation we don't know when to 
finalize and build the 'branch switch'.


In my proposal, onTopOf returns its argument, so we can do something 
more with the original branch after branching.


I understand your point that the need of special object construction 
contrasts the fluency of most KStream methods. But here we have a 
special case: we build the switch to split the flow, so I think this is 
still idiomatic.


Regards,

Ivan



24.03.2019 4:02, Paul Whalen пишет:

Ivan,

I think it's a great idea to improve this API, but I find the onTopOff()
mechanism a little confusing since it contrasts the fluency of other
KStream method calls.  Ideally I'd like to just call a method on the stream
so it still reads top to bottom if the branch cases are defined fluently.
I think the addBranch(predicate, handleCase) is very nice and the right way
to do things, but what if we flipped around how we specify the source
stream.

Like:

stream.branch()
 .addBranch(predicate1, this::handle1)
 .addBranch(predicate2, this::handle2)
 .defaultBranch(this::handleDefault);

Where branch() returns a KBranchedStreams or KStreamBrancher or something,
which is added to by addBranch() and terminated by defaultBranch() (which
returns void).  This is obviously incompatible with the current API, so the
new stream.branch() would have to have a different name, but that seems
like a fairly small problem - we could call it something like branched() or
branchedStreams() and deprecate the old API.

Does this satisfy the motivations of your KIP?  It seems like it does to
me, allowing for clear in-line branching while also allowing you to
dynamically build of branches off of KBranchedStreams if desired.

Thanks,
Paul



On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev 
wrote:


Hi Bill,

Thank you for your reply!

This is how I usually do it:

void handleFirstCase(KStream ks){
 ks.filter().mapValues(...)
}


void handleSecondCase(KStream ks){
 ks.selectKey(...).groupByKey()...
}

..
new KafkaStreamsBrancher()
.addBranch(predicate1, this::handleFirstCase)
.addBranch(predicate2, this::handleSecondCase)
.onTopOf()

Regards,

Ivan

22.03.2019 1:34, Bill Bejeck пишет:

Hi Ivan,

Thanks for the KIP.

I have one question, the KafkaStreamsBrancher takes a Consumer as a

second

argument which returns nothing, and the example in the KIP shows each
stream from the branch using a terminal node (KafkaStreams#to() in this
case).

Maybe I've missed something, but how would we handle the case where the
user has created a branch but wants to continue processing and not
necessarily use a terminal node on the branched stream immediately?

For example, using today's logic as is if we had something like this:

KStream[] branches = originalStream.branch(predicate1,
predicate2);
branches[0].filter().mapValues(...)..
branches[1].selectKey(...).groupByKey().


Thanks!
Bill



On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck  wrote:


All,

I'd like to jump-start the discussion for KIP- 418.

Here's the original message:

Hello,

I'd like to start a discussion about KIP-418. Please take a look at the
KIP if you can, I would appreciate any feedback :)

KIP-418:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream

JIRA KAFKA-5488: https://issues.apache.org/jira/browse/KAFKA-5488

PR#6164: https://github.com/apache/kafka/pull/6164

Regards,

Ivan Ponomarev








Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-23 Thread Paul Whalen
Ivan,

I think it's a great idea to improve this API, but I find the onTopOff()
mechanism a little confusing since it contrasts the fluency of other
KStream method calls.  Ideally I'd like to just call a method on the stream
so it still reads top to bottom if the branch cases are defined fluently.
I think the addBranch(predicate, handleCase) is very nice and the right way
to do things, but what if we flipped around how we specify the source
stream.

Like:

stream.branch()
.addBranch(predicate1, this::handle1)
.addBranch(predicate2, this::handle2)
.defaultBranch(this::handleDefault);

Where branch() returns a KBranchedStreams or KStreamBrancher or something,
which is added to by addBranch() and terminated by defaultBranch() (which
returns void).  This is obviously incompatible with the current API, so the
new stream.branch() would have to have a different name, but that seems
like a fairly small problem - we could call it something like branched() or
branchedStreams() and deprecate the old API.

Does this satisfy the motivations of your KIP?  It seems like it does to
me, allowing for clear in-line branching while also allowing you to
dynamically build of branches off of KBranchedStreams if desired.

Thanks,
Paul



On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev 
wrote:

> Hi Bill,
>
> Thank you for your reply!
>
> This is how I usually do it:
>
> void handleFirstCase(KStream ks){
> ks.filter().mapValues(...)
> }
>
>
> void handleSecondCase(KStream ks){
> ks.selectKey(...).groupByKey()...
> }
>
> ..
> new KafkaStreamsBrancher()
>.addBranch(predicate1, this::handleFirstCase)
>.addBranch(predicate2, this::handleSecondCase)
>.onTopOf()
>
> Regards,
>
> Ivan
>
> 22.03.2019 1:34, Bill Bejeck пишет:
> > Hi Ivan,
> >
> > Thanks for the KIP.
> >
> > I have one question, the KafkaStreamsBrancher takes a Consumer as a
> second
> > argument which returns nothing, and the example in the KIP shows each
> > stream from the branch using a terminal node (KafkaStreams#to() in this
> > case).
> >
> > Maybe I've missed something, but how would we handle the case where the
> > user has created a branch but wants to continue processing and not
> > necessarily use a terminal node on the branched stream immediately?
> >
> > For example, using today's logic as is if we had something like this:
> >
> > KStream[] branches = originalStream.branch(predicate1,
> > predicate2);
> > branches[0].filter().mapValues(...)..
> > branches[1].selectKey(...).groupByKey().
> >
> >
> > Thanks!
> > Bill
> >
> >
> >
> > On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck  wrote:
> >
> >> All,
> >>
> >> I'd like to jump-start the discussion for KIP- 418.
> >>
> >> Here's the original message:
> >>
> >> Hello,
> >>
> >> I'd like to start a discussion about KIP-418. Please take a look at the
> >> KIP if you can, I would appreciate any feedback :)
> >>
> >> KIP-418:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
> >>
> >> JIRA KAFKA-5488: https://issues.apache.org/jira/browse/KAFKA-5488
> >>
> >> PR#6164: https://github.com/apache/kafka/pull/6164
> >>
> >> Regards,
> >>
> >> Ivan Ponomarev
> >>
> >>
> >
>
>


Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-23 Thread Ivan Ponomarev

Hi Bill,

Thank you for your reply!

This is how I usually do it:

void handleFirstCase(KStream ks){
ks.filter().mapValues(...)
}


void handleSecondCase(KStream ks){
ks.selectKey(...).groupByKey()...
}

..
new KafkaStreamsBrancher()
  .addBranch(predicate1, this::handleFirstCase)
  .addBranch(predicate2, this::handleSecondCase)
  .onTopOf()

Regards,

Ivan

22.03.2019 1:34, Bill Bejeck пишет:

Hi Ivan,

Thanks for the KIP.

I have one question, the KafkaStreamsBrancher takes a Consumer as a second
argument which returns nothing, and the example in the KIP shows each
stream from the branch using a terminal node (KafkaStreams#to() in this
case).

Maybe I've missed something, but how would we handle the case where the
user has created a branch but wants to continue processing and not
necessarily use a terminal node on the branched stream immediately?

For example, using today's logic as is if we had something like this:

KStream[] branches = originalStream.branch(predicate1,
predicate2);
branches[0].filter().mapValues(...)..
branches[1].selectKey(...).groupByKey().


Thanks!
Bill



On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck  wrote:


All,

I'd like to jump-start the discussion for KIP- 418.

Here's the original message:

Hello,

I'd like to start a discussion about KIP-418. Please take a look at the
KIP if you can, I would appreciate any feedback :)

KIP-418: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream

JIRA KAFKA-5488: https://issues.apache.org/jira/browse/KAFKA-5488

PR#6164: https://github.com/apache/kafka/pull/6164

Regards,

Ivan Ponomarev








Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-21 Thread Bill Bejeck
Hi Ivan,

Thanks for the KIP.

I have one question, the KafkaStreamsBrancher takes a Consumer as a second
argument which returns nothing, and the example in the KIP shows each
stream from the branch using a terminal node (KafkaStreams#to() in this
case).

Maybe I've missed something, but how would we handle the case where the
user has created a branch but wants to continue processing and not
necessarily use a terminal node on the branched stream immediately?

For example, using today's logic as is if we had something like this:

KStream[] branches = originalStream.branch(predicate1,
predicate2);
branches[0].filter().mapValues(...)..
branches[1].selectKey(...).groupByKey().


Thanks!
Bill



On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck  wrote:

> All,
>
> I'd like to jump-start the discussion for KIP- 418.
>
> Here's the original message:
>
> Hello,
>
> I'd like to start a discussion about KIP-418. Please take a look at the
> KIP if you can, I would appreciate any feedback :)
>
> KIP-418: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
>
> JIRA KAFKA-5488: https://issues.apache.org/jira/browse/KAFKA-5488
>
> PR#6164: https://github.com/apache/kafka/pull/6164
>
> Regards,
>
> Ivan Ponomarev
>
>