Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2018-02-16 Thread Jan Filipiak

Update:

I want to give a quick update on what I found porting the 0.10 version 
towards 1.0.


1. It is difficult to provide a stock CombinedKey Serde.
We effectively wrap 2 serdes for the key. We do not have good topic 
names to feed into the Avro Serde for K1 and K2 for the same topic.
We can also not carry along the Serdes from the creation of the 
table and remember the topic name because of whitelist subscriptions.

2. We should drop the Idea of keysplitter and combiner
I cannot seem to find a good place to have a single layer to handle 
this. It seems to spread everywhere throughout the codebase. I think 
that its due to the fact that it is an oddity and a break in the 
architecture to have something like this. Maybe one introduces that in a 
later step but it's
very messy to have that in the first step and really consuming 80% 
of the effort put into the KIP.
3. Caching is messing with my head very heavily at the moment. I have 
full control over the RocksDB holding the right side (b), So I can make 
it not cache. Which is good. I do inherit the store of the left side 
(A) and I have no control over its caching behaviour.

Let me elaborate:

Say a tuple A,B got emmited after joining and the delete for A goes into 
the cache.

After that the B record would be deleted aswell.
B's join processor would look up A and see `null` while computing for 
old and new value
(at this point we can execute joiner with A beeing null and still emit 
something, but its not gonna represent the actual oldValue)

Then As cache flushes
it doesn't see B so its also not gonna put a proper oldValue.

The output can then not be used for say any aggregate as a delete 
would not reliably find its old aggregate where it needs to be removed from
filter will also break as it stopps null,null changes from 
propagating. So for me it looks pretty clearly that Caching with Join 
breaks KTable   semantics. be it my new join or the 
currently existing once.


4. I further want to propose that I leave out IQ support in the first 
step. Copy pasting the if(storeName == null) that is in almost any 
processor is  unideal. I want to lift it to the topology level in 
the next step (adding a new processor that will maintain the user 
provided store as a downstream processor)


That is where I stand currently. I would appreciate feedback on all the 
points


Best Jan







On 27.10.2017 06:38, Jan Filipiak wrote:

Hello everyone,

this is the new discussion thread after the ID-clash.

Best
Jan

__


Hello Kafka-users,

I want to continue with the development of KAFKA-3705, which allows
the Streams DSL to perform KTableKTable-Joins when the KTables have a
one-to-many relationship.
To make sure we cover the requirements of as many users as possible
and have a good solution afterwards I invite everyone to read through
the KIP I put together and discuss it here in this Thread.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable

https://issues.apache.org/jira/browse/KAFKA-3705
https://github.com/apache/kafka/pull/3720

I think a public discussion and vote on a solution is exactly what is
needed to bring this feauture into kafka-streams. I am looking forward
to everyones opinion!

Please keep the discussion on the mailing list rather than commenting
on the wiki (wiki discussions get unwieldy fast).

Best
Jan







Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-12-18 Thread Jan Filipiak

Hi Guozhang,

 thanks for the update.

On 15.12.2017 22:54, Guozhang Wang wrote:

Jan,

Thanks for the updated KIP, and the raised questions. Here are my thoughts
around the "back and forth mapper" approach on your wiki:

1) regarding the key-value types of KTableValueGetter, we do not
necessarily enforce its template K, V to be the same as its calling
Processor, although today in all implementations we happen to do so.
So I think it is ok to extend this internal implementation to allow getter
and forwarding with different types.
I am not entirely sure how you mean this. The dependencies is only there 
because
the downstream processor is going to invoke the ValueGetter with the 
downstream key.
if this key is of different type we would run into a Runtime Exception. 
We had to introduce
a fourth generic type. The "new key type" wich would also be the access 
type of the
Value Getter. I like introducing this a lot but I don't think it works 
w/o 4th generic type

and then we have
KTableProcessorSupplier extends 
ProcessorSupplier{


view ValueGetterSupplier()
processor ProcessorSupplier()

}

This would conveniently allow for some flatmap() on Ktable wich is a 
neat thing IMO


2) regarding the KTableProcessorSupplier enforcing to return the same
key/value types of its "KTableValueGetterSupplier view();" the key
observation to note is that "ProcessorSupplier" inside "KTableImpl" does not enforce to have the same key-value types of the KTable, i.e.
we can use a "ProcessorSupplier" inside the impl of a `KTable`. I think that should help getting around the issue.
I think it has nothing really todo with places where the 
ProccessorSupllier is referenced
but this is quircked inside KTableProccessorSuplier. regardless of the 
scope of usage I cannot
come up with a KTableProccessorSupplier that changes keys while 
maintaining all invariants (beeing querieable). One
can jump across with a ProcessorSupplier where its obvious that you 
can't have a ValueGetterSupplier, but this is

rather a hack.

Why is it only inside KTableProcessorSupplier? We process key K, and 
then we forward K1, but our ValueGetterSupplier
can only have K as Generic and therefore is will crash if we invoke the 
ValueGetter.




3) About the alternative KTable::mapKeys(), I think the major issue is that
this mapKeys() cannot enforce users to always call it to get the
"non-combined" Key, and hence users may still need to consider the serde of
"CombinedKey" if they do not call mapKeys and then directly pipe it to the
output, while this approach enforce them to always "map" it before trying
to write it to anywhere externally exposable.

It cannot force them, but folks who want this can do it. People that are
fine with any Combinedkey type could just let it be forwarded as such.

A new aspect that I had not thought of as yet is that of course in an
to() call they could pass in a CombinedKeySerde on their own. I think
this flexibility is a plus rather than a minus. What do you think?


4) A very minor comment on the wiki page itself, about the "back and forth
mapper" section: the parameter names "customCombinedKey" and "combinedKey"
seems a bit hard to understand to normal users; should we consider renaming
them to something more understandable? For example, "outputKeyCombiner" and
"outputKeySpliter"?

yes your naming is superior.




Guozhang


On Thu, Dec 7, 2017 at 3:58 AM, Jan Filipiak 
wrote:


On 05.12.2017 00:42, Matthias J. Sax wrote:


Jan,

The KTableValueGetter thing is a valid point. I think we would need a
backwards mapper (or merge both into one and sacrifices lambdas?).
Another alternative would be, to drop the optimization and materialize
the KTable.operator() result... (not a great solution either). I am
personally fine with a backwards mapper (we should call it KeySplitter).

2. I am not sure if we can pull it of w/o said forth generic type in

KTable (that I am in favour of btw)


Not sure if I can follow here. I am personally not worried about the

number of generic types -- it's just to have a clear definition what
each passed parameter does.


I need to double check this again. Its good that we are open to introduce
a new one
I think it will not work currently as a KTableProcessorSupplier when asked
for a
ValueGetterSupplier it can only return a ValueGetter Supplier that has the
same Keytype
as the key it receives in the process method. Even though it would forward
a different
key type and therefore KTables key Type can't change. I am thinking how to
pull this off but I see little chance

But I am always in big favour of introducing the forth type OutputKey, it
would become
straight forward then. I hope you can follow.

+ It won't solves peoples problem having CombinedKey on the wire and not

being able to inspect the topic with say there default tools.


I see your point, but do we 

Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-12-15 Thread Guozhang Wang
Jan,

Thanks for the updated KIP, and the raised questions. Here are my thoughts
around the "back and forth mapper" approach on your wiki:

1) regarding the key-value types of KTableValueGetter, we do not
necessarily enforce its template K, V to be the same as its calling
Processor, although today in all implementations we happen to do so.
So I think it is ok to extend this internal implementation to allow getter
and forwarding with different types.

2) regarding the KTableProcessorSupplier enforcing to return the same
key/value types of its "KTableValueGetterSupplier view();" the key
observation to note is that "ProcessorSupplier" inside "KTableImpl" does not enforce to have the same key-value types of the KTable, i.e.
we can use a "ProcessorSupplier" inside the impl of a `KTable`. I think that should help getting around the issue.

3) About the alternative KTable::mapKeys(), I think the major issue is that
this mapKeys() cannot enforce users to always call it to get the
"non-combined" Key, and hence users may still need to consider the serde of
"CombinedKey" if they do not call mapKeys and then directly pipe it to the
output, while this approach enforce them to always "map" it before trying
to write it to anywhere externally exposable.

4) A very minor comment on the wiki page itself, about the "back and forth
mapper" section: the parameter names "customCombinedKey" and "combinedKey"
seems a bit hard to understand to normal users; should we consider renaming
them to something more understandable? For example, "outputKeyCombiner" and
"outputKeySpliter"?



Guozhang


On Thu, Dec 7, 2017 at 3:58 AM, Jan Filipiak 
wrote:

>
> On 05.12.2017 00:42, Matthias J. Sax wrote:
>
>> Jan,
>>
>> The KTableValueGetter thing is a valid point. I think we would need a
>> backwards mapper (or merge both into one and sacrifices lambdas?).
>> Another alternative would be, to drop the optimization and materialize
>> the KTable.operator() result... (not a great solution either). I am
>> personally fine with a backwards mapper (we should call it KeySplitter).
>>
>> 2. I am not sure if we can pull it of w/o said forth generic type in
 KTable (that I am in favour of btw)

