Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-05-24 Thread Michael Drogalis
easier to read, and also make the order of
> > >> `Predicates` (that is essential) easier to grasp.
> > >>
> > >>>>>> Map> branches = stream.split()
> > >>>>>>.branch("branchOne", Predicate)
> > >>>>>>.branch( "branchTwo", Predicate)
> > >>>>>>.defaultBranch("defaultBranch");
> > >> An open question is the case for which no defaultBranch() should
> be
> > >> specified. Atm, `split()` and `branch()` would return
> > `BranchedKStream`
> > >> and the call to `defaultBranch()` that returns the `Map` is
> mandatory
> > >> (what is not the case atm). Or is this actually not a real
> problem,
> > >> because users can just ignore the branch returned by
> > `defaultBranch()`
> > >> in the result `Map` ?
> > >>
> > >>
> > >> About "inlining": So far, it seems to be a matter of personal
> > >> preference. I can see arguments for both, but no "killer
> > argument" yet
> > >> that clearly make the case for one or the other.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>> On 5/1/19 6:26 PM, Paul Whalen wrote:
> > >>> Perhaps inlining is the wrong terminology. It doesn’t require
> > that a lambda with the full downstream topology be defined inline -
> > it can be a method reference as with Ivan’s original suggestion.
> > The advantage of putting the predicate and its downstream logic
> > (Consumer) together in branch() is that they are required to be near
> > to each other.
> > >>>
> > >>> Ultimately the downstream code has to live somewhere, and deep
> > branch trees will be hard to read regardless.
> > >>>
> > >>>> On May 1, 2019, at 1:07 PM, Michael Drogalis
> >  > <mailto:michael.droga...@confluent.io>> wrote:
> > >>>>
> > >>>> I'm less enthusiastic about inlining the branch logic with its
> > downstream
> > >>>> functionality. Programs that have deep branch trees will
> > quickly become
> > >>>> harder to read as a single unit.
> > >>>>
> > >>>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen
> > mailto:pgwha...@gmail.com>> wrote:
> > >>>>>
> > >>>>> Also +1 on the issues/goals as Michael outlined them, I think
> > that sets a
> > >>>>> great framework for the discussion.
> > >>>>>
> > >>>>> Regarding the SortedMap solution, my understanding is that the
> > current
> > >>>>> proposal in the KIP is what is in my PR which (pending naming
> > decisions) is
> > >>>>> roughly this:
> > >>>>>
> > >>>>> stream.split()
> > >>>>>.branch(Predicate, Consumer>)
> > >>>>>.branch(Predicate, Consumer>)
> > >>>>>.defaultBranch(Consumer>);
> > >>>>>
> > >>>>> Obviously some ordering is necessary, since branching as a
> > construct
> > >>>>> doesn't work without it, but this solution seems like it
> > provides as much
> > >>>>> associativity as the SortedMap solution, because each branch()
> > call
> > >>>>> directly associates the "conditional" with the "code block."
> > The value it
> > >>>>> provides over the KIP solution is the accessing of streams in
> > the same
> > >>>>> scope.
> > >>>>>
> > >>>>> The KIP solution is less "dynamic" than the SortedMap solution
> > in the sense
> > >>>>> that it is slightly clumsier to add a dynamic number of
> > branches, but it is
> > >>>>> certainly possible.  It seems to me like the API should favor
> > the "static"
> > >>>>> case anyway, and should make it simple and readable to
> > fluently declare and
> > >>>>> access your branches in-line.  It also

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-05-01 Thread Michael Drogalis
I'm less enthusiastic about inlining the branch logic with its downstream
functionality. Programs that have deep branch trees will quickly become
harder to read as a single unit.

On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen  wrote:

