Matthias, thank you for getting back on this. - I'll just let the
application run for now. We are trying to build a CDC system
leveraging Kafka in order to feed aggregated data into an indexing
solution like Elasticsearch or Opensearch. Maybe after all the Kafka
Streams application will be sufficient, but: it has to catch up
millions of records of existing data (our software has been running
for about 10 years now, piling up around 80M customers). Once that's
finished, I guess performance will be alright as it is not a
high-throughput system.

Anyway, thanks so much for your help.

Best wishes

Am Mi., 7. Feb. 2024 um 20:05 Uhr schrieb Matthias J. Sax <>:
> Using the DSL, this sounds about right.
> I am not worried about the complexity -- KS can handle it, and it's not
> uncommon to end up with such topologies.
> You might be able to cut down on complexity by not using the DSL, but
> the Processor API. It gives you more control, and thus you might be able
> to optimize the overall topology.
> Maybe inspect the details of `TopologyDescription` to spot
> inefficiencies of the DSL generated Topology that might give you an idea
> how much you could optimize using Processor API (to estimate if it would
> be worth the effort).
> It's hard to tell w/o knowing the details. It could also be just an
> inherently complex problem, and the DSL program is already as efficient
> as it gets...
> Of course, there might also be ways to play with configs to cut down on
> latency to some extend, if e2e latency is your main concern. Again, I
> don't know the use case: for many case, sub-second latency is actually
> sufficient.
> HTH.
> -Matthias
> On 2/7/24 7:41 AM, Karsten Stöckmann wrote:
> > Sorry for being late with the response - I've been quite busy working
> > on our Streams application lately.
> >
> > That leads me back to my initial question. The Folder class contains
> > multiple fields with FK pointing to the Person table, all of them with
> > different semantics (customer, billing address, etc). So in order to
> > find _all_ folders related to a particular person regardless of its
> > role, I guess I need to
> >
> > a) re-key the folder table on each person FK independently and then
> > b) outer join the result tables.
> >
> > The entire topology is insanely complex, I've got around 10 tables
> > with different levels of nesting (e.g. folder -- 1:n --> dependency a
> > -- 1:n --> dependency b) that all need to be aggregated and in the end
> > re-keyed to person IDs in order to build an aggregate person. There
> > are 14 sub topologies... - measuring the e2e latency shows values
> > around 600ms which seems rather high to me. Does that sound crazy? ;)
> >
> > Best wishes
> > Karsten
> >
> > Am Do., 1. Feb. 2024 um 19:02 Uhr schrieb Matthias J. Sax 
> > <>:
> >>
> >> I see. You need to ensure that you get _all_ Person.
> >>
> >> For this case, I guess you are right. You would need to first aggregate
> >> the folder per person:
> >>
> >> KTable allPersonFolders =
> >>       folder.groupBy((...) -> (folder.customerId, ...))
> >>             .aggregate(...)
> >>
> >> And in a second step, do a left join:
> >>
> >> result = personTable.leftJoin(allPersonFolders,...)
> >>
> >>
> >>> Reading the topic as a table directly did not work out as that crashed
> >>> the application; apparently reading the topic as a KTable and then
> >>> using that for three independent re-key-operations is not allowed.
> >>
> >> Not sure if I can follow. What do you mean by "crashed". -- For tables,
> >> there is no `selectKey()` nor  a `repartition()` as explained in my
> >> previous reply. However, doing a `table.groupBy(...)` will set a new key
> >> and repartition the data to your needs.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 2/1/24 1:12 AM, Karsten Stöckmann wrote:
> >>> Thanks so much for taking a look. An FK-table-table join is an inner
> >>> join which implies there would be no Person entites without associated
> >>> Folders. Unfortunately, that's not the case. That lead me to an
> >>> attempt of re-keying the Folder topic by each of the three possible
> >>> foreign keys in order to be able to left join Persons against each of
> >>> the three re-keyed KTables to build an eventual Person aggregation
> >>> containing all possible Folders associated in any way.
> >>>
> >>> Reading the topic as a table directly did not work out as that crashed
> >>> the application; apparently reading the topic as a KTable and then
> >>> using that for three independent re-key-operations is not allowed.
> >>>
> >>> Best wishes,
> >>> Karsten
> >>>
> >>> Am Do., 1. Feb. 2024 um 02:16 Uhr schrieb Matthias J. Sax 
> >>> <>:
> >>>>
> >>>> Thanks for the details. This does make sense.
> >>>>
> >>>> So it seems you can read all topic as table (ie, builder.table("topic")
> >>>> -- no need to so ``).
> >>>>
> >>>> And you can use the built-in FK-table-table join, and aggregate the 
> >>>> result:
> >>>>
> >>>> KTable result =
> >>>>      folderTable
> >>>>          .join(personTable, (folderId, folder) -> folder.customerId, ...)
> >>>>          .groupBy((...) -> (personId, ...))
> >>>>          .aggregate(...);
> >>>> result.toStream().to("resultTopic");
> >>>>
> >>>> Note the fk-extractor `(folderId, folder) -> folder.customerId` that
> >>>> tells the join to use `customerId` from the `folderTable` to lookup the
> >>>> person from personTable.
> >>>>
> >>>> Think of `folderTable` as fact-table and `personTable` as dimension 
> >>>> table.
> >>>>
> >>>>
> >>>> KS will take care of everything else under the hood automatically.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 1/30/24 11:25 AM, Karsten Stöckmann wrote:
> >>>>> Matthias, thanks for getting back on this. I'll try to illustrate my
> >>>>> intent with an example as I'm not yet fully familiar with Kafka
> >>>>> (Streams) and its idioms...
> >>>>>
> >>>>> Assume classes Person and Folder:
> >>>>>
> >>>>> class Person {
> >>>>>      Long id;
> >>>>>      String firstname;
> >>>>>      String lastname;
> >>>>>      // some content
> >>>>> }
> >>>>>
> >>>>> class Folder {
> >>>>>      Long id;
> >>>>>      String folderNumber;
> >>>>>      // some other content
> >>>>>      Long customerId; // FK, points to
> >>>>>      Long billingAddressId; // FK, also points to
> >>>>> }
> >>>>>
> >>>>> Thus both foreign keys of Folder point to Person entities, yet with
> >>>>> different semantics. They're not composite keys but act independently.
> >>>>>
> >>>>> Now assume I want to build an aggregate Person object containing
> >>>>> Folder.folderNumber of all folders associated with a Person entity,
> >>>>> regardless whether it acts as a customer or billing address. My
> >>>>> (naive) idea was to build re-keyed KTables by Folder.customerId and
> >>>>> Folder.billingAddressId and then joining / aggregating them with the
> >>>>> Person KTable in order to build something like this:
> >>>>>
> >>>>> class AggregatedPerson {
> >>>>>      Long id;
> >>>>>      List<String> folderNumbers; // or even List<Folder>
> >>>>>      // ...
> >>>>> }
> >>>>>
> >>>>> (The latter supposed to be written to an output topic in order to
> >>>>> serve as input for Solr or ElasticSearch.)
> >>>>>
> >>>>> Does this even make sense?
> >>>>>
> >>>>>
> >>>>>> If you read the topic a KTable, you cannot repartition because it
> >>>>>> violates the contract. A KTable must be partitioned by it's primary 
> >>>>>> key,
> >>>>>> ie, the ID field, and thus the DSL does not offer you a repartition 
> >>>>>> option.
> >>>>>
> >>>>> So re-key means repartition? ATM the partition size of all input
> >>>>> topics is 1 as per Kafka UI, as I've specified no extra configuration
> >>>>> for them.
> >>>>>
> >>>>> Best wishes,
> >>>>> Karsten
> >>>>>
> >>>>> Am Di., 30. Jan. 2024 um 20:03 Uhr schrieb Matthias J. Sax 
> >>>>> <>:
> >>>>>>
> >>>>>>>> Both fk1 and fk2 point to the PK of another entity (not shown for
> >>>>>>>> brevity, of no relevance to the question).
> >>>>>>
> >>>>>> It this two independent FK, or one two-column FK?
> >>>>>>
> >>>>>>
> >>>>>>> Ingesting the topic into a Kafka Streams application, how can I re-key
> >>>>>>> the resulting KTable<Long, A> by both fk1 and fk2?
> >>>>>>
> >>>>>> If you read the topic a KTable, you cannot repartition because it
> >>>>>> violates the contract. A KTable must be partitioned by it's primary 
> >>>>>> key,
> >>>>>> ie, the ID field, and thus the DSL does not offer you a repartition 
> >>>>>> option.
> >>>>>>
> >>>>>> You could read the topic as KStream though, and provide a custom
> >>>>>> `StreamPartitioner` for a `repartition()` operation. However, this is
> >>>>>> also "dangerous" because for a KStream it's also assumed that it's
> >>>>>> partitioned by it's key, and you might break downstream DSL operators
> >>>>>> with such a violation of the "contract".
> >>>>>>
> >>>>>> Looking into your solution:
> >>>>>>
> >>>>>>> .toTable()
> >>>>>>>        .groupBy(
> >>>>>>>            (key, value) -> KeyValue.pair(value.fk1(), value),
> >>>>>>>            Grouped.with(...))
> >>>>>>
> >>>>>> This will set fk1 as key, what seems not to align with you previous
> >>>>>> comment about the key should stay the ID? (Same for f2k).
> >>>>>>
> >>>>>> Your last step seems to join fk1-fk2 -- is this on purpose? I guess 
> >>>>>> it's
> >>>>>> unclear what you try to actually do to begin with? It sound like it's
> >>>>>> overall a self-join of the input topic on fk1 and fk2 ?
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 1/28/24 2:24 AM, Karsten Stöckmann wrote:
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> just stumbled upon another Kafka Streams issue that keeps me busy 
> >>>>>>> these days.
> >>>>>>>
> >>>>>>> Assume a (simplified) class A like this:
> >>>>>>>
> >>>>>>> class A {
> >>>>>>>         private Long id;
> >>>>>>>         private String someContent;
> >>>>>>>         private Long fk1;
> >>>>>>>         private Long fk2;
> >>>>>>>         // Getters and setters accordingly
> >>>>>>> }
> >>>>>>>
> >>>>>>> Both fk1 and fk2 point to the PK of another entity (not shown for
> >>>>>>> brevity, of no relevance to the question).
> >>>>>>>
> >>>>>>> Now assume a Kafka topic built from instances of class A, keyed by its
> >>>>>>> id (see above).
> >>>>>>>
> >>>>>>> Ingesting the topic into a Kafka Streams application, how can I re-key
> >>>>>>> the resulting KTable<Long, A> by both fk1 and fk2? Note that the
> >>>>>>> resulting key should not be changed or turned into some kind of
> >>>>>>> composite key as it is used in later join operations.
> >>>>>>>
> >>>>>>> My (naive) solution involves creating two KTables from the input
> >>>>>>> stream, re-keying them by fk1 and fk2 accordingly and then outer
> >>>>>>> joining both resulting (re-keyed) KTables.
> >>>>>>>
> >>>>>>> KStream<Long, A> in =, 
> >>>>>>> Consumed.with(...));
> >>>>>>>
> >>>>>>> KTable<Long, A> rekeyedByFk1 = in
> >>>>>>>         .toTable()
> >>>>>>>         .groupBy(
> >>>>>>>             (key, value) -> KeyValue.pair(value.fk1(), value),
> >>>>>>>             Grouped.with(...))
> >>>>>>>         .aggregate(
> >>>>>>>             Aggregate::new,
> >>>>>>>             (key, value, aggregate) -> aggregate.add(value),
> >>>>>>>             (key, value, aggregate) -> aggregate.remove(value),
> >>>>>>>             Materialized.with(...));
> >>>>>>>
> >>>>>>> KTable<Long, a> rekeyedByFk2 = in
> >>>>>>>         .toTable()
> >>>>>>>         .groupBy(
> >>>>>>>             (key, value) -> KeyValue.pair(value.fk2(), value),
> >>>>>>>             Grouped.with(...))
> >>>>>>>         .aggregate(
> >>>>>>>             ... same as above
> >>>>>>>         );
> >>>>>>>
> >>>>>>> KTable<Long, A> joined = rekeyedByFk1
> >>>>>>>         .outerJoin(
> >>>>>>>             rekeyedByFk2,
> >>>>>>>             <value joiner>)
> >>>>>>>           .groupBy(KeyValue::pair, Grouped.with(...))
> >>>>>>>         .aggregate(...);
> >>>>>>>
> >>>>>>> <value joiner> would integrate the (already pre-joined) Aggregates as
> >>>>>>> to avoid duplicates.
> >>>>>>>
> >>>>>>> Does this seem like a viable solution, or are there better / simpler /
> >>>>>>> more efficient implementations?
> >>>>>>>
> >>>>>>> Best wishes,
> >>>>>>> Karsten

Reply via email to