>>> Not sure if I can follow here. I am personally not worried about the
>> number of generic types -- it's just to have a clear definition what
>> each passed parameter does.
>>
> I need to double check this again. Its good that we are open to introduce
> a new one
> I think it will not work currently as a KTableProcessorSupplier when asked
> for a
> ValueGetterSupplier it can only return a ValueGetter Supplier that has the
> same Keytype
> as the key it receives in the process method. Even though it would forward
> a different
> key type and therefore KTables key Type can't change. I am thinking how to
> pull this off but I see little chance
>
> But I am always in big favour of introducing the forth type OutputKey, it
> would become
> straight forward then. I hope you can follow.
>
> + It won't solves peoples problem having CombinedKey on the wire and not
>>> being able to inspect the topic with say there default tools.
>>>
>> I see your point, but do we not have this issue always? To make range
>> scan work, we need to serialize the prefix (K1) and suffix (K)
>> independently from each other. IMHO, it would be too much of a burden to
>> the user, to provide a single serialized for K0 that guaranteed the
>> ordering we need. Still, advanced user can provide custom Serde for the
>> changelog topic via `Joined` -- and they can serialize as they wish (ie,
>> get CombinedKey, convert internally to K0 and serialized -- but
>> this is an opt-in).
>>
>> I think, this actually aligns with what you are saying. However, I think
>> the #prefix() call is not the best idea. We can just use Serde for
>> this (if users overwrite CombinedKey-Serde, it must overwrite Serde
>> too and can return the proper perfix (or do I miss something?).
>>
> I can't follow. For the stock implementation user would get
> they wouldn't need prefix. Users had not to define it we can implement
> that ourself by just getting K1 Serde.
>
> But to Override with a custom Serde that prefix method is needed as an
> indicator if only a prefix or the full thing is to be rendered.
>
>
>>   - Id rather introduce KTable::mapKeys() or something (4th generic in
>>> Ktable?) than overloading. It is better SOCs wise.
>>>
>> What overload are you talking about? From my understanding, we want to
>> add one single method (or maybe one for inner,left,outter each), but I
>> don't see any overloads atm?
>>
> The back and forth mapper would get an overload
>
>>
>> Also, `KTable.mapKeys()` would have the issue, that one could create an
>> invalid KTable with key collisions. I would rather shield users to shoot
>> themselves in the foot.
>>
> This mapkeys would not be used to remove the actual values but to get rid
> of the CombinedKey-type.
> 

Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-12-12 Thread Jan Filipiak

Hi, I updated the KIP

I would be open for this:

We mark the "less intrusive" and the "back and forth mapper" approach as 
rejected alternatives.

and implement the two remaining methods.

any thoughts?

Best jan


On 07.12.2017 12:58, Jan Filipiak wrote:


On 05.12.2017 00:42, Matthias J. Sax wrote:

Jan,

The KTableValueGetter thing is a valid point. I think we would need a
backwards mapper (or merge both into one and sacrifices lambdas?).
Another alternative would be, to drop the optimization and materialize
the KTable.operator() result... (not a great solution either). I am
personally fine with a backwards mapper (we should call it KeySplitter).


2. I am not sure if we can pull it of w/o said forth generic type in
KTable (that I am in favour of btw)

Not sure if I can follow here. I am personally not worried about the
number of generic types -- it's just to have a clear definition what
each passed parameter does.
I need to double check this again. Its good that we are open to 
introduce a new one
I think it will not work currently as a KTableProcessorSupplier when 
asked for a
ValueGetterSupplier it can only return a ValueGetter Supplier that has 
the same Keytype
as the key it receives in the process method. Even though it would 
forward a different
key type and therefore KTables key Type can't change. I am thinking 
how to pull this off but I see little chance


But I am always in big favour of introducing the forth type OutputKey, 
it would become

straight forward then. I hope you can follow.

+ It won't solves peoples problem having CombinedKey on the wire and 
not being able to inspect the topic with say there default tools.

I see your point, but do we not have this issue always? To make range
scan work, we need to serialize the prefix (K1) and suffix (K)
independently from each other. IMHO, it would be too much of a burden to
the user, to provide a single serialized for K0 that guaranteed the
ordering we need. Still, advanced user can provide custom Serde for the
changelog topic via `Joined` -- and they can serialize as they wish (ie,
get CombinedKey, convert internally to K0 and serialized -- but
this is an opt-in).

I think, this actually aligns with what you are saying. However, I think
the #prefix() call is not the best idea. We can just use Serde for
this (if users overwrite CombinedKey-Serde, it must overwrite Serde
too and can return the proper perfix (or do I miss something?).

I can't follow. For the stock implementation user would get
they wouldn't need prefix. Users had not to define it we can implement
that ourself by just getting K1 Serde.

But to Override with a custom Serde that prefix method is needed as an
indicator if only a prefix or the full thing is to be rendered.



  - Id rather introduce KTable::mapKeys() or something (4th generic 
in Ktable?) than overloading. It is better SOCs wise.

What overload are you talking about? From my understanding, we want to
add one single method (or maybe one for inner,left,outter each), but I
don't see any overloads atm?

The back and forth mapper would get an overload


Also, `KTable.mapKeys()` would have the issue, that one could create an
invalid KTable with key collisions. I would rather shield users to shoot
themselves in the foot.
This mapkeys would not be used to remove the actual values but to get 
rid of the CombinedKey-type.
Users can shoot themself with the proposed back and forth mapper you 
suggested.






Side remark:

In the KIP, in the Step-by-Step table (that I really like a lot!) I
think in line 5 (input A, with key A2 arrives, the columns "state B
materialized" and "state B other task" should not be empty but the same
as in line 4?

Will double check tonight. totally plausible i messed this up!

best Jan




-Matthias


On 11/25/17 8:56 PM, Jan Filipiak wrote:

Hi Matthias,

2 things that pop into my mind sunday morning. Can we provide an
KTableValueGetter when key in the store is different from the key
forwarded?
1. we would need a backwards mapper
2. I am not sure if we can pull it of w/o said forth generic type in
KTable (that I am in favour of btw)

+ It won't solves peoples problem having CombinedKey on the wire and 
not

beeing able to inspect the topic with say there default tools.
  - Id rather introduce KTable::mapKeys() or something (4th generic in
Ktable?) than overloading. It is better SOCs wise.

I am thinking more into an overload where we replace the Comined key
Serde. So people can use a default CombinedKey Serde
but could provide an own implementation that would internally use K0 
vor

serialisation and deserialisation. One could implement
a ##prefix() into this call to make explicit that we only want the
prefix rendered. This would take CombinedKey logic out of publicly 
visible
data. A Stock CombinedKey Serde that would be used by default could 
also

handle the JSON users correctly.

Users would still get CombinedKey back. The downside of getting these
nested deeply is probably mitgated 

Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-12-07 Thread Jan Filipiak


On 05.12.2017 00:42, Matthias J. Sax wrote:

Jan,

The KTableValueGetter thing is a valid point. I think we would need a
backwards mapper (or merge both into one and sacrifices lambdas?).
Another alternative would be, to drop the optimization and materialize
the KTable.operator() result... (not a great solution either). I am
personally fine with a backwards mapper (we should call it KeySplitter).


2. I am not sure if we can pull it of w/o said forth generic type in
KTable (that I am in favour of btw)

Not sure if I can follow here. I am personally not worried about the
number of generic types -- it's just to have a clear definition what
each passed parameter does.
I need to double check this again. Its good that we are open to 
introduce a new one
I think it will not work currently as a KTableProcessorSupplier when 
asked for a
ValueGetterSupplier it can only return a ValueGetter Supplier that has 
the same Keytype
as the key it receives in the process method. Even though it would 
forward a different
key type and therefore KTables key Type can't change. I am thinking how 
to pull this off but I see little chance


But I am always in big favour of introducing the forth type OutputKey, 
it would become

straight forward then. I hope you can follow.


+ It won't solves peoples problem having CombinedKey on the wire and not being 
able to inspect the topic with say there default tools.

I see your point, but do we not have this issue always? To make range
scan work, we need to serialize the prefix (K1) and suffix (K)
independently from each other. IMHO, it would be too much of a burden to
the user, to provide a single serialized for K0 that guaranteed the
ordering we need. Still, advanced user can provide custom Serde for the
changelog topic via `Joined` -- and they can serialize as they wish (ie,
get CombinedKey, convert internally to K0 and serialized -- but
this is an opt-in).

I think, this actually aligns with what you are saying. However, I think
the #prefix() call is not the best idea. We can just use Serde for
this (if users overwrite CombinedKey-Serde, it must overwrite Serde
too and can return the proper perfix (or do I miss something?).

I can't follow. For the stock implementation user would get
they wouldn't need prefix. Users had not to define it we can implement
that ourself by just getting K1 Serde.

But to Override with a custom Serde that prefix method is needed as an
indicator if only a prefix or the full thing is to be rendered.




  - Id rather introduce KTable::mapKeys() or something (4th generic in Ktable?) 
than overloading. It is better SOCs wise.

What overload are you talking about? From my understanding, we want to
add one single method (or maybe one for inner,left,outter each), but I
don't see any overloads atm?

The back and forth mapper would get an overload


Also, `KTable.mapKeys()` would have the issue, that one could create an
invalid KTable with key collisions. I would rather shield users to shoot
themselves in the foot.
This mapkeys would not be used to remove the actual values but to get 
rid of the CombinedKey-type.
Users can shoot themself with the proposed back and forth mapper you 
suggested.






Side remark:

In the KIP, in the Step-by-Step table (that I really like a lot!) I
think in line 5 (input A, with key A2 arrives, the columns "state B
materialized" and "state B other task" should not be empty but the same
as in line 4?

Will double check tonight. totally plausible i messed this up!

best Jan




-Matthias


On 11/25/17 8:56 PM, Jan Filipiak wrote:

Hi Matthias,

2 things that pop into my mind sunday morning. Can we provide an
KTableValueGetter when key in the store is different from the key
forwarded?
1. we would need a backwards mapper
2. I am not sure if we can pull it of w/o said forth generic type in
KTable (that I am in favour of btw)

+ It won't solves peoples problem having CombinedKey on the wire and not
beeing able to inspect the topic with say there default tools.
  - Id rather introduce KTable::mapKeys() or something (4th generic in
Ktable?) than overloading. It is better SOCs wise.

I am thinking more into an overload where we replace the Comined key
Serde. So people can use a default CombinedKey Serde
but could provide an own implementation that would internally use K0 vor
serialisation and deserialisation. One could implement
a ##prefix() into this call to make explicit that we only want the
prefix rendered. This would take CombinedKey logic out of publicly visible
data. A Stock CombinedKey Serde that would be used by default could also
handle the JSON users correctly.

Users would still get CombinedKey back. The downside of getting these
nested deeply is probably mitgated by users doing a group by
in the very next step to get rid of A's key again.

That is what I was able to come up with so far.
Let me know. what you think




On 22.11.2017 00:14, Matthias J. Sax wrote:

Jan,

Thanks for explaining the Serde issue! This makes a lot 

Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-12-04 Thread Matthias J. Sax
Jan,

The KTableValueGetter thing is a valid point. I think we would need a
backwards mapper (or merge both into one and sacrifices lambdas?).
Another alternative would be, to drop the optimization and materialize
the KTable.operator() result... (not a great solution either). I am
personally fine with a backwards mapper (we should call it KeySplitter).

>> 2. I am not sure if we can pull it of w/o said forth generic type in
>> KTable (that I am in favour of btw)

Not sure if I can follow here. I am personally not worried about the
number of generic types -- it's just to have a clear definition what
each passed parameter does.

> + It won't solves peoples problem having CombinedKey on the wire and not 
> being able to inspect the topic with say there default tools. 

I see your point, but do we not have this issue always? To make range
scan work, we need to serialize the prefix (K1) and suffix (K)
independently from each other. IMHO, it would be too much of a burden to
the user, to provide a single serialized for K0 that guaranteed the
ordering we need. Still, advanced user can provide custom Serde for the
changelog topic via `Joined` -- and they can serialize as they wish (ie,
get CombinedKey, convert internally to K0 and serialized -- but
this is an opt-in).

I think, this actually aligns with what you are saying. However, I think
the #prefix() call is not the best idea. We can just use Serde for
this (if users overwrite CombinedKey-Serde, it must overwrite Serde
too and can return the proper perfix (or do I miss something?).

>  - Id rather introduce KTable::mapKeys() or something (4th generic in 
> Ktable?) than overloading. It is better SOCs wise. 

What overload are you talking about? From my understanding, we want to
add one single method (or maybe one for inner,left,outter each), but I
don't see any overloads atm?

Also, `KTable.mapKeys()` would have the issue, that one could create an
invalid KTable with key collisions. I would rather shield users to shoot
themselves in the foot.



Side remark:

In the KIP, in the Step-by-Step table (that I really like a lot!) I
think in line 5 (input A, with key A2 arrives, the columns "state B
materialized" and "state B other task" should not be empty but the same
as in line 4?



-Matthias


On 11/25/17 8:56 PM, Jan Filipiak wrote:
> Hi Matthias,
> 
> 2 things that pop into my mind sunday morning. Can we provide an
> KTableValueGetter when key in the store is different from the key
> forwarded?
> 1. we would need a backwards mapper
> 2. I am not sure if we can pull it of w/o said forth generic type in
> KTable (that I am in favour of btw)
> 
> + It won't solves peoples problem having CombinedKey on the wire and not
> beeing able to inspect the topic with say there default tools.
>  - Id rather introduce KTable::mapKeys() or something (4th generic in
> Ktable?) than overloading. It is better SOCs wise.
> 
> I am thinking more into an overload where we replace the Comined key
> Serde. So people can use a default CombinedKey Serde
> but could provide an own implementation that would internally use K0 vor
> serialisation and deserialisation. One could implement
> a ##prefix() into this call to make explicit that we only want the
> prefix rendered. This would take CombinedKey logic out of publicly visible
> data. A Stock CombinedKey Serde that would be used by default could also
> handle the JSON users correctly.
> 
> Users would still get CombinedKey back. The downside of getting these
> nested deeply is probably mitgated by users doing a group by
> in the very next step to get rid of A's key again.
> 
> That is what I was able to come up with so far.
> Let me know. what you think
> 
> 
> 
> 
> On 22.11.2017 00:14, Matthias J. Sax wrote:
>> Jan,
>>
>> Thanks for explaining the Serde issue! This makes a lot of sense.
>>
>> I discussed with Guozhang about this issue and came up with the
>> following idea that bridges both APIs:
>>
>> We still introduce CombinedKey as a public interface and exploit it to
>> manage the key in the store and the changelog topic. For this case we
>> can construct a suitable Serde internally based on the Serdes of both
>> keys that are combined.
>>
>> However, the type of the result table is user defined and can be
>> anything. To bridge between the CombinedKey and the user defined result
>> type, users need to hand in a `ValueMapper` that
>> convert the CombinedKey into the desired result type.
>>
>> Thus, the method signature would be something like
>>
>>>  KTable oneToManyJoin(> KTable other,
>>>  ValueMapper keyExtractor,> ValueJoiner
>>> joiner,
>>>  ValueMapper, KO> resultKeyMapper);
>> The interface parameters are still easy to understand and don't leak
>> implementation details IMHO.
>>
>> WDYT about this idea?
>>
>>
>> -Matthias
>>
>>
>> On 11/19/17 11:28 AM, Guozhang Wang wrote:
>>> Hello Jan,
>>>
>>> I think I get your 

Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-11-25 Thread Jan Filipiak

Hi Matthias,

2 things that pop into my mind sunday morning. Can we provide an 
KTableValueGetter when key in the store is different from the key 
forwarded?

1. we would need a backwards mapper
2. I am not sure if we can pull it of w/o said forth generic type in 
KTable (that I am in favour of btw)


+ It won't solves peoples problem having CombinedKey on the wire and not 
beeing able to inspect the topic with say there default tools.
 - Id rather introduce KTable::mapKeys() or something (4th generic in 
Ktable?) than overloading. It is better SOCs wise.


I am thinking more into an overload where we replace the Comined key 
Serde. So people can use a default CombinedKey Serde
but could provide an own implementation that would internally use K0 vor 
serialisation and deserialisation. One could implement
a ##prefix() into this call to make explicit that we only want the 
prefix rendered. This would take CombinedKey logic out of publicly visible
data. A Stock CombinedKey Serde that would be used by default could also 
handle the JSON users correctly.


Users would still get CombinedKey back. The downside of getting these 
nested deeply is probably mitgated by users doing a group by

in the very next step to get rid of A's key again.

That is what I was able to come up with so far.
Let me know. what you think




On 22.11.2017 00:14, Matthias J. Sax wrote:

Jan,

Thanks for explaining the Serde issue! This makes a lot of sense.

I discussed with Guozhang about this issue and came up with the
following idea that bridges both APIs:

We still introduce CombinedKey as a public interface and exploit it to
manage the key in the store and the changelog topic. For this case we
can construct a suitable Serde internally based on the Serdes of both
keys that are combined.

However, the type of the result table is user defined and can be
anything. To bridge between the CombinedKey and the user defined result
type, users need to hand in a `ValueMapper` that
convert the CombinedKey into the desired result type.

Thus, the method signature would be something like


 KTable oneToManyJoin(> KTable other,
 ValueMapper keyExtractor,> ValueJoiner joiner,
 ValueMapper, KO> resultKeyMapper);

The interface parameters are still easy to understand and don't leak
implementation details IMHO.

WDYT about this idea?


-Matthias


On 11/19/17 11:28 AM, Guozhang Wang wrote:

Hello Jan,

I think I get your point about the cumbersome that CombinedKey would
introduce for serialization and tooling based on serdes. What I'm still
wondering is the underlying of joinPrefixFakers mapper: from your latest
comment it seems this mapper will be a one-time mapper: we use this to map
the original resulted KTable, V0> to KTable and
then that mapper can be thrown away and be forgotten. Is that true? My
original thought is that you propose to carry this mapper all the way along
the rest of the topology to "abstract" the underlying combined keys.

If it is the other way (i.e. the former approach), then the diagram of
these two approaches would be different: for the less intrusive approach we
would add one more step in this diagram to always do a mapping after the
"task perform join" block.

Also another minor comment on the internal topic: I think many readers may
not get the schema of this topic, so it is better to indicate that what
would be the key of this internal topic used for compaction, and what would
be used as the partition-key.

Guozhang


On Sat, Nov 18, 2017 at 2:30 PM, Jan Filipiak 
wrote:


-> it think the relationships between the different used types, K0,K1,KO
should be explains explicitly (all information is there implicitly, but
one need to think hard to figure it out)


I'm probably blind for this. can you help me here? how would you formulate
this?

Thanks,

Jan


On 16.11.2017 23:18, Matthias J. Sax wrote:


Hi,

I am just catching up on this discussion and did re-read the KIP and
discussion thread.

In contrast to you, I prefer the second approach with CombinedKey as
return type for the following reasons:

   1) the oneToManyJoin() method had less parameter
   2) those parameters are easy to understand
   3) we hide implementation details (joinPrefixFaker, leftKeyExtractor,
and the return type KO leaks internal implementation details from my
point of view)
   4) user can get their own KO type by extending CombinedKey interface
(this would also address the nesting issue Trevor pointed out)

That's unclear to me is, why you care about JSON serdes? What is the
problem with regard to prefix? It seems I am missing something here.

I also don't understand the argument about "the user can stick with his
default serde or his standard way of serializing"? If we have
`CombinedKey` as output, the use just provide the serdes for both input
combined-key types individually, and we can reuse both 

Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-11-21 Thread Guozhang Wang
Just to clarify, though "CombinedKey" will still be exposed as public APIs
it would not be used in any of the returned key types, so users do not need
to worry providing a serde for it at all; it will only be used in the
Mapper parameter, and internally Streams library would know how to serde it
if ever necessary.


Guozhang

On Tue, Nov 21, 2017 at 3:14 PM, Matthias J. Sax 
wrote:

> Jan,
>
> Thanks for explaining the Serde issue! This makes a lot of sense.
>
> I discussed with Guozhang about this issue and came up with the
> following idea that bridges both APIs:
>
> We still introduce CombinedKey as a public interface and exploit it to
> manage the key in the store and the changelog topic. For this case we
> can construct a suitable Serde internally based on the Serdes of both
> keys that are combined.
>
> However, the type of the result table is user defined and can be
> anything. To bridge between the CombinedKey and the user defined result
> type, users need to hand in a `ValueMapper` that
> convert the CombinedKey into the desired result type.
>
> Thus, the method signature would be something like
>
> >  KTable oneToManyJoin(> KTable other,
> > ValueMapper keyExtractor,> ValueJoiner joiner,
> > ValueMapper, KO> resultKeyMapper);
>
> The interface parameters are still easy to understand and don't leak
> implementation details IMHO.
>
> WDYT about this idea?
>
>
> -Matthias
>
>
> On 11/19/17 11:28 AM, Guozhang Wang wrote:
> > Hello Jan,
> >
> > I think I get your point about the cumbersome that CombinedKey would
> > introduce for serialization and tooling based on serdes. What I'm still
> > wondering is the underlying of joinPrefixFakers mapper: from your latest
> > comment it seems this mapper will be a one-time mapper: we use this to
> map
> > the original resulted KTable, V0> to KTable and
> > then that mapper can be thrown away and be forgotten. Is that true? My
> > original thought is that you propose to carry this mapper all the way
> along
> > the rest of the topology to "abstract" the underlying combined keys.
> >
> > If it is the other way (i.e. the former approach), then the diagram of
> > these two approaches would be different: for the less intrusive approach
> we
> > would add one more step in this diagram to always do a mapping after the
> > "task perform join" block.
> >
> > Also another minor comment on the internal topic: I think many readers
> may
> > not get the schema of this topic, so it is better to indicate that what
> > would be the key of this internal topic used for compaction, and what
> would
> > be used as the partition-key.
> >
> > Guozhang
> >
> >
> > On Sat, Nov 18, 2017 at 2:30 PM, Jan Filipiak 
> > wrote:
> >
> >> -> it think the relationships between the different used types, K0,K1,KO
> >> should be explains explicitly (all information is there implicitly, but
> >> one need to think hard to figure it out)
> >>
> >>
> >> I'm probably blind for this. can you help me here? how would you
> formulate
> >> this?
> >>
> >> Thanks,
> >>
> >> Jan
> >>
> >>
> >> On 16.11.2017 23:18, Matthias J. Sax wrote:
> >>
> >>> Hi,
> >>>
> >>> I am just catching up on this discussion and did re-read the KIP and
> >>> discussion thread.
> >>>
> >>> In contrast to you, I prefer the second approach with CombinedKey as
> >>> return type for the following reasons:
> >>>
> >>>   1) the oneToManyJoin() method had less parameter
> >>>   2) those parameters are easy to understand
> >>>   3) we hide implementation details (joinPrefixFaker, leftKeyExtractor,
> >>> and the return type KO leaks internal implementation details from my
> >>> point of view)
> >>>   4) user can get their own KO type by extending CombinedKey interface
> >>> (this would also address the nesting issue Trevor pointed out)
> >>>
> >>> That's unclear to me is, why you care about JSON serdes? What is the
> >>> problem with regard to prefix? It seems I am missing something here.
> >>>
> >>> I also don't understand the argument about "the user can stick with his
> >>> default serde or his standard way of serializing"? If we have
> >>> `CombinedKey` as output, the use just provide the serdes for both input
> >>> combined-key types individually, and we can reuse both internally to do
> >>> the rest. This seems to be a way simpler API. With the KO output type
> >>> approach, users need to write an entirely new serde for KO in contrast.
> >>>
> >>> Finally, @Jan, there are still some open comments you did not address
> >>> and the KIP wiki page needs some updates. Would be great if you could
> do
> >>> this.
> >>>
> >>> Can you also explicitly describe the data layout of the store that is
> >>> used to do the range scans?
> >>>
> >>> Additionally:
> >>>
> >>> -> some arrows in the algorithm diagram are missing
> >>> -> was are those XXX in the diagram
> >>> -> 

Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-11-21 Thread Matthias J. Sax
Jan,