> Also +1 on the issues/goals as Michael outlined them, I think that sets a
> great framework for the discussion.
>
> Regarding the SortedMap solution, my understanding is that the current
> proposal in the KIP is what is in my PR which (pending naming decisions) is
> roughly this:
>
> stream.split()
> .branch(Predicate, Consumer>)
> .branch(Predicate, Consumer>)
> .defaultBranch(Consumer>);
>
> Obviously some ordering is necessary, since branching as a construct
> doesn't work without it, but this solution seems like it provides as much
> associativity as the SortedMap solution, because each branch() call
> directly associates the "conditional" with the "code block."  The value it
> provides over the KIP solution is the accessing of streams in the same
> scope.
>
> The KIP solution is less "dynamic" than the SortedMap solution in the sense
> that it is slightly clumsier to add a dynamic number of branches, but it is
> certainly possible.  It seems to me like the API should favor the "static"
> case anyway, and should make it simple and readable to fluently declare and
> access your branches in-line.  It also makes it impossible to ignore a
> branch, and it is possible to build an (almost) identical SortedMap
> solution on top of it.
>
> I could also see a middle ground where instead of a raw SortedMap being
> taken in, branch() takes a name and not a Consumer.  Something like this:
>
> Map> branches = stream.split()
> .branch("branchOne", Predicate)
> .branch( "branchTwo", Predicate)
> .defaultBranch("defaultBranch", Consumer>);
>
> Pros for that solution:
>  - accessing branched KStreams in same scope
>  - no double brace initialization, hopefully slightly more readable than
> SortedMap
>
> Cons
>  - downstream branch logic cannot be specified inline which makes it harder
> to read top to bottom (like existing API and SortedMap, but unlike the KIP)
>  - you can forget to "handle" one of the branched streams (like existing
> API and SortedMap, but unlike the KIP)
>
> (KBranchedStreams could even work *both* ways but perhaps that's overdoing
> it).
>
> Overall I'm curious how important it is to be able to easily access the
> branched KStream in the same scope as the original.  It's possible that it
> doesn't need to be handled directly by the API, but instead left up to the
> user.  I'm sort of in the middle on it.
>
> Paul
>
>
>
> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman 
> wrote:
>
> > I'd like to +1 what Michael said about the issues with the existing
> branch
> > method, I agree with what he's outlined and I think we should proceed by
> > trying to alleviate these problems. Specifically it seems important to be
> > able to cleanly access the individual branches (eg by mapping
> > name->stream), which I thought was the original intention of this KIP.
> >
> > That said, I don't think we should so easily give in to the double brace
> > anti-pattern or force ours users into it if at all possible to
> avoid...just
> > my two cents.
> >
> > Cheers,
> > Sophie
> >
> > On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
> > michael.droga...@confluent.io> wrote:
> >
> > > I’d like to propose a different way of thinking about this. To me,
> there
> > > are three problems with the existing branch signature:
> > >
> > > 1. If you use it the way most people do, Java raises unsafe type
> > warnings.
> > > 2. The way in which you use the stream branches is positionally coupled
> > to
> > > the ordering of the conditionals.
> > > 3. It is brittle to extend existing branch calls with additional code
> > > paths.
> > >
> > > Using associative constructs instead of relying on ordered constructs
> > would
> > > be a stronger approach. Consider a signature that instead looks like
> > this:
> > >
> > > Map> KStream#branch(SortedMap > > super K,? super V>>);
> > >
> > > Branches are given names in a map, and as a result, the API returns a
> > > mapping of names to streams. The ordering of the conditionals is
> > maintained
> > > because it’s a sorted map. Insert order determines the order of
> > evaluation.
> > >
> > > This solves problem 1 becaus

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-04-30 Thread Michael Drogalis
I’d like to propose a different way of thinking about this. To me, there
are three problems with the existing branch signature:

1. If you use it the way most people do, Java raises unsafe type warnings.
2. The way in which you use the stream branches is positionally coupled to
the ordering of the conditionals.
3. It is brittle to extend existing branch calls with additional code paths.

Using associative constructs instead of relying on ordered constructs would
be a stronger approach. Consider a signature that instead looks like this:

Map> KStream#branch(SortedMap>);

Branches are given names in a map, and as a result, the API returns a
mapping of names to streams. The ordering of the conditionals is maintained
because it’s a sorted map. Insert order determines the order of evaluation.

This solves problem 1 because there are no more varargs. It solves problem
2 because you no longer lean on ordering to access the branch you’re
interested in. It solves problem 3 because you can introduce another
conditional by simply attaching another name to the structure, rather than
messing with the existing indices.

One of the drawbacks is that creating the map inline is historically
awkward in Java. I know it’s an anti-pattern to use voluminously, but
double brace initialization would clean up the aesthetics.

On Tue, Apr 30, 2019 at 9:10 AM John Roesler  wrote:

