Thanks so much for taking a look. An FK-table-table join is an inner
join which implies there would be no Person entites without associated
Folders. Unfortunately, that's not the case. That lead me to an
attempt of re-keying the Folder topic by each of the three possible
foreign keys in order to be able to left join Persons against each of
the three re-keyed KTables to build an eventual Person aggregation
containing all possible Folders associated in any way.

Reading the topic as a table directly did not work out as that crashed
the application; apparently reading the topic as a KTable and then
using that for three independent re-key-operations is not allowed.

Best wishes,
Karsten

Am Do., 1. Feb. 2024 um 02:16 Uhr schrieb Matthias J. Sax <mj...@apache.org>:
>
> Thanks for the details. This does make sense.
>
> So it seems you can read all topic as table (ie, builder.table("topic")
> -- no need to so `builder.stream().toTable()`).
>
> And you can use the built-in FK-table-table join, and aggregate the result:
>
> KTable result =
>    folderTable
>        .join(personTable, (folderId, folder) -> folder.customerId, ...)
>        .groupBy((...) -> (personId, ...))
>        .aggregate(...);
> result.toStream().to("resultTopic");
>
> Note the fk-extractor `(folderId, folder) -> folder.customerId` that
> tells the join to use `customerId` from the `folderTable` to lookup the
> person from personTable.
>
> Think of `folderTable` as fact-table and `personTable` as dimension table.
>
>
> KS will take care of everything else under the hood automatically.
>
>
> -Matthias
>
> On 1/30/24 11:25 AM, Karsten Stöckmann wrote:
> > Matthias, thanks for getting back on this. I'll try to illustrate my
> > intent with an example as I'm not yet fully familiar with Kafka
> > (Streams) and its idioms...
> >
> > Assume classes Person and Folder:
> >
> > class Person {
> >    Long id;
> >    String firstname;
> >    String lastname;
> >    // some content
> > }
> >
> > class Folder {
> >    Long id;
> >    String folderNumber;
> >    // some other content
> >    Long customerId; // FK, points to Person.id
> >    Long billingAddressId; // FK, also points to Person.id
> > }
> >
> > Thus both foreign keys of Folder point to Person entities, yet with
> > different semantics. They're not composite keys but act independently.
> >
> > Now assume I want to build an aggregate Person object containing
> > Folder.folderNumber of all folders associated with a Person entity,
> > regardless whether it acts as a customer or billing address. My
> > (naive) idea was to build re-keyed KTables by Folder.customerId and
> > Folder.billingAddressId and then joining / aggregating them with the
> > Person KTable in order to build something like this:
> >
> > class AggregatedPerson {
> >    Long id;
> >    List<String> folderNumbers; // or even List<Folder>
> >    // ...
> > }
> >
> > (The latter supposed to be written to an output topic in order to
> > serve as input for Solr or ElasticSearch.)
> >
> > Does this even make sense?
> >
> >
> >> If you read the topic a KTable, you cannot repartition because it
> >> violates the contract. A KTable must be partitioned by it's primary key,
> >> ie, the ID field, and thus the DSL does not offer you a repartition option.
> >
> > So re-key means repartition? ATM the partition size of all input
> > topics is 1 as per Kafka UI, as I've specified no extra configuration
> > for them.
> >
> > Best wishes,
> > Karsten
> >
> > Am Di., 30. Jan. 2024 um 20:03 Uhr schrieb Matthias J. Sax 
> > <mj...@apache.org>:
> >>
> >>>> Both fk1 and fk2 point to the PK of another entity (not shown for
> >>>> brevity, of no relevance to the question).
> >>
> >> It this two independent FK, or one two-column FK?
> >>
> >>
> >>> Ingesting the topic into a Kafka Streams application, how can I re-key
> >>> the resulting KTable<Long, A> by both fk1 and fk2?
> >>
> >> If you read the topic a KTable, you cannot repartition because it
> >> violates the contract. A KTable must be partitioned by it's primary key,
> >> ie, the ID field, and thus the DSL does not offer you a repartition option.
> >>
> >> You could read the topic as KStream though, and provide a custom
> >> `StreamPartitioner` for a `repartition()` operation. However, this is
> >> also "dangerous" because for a KStream it's also assumed that it's
> >> partitioned by it's key, and you might break downstream DSL operators
> >> with such a violation of the "contract".
> >>
> >> Looking into your solution:
> >>
> >>> .toTable()
> >>>      .groupBy(
> >>>          (key, value) -> KeyValue.pair(value.fk1(), value),
> >>>          Grouped.with(...))
> >>
> >> This will set fk1 as key, what seems not to align with you previous
> >> comment about the key should stay the ID? (Same for f2k).
> >>
> >> Your last step seems to join fk1-fk2 -- is this on purpose? I guess it's
> >> unclear what you try to actually do to begin with? It sound like it's
> >> overall a self-join of the input topic on fk1 and fk2 ?
> >>
> >>
> >> -Matthias
> >>
> >> On 1/28/24 2:24 AM, Karsten Stöckmann wrote:
> >>> Hi all,
> >>>
> >>> just stumbled upon another Kafka Streams issue that keeps me busy these 
> >>> days.
> >>>
> >>> Assume a (simplified) class A like this:
> >>>
> >>> class A {
> >>>       private Long id;
> >>>       private String someContent;
> >>>       private Long fk1;
> >>>       private Long fk2;
> >>>       // Getters and setters accordingly
> >>> }
> >>>
> >>> Both fk1 and fk2 point to the PK of another entity (not shown for
> >>> brevity, of no relevance to the question).
> >>>
> >>> Now assume a Kafka topic built from instances of class A, keyed by its
> >>> id (see above).
> >>>
> >>> Ingesting the topic into a Kafka Streams application, how can I re-key
> >>> the resulting KTable<Long, A> by both fk1 and fk2? Note that the
> >>> resulting key should not be changed or turned into some kind of
> >>> composite key as it is used in later join operations.
> >>>
> >>> My (naive) solution involves creating two KTables from the input
> >>> stream, re-keying them by fk1 and fk2 accordingly and then outer
> >>> joining both resulting (re-keyed) KTables.
> >>>
> >>> KStream<Long, A> in = streamsBuilder.stream(topic, Consumed.with(...));
> >>>
> >>> KTable<Long, A> rekeyedByFk1 = in
> >>>       .toTable()
> >>>       .groupBy(
> >>>           (key, value) -> KeyValue.pair(value.fk1(), value),
> >>>           Grouped.with(...))
> >>>       .aggregate(
> >>>           Aggregate::new,
> >>>           (key, value, aggregate) -> aggregate.add(value),
> >>>           (key, value, aggregate) -> aggregate.remove(value),
> >>>           Materialized.with(...));
> >>>
> >>> KTable<Long, a> rekeyedByFk2 = in
> >>>       .toTable()
> >>>       .groupBy(
> >>>           (key, value) -> KeyValue.pair(value.fk2(), value),
> >>>           Grouped.with(...))
> >>>       .aggregate(
> >>>           ... same as above
> >>>       );
> >>>
> >>> KTable<Long, A> joined = rekeyedByFk1
> >>>       .outerJoin(
> >>>           rekeyedByFk2,
> >>>           <value joiner>)
> >>>         .groupBy(KeyValue::pair, Grouped.with(...))
> >>>       .aggregate(...);
> >>>
> >>> <value joiner> would integrate the (already pre-joined) Aggregates as
> >>> to avoid duplicates.
> >>>
> >>> Does this seem like a viable solution, or are there better / simpler /
> >>> more efficient implementations?
> >>>
> >>> Best wishes,
> >>> Karsten

Reply via email to