Thanks for explaining the Serde issue! This makes a lot of sense.

I discussed with Guozhang about this issue and came up with the
following idea that bridges both APIs:

We still introduce CombinedKey as a public interface and exploit it to
manage the key in the store and the changelog topic. For this case we
can construct a suitable Serde internally based on the Serdes of both
keys that are combined.

However, the type of the result table is user defined and can be
anything. To bridge between the CombinedKey and the user defined result
type, users need to hand in a `ValueMapper` that
convert the CombinedKey into the desired result type.

Thus, the method signature would be something like

>  KTable oneToManyJoin(> KTable other,
> ValueMapper keyExtractor,> ValueJoiner joiner,
> ValueMapper, KO> resultKeyMapper);

The interface parameters are still easy to understand and don't leak
implementation details IMHO.

WDYT about this idea?


-Matthias


On 11/19/17 11:28 AM, Guozhang Wang wrote:
> Hello Jan,
> 
> I think I get your point about the cumbersome that CombinedKey would
> introduce for serialization and tooling based on serdes. What I'm still
> wondering is the underlying of joinPrefixFakers mapper: from your latest
> comment it seems this mapper will be a one-time mapper: we use this to map
> the original resulted KTable, V0> to KTable and
> then that mapper can be thrown away and be forgotten. Is that true? My
> original thought is that you propose to carry this mapper all the way along
> the rest of the topology to "abstract" the underlying combined keys.
> 
> If it is the other way (i.e. the former approach), then the diagram of
> these two approaches would be different: for the less intrusive approach we
> would add one more step in this diagram to always do a mapping after the
> "task perform join" block.
> 
> Also another minor comment on the internal topic: I think many readers may
> not get the schema of this topic, so it is better to indicate that what
> would be the key of this internal topic used for compaction, and what would
> be used as the partition-key.
> 
> Guozhang
> 
> 
> On Sat, Nov 18, 2017 at 2:30 PM, Jan Filipiak 
> wrote:
> 
>> -> it think the relationships between the different used types, K0,K1,KO
>> should be explains explicitly (all information is there implicitly, but
>> one need to think hard to figure it out)
>>
>>
>> I'm probably blind for this. can you help me here? how would you formulate
>> this?
>>
>> Thanks,
>>
>> Jan
>>
>>
>> On 16.11.2017 23:18, Matthias J. Sax wrote:
>>
>>> Hi,
>>>
>>> I am just catching up on this discussion and did re-read the KIP and
>>> discussion thread.
>>>
>>> In contrast to you, I prefer the second approach with CombinedKey as
>>> return type for the following reasons:
>>>
>>>   1) the oneToManyJoin() method had less parameter
>>>   2) those parameters are easy to understand
>>>   3) we hide implementation details (joinPrefixFaker, leftKeyExtractor,
>>> and the return type KO leaks internal implementation details from my
>>> point of view)
>>>   4) user can get their own KO type by extending CombinedKey interface
>>> (this would also address the nesting issue Trevor pointed out)
>>>
>>> That's unclear to me is, why you care about JSON serdes? What is the
>>> problem with regard to prefix? It seems I am missing something here.
>>>
>>> I also don't understand the argument about "the user can stick with his
>>> default serde or his standard way of serializing"? If we have
>>> `CombinedKey` as output, the use just provide the serdes for both input
>>> combined-key types individually, and we can reuse both internally to do
>>> the rest. This seems to be a way simpler API. With the KO output type
>>> approach, users need to write an entirely new serde for KO in contrast.
>>>
>>> Finally, @Jan, there are still some open comments you did not address
>>> and the KIP wiki page needs some updates. Would be great if you could do
>>> this.
>>>
>>> Can you also explicitly describe the data layout of the store that is
>>> used to do the range scans?
>>>
>>> Additionally:
>>>
>>> -> some arrows in the algorithm diagram are missing
>>> -> was are those XXX in the diagram
>>> -> can you finish the "Step by Step" example
>>> -> it think the relationships between the different used types, K0,K1,KO
>>> should be explains explicitly (all information is there implicitly, but
>>> one need to think hard to figure it out)
>>>
>>>
>>> Last but not least:
>>>
>>> But noone is really interested.

>>> Don't understand this statement...
>>>
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 11/16/17 9:05 AM, Jan Filipiak wrote:
>>>
 We are running this perfectly fine. for us the smaller table changes
 rather infrequent say. only a few times per day. The performance of the
 flush is way lower than the computing 

Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-11-20 Thread Matthias J. Sax
Just list what each thing is:

K0: key type of first/this table
K1: key type of second/other table
KO: key type of result table (concatenation of both input keys )


something like this (not sure it the example above is correct---it's
just for illustration)


-Matthias


On 11/18/17 2:30 PM, Jan Filipiak wrote:
> -> it think the relationships between the different used types, K0,K1,KO
> should be explains explicitly (all information is there implicitly, but
> one need to think hard to figure it out)
> 
> 
> I'm probably blind for this. can you help me here? how would you
> formulate this?
> 
> Thanks,
> 
> Jan
> 
> 
> On 16.11.2017 23:18, Matthias J. Sax wrote:
>> Hi,
>>
>> I am just catching up on this discussion and did re-read the KIP and
>> discussion thread.
>>
>> In contrast to you, I prefer the second approach with CombinedKey as
>> return type for the following reasons:
>>
>>   1) the oneToManyJoin() method had less parameter
>>   2) those parameters are easy to understand
>>   3) we hide implementation details (joinPrefixFaker, leftKeyExtractor,
>> and the return type KO leaks internal implementation details from my
>> point of view)
>>   4) user can get their own KO type by extending CombinedKey interface
>> (this would also address the nesting issue Trevor pointed out)
>>
>> That's unclear to me is, why you care about JSON serdes? What is the
>> problem with regard to prefix? It seems I am missing something here.
>>
>> I also don't understand the argument about "the user can stick with his
>> default serde or his standard way of serializing"? If we have
>> `CombinedKey` as output, the use just provide the serdes for both input
>> combined-key types individually, and we can reuse both internally to do
>> the rest. This seems to be a way simpler API. With the KO output type
>> approach, users need to write an entirely new serde for KO in contrast.
>>
>> Finally, @Jan, there are still some open comments you did not address
>> and the KIP wiki page needs some updates. Would be great if you could do
>> this.
>>
>> Can you also explicitly describe the data layout of the store that is
>> used to do the range scans?
>>
>> Additionally:
>>
>> -> some arrows in the algorithm diagram are missing
>> -> was are those XXX in the diagram
>> -> can you finish the "Step by Step" example
>> -> it think the relationships between the different used types, K0,K1,KO
>> should be explains explicitly (all information is there implicitly, but
>> one need to think hard to figure it out)
>>
>>
>> Last but not least:
>>
>>> But noone is really interested.
>> Don't understand this statement...
>>
>>
>>
>> -Matthias
>>
>>
>> On 11/16/17 9:05 AM, Jan Filipiak wrote:
>>> We are running this perfectly fine. for us the smaller table changes
>>> rather infrequent say. only a few times per day. The performance of the
>>> flush is way lower than the computing power you need to bring to the
>>> table to account for all the records beeing emmited after the one single
>>> update.
>>>
>>> On 16.11.2017 18:02, Trevor Huey wrote:
 Ah, I think I see the problem now. Thanks for the explanation. That is
 tricky. As you said, it seems the easiest solution would just be to
 flush the cache. I wonder how big of a performance hit that'd be...

 On Thu, Nov 16, 2017 at 9:07 AM Jan Filipiak > wrote:

  Hi Trevor,

  I am leaning towards the less intrusive approach myself. Infact
  that is how we implemented our Internal API for this and how we
  run it in production.
  getting more voices towards this solution makes me really happy.
  The reason its a problem for Prefix and not for Range is the
  following. Imagine the intrusive approach. They key of the RockDB
  would be CombinedKey and the prefix scan would take an A, and
  the range scan would take an CombinedKey still. As you can
  see with the intrusive approach the keys are actually different
  types for different queries. With the less intrusive apporach we
  use the same type and rely on Serde Invariances. For us this works
  nice (protobuf) might bite some JSON users.

  Hope it makes it clear

  Best Jan


  On 16.11.2017 16:39, Trevor Huey wrote:
>  1. Going over KIP-213, I am leaning toward the "less intrusive"
>  approach. In my use case, I am planning on performing a sequence
>  of several oneToMany joins, From my understanding, the more
>  intrusive approach would result in several nested levels of
>  CombinedKey's. For example, consider Tables A, B, C, D with
>  corresponding keys KA, KB, KC. Joining A and B would produce
>  CombinedKey. Then joining that result on C would produce
>  CombinedKey>. My "keyOtherSerde" in this
>  case 

Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-11-19 Thread Guozhang Wang
Hello Jan,

I think I get your point about the cumbersome that CombinedKey would
introduce for serialization and tooling based on serdes. What I'm still
wondering is the underlying of joinPrefixFakers mapper: from your latest
comment it seems this mapper will be a one-time mapper: we use this to map
the original resulted KTable, V0> to KTable and
then that mapper can be thrown away and be forgotten. Is that true? My
original thought is that you propose to carry this mapper all the way along
the rest of the topology to "abstract" the underlying combined keys.

If it is the other way (i.e. the former approach), then the diagram of
these two approaches would be different: for the less intrusive approach we
would add one more step in this diagram to always do a mapping after the
"task perform join" block.

Also another minor comment on the internal topic: I think many readers may
not get the schema of this topic, so it is better to indicate that what
would be the key of this internal topic used for compaction, and what would
be used as the partition-key.

Guozhang


On Sat, Nov 18, 2017 at 2:30 PM, Jan Filipiak 
wrote:

> -> it think the relationships between the different used types, K0,K1,KO
> should be explains explicitly (all information is there implicitly, but
> one need to think hard to figure it out)
>
>
> I'm probably blind for this. can you help me here? how would you formulate
> this?
>
> Thanks,
>
> Jan
>
>
> On 16.11.2017 23:18, Matthias J. Sax wrote:
>
>> Hi,
>>
>> I am just catching up on this discussion and did re-read the KIP and
>> discussion thread.
>>
>> In contrast to you, I prefer the second approach with CombinedKey as
>> return type for the following reasons:
>>
>>   1) the oneToManyJoin() method had less parameter
>>   2) those parameters are easy to understand
>>   3) we hide implementation details (joinPrefixFaker, leftKeyExtractor,
>> and the return type KO leaks internal implementation details from my
>> point of view)
>>   4) user can get their own KO type by extending CombinedKey interface
>> (this would also address the nesting issue Trevor pointed out)
>>
>> That's unclear to me is, why you care about JSON serdes? What is the
>> problem with regard to prefix? It seems I am missing something here.
>>
>> I also don't understand the argument about "the user can stick with his
>> default serde or his standard way of serializing"? If we have
>> `CombinedKey` as output, the use just provide the serdes for both input
>> combined-key types individually, and we can reuse both internally to do
>> the rest. This seems to be a way simpler API. With the KO output type
>> approach, users need to write an entirely new serde for KO in contrast.
>>
>> Finally, @Jan, there are still some open comments you did not address
>> and the KIP wiki page needs some updates. Would be great if you could do
>> this.
>>
>> Can you also explicitly describe the data layout of the store that is
>> used to do the range scans?
>>
>> Additionally:
>>
>> -> some arrows in the algorithm diagram are missing
>> -> was are those XXX in the diagram
>> -> can you finish the "Step by Step" example
>> -> it think the relationships between the different used types, K0,K1,KO
>> should be explains explicitly (all information is there implicitly, but
>> one need to think hard to figure it out)
>>
>>
>> Last but not least:
>>
>> But noone is really interested.
>>>
>> Don't understand this statement...
>>
>>
>>
>> -Matthias
>>
>>
>> On 11/16/17 9:05 AM, Jan Filipiak wrote:
>>
>>> We are running this perfectly fine. for us the smaller table changes
>>> rather infrequent say. only a few times per day. The performance of the
>>> flush is way lower than the computing power you need to bring to the
>>> table to account for all the records beeing emmited after the one single
>>> update.
>>>
>>> On 16.11.2017 18:02, Trevor Huey wrote:
>>>
 Ah, I think I see the problem now. Thanks for the explanation. That is
 tricky. As you said, it seems the easiest solution would just be to
 flush the cache. I wonder how big of a performance hit that'd be...

 On Thu, Nov 16, 2017 at 9:07 AM Jan Filipiak > wrote:

  Hi Trevor,

  I am leaning towards the less intrusive approach myself. Infact
  that is how we implemented our Internal API for this and how we
  run it in production.
  getting more voices towards this solution makes me really happy.
  The reason its a problem for Prefix and not for Range is the
  following. Imagine the intrusive approach. They key of the RockDB
  would be CombinedKey and the prefix scan would take an A, and
  the range scan would take an CombinedKey still. As you can
  see with the intrusive approach the keys are actually different
  types for different queries. With the less 

Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-11-18 Thread Jan Filipiak

-> it think the relationships between the different used types, K0,K1,KO
should be explains explicitly (all information is there implicitly, but
one need to think hard to figure it out)


I'm probably blind for this. can you help me here? how would you 
formulate this?


Thanks,

Jan


On 16.11.2017 23:18, Matthias J. Sax wrote:

Hi,

I am just catching up on this discussion and did re-read the KIP and
discussion thread.

In contrast to you, I prefer the second approach with CombinedKey as
return type for the following reasons:

  1) the oneToManyJoin() method had less parameter
  2) those parameters are easy to understand
  3) we hide implementation details (joinPrefixFaker, leftKeyExtractor,
and the return type KO leaks internal implementation details from my
point of view)
  4) user can get their own KO type by extending CombinedKey interface
(this would also address the nesting issue Trevor pointed out)

That's unclear to me is, why you care about JSON serdes? What is the
problem with regard to prefix? It seems I am missing something here.

I also don't understand the argument about "the user can stick with his
default serde or his standard way of serializing"? If we have
`CombinedKey` as output, the use just provide the serdes for both input
combined-key types individually, and we can reuse both internally to do
the rest. This seems to be a way simpler API. With the KO output type
approach, users need to write an entirely new serde for KO in contrast.

Finally, @Jan, there are still some open comments you did not address
and the KIP wiki page needs some updates. Would be great if you could do
this.

Can you also explicitly describe the data layout of the store that is
used to do the range scans?

Additionally:

-> some arrows in the algorithm diagram are missing
-> was are those XXX in the diagram
-> can you finish the "Step by Step" example
-> it think the relationships between the different used types, K0,K1,KO
should be explains explicitly (all information is there implicitly, but
one need to think hard to figure it out)


Last but not least:


But noone is really interested.

Don't understand this statement...



-Matthias


On 11/16/17 9:05 AM, Jan Filipiak wrote:

We are running this perfectly fine. for us the smaller table changes
rather infrequent say. only a few times per day. The performance of the
flush is way lower than the computing power you need to bring to the
table to account for all the records beeing emmited after the one single
update.

On 16.11.2017 18:02, Trevor Huey wrote:

