Hi,

yeah if the proposed solution is doable (only constrain really is to not have a parent key with lots of children) completly in the DSL except the lateral view
wich is a pretty easy thing in PAPI.

Our own implementation is a mix of reusing DSL interfaces but using reflection against KTableImpl to drop down to PAPI. Probably one limiting factor why i am not that eager to share publicly, cause its kinda ugly. The development at the moment (removing many featueres from PAPI) is very worrisome for me, so I should get moving having upstream support.

regarding the output key, we forced the user to pick a combined key parent+child_id, this works out pretty nicely as you get access to the partition information in the partitioner also in the delete cases + on the recieving side you can use just a regular KTableSource to materialze and have the parent key as prefix automatically. + It will do the naturally correct thing if you update parent_id in the child table. Upstream support would also be helpfull as the statestores are changelog even though we can use the intermediate topic for state store high availability.

Best Jan

On 21.02.2017 20:15, Guozhang Wang wrote:
Jan,

Sure I would love to hear what you did for non-key joins. Last time we
chatted there are discussions on the ordering issue, that we HAVE TO
augment the join result stream keys as a combo of both, which may not be
elegant as used in the DSL.

For your proposed solution, it seems you did not do that on the DSL but at
the PAPI layer, right?

Guozhang

On Tue, Feb 21, 2017 at 6:05 AM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

Just a little note here:

if you can take all rows of the "children" table for each key into memory,
you get get away by using group_by and make a list of them. With this
aggregation the join is straight forward and you can use a lateral view
later to get to the same result. For this you could use the current DSL to
a greater extend.

Best Jan

On 21.02.2017 13:10, Frank Lyaruu wrote:

I've read that JIRA (although I don't understand every single thing), and
I
got the feeling it is not exactly the same problem.
I am aware of the Global Tables, and I've tried that first, but I seem
unable to do what I need to do.

I'm replicating a relational database, and on a one-to-many relationship
I'd like to publish a joined message if either of the source streams
receives an update.

In the Global Table Wiki:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+
Add+Global+Tables+to+Kafka+Streams

I see this:
"The GlobalKTable will only be used for doing lookups. That is, data
arriving in the GlobalKTable will not trigger the join. "

So how would I go about doing this?
regards, Frank



On Tue, Feb 21, 2017 at 10:38 AM, Eno Thereska <eno.there...@gmail.com>
wrote:

Hi Frank,
As far as I know the design in that wiki has been superceded by the
Global
KTables design which is now coming in 0.10.2. Hence, the JIRAs that are
mentioned there (like KAFKA-3705). There are some extensive comments in
https://issues.apache.org/jira/browse/KAFKA-3705 <
https://issues.apache.org/jira/browse/KAFKA-3705> illustrating why this
design is particularly challenging and why Global KTables was chosen
instead. I'm not sure if you still want to pursue that original design,
since it is not proven to work.

Guozhang, perhaps we need to add a note saying that Global KTables is the
new design?

Thanks
Eno

On 21 Feb 2017, at 07:35, Frank Lyaruu <flya...@gmail.com> wrote:
Hi all,

I'm trying to implement joining two Kafka tables using a 'remote' key,
basically as described here:

https://cwiki.apache.org/confluence/display/KAFKA/

Discussion%3A+Non-key+KTable-KTable+Joins

Under the "Implementation Details" there is one line I don't know how to
do:


    1. First of all, we will repartition this KTable's stream, by key
    computed from the *mapper(K, V) → K1*, so that it is co-partitioned
by
    the same key. The co-partition topic is partitioned on the new key,

but the

    message key and value are unchanged, and log compaction is turned
off.


How do I do that? I've been unable to find any documentation, I've
looked
at the StreamPartitionAssignor, that seems relevant, but I could use
some
help. Does anyone have an example?

regards, Frank




Reply via email to