> Hi Ivan,
>
> Thanks for the update.
>
> FWIW, I agree with Matthias that the current "start branching" operator is
> confusing when named the same way as the actual branches. "Split" seems
> like a good name. Alternatively, we can do without a "start branching"
> operator at all, and just do:
>
> stream
>   .branch(Predicate)
>   .branch(Predicate)
>   .defaultBranch();
>
> Tentatively, I think that this branching operation should be terminal. That
> way, we don't create ambiguity about how to use it. That is, `branch`
> should return `KBranchedStream`, while `defaultBranch` is `void`, to
> enforce that it comes last, and that there is only one definition of the
> default branch. Potentially, we should log a warning if there's no default,
> and additionally log a warning (or throw an exception) if a record falls
> though with no default.
>
> Thoughts?
>
> Thanks,
> -John
>
> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax 
> wrote:
>
> > Thanks for updating the KIP and your answers.
> >
> >
> > >  this is to make the name similar to String#split
> > >> that also returns an array, right?
> >
> > The intend was to avoid name duplication. The return type should _not_
> > be an array.
> >
> > The current proposal is
> >
> > stream.branch()
> >   .branch(Predicate)
> >   .branch(Predicate)
> >   .defaultBranch();
> >
> > IMHO, this reads a little odd, because the first `branch()` does not
> > take any parameters and has different semantics than the later
> > `branch()` calls. Note, that from the code snippet above, it's hidden
> > that the first call is `KStream#branch()` while the others are
> > `KBranchedStream#branch()` what makes reading the code harder.
> >
> > Because I suggested to rename `addBranch()` -> `branch()`, I though it
> > might be better to also rename `KStream#branch()` to avoid the naming
> > overlap that seems to be confusing. The following reads much cleaner to
> me:
> >
> > stream.split()
> >   .branch(Predicate)
> >   .branch(Predicate)
> >   .defaultBranch();
> >
> > Maybe there is a better alternative to `split()` though to avoid the
> > naming overlap.
> >
> >
> > > 'default' is, however, a reserved word, so unfortunately we cannot have
> > a method with such name :-)
> >
> > Bummer. Didn't consider this. Maybe we can still come up with a short
> name?
> >
> >
> > Can you add the interface `KBranchedStream` to the KIP with all it's
> > methods? It will be part of public API and should be contained in the
> > KIP. For example, it's unclear atm, what the return type of
> > `defaultBranch()` is.
> >
> >
> > You did not comment on the idea to add a `KBranchedStream#get(int index)
> > -> KStream` method to get the individually branched-KStreams. Would be
> > nice to get your feedback about it. It seems you suggest that users
> > would need to write custom utility code otherwise, to access them. We
> > should discuss the pros and cons of both approaches. It feels
> > "incomplete" to me atm, if the API has no built-in support to get the
> > branched-KStreams directly.
> >
> >
> >
> > -Matthias
> >
> >
> > On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
> > > Hi all!
> > >
> > > I have updated the KIP-418 according to the new vision.
> > >
> > > Matthias, thanks for your comment!
> > >
> > >> Renaming KStream#branch() -> #split()
> > >
> > > I can see your point: this is to make the name similar to String#split
> > > that also returns an array, right? But is it worth the loss of
> backwards
> > > compatibility? We can have overloaded branch() as well without
> affecting
> > > the existing code. Maybe the old 

[jira] [Created] (KAFKA-8297) Kafka Streams ConsumerRecordFactory yields difficult compiler error about generics

2019-04-26 Thread Michael Drogalis (JIRA)
Michael Drogalis created KAFKA-8297:
---

 Summary: Kafka Streams ConsumerRecordFactory yields difficult 
compiler error about generics
 Key: KAFKA-8297
 URL: https://issues.apache.org/jira/browse/KAFKA-8297
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Michael Drogalis


When using the ConsumerRecordFactory, it's convenient to specify a default 
topic to create records with:

{code:java}
ConsumerRecordFactory inputFactory = new 
ConsumerRecordFactory<>(inputTopic, keySerializer, valueSerializer);
{code}

However, when the factory is used to create a record with a String key:

{code:java}
inputFactory.create("any string", user)
{code}

Compilation fails with the following warning:

{code:java}
Ambiguous method call. Both:

create(String, User) in ConsumerRecordFactory and
create(String, User) in ConsumerRecordFactory match
{code}