Ah, I think I see the problem now. Thanks for the explanation. That is
tricky. As you said, it seems the easiest solution would just be to
flush the cache. I wonder how big of a performance hit that'd be...

On Thu, Nov 16, 2017 at 9:07 AM Jan Filipiak > wrote:

 Hi Trevor,

 I am leaning towards the less intrusive approach myself. Infact
 that is how we implemented our Internal API for this and how we
 run it in production.
 getting more voices towards this solution makes me really happy.
 The reason its a problem for Prefix and not for Range is the
 following. Imagine the intrusive approach. They key of the RockDB
 would be CombinedKey and the prefix scan would take an A, and
 the range scan would take an CombinedKey still. As you can
 see with the intrusive approach the keys are actually different
 types for different queries. With the less intrusive apporach we
 use the same type and rely on Serde Invariances. For us this works
 nice (protobuf) might bite some JSON users.

 Hope it makes it clear

 Best Jan


 On 16.11.2017 16:39, Trevor Huey wrote:

 1. Going over KIP-213, I am leaning toward the "less intrusive"
 approach. In my use case, I am planning on performing a sequence
 of several oneToMany joins, From my understanding, the more
 intrusive approach would result in several nested levels of
 CombinedKey's. For example, consider Tables A, B, C, D with
 corresponding keys KA, KB, KC. Joining A and B would produce
 CombinedKey. Then joining that result on C would produce
 CombinedKey>. My "keyOtherSerde" in this
 case would need to be capable of deserializing CombinedKey. This would just get worse the more tables I join. I realize
 that it's easier to shoot yourself in the foot with the less
 intrusive approach, but as you said, " the user can stick with
 his default serde or his standard way of serializing". In the
 simplest case where the keys are just strings, they can do simple
 string concatenation and Serdes.String(). It also allows the user
 to create and use their own version of CombinedKey if they feel
 so inclined.

 2. Why is there a problem for prefix, but not for range?


Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-11-18 Thread Jan Filipiak


On 17.11.2017 06:59, Guozhang Wang wrote:

Thanks for the explanation Jan. On top of my head I'm leaning towards the
"more intrusive" approach to resolve the race condition issue we discussed
above. Matthias has some arguments for this approach already, so I would
not re-iterate them here. To me I find the "ValueMapper
joinPrefixFaker" is actually leaking the same amount of internal
implementation details information as the more intrusive approach, but in a
less clear way. So I'd rather just clarify to users than trying to abstract
in an awkward way.

As again. The benefits of __not__ introducing a new "Wrapper" type are huge!
We spend lot of effort to get rid of Changes<> in our Topics. We also 
will not

want CombinedKeys.
I make one suggestion! Let's keep thinking how to make this more precise
w/o introducing new Kafka Streams only types?
As you can see currently the vote is 2 / 2. People that use kafka stream
like the less intrusive approach people that develop like the more 
intrusive one.

The most pretty thing might not be the thing that gives the bang for the bug
out there.

Best Jan


Also I'm not clear what do you mean by "CombinedKey would require an
additional mapping to what the less intrusive method has". If you meant
that users are enforced to provide a new serde for this combo key, could
that be avoided with the library automatically generate a serde for it
until the user changed this key later in the topology (e.g. via a map()
function) in which they can "flatten" this combo key into a flat key.

*@Trevor: *for your case for concatenating multiple joins, I think a better
way is to call `oneToManyJoin().map().oneToManyJoin().map()...` than
specifying a sequence of joinPrefixFakers as they will also be chained up
together (remember we have to keep this object along the rest of the
topology) which will make serde even harder?

Hi,

that was the map I was talking about. Last time I checked KTable only 
had 3 Generic Types.
For this I think it would require 4 Types, KeyIn,KeyOut,ValueIn,ValueOut 
I am very much in favour to
add this since basically ever, maybe this opens up some discussion, but 
without this Mapping keys of
KTable is not possible. I once again recommend peeking over to 
Mapreduce/Tez guys wich have the

concept of these 4 Generics since basically ever.



Similar to Matthias's question, the "XXX" markers are a bit confusing to me.

Sorry!!!


Guozhang


On Thu, Nov 16, 2017 at 2:18 PM, Matthias J. Sax 
wrote:


Hi,

I am just catching up on this discussion and did re-read the KIP and
discussion thread.

In contrast to you, I prefer the second approach with CombinedKey as
return type for the following reasons:

  1) the oneToManyJoin() method had less parameter
  2) those parameters are easy to understand
  3) we hide implementation details (joinPrefixFaker, leftKeyExtractor,
and the return type KO leaks internal implementation details from my
point of view)
  4) user can get their own KO type by extending CombinedKey interface
(this would also address the nesting issue Trevor pointed out)

That's unclear to me is, why you care about JSON serdes? What is the
problem with regard to prefix? It seems I am missing something here.

I also don't understand the argument about "the user can stick with his
default serde or his standard way of serializing"? If we have
`CombinedKey` as output, the use just provide the serdes for both input
combined-key types individually, and we can reuse both internally to do
the rest. This seems to be a way simpler API. With the KO output type
approach, users need to write an entirely new serde for KO in contrast.

Finally, @Jan, there are still some open comments you did not address
and the KIP wiki page needs some updates. Would be great if you could do
this.

Can you also explicitly describe the data layout of the store that is
used to do the range scans?

Additionally:

-> some arrows in the algorithm diagram are missing
-> was are those XXX in the diagram
-> can you finish the "Step by Step" example
-> it think the relationships between the different used types, K0,K1,KO
should be explains explicitly (all information is there implicitly, but
one need to think hard to figure it out)


Last but not least:


But noone is really interested.

Don't understand this statement...



-Matthias


On 11/16/17 9:05 AM, Jan Filipiak wrote:

We are running this perfectly fine. for us the smaller table changes
rather infrequent say. only a few times per day. The performance of the
flush is way lower than the computing power you need to bring to the
table to account for all the records beeing emmited after the one single
update.

On 16.11.2017 18:02, Trevor Huey wrote:

Ah, I think I see the problem now. Thanks for the explanation. That is
tricky. As you said, it seems the easiest solution would just be to
flush the cache. I wonder how big of a performance hit that'd be...

On Thu, Nov 16, 2017 at 9:07 AM Jan Filipiak 

Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-11-18 Thread Jan Filipiak

Hi Matthias

answers to the questions inline.

On 16.11.2017 23:18, Matthias J. Sax wrote:

Hi,

I am just catching up on this discussion and did re-read the KIP and
discussion thread.

In contrast to you, I prefer the second approach with CombinedKey as
return type for the following reasons:

  1) the oneToManyJoin() method had less parameter

yeah I like that!

  2) those parameters are easy to understand

The big benefit really!

  3) we hide implementation details (joinPrefixFaker, leftKeyExtractor,
and the return type KO leaks internal implementation details from my
point of view)
It does, so does Combined key. I am a firm believer in the principle of 
leaky abstractions.
I think this is okay given the non-triviality of what it tries to 
abstract away.



  4) user can get their own KO type by extending CombinedKey interface
(this would also address the nesting issue Trevor pointed out)
They can not easily. Say you use protobuf (as we do) and your classes 
get generated by a compiler such as protoc you can not easily
have it generate subclasses of CombinedKey. I think it's great that we 
have Trevors opinion here as a second users perspective.
To get stuff going its sometimes easier to deal with the implications of 
you API (that are there anyways) instead of fighting you current
established toolset to adapt to some new sheme (like ComnbinedKeys). In 
the end there is a reason we run it in production with the
less intrusive approach. because it is way less intrusive into our 
current tool chain and does not require us to adapt to some "kafka streams"
specifica. We have tools to inspect topics, if they key would suddenly 
be a CombinedKey of two protobuf messages we cant use our default 
toolchain. This argument for us is very relevant to give some gravitas: 
We also rewrote the KTable::GroupBy that comes with stock
0.10.0.1 to repartition without serializing Change<> and have log 
compaction enabled to not treat Streamstopics different than any other.
For us this is very important. We want to upstream this to be able to 
use it instead of our Reflection based PAPI setup. We would not take the 
stock one into production.

That's unclear to me is, why you care about JSON serdes? What is the
problem with regard to prefix? It seems I am missing something here.

say you have a CombineKey A,B  with values "a" and "b",
if you take our Protobuf Serde its gonna be
"a""b"
and without "b" field set
"a"
as you can see its a perfect prefix
but with JSON
you would get
{ "A" => "a", "B" => "b" }
and without the B field
{ "A" => "a" }
as you can see it will  not be a prefix

I also don't understand the argument about "the user can stick with his
default serde or his standard way of serializing"? If we have
`CombinedKey` as output, the use just provide the serdes for both input
combined-key types individually, and we can reuse both internally to do
the rest. This seems to be a way simpler API. With the KO output type
approach, users need to write an entirely new serde for KO in contrast.

Finally, @Jan, there are still some open comments you did not address
and the KIP wiki page needs some updates. Would be great if you could do
this.

Can you also explicitly describe the data layout of the store that is
used to do the range scans?

Additionally:

-> some arrows in the algorithm diagram are missing
-> was are those XXX in the diagram
-> can you finish the "Step by Step" example
Can't find the missing arrows. Those XX's are not really relevant, will 
focus on the step by step example

-> it think the relationships between the different used types, K0,K1,KO
should be explains explicitly (all information is there implicitly, but
one need to think hard to figure it out)

will add this

Last but not least:


But noone is really interested.
This was the first time some took the effort to address the most 
pressuring issue moving this forward.

I counted this as not beeing interested before.

Don't understand this statement...



-Matthias


On 11/16/17 9:05 AM, Jan Filipiak wrote:

We are running this perfectly fine. for us the smaller table changes
rather infrequent say. only a few times per day. The performance of the
flush is way lower than the computing power you need to bring to the
table to account for all the records beeing emmited after the one single
update.

On 16.11.2017 18:02, Trevor Huey wrote:

Ah, I think I see the problem now. Thanks for the explanation. That is
tricky. As you said, it seems the easiest solution would just be to
flush the cache. I wonder how big of a performance hit that'd be...

On Thu, Nov 16, 2017 at 9:07 AM Jan Filipiak > wrote:

 Hi Trevor,

 I am leaning towards the less intrusive approach myself. Infact
 that is how we implemented our Internal API for this and how we
 run it in production.
 getting more voices towards this solution makes me really happy.
 The reason its a problem for Prefix and not 

Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-11-16 Thread Guozhang Wang
Thanks for the explanation Jan. On top of my head I'm leaning towards the
"more intrusive" approach to resolve the race condition issue we discussed
above. Matthias has some arguments for this approach already, so I would
not re-iterate them here. To me I find the "ValueMapper
joinPrefixFaker" is actually leaking the same amount of internal
implementation details information as the more intrusive approach, but in a
less clear way. So I'd rather just clarify to users than trying to abstract
in an awkward way.

Also I'm not clear what do you mean by "CombinedKey would require an
additional mapping to what the less intrusive method has". If you meant
that users are enforced to provide a new serde for this combo key, could
that be avoided with the library automatically generate a serde for it
until the user changed this key later in the topology (e.g. via a map()
function) in which they can "flatten" this combo key into a flat key.

*@Trevor: *for your case for concatenating multiple joins, I think a better
way is to call `oneToManyJoin().map().oneToManyJoin().map()...` than
specifying a sequence of joinPrefixFakers as they will also be chained up
together (remember we have to keep this object along the rest of the
topology) which will make serde even harder?

Similar to Matthias's question, the "XXX" markers are a bit confusing to me.

Guozhang


On Thu, Nov 16, 2017 at 2:18 PM, Matthias J. Sax 
wrote:

> Hi,
>
> I am just catching up on this discussion and did re-read the KIP and
> discussion thread.
>
> In contrast to you, I prefer the second approach with CombinedKey as
> return type for the following reasons:
>
>  1) the oneToManyJoin() method had less parameter
>  2) those parameters are easy to understand
>  3) we hide implementation details (joinPrefixFaker, leftKeyExtractor,
> and the return type KO leaks internal implementation details from my
> point of view)
>  4) user can get their own KO type by extending CombinedKey interface
> (this would also address the nesting issue Trevor pointed out)
>
> That's unclear to me is, why you care about JSON serdes? What is the
> problem with regard to prefix? It seems I am missing something here.
>
> I also don't understand the argument about "the user can stick with his
> default serde or his standard way of serializing"? If we have
> `CombinedKey` as output, the use just provide the serdes for both input
> combined-key types individually, and we can reuse both internally to do
> the rest. This seems to be a way simpler API. With the KO output type
> approach, users need to write an entirely new serde for KO in contrast.
>
> Finally, @Jan, there are still some open comments you did not address
> and the KIP wiki page needs some updates. Would be great if you could do
> this.
>
> Can you also explicitly describe the data layout of the store that is
> used to do the range scans?
>
> Additionally:
>
> -> some arrows in the algorithm diagram are missing
> -> was are those XXX in the diagram
> -> can you finish the "Step by Step" example
> -> it think the relationships between the different used types, K0,K1,KO
> should be explains explicitly (all information is there implicitly, but
> one need to think hard to figure it out)
>
>
> Last but not least:
>
> > But noone is really interested.
>
> Don't understand this statement...
>
>
>
> -Matthias
>
>
> On 11/16/17 9:05 AM, Jan Filipiak wrote:
> > We are running this perfectly fine. for us the smaller table changes
> > rather infrequent say. only a few times per day. The performance of the
> > flush is way lower than the computing power you need to bring to the
> > table to account for all the records beeing emmited after the one single
> > update.
> >
> > On 16.11.2017 18:02, Trevor Huey wrote:
> >> Ah, I think I see the problem now. Thanks for the explanation. That is
> >> tricky. As you said, it seems the easiest solution would just be to
> >> flush the cache. I wonder how big of a performance hit that'd be...
> >>
> >> On Thu, Nov 16, 2017 at 9:07 AM Jan Filipiak  >> > wrote:
> >>
> >> Hi Trevor,
> >>
> >> I am leaning towards the less intrusive approach myself. Infact
> >> that is how we implemented our Internal API for this and how we
> >> run it in production.
> >> getting more voices towards this solution makes me really happy.
> >> The reason its a problem for Prefix and not for Range is the
> >> following. Imagine the intrusive approach. They key of the RockDB
> >> would be CombinedKey and the prefix scan would take an A, and
> >> the range scan would take an CombinedKey still. As you can
> >> see with the intrusive approach the keys are actually different
> >> types for different queries. With the less intrusive apporach we
> >> use the same type and rely on Serde Invariances. For us this works
> >> nice (protobuf) might bite some JSON users.
> >>
> >> 

Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-11-16 Thread Matthias J. Sax
Hi,

I am just catching up on this discussion and did re-read the KIP and
discussion thread.

In contrast to you, I prefer the second approach with CombinedKey as
return type for the following reasons:

 1) the oneToManyJoin() method had less parameter
 2) those parameters are easy to understand
 3) we hide implementation details (joinPrefixFaker, leftKeyExtractor,
and the return type KO leaks internal implementation details from my
point of view)
 4) user can get their own KO type by extending CombinedKey interface
(this would also address the nesting issue Trevor pointed out)

That's unclear to me is, why you care about JSON serdes? What is the
problem with regard to prefix? It seems I am missing something here.

I also don't understand the argument about "the user can stick with his
default serde or his standard way of serializing"? If we have
`CombinedKey` as output, the use just provide the serdes for both input
combined-key types individually, and we can reuse both internally to do
the rest. This seems to be a way simpler API. With the KO output type
approach, users need to write an entirely new serde for KO in contrast.

Finally, @Jan, there are still some open comments you did not address
and the KIP wiki page needs some updates. Would be great if you could do
this.

Can you also explicitly describe the data layout of the store that is
used to do the range scans?

Additionally:

-> some arrows in the algorithm diagram are missing
-> was are those XXX in the diagram
-> can you finish the "Step by Step" example
-> it think the relationships between the different used types, K0,K1,KO
should be explains explicitly (all information is there implicitly, but
one need to think hard to figure it out)


Last but not least:

> But noone is really interested. 

Don't understand this statement...



-Matthias


