Thanks so much for taking a look. An FK-table-table join is an inner join which implies there would be no Person entites without associated Folders. Unfortunately, that's not the case. That lead me to an attempt of re-keying the Folder topic by each of the three possible foreign keys in order to be able to left join Persons against each of the three re-keyed KTables to build an eventual Person aggregation containing all possible Folders associated in any way.
Reading the topic as a table directly did not work out as that crashed the application; apparently reading the topic as a KTable and then using that for three independent re-key-operations is not allowed. Best wishes, Karsten Am Do., 1. Feb. 2024 um 02:16 Uhr schrieb Matthias J. Sax <mj...@apache.org>: > > Thanks for the details. This does make sense. > > So it seems you can read all topic as table (ie, builder.table("topic") > -- no need to so `builder.stream().toTable()`). > > And you can use the built-in FK-table-table join, and aggregate the result: > > KTable result = > folderTable > .join(personTable, (folderId, folder) -> folder.customerId, ...) > .groupBy((...) -> (personId, ...)) > .aggregate(...); > result.toStream().to("resultTopic"); > > Note the fk-extractor `(folderId, folder) -> folder.customerId` that > tells the join to use `customerId` from the `folderTable` to lookup the > person from personTable. > > Think of `folderTable` as fact-table and `personTable` as dimension table. > > > KS will take care of everything else under the hood automatically. > > > -Matthias > > On 1/30/24 11:25 AM, Karsten Stöckmann wrote: > > Matthias, thanks for getting back on this. I'll try to illustrate my > > intent with an example as I'm not yet fully familiar with Kafka > > (Streams) and its idioms... > > > > Assume classes Person and Folder: > > > > class Person { > > Long id; > > String firstname; > > String lastname; > > // some content > > } > > > > class Folder { > > Long id; > > String folderNumber; > > // some other content > > Long customerId; // FK, points to Person.id > > Long billingAddressId; // FK, also points to Person.id > > } > > > > Thus both foreign keys of Folder point to Person entities, yet with > > different semantics. They're not composite keys but act independently. > > > > Now assume I want to build an aggregate Person object containing > > Folder.folderNumber of all folders associated with a Person entity, > > regardless whether it acts as a customer or billing address. My > > (naive) idea was to build re-keyed KTables by Folder.customerId and > > Folder.billingAddressId and then joining / aggregating them with the > > Person KTable in order to build something like this: > > > > class AggregatedPerson { > > Long id; > > List<String> folderNumbers; // or even List<Folder> > > // ... > > } > > > > (The latter supposed to be written to an output topic in order to > > serve as input for Solr or ElasticSearch.) > > > > Does this even make sense? > > > > > >> If you read the topic a KTable, you cannot repartition because it > >> violates the contract. A KTable must be partitioned by it's primary key, > >> ie, the ID field, and thus the DSL does not offer you a repartition option. > > > > So re-key means repartition? ATM the partition size of all input > > topics is 1 as per Kafka UI, as I've specified no extra configuration > > for them. > > > > Best wishes, > > Karsten > > > > Am Di., 30. Jan. 2024 um 20:03 Uhr schrieb Matthias J. Sax > > <mj...@apache.org>: > >> > >>>> Both fk1 and fk2 point to the PK of another entity (not shown for > >>>> brevity, of no relevance to the question). > >> > >> It this two independent FK, or one two-column FK? > >> > >> > >>> Ingesting the topic into a Kafka Streams application, how can I re-key > >>> the resulting KTable<Long, A> by both fk1 and fk2? > >> > >> If you read the topic a KTable, you cannot repartition because it > >> violates the contract. A KTable must be partitioned by it's primary key, > >> ie, the ID field, and thus the DSL does not offer you a repartition option. > >> > >> You could read the topic as KStream though, and provide a custom > >> `StreamPartitioner` for a `repartition()` operation. However, this is > >> also "dangerous" because for a KStream it's also assumed that it's > >> partitioned by it's key, and you might break downstream DSL operators > >> with such a violation of the "contract". > >> > >> Looking into your solution: > >> > >>> .toTable() > >>> .groupBy( > >>> (key, value) -> KeyValue.pair(value.fk1(), value), > >>> Grouped.with(...)) > >> > >> This will set fk1 as key, what seems not to align with you previous > >> comment about the key should stay the ID? (Same for f2k). > >> > >> Your last step seems to join fk1-fk2 -- is this on purpose? I guess it's > >> unclear what you try to actually do to begin with? It sound like it's > >> overall a self-join of the input topic on fk1 and fk2 ? > >> > >> > >> -Matthias > >> > >> On 1/28/24 2:24 AM, Karsten Stöckmann wrote: > >>> Hi all, > >>> > >>> just stumbled upon another Kafka Streams issue that keeps me busy these > >>> days. > >>> > >>> Assume a (simplified) class A like this: > >>> > >>> class A { > >>> private Long id; > >>> private String someContent; > >>> private Long fk1; > >>> private Long fk2; > >>> // Getters and setters accordingly > >>> } > >>> > >>> Both fk1 and fk2 point to the PK of another entity (not shown for > >>> brevity, of no relevance to the question). > >>> > >>> Now assume a Kafka topic built from instances of class A, keyed by its > >>> id (see above). > >>> > >>> Ingesting the topic into a Kafka Streams application, how can I re-key > >>> the resulting KTable<Long, A> by both fk1 and fk2? Note that the > >>> resulting key should not be changed or turned into some kind of > >>> composite key as it is used in later join operations. > >>> > >>> My (naive) solution involves creating two KTables from the input > >>> stream, re-keying them by fk1 and fk2 accordingly and then outer > >>> joining both resulting (re-keyed) KTables. > >>> > >>> KStream<Long, A> in = streamsBuilder.stream(topic, Consumed.with(...)); > >>> > >>> KTable<Long, A> rekeyedByFk1 = in > >>> .toTable() > >>> .groupBy( > >>> (key, value) -> KeyValue.pair(value.fk1(), value), > >>> Grouped.with(...)) > >>> .aggregate( > >>> Aggregate::new, > >>> (key, value, aggregate) -> aggregate.add(value), > >>> (key, value, aggregate) -> aggregate.remove(value), > >>> Materialized.with(...)); > >>> > >>> KTable<Long, a> rekeyedByFk2 = in > >>> .toTable() > >>> .groupBy( > >>> (key, value) -> KeyValue.pair(value.fk2(), value), > >>> Grouped.with(...)) > >>> .aggregate( > >>> ... same as above > >>> ); > >>> > >>> KTable<Long, A> joined = rekeyedByFk1 > >>> .outerJoin( > >>> rekeyedByFk2, > >>> <value joiner>) > >>> .groupBy(KeyValue::pair, Grouped.with(...)) > >>> .aggregate(...); > >>> > >>> <value joiner> would integrate the (already pre-joined) Aggregates as > >>> to avoid duplicates. > >>> > >>> Does this seem like a viable solution, or are there better / simpler / > >>> more efficient implementations? > >>> > >>> Best wishes, > >>> Karsten