At first glance, this is a really confusing error to see during compilation. 
What's happening is that there are two clashing signatures for `create`: 
create(K, V) and create(String, V). The latter signature represents a topic 
name.

It seems like fixing this would require breaking the existing interface. This 
is a really opaque problem to hit though, and it would be great if we could 
avoid having users encounter this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8296) Kafka Streams branch method raises type warnings

2019-04-26 Thread Michael Drogalis (JIRA)
Michael Drogalis created KAFKA-8296:
---

 Summary: Kafka Streams branch method raises type warnings
 Key: KAFKA-8296
 URL: https://issues.apache.org/jira/browse/KAFKA-8296
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Michael Drogalis


Because the branch method in the DSL takes vargargs, using it as follows raises 
an unchecked type warning:

{code:java}
KStream[] branches = builder.stream(inputTopic)
.branch((key, user) -> "united 
states".equals(user.getCountry()),
(key, user) -> "germany".equals(user.getCountry()),
(key, user) -> "mexico".equals(user.getCountry()),
(key, user) -> true);
{code}

The compiler warns with:

{code:java}
Warning:(39, 24) java: unchecked generic array creation for varargs parameter 
of type org.apache.kafka.streams.kstream.Predicate[]
{code}

This is unfortunate because of the way Java's type system + generics work. We 
could possibly fix this by adding the @SafeVarargs annotation to the branch 
method signatures.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8213) KStreams interactive query documentation typo

2019-04-10 Thread Michael Drogalis (JIRA)
Michael Drogalis created KAFKA-8213:
---

 Summary: KStreams interactive query documentation typo
 Key: KAFKA-8213
 URL: https://issues.apache.org/jira/browse/KAFKA-8213
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Reporter: Michael Drogalis


In [the Interactive Queries 
docs|https://kafka.apache.org/10/documentation/streams/developer-guide/interactive-queries.html#querying-remote-state-stores-for-the-entire-app],
 we have a minor typo:

Actual: You can use the corresponding local data in other parts of your 
application code, as long as it doesn’t required calling the Kafka Streams API.
Expected: You can use the corresponding local data in other parts of your 
application code, as long as it doesn’t require calling the Kafka Streams API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8212) KStreams documentation Maven artifact table is cut off

2019-04-10 Thread Michael Drogalis (JIRA)
Michael Drogalis created KAFKA-8212:
---

 Summary: KStreams documentation Maven artifact table is cut off
 Key: KAFKA-8212
 URL: https://issues.apache.org/jira/browse/KAFKA-8212
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Reporter: Michael Drogalis
 Attachments: Screen Shot 2019-04-10 at 2.04.09 PM.png

In the [Writing a Streams Application 
doc|https://kafka.apache.org/21/documentation/streams/developer-guide/write-streams.html],
 the section "LIBRARIES AND MAVEN ARTIFACTS" has a table that lists out the 
Maven artifacts. The items in the group ID overflow and are cut off by the 
table column, even on a very large monitor.

Note that "artifact ID" seems to have its word break property set correctly. 
See the attached image.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8210) Missing link for KStreams in Streams DSL docs

2019-04-10 Thread Michael Drogalis (JIRA)
Michael Drogalis created KAFKA-8210:
---

 Summary: Missing link for KStreams in Streams DSL docs
 Key: KAFKA-8210
 URL: https://issues.apache.org/jira/browse/KAFKA-8210
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Reporter: Michael Drogalis


In [the Streams DSL 
docs|https://kafka.apache.org/22/documentation/streams/developer-guide/dsl-api.html],
 there is some text under the KTable section that reads: "We have already seen 
an example of a changelog stream in the section streams_concepts_duality."

"streams_concepts_duality" seems to indicate that it should be a link, but it 
is not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8209) Wrong link for KStreams DSL in Core Concepts doc

2019-04-10 Thread Michael Drogalis (JIRA)
Michael Drogalis created KAFKA-8209:
---

 Summary: Wrong link for KStreams DSL in Core Concepts doc
 Key: KAFKA-8209
 URL: https://issues.apache.org/jira/browse/KAFKA-8209
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Reporter: Michael Drogalis