On 11/16/17 9:05 AM, Jan Filipiak wrote:
> We are running this perfectly fine. for us the smaller table changes
> rather infrequent say. only a few times per day. The performance of the
> flush is way lower than the computing power you need to bring to the
> table to account for all the records beeing emmited after the one single
> update.
> 
> On 16.11.2017 18:02, Trevor Huey wrote:
>> Ah, I think I see the problem now. Thanks for the explanation. That is
>> tricky. As you said, it seems the easiest solution would just be to
>> flush the cache. I wonder how big of a performance hit that'd be...
>>
>> On Thu, Nov 16, 2017 at 9:07 AM Jan Filipiak > > wrote:
>>
>>     Hi Trevor,
>>
>>     I am leaning towards the less intrusive approach myself. Infact
>>     that is how we implemented our Internal API for this and how we
>>     run it in production.
>>     getting more voices towards this solution makes me really happy.
>>     The reason its a problem for Prefix and not for Range is the
>>     following. Imagine the intrusive approach. They key of the RockDB
>>     would be CombinedKey and the prefix scan would take an A, and
>>     the range scan would take an CombinedKey still. As you can
>>     see with the intrusive approach the keys are actually different
>>     types for different queries. With the less intrusive apporach we
>>     use the same type and rely on Serde Invariances. For us this works
>>     nice (protobuf) might bite some JSON users.
>>
>>     Hope it makes it clear
>>
>>     Best Jan
>>
>>
>>     On 16.11.2017 16:39, Trevor Huey wrote:
>>>     1. Going over KIP-213, I am leaning toward the "less intrusive"
>>>     approach. In my use case, I am planning on performing a sequence
>>>     of several oneToMany joins, From my understanding, the more
>>>     intrusive approach would result in several nested levels of
>>>     CombinedKey's. For example, consider Tables A, B, C, D with
>>>     corresponding keys KA, KB, KC. Joining A and B would produce
>>>     CombinedKey. Then joining that result on C would produce
>>>     CombinedKey>. My "keyOtherSerde" in this
>>>     case would need to be capable of deserializing CombinedKey>>     KB>. This would just get worse the more tables I join. I realize
>>>     that it's easier to shoot yourself in the foot with the less
>>>     intrusive approach, but as you said, " the user can stick with
>>>     his default serde or his standard way of serializing". In the
>>>     simplest case where the keys are just strings, they can do simple
>>>     string concatenation and Serdes.String(). It also allows the user
>>>     to create and use their own version of CombinedKey if they feel
>>>     so inclined.
>>>
>>>     2. Why is there a problem for prefix, but not for range?
>>>    
>>> https://github.com/apache/kafka/pull/3720/files#diff-8f863b74c3c5a0b989e89d00c149aef1L162
>>>
>>>
>>>
>>>     On Thu, Nov 16, 2017 at 2:57 AM Jan Filipiak
>>>     > wrote:
>>>
>>>     Hi Trevor,
>>>
>>> 

Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-11-16 Thread Trevor Huey
1. Going over KIP-213, I am leaning toward the "less intrusive" approach.
In my use case, I am planning on performing a sequence of several oneToMany
joins, From my understanding, the more intrusive approach would result in
several nested levels of CombinedKey's. For example, consider Tables A, B,
C, D with corresponding keys KA, KB, KC. Joining A and B would produce
CombinedKey. Then joining that result on C would produce
CombinedKey>. My "keyOtherSerde" in this case would
need to be capable of deserializing CombinedKey. This would just
get worse the more tables I join. I realize that it's easier to shoot
yourself in the foot with the less intrusive approach, but as you said, " the
user can stick with his default serde or his standard way of serializing".
In the simplest case where the keys are just strings, they can do simple
string concatenation and Serdes.String(). It also allows the user to create
and use their own version of CombinedKey if they feel so inclined.

2. Why is there a problem for prefix, but not for range?
https://github.com/apache/kafka/pull/3720/files#diff-8f863b74c3c5a0b989e89d00c149aef1L162


On Thu, Nov 16, 2017 at 2:57 AM Jan Filipiak 
wrote:

> Hi Trevor,
>
> thank you very much for your interested. Too keep discussion mailing list
> focused and not Jira or Confluence I decided to reply here.
>
> 1. its tricky activity is indeed very low. In the KIP-213 there are 2
> proposals about the return type of the join. I would like to settle on one.
> Unfortunatly its controversal and I don't want to have the discussion
> after I settled on one way and implemented it. But noone is really
> interested.
> So discussing with YOU, what your preferred return type would look would
> be very helpfull already.
>
> 2.
> The most difficult part is implementing
> this
> https://github.com/apache/kafka/pull/3720/files#diff-ac41b4dfb9fc6bb707d966477317783cR68
> here
> https://github.com/apache/kafka/pull/3720/files#diff-8f863b74c3c5a0b989e89d00c149aef1R244
> and here
> https://github.com/apache/kafka/pull/3720/files#diff-b1a1281dce5219fd0cb5afad380d9438R207
> One can get an easy shot by just flushing the underlying rocks and using
> Rocks for range scan.
> But as you can see the implementation depends on the API. For wich way the
> API discussion goes
> I would implement this differently.
>
> 3.
> I only have so and so much time to work on this. I filed the KIP because I
> want to pull it through and I am pretty confident that I can do it.
> But I am still waiting for the full discussion to happen on this. To get
> the discussion forward it seems to be that I need to fill out the table in
> the KIP entirly (the one describing the events, change modifications and
> output). Feel free to continue the discussion w/o the table. I want
> to finish the table during next week.
>
> Best Jan thank you for your interest!
>
> _ Jira Quote __
>
> Jan Filipiak
> 
> Please bear with me while I try to get caught up. I'm not yet familiar with
> the Kafka code base. I have a few questions to try to figure out how I can
> get involved:
> 1. It seems like we need to get buy-in on your KIP-213? It doesn't seem
> like there's been much activity on it besides yourself in a while. What's
> your current plan of attack for getting that approved?
> 2. I know you said that the most difficult part is yet to be done. Is
> there some code you can point me toward so I can start digging in and
> better understand why this is so difficult?
> 3. This issue has been open since May '16. How far out do you think we are
> from getting this implemented?
>


Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-11-16 Thread Jan Filipiak
We are running this perfectly fine. for us the smaller table changes 
rather infrequent say. only a few times per day. The performance of the 
flush is way lower than the computing power you need to bring to the 
table to account for all the records beeing emmited after the one single 
update.


On 16.11.2017 18:02, Trevor Huey wrote:
Ah, I think I see the problem now. Thanks for the explanation. That is 
tricky. As you said, it seems the easiest solution would just be to 
flush the cache. I wonder how big of a performance hit that'd be...


On Thu, Nov 16, 2017 at 9:07 AM Jan Filipiak > wrote:


Hi Trevor,

I am leaning towards the less intrusive approach myself. Infact
that is how we implemented our Internal API for this and how we
run it in production.
getting more voices towards this solution makes me really happy.
The reason its a problem for Prefix and not for Range is the
following. Imagine the intrusive approach. They key of the RockDB
would be CombinedKey and the prefix scan would take an A, and
the range scan would take an CombinedKey still. As you can
see with the intrusive approach the keys are actually different
types for different queries. With the less intrusive apporach we
use the same type and rely on Serde Invariances. For us this works
nice (protobuf) might bite some JSON users.

Hope it makes it clear

Best Jan


On 16.11.2017 16:39, Trevor Huey wrote:

1. Going over KIP-213, I am leaning toward the "less intrusive"
approach. In my use case, I am planning on performing a sequence
of several oneToMany joins, From my understanding, the more
intrusive approach would result in several nested levels of
CombinedKey's. For example, consider Tables A, B, C, D with
corresponding keys KA, KB, KC. Joining A and B would produce
CombinedKey. Then joining that result on C would produce
CombinedKey>. My "keyOtherSerde" in this
case would need to be capable of deserializing CombinedKey. This would just get worse the more tables I join. I realize
that it's easier to shoot yourself in the foot with the less
intrusive approach, but as you said, " the user can stick with
his default serde or his standard way of serializing". In the
simplest case where the keys are just strings, they can do simple
string concatenation and Serdes.String(). It also allows the user
to create and use their own version of CombinedKey if they feel
so inclined.

2. Why is there a problem for prefix, but not for range?

https://github.com/apache/kafka/pull/3720/files#diff-8f863b74c3c5a0b989e89d00c149aef1L162


On Thu, Nov 16, 2017 at 2:57 AM Jan Filipiak
> wrote:

Hi Trevor,

thank you very much for your interested. Too keep discussion
mailing list focused and not Jira or Confluence I decided to
reply here.

1. its tricky activity is indeed very low. In the KIP-213
there are 2 proposals about the return type of the join. I
would like to settle on one.
Unfortunatly its controversal and I don't want to have the
discussion after I settled on one way and implemented it. But
noone is really interested.
So discussing with YOU, what your preferred return type would
look would be very helpfull already.

2.
The most difficult part is implementing
this

https://github.com/apache/kafka/pull/3720/files#diff-ac41b4dfb9fc6bb707d966477317783cR68
here

https://github.com/apache/kafka/pull/3720/files#diff-8f863b74c3c5a0b989e89d00c149aef1R244
and here

https://github.com/apache/kafka/pull/3720/files#diff-b1a1281dce5219fd0cb5afad380d9438R207
One can get an easy shot by just flushing the underlying
rocks and using Rocks for range scan.
But as you can see the implementation depends on the API. For
wich way the API discussion goes
I would implement this differently.

3.
I only have so and so much time to work on this. I filed the
KIP because I want to pull it through and I am pretty
confident that I can do it.
But I am still waiting for the full discussion to happen on
this. To get the discussion forward it seems to be that I
need to fill out the table in
the KIP entirly (the one describing the events, change
modifications and output). Feel free to continue the
discussion w/o the table. I want
to finish the table during next week.

Best Jan thank you for your interest!

_ Jira Quote __

Jan Filipiak

Please bear with me while I try to get 

Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-11-16 Thread Jan Filipiak

Hi Trevor,

thank you very much for your interested. Too keep discussion mailing 
list focused and not Jira or Confluence I decided to reply here.


1. its tricky activity is indeed very low. In the KIP-213 there are 2 
proposals about the return type of the join. I would like to settle on one.
Unfortunatly its controversal and I don't want to have the discussion 
after I settled on one way and implemented it. But noone is really 
interested.
So discussing with YOU, what your preferred return type would look would 
be very helpfull already.


2.
The most difficult part is implementing
this 
https://github.com/apache/kafka/pull/3720/files#diff-ac41b4dfb9fc6bb707d966477317783cR68
here 
https://github.com/apache/kafka/pull/3720/files#diff-8f863b74c3c5a0b989e89d00c149aef1R244
and here 
https://github.com/apache/kafka/pull/3720/files#diff-b1a1281dce5219fd0cb5afad380d9438R207
One can get an easy shot by just flushing the underlying rocks and using 
Rocks for range scan.
But as you can see the implementation depends on the API. For wich way 
the API discussion goes

I would implement this differently.

3.
I only have so and so much time to work on this. I filed the KIP because 
I want to pull it through and I am pretty confident that I can do it.
But I am still waiting for the full discussion to happen on this. To get 
the discussion forward it seems to be that I need to fill out the table in
the KIP entirly (the one describing the events, change modifications and 
output). Feel free to continue the discussion w/o the table. I want

to finish the table during next week.

Best Jan thank you for your interest!

_ Jira Quote __

Jan Filipiak 
 
Please bear with me while I try to get caught up. I'm not yet familiar 
with the Kafka code base. I have a few questions to try to figure out 
how I can get involved:
1. It seems like we need to get buy-in on your KIP-213? It doesn't seem 
like there's been much activity on it besides yourself in a while. 
What's your current plan of attack for getting that approved?
2. I know you said that the most difficult part is yet to be done. Is 
there some code you can point me toward so I can start digging in and 
better understand why this is so difficult?
3. This issue has been open since May '16. How far out do you think we 
are from getting this implemented?


Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-11-06 Thread Matthias J. Sax
;>>>> actually materializing both A and B right? If yes could you
>>>>>>> update the
>>>>>>> diagram?
>>>>>>>
>>>>>>> 7. This is a meta question: "in the sink, only use A's key to
>>>>>>> determine
>>>>>>> partition" I think we had the discussion long time ago, that if
>>>>>>> we are
>>>>>>> sending the old and new entries of the pair to different partitions,
>>>>>>> their
>>>>>>> ordering may get reversed later when reading from the join operator
>>>>>>> (i.e.
>>>>>>> the "Materialize B" block in your diagram). How did you address that
>>>>>>> with
>>>>>>> this proposal?
>>>>>>>
>>>>>>> 8. "B records with a 'null' A-key value would be silently dropped"
>>>>>>> Where
>>>>>>> are we dropping it, do we drop it at the first sub-topology (i.e the
>>>>>>> "Repartition by A's key" block)?
>>>>>>>
>>>>>>> Guozhang
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Nov 1, 2017 at 12:18 PM, Jan Filipiak <
>>>>>>> jan.filip...@trivago.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi thanks for the feedback
>>>>>>>> On 01.11.2017 12:58, Damian Guy wrote:
>>>>>>>>
>>>>>>>> Hi Jan, Thanks for the KIP!
>>>>>>>>> In both alternatives the API will need to use the `Joined` class
>>>>>>>>> rather
>>>>>>>>> than than passing in `Serde`s. Also, as with all other joins etc,
>>>>>>>>> there
>>>>>>>>> probably should be an overload that doesn't require any `Serdes`.
>>>>>>>>>
>>>>>>>>> Will check again how current API looks. I remember loosing the
>>>>>>>> argument
>>>>>>>> with this IQ overloads things.
>>>>>>>> Didn't expect something to have happend already so I just copied
>>>>>>>> from
>>>>>>>> the
>>>>>>>> PR. Will update.
>>>>>>>> Will also add the overload.
>>>>>>>>
>>>>>>>> It isn't clear to me what `joinPrefixFaker` is doing? In the
>>>>>>>> comment
>>>>>>>>> it
>>>>>>>>> says "returning an outputKey that when serialized only produces a
>>>>>>>>> prefix
>>>>>>>>> of
>>>>>>>>> the output key which is the same serializing K" So why not just
>>>>>>>>> use
>>>>>>>>> "K" ?
>>>>>>>>>
>>>>>>>>> The faker in fact returns K wich can be serialized by the Key
>>>>>>>>> Serde
>>>>>>>> in the
>>>>>>>> rocks. But it needs to only contain A's key and it needs to be a
>>>>>>>> strict
>>>>>>>> prefix
>>>>>>>> byte[] of all K with this A's key. We gonna seek there with an
>>>>>>>> RocksIterator and continue to read as long as the "faked key"
>>>>>>>> serialized
>>>>>>>> form is a prefix
>>>>>>>> This is easy todo for Avro + Protobuf +  custom Serdes and Hadoop
>>>>>>>> Writables. Its a nightmare for JSON serdes.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>> Damian
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, 27 Oct 2017 at 10:27 Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> I think if you explain what A and B are in the beginning, it makes
>>>>>>>>> sense
>>>>>>>>>
>>>>>>>>>> to
>>>>>>>>>> use them since readers would know who they reference.
>>>>>>>>>>
>>>>>>>>>> Cheers
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 26, 2017 at 11:04 PM, Jan Filipiak
>>>>>>>>>> <jan.filip...@trivago.com
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks for the remarks. hope I didn't miss any.
>>>>>>>>>>> Not even sure if it makes sense to introduce A and B or just
>>>>>>>>>>> stick
>>>>>>>>>>> with
>>>>>>>>>>> "this ktable", "other ktable"
>>>>>>>>>>>
>>>>>>>>>>> Thank you
>>>>>>>>>>> Jan
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 27.10.2017 06:58, Ted Yu wrote:
>>>>>>>>>>>
>>>>>>>>>>> Do you mind addressing my previous comments ?
>>>>>>>>>>>
>>>>>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND1hzF8SRzUqb?subj=Re+
>>>>>>>>>>>> DISCUSS+KIP+213+Support+non+key+joining+in+KTable
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Oct 26, 2017 at 9:38 PM, Jan Filipiak <
>>>>>>>>>>>> jan.filip...@trivago.com
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hello everyone,
>>>>>>>>>>>>
>>>>>>>>>>>> this is the new discussion thread after the ID-clash.
>>>>>>>>>>>>> Best
>>>>>>>>>>>>> Jan
>>>>>>>>>>>>>
>>>>>>>>>>>>> __
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hello Kafka-users,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I want to continue with the development of KAFKA-3705, which
>>>>>>>>>>>>> allows
>>>>>>>>>>>>> the
>>>>>>>>>>>>> Streams DSL to perform KTableKTable-Joins when the KTables
>>>>>>>>>>>>> have a
>>>>>>>>>>>>> one-to-many relationship.
>>>>>>>>>>>>> To make sure we cover the requirements of as many users as
>>>>>>>>>>>>> possible
>>>>>>>>>>>>> and
>>>>>>>>>>>>> have a good solution afterwards I invite everyone to read
>>>>>>>>>>>>> through the
>>>>>>>>>>>>> KIP I
>>>>>>>>>>>>> put together and discuss it here in this Thread.
>>>>>>>>>>>>>
>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
>>>>>>>>>>>>> Support+non-key+joining+in+KTable
>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-3705
>>>>>>>>>>>>> https://github.com/apache/kafka/pull/3720
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think a public discussion and vote on a solution is exactly
>>>>>>>>>>>>> what is
>>>>>>>>>>>>> needed to bring this feauture into kafka-streams. I am looking
>>>>>>>>>>>>> forward
>>>>>>>>>>>>>
>>>>>>>>>>>>> to
>>>>>>>>>>> everyones opinion!
>>>>>>>>>>>
>>>>>>>>>>>> Please keep the discussion on the mailing list rather than
>>>>>>>>>>>>> commenting
>>>>>>>>>>>>>
>>>>>>>>>>>>> on
>>>>>>>>>>> the wiki (wiki discussions get unwieldy fast).
>>>>>>>>>>>
>>>>>>>>>>>> Best
>>>>>>>>>>>>> Jan
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-11-06 Thread Jan Filipiak
uot;this ktable", "other ktable"

Thank you
Jan


On 27.10.2017 06:58, Ted Yu wrote:

Do you mind addressing my previous comments ?


http://search-hadoop.com/m/Kafka/uyzND1hzF8SRzUqb?subj=Re+
DISCUSS+KIP+213+Support+non+key+joining+in+KTable

On Thu, Oct 26, 2017 at 9:38 PM, Jan Filipiak <
jan.filip...@trivago.com
wrote:

Hello everyone,

this is the new discussion thread after the ID-clash.

Best
Jan

__


Hello Kafka-users,

I want to continue with the development of KAFKA-3705, which
allows
the
Streams DSL to perform KTableKTable-Joins when the KTables have a
one-to-many relationship.
To make sure we cover the requirements of as many users as
possible
and
have a good solution afterwards I invite everyone to read
through the
KIP I
put together and discuss it here in this Thread.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
Support+non-key+joining+in+KTable
https://issues.apache.org/jira/browse/KAFKA-3705
https://github.com/apache/kafka/pull/3720

I think a public discussion and vote on a solution is exactly
what is
needed to bring this feauture into kafka-streams. I am looking
forward

to

everyones opinion!


Please keep the discussion on the mailing list rather than

commenting

on

the wiki (wiki discussions get unwieldy fast).


Best

Jan










Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-11-06 Thread Ted Yu
 Hi thanks for the feedback
>>>>>>
>>>>>> On 01.11.2017 12:58, Damian Guy wrote:
>>>>>>
>>>>>> Hi Jan, Thanks for the KIP!
>>>>>>>
>>>>>>> In both alternatives the API will need to use the `Joined` class
>>>>>>> rather
>>>>>>> than than passing in `Serde`s. Also, as with all other joins etc,
>>>>>>> there
>>>>>>> probably should be an overload that doesn't require any `Serdes`.
>>>>>>>
>>>>>>> Will check again how current API looks. I remember loosing the
>>>>>> argument
>>>>>> with this IQ overloads things.
>>>>>> Didn't expect something to have happend already so I just copied from
>>>>>> the
>>>>>> PR. Will update.
>>>>>> Will also add the overload.
>>>>>>
>>>>>> It isn't clear to me what `joinPrefixFaker` is doing? In the comment
>>>>>>> it
>>>>>>> says "returning an outputKey that when serialized only produces a
>>>>>>> prefix
>>>>>>> of
>>>>>>> the output key which is the same serializing K" So why not just use
>>>>>>> "K" ?
>>>>>>>
>>>>>>> The faker in fact returns K wich can be serialized by the Key Serde
>>>>>> in the
>>>>>> rocks. But it needs to only contain A's key and it needs to be a
>>>>>> strict
>>>>>> prefix
>>>>>> byte[] of all K with this A's key. We gonna seek there with an
>>>>>> RocksIterator and continue to read as long as the "faked key"
>>>>>> serialized
>>>>>> form is a prefix
>>>>>> This is easy todo for Avro + Protobuf +  custom Serdes and Hadoop
>>>>>> Writables. Its a nightmare for JSON serdes.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>>> Damian
>>>>>>>
>>>>>>>
>>>>>>> On Fri, 27 Oct 2017 at 10:27 Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>>
>>>>>>> I think if you explain what A and B are in the beginning, it makes
>>>>>>> sense
>>>>>>>
>>>>>>>> to
>>>>>>>> use them since readers would know who they reference.
>>>>>>>>
>>>>>>>> Cheers
>>>>>>>>
>>>>>>>> On Thu, Oct 26, 2017 at 11:04 PM, Jan Filipiak
>>>>>>>> <jan.filip...@trivago.com
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks for the remarks. hope I didn't miss any.
>>>>>>>>> Not even sure if it makes sense to introduce A and B or just stick
>>>>>>>>> with
>>>>>>>>> "this ktable", "other ktable"
>>>>>>>>>
>>>>>>>>> Thank you
>>>>>>>>> Jan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 27.10.2017 06:58, Ted Yu wrote:
>>>>>>>>>
>>>>>>>>> Do you mind addressing my previous comments ?
>>>>>>>>>
>>>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND1hzF8SRzUqb?subj=Re+
>>>>>>>>>> DISCUSS+KIP+213+Support+non+key+joining+in+KTable
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 26, 2017 at 9:38 PM, Jan Filipiak <
>>>>>>>>>> jan.filip...@trivago.com
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hello everyone,
>>>>>>>>>>
>>>>>>>>>> this is the new discussion thread after the ID-clash.
>>>>>>>>>>>
>>>>>>>>>>> Best
>>>>>>>>>>> Jan
>>>>>>>>>>>
>>>>>>>>>>> __
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Hello Kafka-users,
>>>>>>>>>>>
>>>>>>>>>>> I want to continue with the development of KAFKA-3705, which
>>>>>>>>>>> allows
>>>>>>>>>>> the
>>>>>>>>>>> Streams DSL to perform KTableKTable-Joins when the KTables have a
>>>>>>>>>>> one-to-many relationship.
>>>>>>>>>>> To make sure we cover the requirements of as many users as
>>>>>>>>>>> possible
>>>>>>>>>>> and
>>>>>>>>>>> have a good solution afterwards I invite everyone to read
>>>>>>>>>>> through the
>>>>>>>>>>> KIP I
>>>>>>>>>>> put together and discuss it here in this Thread.
>>>>>>>>>>>
>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
>>>>>>>>>>> Support+non-key+joining+in+KTable
>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-3705
>>>>>>>>>>> https://github.com/apache/kafka/pull/3720
>>>>>>>>>>>
>>>>>>>>>>> I think a public discussion and vote on a solution is exactly
>>>>>>>>>>> what is
>>>>>>>>>>> needed to bring this feauture into kafka-streams. I am looking
>>>>>>>>>>> forward
>>>>>>>>>>>
>>>>>>>>>>> to
>>>>>>>>>>
>>>>>>>>> everyones opinion!
>>>>>>>>>
>>>>>>>>>> Please keep the discussion on the mailing list rather than
>>>>>>>>>>> commenting
>>>>>>>>>>>
>>>>>>>>>>> on
>>>>>>>>>>
>>>>>>>>> the wiki (wiki discussions get unwieldy fast).
>>>>>>>>>
>>>>>>>>>> Best
>>>>>>>>>>> Jan
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>
>


Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-11-06 Thread Jan Filipiak

I created an example Table in the WIKI page
Can you quickly check if that would be a good format?
I tried todo it ~like the unit tests but with the information of what 
state is there _AFTER_

processing happend.
I make the first 2 columns exclusive even though the in fact run in 
parallel but the joining

task serializes the effects.

Best Jan

On 06.11.2017 21:20, Jan Filipiak wrote:
Will do! Need to do it carefully. One mistake in this detailed 
approach and confusion is perfect ;)

Hope I can deliver this week.

Best Jan

On 06.11.2017 17:21, Matthias J. Sax wrote:

Jan,

thanks a lot for this KIP. I did an initial pass over it, but feel a
little lost. Maybe I need to read it more carefully, but atm it's not
clear to me at all what algorithm you propose.

I think it would be super helpful, to do an example with concrete data
that show how records are stored, what the different value mappers
extract, and what is written into repartitioning topics.



-Matthias


On 11/5/17 2:09 AM, Jan Filipiak wrote:

Hi Gouzhang

I hope the wikipage looks better now. made a little more effort into 
the

diagram. Still not ideal but I think it serves its purpose.



On 02.11.2017 01:17, Guozhang Wang wrote:

Thanks for the KIP writeup Jan. I made a first pass and here are some
quick
comments:


1. Could we use K0 / V0 and K1 / V1 etc since K0 and KO are a bit
harder to
differentiate when reading.

2. I think you missed the key type in the intrusive approach 
example code

snippet regarding "KTable  oneToManyJoin"? Should that be

KTable<CombinedKey<K,KO>, V0> oneToManyJoin

3. Some of the arrows in your algorithm section's diagrams seems
reversed.

4. In the first step of the algorithm, "Materialize B first", that
happens
in the "Repartition by A's key" block right? If yes, could you 
clarify it

in the block?

5. "skip old if A's key didn't change": hmm, not sure if we can 
skip it.

What if other fields (neither A's key or B's key) changes? Suppose you
have
an aggregation after the join, we still need to subtract the old value
from
the aggregation right?

6. In the block of "Materialize B", I think from your description 
we are

actually materializing both A and B right? If yes could you update the
diagram?

7. This is a meta question: "in the sink, only use A's key to 
determine

partition" I think we had the discussion long time ago, that if we are
sending the old and new entries of the pair to different partitions,
their
ordering may get reversed later when reading from the join operator 
(i.e.
the "Materialize B" block in your diagram). How did you address 
that with

this proposal?

8. "B records with a 'null' A-key value would be silently dropped" 
Where

are we dropping it, do we drop it at the first sub-topology (i.e the
"Repartition by A's key" block)?

Guozhang





On Wed, Nov 1, 2017 at 12:18 PM, Jan Filipiak 
<jan.filip...@trivago.com>

wrote:


Hi thanks for the feedback

On 01.11.2017 12:58, Damian Guy wrote:


Hi Jan, Thanks for the KIP!

In both alternatives the API will need to use the `Joined` class 
rather
than than passing in `Serde`s. Also, as with all other joins etc, 
there

probably should be an overload that doesn't require any `Serdes`.

Will check again how current API looks. I remember loosing the 
argument

with this IQ overloads things.
Didn't expect something to have happend already so I just copied from
the
PR. Will update.
Will also add the overload.

It isn't clear to me what `joinPrefixFaker` is doing? In the 
comment it

says "returning an outputKey that when serialized only produces a
prefix
of
the output key which is the same serializing K" So why not just use
"K" ?


The faker in fact returns K wich can be serialized by the Key Serde
in the
rocks. But it needs to only contain A's key and it needs to be a 
strict

prefix
byte[] of all K with this A's key. We gonna seek there with an
RocksIterator and continue to read as long as the "faked key" 
serialized

form is a prefix
This is easy todo for Avro + Protobuf +  custom Serdes and Hadoop
Writables. Its a nightmare for JSON serdes.




Thanks,
Damian


On Fri, 27 Oct 2017 at 10:27 Ted Yu <yuzhih...@gmail.com> wrote:

I think if you explain what A and B are in the beginning, it makes
sense

to
use them since readers would know who they reference.

Cheers

On Thu, Oct 26, 2017 at 11:04 PM, Jan Filipiak
<jan.filip...@trivago.com
wrote:



Thanks for the remarks. hope I didn't miss any.
Not even sure if it makes sense to introduce A and B or just stick
with
"this ktable", "other ktable"

Thank you
Jan


On 27.10.2017 06:58, Ted Yu wrote:

Do you mind addressing my previous comments ?

http://search-hadoop.com/m/Kafka/uyzND1hzF8SRzUqb?subj=Re+
DISCUSS+KIP+213+Support+non+key+joining+in+KTable

On Thu, Oct 26, 2017 at 9:38 PM, Jan Filipiak <
jan.filip...@trivago.co

Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-11-06 Thread Jan Filipiak
Will do! Need to do it carefully. One mistake in this detailed approach 
and confusion is perfect ;)

Hope I can deliver this week.

Best Jan

On 06.11.2017 17:21, Matthias J. Sax wrote:

Jan,

thanks a lot for this KIP. I did an initial pass over it, but feel a
little lost. Maybe I need to read it more carefully, but atm it's not
clear to me at all what algorithm you propose.

I think it would be super helpful, to do an example with concrete data
that show how records are stored, what the different value mappers
extract, and what is written into repartitioning topics.



-Matthias


On 11/5/17 2:09 AM, Jan Filipiak wrote:

Hi Gouzhang

I hope the wikipage looks better now. made a little more effort into the
diagram. Still not ideal but I think it serves its purpose.



On 02.11.2017 01:17, Guozhang Wang wrote:

Thanks for the KIP writeup Jan. I made a first pass and here are some
quick
comments:


1. Could we use K0 / V0 and K1 / V1 etc since K0 and KO are a bit
harder to
differentiate when reading.

2. I think you missed the key type in the intrusive approach example code
snippet regarding "KTable  oneToManyJoin"? Should that be

KTable<CombinedKey<K,KO>, V0> oneToManyJoin

3. Some of the arrows in your algorithm section's diagrams seems
reversed.

4. In the first step of the algorithm, "Materialize B first", that
happens
in the "Repartition by A's key" block right? If yes, could you clarify it
in the block?

5. "skip old if A's key didn't change": hmm, not sure if we can skip it.
What if other fields (neither A's key or B's key) changes? Suppose you
have
an aggregation after the join, we still need to subtract the old value
from
the aggregation right?

6. In the block of "Materialize B", I think from your description we are
actually materializing both A and B right? If yes could you update the
diagram?

7. This is a meta question: "in the sink, only use A's key to determine
partition" I think we had the discussion long time ago, that if we are
sending the old and new entries of the pair to different partitions,
their
ordering may get reversed later when reading from the join operator (i.e.
the "Materialize B" block in your diagram). How did you address that with
this proposal?

8. "B records with a 'null' A-key value would be silently dropped" Where
are we dropping it, do we drop it at the first sub-topology (i.e the
"Repartition by A's key" block)?

Guozhang





On Wed, Nov 1, 2017 at 12:18 PM, Jan Filipiak <jan.filip...@trivago.com>
wrote:


Hi thanks for the feedback

On 01.11.2017 12:58, Damian Guy wrote:


Hi Jan, Thanks for the KIP!

In both alternatives the API will need to use the `Joined` class rather
than than passing in `Serde`s. Also, as with all other joins etc, there
probably should be an overload that doesn't require any `Serdes`.


Will check again how current API looks. I remember loosing the argument
with this IQ overloads things.
Didn't expect something to have happend already so I just copied from
the
PR. Will update.
Will also add the overload.


It isn't clear to me what `joinPrefixFaker` is doing? In the comment it
says "returning an outputKey that when serialized only produces a
prefix
of
the output key which is the same serializing K" So why not just use
"K" ?