In the [core concepts 
doc|https://kafka.apache.org/21/documentation/streams/core-concepts], there is 
a link in the "States" section for "Kafka Streams DSL". It points to the wrong 
link.

Actual: 
https://kafka.apache.org/21/documentation/streams/developer-guide/#streams_dsl
Expected: 
https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8208) Broken link for out-of-order data in KStreams Core Concepts doc

2019-04-10 Thread Michael Drogalis (JIRA)
Michael Drogalis created KAFKA-8208:
---

 Summary: Broken link for out-of-order data in KStreams Core 
Concepts doc
 Key: KAFKA-8208
 URL: https://issues.apache.org/jira/browse/KAFKA-8208
 Project: Kafka
  Issue Type: Improvement
  Components: documentation
Reporter: Michael Drogalis


In the [core concepts 
doc|https://kafka.apache.org/21/documentation/streams/core-concepts], there is 
a link in the "Out-of-Order Handling" section for "out-of-order data". It 404's 
to https://kafka.apache.org/21/documentation/streams/tbd.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8200) TopologyTestDriver should offer an iterable signature of readOutput

2019-04-08 Thread Michael Drogalis (JIRA)
Michael Drogalis created KAFKA-8200:
---

 Summary: TopologyTestDriver should offer an iterable signature of 
readOutput
 Key: KAFKA-8200
 URL: https://issues.apache.org/jira/browse/KAFKA-8200
 Project: Kafka
  Issue Type: Improvement
Reporter: Michael Drogalis


When using the TopologyTestDriver, one examines the output on a topic with the 
readOutput method. This method returns one record at a time, until no more 
records can be found, at which point in returns null.

Many times, the usage pattern around readOutput will involve writing a loop to 
extract a number of records from the topic, building up a list of records, 
until it returns null.

It would be helpful to offer an iterable signature of readOutput, which returns 
either an iterator or list over the records that are currently available in the 
topic. This would effectively remove the loop that a user needs to write for 
him/herself each time.

Such a signature might look like:

{code:java}
public Iterable> readOutput(java.lang.String 
topic);
{code}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8198) KStreams testing docs use non-existent method "pipe"

2019-04-05 Thread Michael Drogalis (JIRA)
Michael Drogalis created KAFKA-8198:
---

 Summary: KStreams testing docs use non-existent method "pipe"
 Key: KAFKA-8198
 URL: https://issues.apache.org/jira/browse/KAFKA-8198
 Project: Kafka
  Issue Type: Bug
Reporter: Michael Drogalis


In [the testing docs for 
KStreams|https://kafka.apache.org/20/documentation/streams/developer-guide/testing.html],
 we use the following code snippet:

{code:java}
ConsumerRecordFactory factory = new 
ConsumerRecordFactory<>("input-topic", new StringSerializer(), new 
IntegerSerializer());
testDriver.pipe(factory.create("key", 42L));
{code}

As of Apache Kafka 2.2.0, this method no longer exists. We should correct the 
docs to use the pipeInput method.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8181) Streams docs on serialization include Avro header, but no content

2019-04-01 Thread Michael Drogalis (JIRA)
Michael Drogalis created KAFKA-8181:
---

 Summary: Streams docs on serialization include Avro header, but no 
content
 Key: KAFKA-8181
 URL: https://issues.apache.org/jira/browse/KAFKA-8181
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.2.0
Reporter: Michael Drogalis


On [the documentation for data types and 
serialization|https://kafka.apache.org/10/documentation/streams/developer-guide/datatypes.html],
 Avro is listed in the table of contents as something supported out of the box. 
The link is dead, though, because there is no content. We should either remove 
the header or supply the content.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7855) Kafka Streams Maven Archetype quickstart fails to compile out of the box

2019-01-22 Thread Michael Drogalis (JIRA)
Michael Drogalis created KAFKA-7855:
---

 Summary: Kafka Streams Maven Archetype quickstart fails to compile 
out of the box
 Key: KAFKA-7855
 URL: https://issues.apache.org/jira/browse/KAFKA-7855
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.1.0
 Environment: Java 8, OS X 10.13.6
Reporter: Michael Drogalis
 Attachments: output.log

When I follow the [quickstart 
tutorial|https://kafka.apache.org/21/documentation/streams/tutorial] and issue 
the command to set up a new Maven project, the generated example fails to 
compile. Adding a Produced.with() on the source seems to fix this. I've 
attached the compiler output.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)