The faker in fact returns K wich can be serialized by the Key Serde
in the
rocks. But it needs to only contain A's key and it needs to be a strict
prefix
byte[] of all K with this A's key. We gonna seek there with an
RocksIterator and continue to read as long as the "faked key" serialized
form is a prefix
This is easy todo for Avro + Protobuf +  custom Serdes and Hadoop
Writables. Its a nightmare for JSON serdes.




Thanks,
Damian


On Fri, 27 Oct 2017 at 10:27 Ted Yu <yuzhih...@gmail.com> wrote:

I think if you explain what A and B are in the beginning, it makes
sense

to
use them since readers would know who they reference.

Cheers

On Thu, Oct 26, 2017 at 11:04 PM, Jan Filipiak
<jan.filip...@trivago.com
wrote:



Thanks for the remarks. hope I didn't miss any.
Not even sure if it makes sense to introduce A and B or just stick
with
"this ktable", "other ktable"

Thank you
Jan


On 27.10.2017 06:58, Ted Yu wrote:

Do you mind addressing my previous comments ?

http://search-hadoop.com/m/Kafka/uyzND1hzF8SRzUqb?subj=Re+
DISCUSS+KIP+213+Support+non+key+joining+in+KTable

On Thu, Oct 26, 2017 at 9:38 PM, Jan Filipiak <
jan.filip...@trivago.com
wrote:

Hello everyone,


this is the new discussion thread after the ID-clash.

Best
Jan

__


Hello Kafka-users,

I want to continue with the development of KAFKA-3705, which allows
the
Streams DSL to perform KTableKTable-Joins when the KTables have a
one-to-many relationship.
To make sure we cover the requirements of as many users as possible
and
have a good solution afterwards I invite everyone to r

Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-11-06 Thread Matthias J. Sax
Jan,

thanks a lot for this KIP. I did an initial pass over it, but feel a
little lost. Maybe I need to read it more carefully, but atm it's not
clear to me at all what algorithm you propose.

I think it would be super helpful, to do an example with concrete data
that show how records are stored, what the different value mappers
extract, and what is written into repartitioning topics.



-Matthias


On 11/5/17 2:09 AM, Jan Filipiak wrote:
> Hi Gouzhang
> 
> I hope the wikipage looks better now. made a little more effort into the
> diagram. Still not ideal but I think it serves its purpose.
> 
> 
> 
> On 02.11.2017 01:17, Guozhang Wang wrote:
>> Thanks for the KIP writeup Jan. I made a first pass and here are some
>> quick
>> comments:
>>
>>
>> 1. Could we use K0 / V0 and K1 / V1 etc since K0 and KO are a bit
>> harder to
>> differentiate when reading.
>>
>> 2. I think you missed the key type in the intrusive approach example code
>> snippet regarding "KTable  oneToManyJoin"? Should that be
>>
>> KTable<CombinedKey<K,KO>, V0> oneToManyJoin
>>
>> 3. Some of the arrows in your algorithm section's diagrams seems
>> reversed.
>>
>> 4. In the first step of the algorithm, "Materialize B first", that
>> happens
>> in the "Repartition by A's key" block right? If yes, could you clarify it
>> in the block?
>>
>> 5. "skip old if A's key didn't change": hmm, not sure if we can skip it.
>> What if other fields (neither A's key or B's key) changes? Suppose you
>> have
>> an aggregation after the join, we still need to subtract the old value
>> from
>> the aggregation right?
>>
>> 6. In the block of "Materialize B", I think from your description we are
>> actually materializing both A and B right? If yes could you update the
>> diagram?
>>
>> 7. This is a meta question: "in the sink, only use A's key to determine
>> partition" I think we had the discussion long time ago, that if we are
>> sending the old and new entries of the pair to different partitions,
>> their
>> ordering may get reversed later when reading from the join operator (i.e.
>> the "Materialize B" block in your diagram). How did you address that with
>> this proposal?
>>
>> 8. "B records with a 'null' A-key value would be silently dropped" Where
>> are we dropping it, do we drop it at the first sub-topology (i.e the
>> "Repartition by A's key" block)?
>>
>> Guozhang
>>
>>
>>
>>
>>
>> On Wed, Nov 1, 2017 at 12:18 PM, Jan Filipiak <jan.filip...@trivago.com>
>> wrote:
>>
>>> Hi thanks for the feedback
>>>
>>> On 01.11.2017 12:58, Damian Guy wrote:
>>>
>>>> Hi Jan, Thanks for the KIP!
>>>>
>>>> In both alternatives the API will need to use the `Joined` class rather
>>>> than than passing in `Serde`s. Also, as with all other joins etc, there
>>>> probably should be an overload that doesn't require any `Serdes`.
>>>>
>>> Will check again how current API looks. I remember loosing the argument
>>> with this IQ overloads things.
>>> Didn't expect something to have happend already so I just copied from
>>> the
>>> PR. Will update.
>>> Will also add the overload.
>>>
>>>> It isn't clear to me what `joinPrefixFaker` is doing? In the comment it
>>>> says "returning an outputKey that when serialized only produces a
>>>> prefix
>>>> of
>>>> the output key which is the same serializing K" So why not just use
>>>> "K" ?
>>>>
>>> The faker in fact returns K wich can be serialized by the Key Serde
>>> in the
>>> rocks. But it needs to only contain A's key and it needs to be a strict
>>> prefix
>>> byte[] of all K with this A's key. We gonna seek there with an
>>> RocksIterator and continue to read as long as the "faked key" serialized
>>> form is a prefix
>>> This is easy todo for Avro + Protobuf +  custom Serdes and Hadoop
>>> Writables. Its a nightmare for JSON serdes.
>>>
>>>
>>>
>>>> Thanks,
>>>> Damian
>>>>
>>>>
>>>> On Fri, 27 Oct 2017 at 10:27 Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>> I think if you explain what A and B are in the beginning, it makes
>>>> sense
>>>>> to
>>>>> use them since reade

Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-11-04 Thread Jan Filipiak

Hi Gouzhang

I hope the wikipage looks better now. made a little more effort into the 
diagram. Still not ideal but I think it serves its purpose.




On 02.11.2017 01:17, Guozhang Wang wrote:

Thanks for the KIP writeup Jan. I made a first pass and here are some quick
comments:


1. Could we use K0 / V0 and K1 / V1 etc since K0 and KO are a bit harder to
differentiate when reading.

2. I think you missed the key type in the intrusive approach example code
snippet regarding "KTable  oneToManyJoin"? Should that be

KTable<CombinedKey<K,KO>, V0> oneToManyJoin

3. Some of the arrows in your algorithm section's diagrams seems reversed.

4. In the first step of the algorithm, "Materialize B first", that happens
in the "Repartition by A's key" block right? If yes, could you clarify it
in the block?

5. "skip old if A's key didn't change": hmm, not sure if we can skip it.
What if other fields (neither A's key or B's key) changes? Suppose you have
an aggregation after the join, we still need to subtract the old value from
the aggregation right?

6. In the block of "Materialize B", I think from your description we are
actually materializing both A and B right? If yes could you update the
diagram?

7. This is a meta question: "in the sink, only use A's key to determine
partition" I think we had the discussion long time ago, that if we are
sending the old and new entries of the pair to different partitions, their
ordering may get reversed later when reading from the join operator (i.e.
the "Materialize B" block in your diagram). How did you address that with
this proposal?

8. "B records with a 'null' A-key value would be silently dropped" Where
are we dropping it, do we drop it at the first sub-topology (i.e the
"Repartition by A's key" block)?

Guozhang





On Wed, Nov 1, 2017 at 12:18 PM, Jan Filipiak <jan.filip...@trivago.com>
wrote:


Hi thanks for the feedback

On 01.11.2017 12:58, Damian Guy wrote:


Hi Jan, Thanks for the KIP!

In both alternatives the API will need to use the `Joined` class rather
than than passing in `Serde`s. Also, as with all other joins etc, there
probably should be an overload that doesn't require any `Serdes`.


Will check again how current API looks. I remember loosing the argument
with this IQ overloads things.
Didn't expect something to have happend already so I just copied from the
PR. Will update.
Will also add the overload.


It isn't clear to me what `joinPrefixFaker` is doing? In the comment it
says "returning an outputKey that when serialized only produces a prefix
of
the output key which is the same serializing K" So why not just use "K" ?


The faker in fact returns K wich can be serialized by the Key Serde in the
rocks. But it needs to only contain A's key and it needs to be a strict
prefix
byte[] of all K with this A's key. We gonna seek there with an
RocksIterator and continue to read as long as the "faked key" serialized
form is a prefix
This is easy todo for Avro + Protobuf +  custom Serdes and Hadoop
Writables. Its a nightmare for JSON serdes.




Thanks,
Damian


On Fri, 27 Oct 2017 at 10:27 Ted Yu <yuzhih...@gmail.com> wrote:

I think if you explain what A and B are in the beginning, it makes sense

to
use them since readers would know who they reference.

Cheers

On Thu, Oct 26, 2017 at 11:04 PM, Jan Filipiak <jan.filip...@trivago.com
wrote:



Thanks for the remarks. hope I didn't miss any.
Not even sure if it makes sense to introduce A and B or just stick with
"this ktable", "other ktable"

Thank you
Jan


On 27.10.2017 06:58, Ted Yu wrote:

Do you mind addressing my previous comments ?

http://search-hadoop.com/m/Kafka/uyzND1hzF8SRzUqb?subj=Re+
DISCUSS+KIP+213+Support+non+key+joining+in+KTable

On Thu, Oct 26, 2017 at 9:38 PM, Jan Filipiak <
jan.filip...@trivago.com
wrote:

Hello everyone,


this is the new discussion thread after the ID-clash.

Best
Jan

__


Hello Kafka-users,

I want to continue with the development of KAFKA-3705, which allows
the
Streams DSL to perform KTableKTable-Joins when the KTables have a
one-to-many relationship.
To make sure we cover the requirements of as many users as possible
and
have a good solution afterwards I invite everyone to read through the
KIP I
put together and discuss it here in this Thread.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
Support+non-key+joining+in+KTable
https://issues.apache.org/jira/browse/KAFKA-3705
https://github.com/apache/kafka/pull/3720

I think a public discussion and vote on a solution is exactly what is
needed to bring this feauture into kafka-streams. I am looking forward


to

everyones opinion!

Please keep the discussion on the mailing list rather than commenting


on

the wiki (wiki discussions get unwieldy fast).

Best
Jan













Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-11-01 Thread Jan Filipiak
read as long as the "faked key" serialized
form is a prefix
This is easy todo for Avro + Protobuf +  custom Serdes and Hadoop
Writables. Its a nightmare for JSON serdes.




Thanks,
Damian


On Fri, 27 Oct 2017 at 10:27 Ted Yu <yuzhih...@gmail.com> wrote:

I think if you explain what A and B are in the beginning, it makes sense

to
use them since readers would know who they reference.

Cheers

On Thu, Oct 26, 2017 at 11:04 PM, Jan Filipiak <jan.filip...@trivago.com
wrote:



Thanks for the remarks. hope I didn't miss any.
Not even sure if it makes sense to introduce A and B or just stick with
"this ktable", "other ktable"

Thank you
Jan


On 27.10.2017 06:58, Ted Yu wrote:

Do you mind addressing my previous comments ?

http://search-hadoop.com/m/Kafka/uyzND1hzF8SRzUqb?subj=Re+
DISCUSS+KIP+213+Support+non+key+joining+in+KTable

On Thu, Oct 26, 2017 at 9:38 PM, Jan Filipiak <
jan.filip...@trivago.com
wrote:

Hello everyone,


this is the new discussion thread after the ID-clash.

Best
Jan

__


Hello Kafka-users,

I want to continue with the development of KAFKA-3705, which allows
the
Streams DSL to perform KTableKTable-Joins when the KTables have a
one-to-many relationship.
To make sure we cover the requirements of as many users as possible
and
have a good solution afterwards I invite everyone to read through the
KIP I
put together and discuss it here in this Thread.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
Support+non-key+joining+in+KTable
https://issues.apache.org/jira/browse/KAFKA-3705
https://github.com/apache/kafka/pull/3720

I think a public discussion and vote on a solution is exactly what is
needed to bring this feauture into kafka-streams. I am looking forward


to

everyones opinion!

Please keep the discussion on the mailing list rather than commenting


on

the wiki (wiki discussions get unwieldy fast).

Best
Jan













Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-11-01 Thread Guozhang Wang
Thanks for the KIP writeup Jan. I made a first pass and here are some quick
comments:


1. Could we use K0 / V0 and K1 / V1 etc since K0 and KO are a bit harder to
differentiate when reading.

2. I think you missed the key type in the intrusive approach example code
snippet regarding "KTable  oneToManyJoin"? Should that be

KTable<CombinedKey<K,KO>, V0> oneToManyJoin

3. Some of the arrows in your algorithm section's diagrams seems reversed.

4. In the first step of the algorithm, "Materialize B first", that happens
in the "Repartition by A's key" block right? If yes, could you clarify it
in the block?

5. "skip old if A's key didn't change": hmm, not sure if we can skip it.
What if other fields (neither A's key or B's key) changes? Suppose you have
an aggregation after the join, we still need to subtract the old value from
the aggregation right?

6. In the block of "Materialize B", I think from your description we are
actually materializing both A and B right? If yes could you update the
diagram?

7. This is a meta question: "in the sink, only use A's key to determine
partition" I think we had the discussion long time ago, that if we are
sending the old and new entries of the pair to different partitions, their
ordering may get reversed later when reading from the join operator (i.e.
the "Materialize B" block in your diagram). How did you address that with
this proposal?

8. "B records with a 'null' A-key value would be silently dropped" Where
are we dropping it, do we drop it at the first sub-topology (i.e the
"Repartition by A's key" block)?

Guozhang





On Wed, Nov 1, 2017 at 12:18 PM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

> Hi thanks for the feedback
>
> On 01.11.2017 12:58, Damian Guy wrote:
>
>> Hi Jan, Thanks for the KIP!
>>
>> In both alternatives the API will need to use the `Joined` class rather
>> than than passing in `Serde`s. Also, as with all other joins etc, there
>> probably should be an overload that doesn't require any `Serdes`.
>>
> Will check again how current API looks. I remember loosing the argument
> with this IQ overloads things.
> Didn't expect something to have happend already so I just copied from the
> PR. Will update.
> Will also add the overload.
>
>>
>> It isn't clear to me what `joinPrefixFaker` is doing? In the comment it
>> says "returning an outputKey that when serialized only produces a prefix
>> of
>> the output key which is the same serializing K" So why not just use "K" ?
>>
> The faker in fact returns K wich can be serialized by the Key Serde in the
> rocks. But it needs to only contain A's key and it needs to be a strict
> prefix
> byte[] of all K with this A's key. We gonna seek there with an
> RocksIterator and continue to read as long as the "faked key" serialized
> form is a prefix
> This is easy todo for Avro + Protobuf +  custom Serdes and Hadoop
> Writables. Its a nightmare for JSON serdes.
>
>
>
>> Thanks,
>> Damian
>>
>>
>> On Fri, 27 Oct 2017 at 10:27 Ted Yu <yuzhih...@gmail.com> wrote:
>>
>> I think if you explain what A and B are in the beginning, it makes sense
>>> to
>>> use them since readers would know who they reference.
>>>
>>> Cheers
>>>
>>> On Thu, Oct 26, 2017 at 11:04 PM, Jan Filipiak <jan.filip...@trivago.com
>>> >
>>> wrote:
>>>
>>>
>>>> Thanks for the remarks. hope I didn't miss any.
>>>> Not even sure if it makes sense to introduce A and B or just stick with
>>>> "this ktable", "other ktable"
>>>>
>>>> Thank you
>>>> Jan
>>>>
>>>>
>>>> On 27.10.2017 06:58, Ted Yu wrote:
>>>>
>>>> Do you mind addressing my previous comments ?
>>>>>
>>>>> http://search-hadoop.com/m/Kafka/uyzND1hzF8SRzUqb?subj=Re+
>>>>> DISCUSS+KIP+213+Support+non+key+joining+in+KTable
>>>>>
>>>>> On Thu, Oct 26, 2017 at 9:38 PM, Jan Filipiak <
>>>>> jan.filip...@trivago.com
>>>>> wrote:
>>>>>
>>>>> Hello everyone,
>>>>>
>>>>>> this is the new discussion thread after the ID-clash.
>>>>>>
>>>>>> Best
>>>>>> Jan
>>>>>>
>>>>>> __
>>>>>>
>>>>>>
>>>>>> Hello Kafka-users,
>>>>>>
>>>>>> I want to continue with the development of KAFKA-3705, which allows
>>

Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-11-01 Thread Jan Filipiak

Hi thanks for the feedback

On 01.11.2017 12:58, Damian Guy wrote:

Hi Jan, Thanks for the KIP!

In both alternatives the API will need to use the `Joined` class rather
than than passing in `Serde`s. Also, as with all other joins etc, there
probably should be an overload that doesn't require any `Serdes`.
Will check again how current API looks. I remember loosing the argument 
with this IQ overloads things.
Didn't expect something to have happend already so I just copied from 
the PR. Will update.

Will also add the overload.


It isn't clear to me what `joinPrefixFaker` is doing? In the comment it
says "returning an outputKey that when serialized only produces a prefix of
the output key which is the same serializing K" So why not just use "K" ?
The faker in fact returns K wich can be serialized by the Key Serde in 
the rocks. But it needs to only contain A's key and it needs to be a 
strict prefix
byte[] of all K with this A's key. We gonna seek there with an 
RocksIterator and continue to read as long as the "faked key" serialized 
form is a prefix
This is easy todo for Avro + Protobuf +  custom Serdes and Hadoop 
Writables. Its a nightmare for JSON serdes.




Thanks,
Damian


On Fri, 27 Oct 2017 at 10:27 Ted Yu <yuzhih...@gmail.com> wrote:


I think if you explain what A and B are in the beginning, it makes sense to
use them since readers would know who they reference.

Cheers

On Thu, Oct 26, 2017 at 11:04 PM, Jan Filipiak <jan.filip...@trivago.com>
wrote:



Thanks for the remarks. hope I didn't miss any.
Not even sure if it makes sense to introduce A and B or just stick with
"this ktable", "other ktable"

Thank you
Jan


On 27.10.2017 06:58, Ted Yu wrote:


Do you mind addressing my previous comments ?

http://search-hadoop.com/m/Kafka/uyzND1hzF8SRzUqb?subj=Re+
DISCUSS+KIP+213+Support+non+key+joining+in+KTable

On Thu, Oct 26, 2017 at 9:38 PM, Jan Filipiak <jan.filip...@trivago.com
wrote:

Hello everyone,

this is the new discussion thread after the ID-clash.

Best
Jan

__


Hello Kafka-users,

I want to continue with the development of KAFKA-3705, which allows the
Streams DSL to perform KTableKTable-Joins when the KTables have a
one-to-many relationship.
To make sure we cover the requirements of as many users as possible and
have a good solution afterwards I invite everyone to read through the
KIP I
put together and discuss it here in this Thread.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
Support+non-key+joining+in+KTable
https://issues.apache.org/jira/browse/KAFKA-3705
https://github.com/apache/kafka/pull/3720

I think a public discussion and vote on a solution is exactly what is
needed to bring this feauture into kafka-streams. I am looking forward

to

everyones opinion!

Please keep the discussion on the mailing list rather than commenting

on

the wiki (wiki discussions get unwieldy fast).

Best
Jan











Re: Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-11-01 Thread Damian Guy
Hi Jan, Thanks for the KIP!

In both alternatives the API will need to use the `Joined` class rather
than than passing in `Serde`s. Also, as with all other joins etc, there
probably should be an overload that doesn't require any `Serdes`.

It isn't clear to me what `joinPrefixFaker` is doing? In the comment it
says "returning an outputKey that when serialized only produces a prefix of
the output key which is the same serializing K" So why not just use "K" ?

Thanks,
Damian


On Fri, 27 Oct 2017 at 10:27 Ted Yu <yuzhih...@gmail.com> wrote:

> I think if you explain what A and B are in the beginning, it makes sense to
> use them since readers would know who they reference.
>
> Cheers
>
> On Thu, Oct 26, 2017 at 11:04 PM, Jan Filipiak <jan.filip...@trivago.com>
> wrote:
>
> >
> >
> > Thanks for the remarks. hope I didn't miss any.
> > Not even sure if it makes sense to introduce A and B or just stick with
> > "this ktable", "other ktable"
> >
> > Thank you
> > Jan
> >
> >
> > On 27.10.2017 06:58, Ted Yu wrote:
> >
> >> Do you mind addressing my previous comments ?
> >>
> >> http://search-hadoop.com/m/Kafka/uyzND1hzF8SRzUqb?subj=Re+
> >> DISCUSS+KIP+213+Support+non+key+joining+in+KTable
> >>
> >> On Thu, Oct 26, 2017 at 9:38 PM, Jan Filipiak <jan.filip...@trivago.com
> >
> >> wrote:
> >>
> >> Hello everyone,
> >>>
> >>> this is the new discussion thread after the ID-clash.
> >>>
> >>> Best
> >>> Jan
> >>>
> >>> __
> >>>
> >>>
> >>> Hello Kafka-users,
> >>>
> >>> I want to continue with the development of KAFKA-3705, which allows the
> >>> Streams DSL to perform KTableKTable-Joins when the KTables have a
> >>> one-to-many relationship.
> >>> To make sure we cover the requirements of as many users as possible and
> >>> have a good solution afterwards I invite everyone to read through the
> >>> KIP I
> >>> put together and discuss it here in this Thread.
> >>>
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
> >>> Support+non-key+joining+in+KTable
> >>> https://issues.apache.org/jira/browse/KAFKA-3705
> >>> https://github.com/apache/kafka/pull/3720
> >>>
> >>> I think a public discussion and vote on a solution is exactly what is
> >>> needed to bring this feauture into kafka-streams. I am looking forward
> to
> >>> everyones opinion!
> >>>
> >>> Please keep the discussion on the mailing list rather than commenting
> on
> >>> the wiki (wiki discussions get unwieldy fast).
> >>>
> >>> Best
> >>> Jan
> >>>
> >>>
> >>>
> >>>
> >
> >
> >
>


Re: Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-10-27 Thread Ted Yu
I think if you explain what A and B are in the beginning, it makes sense to
use them since readers would know who they reference.

Cheers

On Thu, Oct 26, 2017 at 11:04 PM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

>
>
> Thanks for the remarks. hope I didn't miss any.
> Not even sure if it makes sense to introduce A and B or just stick with
> "this ktable", "other ktable"
>
> Thank you
> Jan
>
>
> On 27.10.2017 06:58, Ted Yu wrote:
>
>> Do you mind addressing my previous comments ?
>>
>> http://search-hadoop.com/m/Kafka/uyzND1hzF8SRzUqb?subj=Re+
>> DISCUSS+KIP+213+Support+non+key+joining+in+KTable
>>
>> On Thu, Oct 26, 2017 at 9:38 PM, Jan Filipiak <jan.filip...@trivago.com>
>> wrote:
>>
>> Hello everyone,
>>>
>>> this is the new discussion thread after the ID-clash.
>>>
>>> Best
>>> Jan
>>>
>>> __
>>>
>>>
>>> Hello Kafka-users,
>>>
>>> I want to continue with the development of KAFKA-3705, which allows the
>>> Streams DSL to perform KTableKTable-Joins when the KTables have a
>>> one-to-many relationship.
>>> To make sure we cover the requirements of as many users as possible and
>>> have a good solution afterwards I invite everyone to read through the
>>> KIP I
>>> put together and discuss it here in this Thread.
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
>>> Support+non-key+joining+in+KTable
>>> https://issues.apache.org/jira/browse/KAFKA-3705
>>> https://github.com/apache/kafka/pull/3720
>>>
>>> I think a public discussion and vote on a solution is exactly what is
>>> needed to bring this feauture into kafka-streams. I am looking forward to
>>> everyones opinion!
>>>
>>> Please keep the discussion on the mailing list rather than commenting on
>>> the wiki (wiki discussions get unwieldy fast).
>>>
>>> Best
>>> Jan
>>>
>>>
>>>
>>>
>
>
>


Fwd: Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-10-27 Thread Jan Filipiak

Sorry,

so used to the users mailing list...


Thanks for the remarks. hope I didn't miss any.
Not even sure if it makes sense to introduce A and B or just stick with
"this ktable", "other ktable"

Thank you
Jan


On 27.10.2017 06:58, Ted Yu wrote:

Do you mind addressing my previous comments ?

http://search-hadoop.com/m/Kafka/uyzND1hzF8SRzUqb?subj=Re+DISCUSS+KIP+213+Support+non+key+joining+in+KTable

On Thu, Oct 26, 2017 at 9:38 PM, Jan Filipiak <jan.filip...@trivago.com>
wrote:


Hello everyone,

this is the new discussion thread after the ID-clash.

Best
Jan

__


Hello Kafka-users,

I want to continue with the development of KAFKA-3705, which allows the
Streams DSL to perform KTableKTable-Joins when the KTables have a
one-to-many relationship.
To make sure we cover the requirements of as many users as possible and
have a good solution afterwards I invite everyone to read through the KIP I
put together and discuss it here in this Thread.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
Support+non-key+joining+in+KTable
https://issues.apache.org/jira/browse/KAFKA-3705
https://github.com/apache/kafka/pull/3720

I think a public discussion and vote on a solution is exactly what is
needed to bring this feauture into kafka-streams. I am looking forward to
everyones opinion!

Please keep the discussion on the mailing list rather than commenting on
the wiki (wiki discussions get unwieldy fast).

Best
Jan









Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-10-26 Thread Ted Yu
Do you mind addressing my previous comments ?

http://search-hadoop.com/m/Kafka/uyzND1hzF8SRzUqb?subj=Re+DISCUSS+KIP+213+Support+non+key+joining+in+KTable

On Thu, Oct 26, 2017 at 9:38 PM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

> Hello everyone,
>
> this is the new discussion thread after the ID-clash.
>
> Best
> Jan
>
> __
>
>
> Hello Kafka-users,
>
> I want to continue with the development of KAFKA-3705, which allows the
> Streams DSL to perform KTableKTable-Joins when the KTables have a
> one-to-many relationship.
> To make sure we cover the requirements of as many users as possible and
> have a good solution afterwards I invite everyone to read through the KIP I
> put together and discuss it here in this Thread.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
> Support+non-key+joining+in+KTable
> https://issues.apache.org/jira/browse/KAFKA-3705
> https://github.com/apache/kafka/pull/3720
>
> I think a public discussion and vote on a solution is exactly what is
> needed to bring this feauture into kafka-streams. I am looking forward to
> everyones opinion!
>
> Please keep the discussion on the mailing list rather than commenting on
> the wiki (wiki discussions get unwieldy fast).
>
> Best
> Jan
>
>
>


Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-10-25 Thread Onur Karaman
Done. I changed my KIP to KIP-214. So this KIP doesn't need to change.

On Wed, Oct 25, 2017 at 10:33 PM, Onur Karaman  wrote:

> Looks like Jan technically made his KIP wiki page first so I'll just
> change my KIP number.
>
> On Wed, Oct 25, 2017 at 4:59 PM, Matthias J. Sax 
> wrote:
>
>> Thanks a lot for the KIP. Can we please move the discussion to the dev
>> list?
>>
>> Thus, after fixing the KIP collision, just start a new DISCUSS thread.
>>
>> Thx.
>>
>>
>> -Matthias
>>
>> On 10/25/17 4:20 PM, Ted Yu wrote:
>> > Have you seen the email a moment ago from Onur which uses the same KIP
>> > number ?
>> >
>> > Looks like there was race condition in modifying wiki.
>> >
>> > Please consider bumping the KIP number.
>> >
>> > Thanks
>> >
>> > On Wed, Oct 25, 2017 at 4:14 PM, Jan Filipiak > >
>> > wrote:
>> >
>> >> Hello Kafka-users,
>> >>
>> >> I want to continue with the development of KAFKA-3705, which allows the
>> >> Streams DSL to perform KTableKTable-Joins when the KTables have a
>> >> one-to-many relationship.
>> >> To make sure we cover the requirements of as many users as possible and
>> >> have a good solution afterwards I invite everyone to read through the
>> KIP I
>> >> put together and
>> >> discuss it here in this Thread.
>> >>
>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
>> >> Support+non-key+joining+in+KTable
>> >> https://issues.apache.org/jira/browse/KAFKA-3705
>> >> https://github.com/apache/kafka/pull/3720
>> >>
>> >> I think a public discussion and vote on a solution is exactly what is
>> >> needed to bring this feauture into kafka-streams. I am looking forward
>> to
>> >> everyones opinion!
>> >>
>> >> Please keep the discussion on the mailing list rather than commenting
>> on
>> >> the wiki (wiki discussions get unwieldy fast).
>> >>
>> >> Best
>> >> Jan
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >
>>
>>
>


Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-10-25 Thread Onur Karaman
Looks like Jan technically made his KIP wiki page first so I'll just change
my KIP number.

On Wed, Oct 25, 2017 at 4:59 PM, Matthias J. Sax 
wrote:

> Thanks a lot for the KIP. Can we please move the discussion to the dev
> list?
>
> Thus, after fixing the KIP collision, just start a new DISCUSS thread.
>
> Thx.
>
>
> -Matthias
>
> On 10/25/17 4:20 PM, Ted Yu wrote:
> > Have you seen the email a moment ago from Onur which uses the same KIP
> > number ?
> >
> > Looks like there was race condition in modifying wiki.
> >
> > Please consider bumping the KIP number.
> >
> > Thanks
> >
> > On Wed, Oct 25, 2017 at 4:14 PM, Jan Filipiak 
> > wrote:
> >
> >> Hello Kafka-users,
> >>
> >> I want to continue with the development of KAFKA-3705, which allows the
> >> Streams DSL to perform KTableKTable-Joins when the KTables have a
> >> one-to-many relationship.
> >> To make sure we cover the requirements of as many users as possible and
> >> have a good solution afterwards I invite everyone to read through the
> KIP I
> >> put together and
> >> discuss it here in this Thread.
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
> >> Support+non-key+joining+in+KTable
> >> https://issues.apache.org/jira/browse/KAFKA-3705
> >> https://github.com/apache/kafka/pull/3720
> >>
> >> I think a public discussion and vote on a solution is exactly what is
> >> needed to bring this feauture into kafka-streams. I am looking forward
> to
> >> everyones opinion!
> >>
> >> Please keep the discussion on the mailing list rather than commenting on
> >> the wiki (wiki discussions get unwieldy fast).
> >>
> >> Best
> >> Jan
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >
>
>


Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-10-25 Thread Matthias J. Sax
Thanks a lot for the KIP. Can we please move the discussion to the dev list?

Thus, after fixing the KIP collision, just start a new DISCUSS thread.

Thx.


-Matthias

On 10/25/17 4:20 PM, Ted Yu wrote:
> Have you seen the email a moment ago from Onur which uses the same KIP
> number ?
> 
> Looks like there was race condition in modifying wiki.
> 
> Please consider bumping the KIP number.
> 
> Thanks
> 
> On Wed, Oct 25, 2017 at 4:14 PM, Jan Filipiak 
> wrote:
> 
>> Hello Kafka-users,
>>
>> I want to continue with the development of KAFKA-3705, which allows the
>> Streams DSL to perform KTableKTable-Joins when the KTables have a
>> one-to-many relationship.
>> To make sure we cover the requirements of as many users as possible and
>> have a good solution afterwards I invite everyone to read through the KIP I
>> put together and
>> discuss it here in this Thread.
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
>> Support+non-key+joining+in+KTable
>> https://issues.apache.org/jira/browse/KAFKA-3705
>> https://github.com/apache/kafka/pull/3720
>>
>> I think a public discussion and vote on a solution is exactly what is
>> needed to bring this feauture into kafka-streams. I am looking forward to
>> everyones opinion!
>>
>> Please keep the discussion on the mailing list rather than commenting on
>> the wiki (wiki discussions get unwieldy fast).
>>
>> Best
>> Jan
>>
>>
>>
>>
>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature