Matthias, thank you for getting back on this. - I'll just let the application run for now. We are trying to build a CDC system leveraging Kafka in order to feed aggregated data into an indexing solution like Elasticsearch or Opensearch. Maybe after all the Kafka Streams application will be sufficient, but: it has to catch up millions of records of existing data (our software has been running for about 10 years now, piling up around 80M customers). Once that's finished, I guess performance will be alright as it is not a high-throughput system.
Anyway, thanks so much for your help. Best wishes Karsten Am Mi., 7. Feb. 2024 um 20:05 Uhr schrieb Matthias J. Sax <mj...@apache.org>: > > Using the DSL, this sounds about right. > > I am not worried about the complexity -- KS can handle it, and it's not > uncommon to end up with such topologies. > > You might be able to cut down on complexity by not using the DSL, but > the Processor API. It gives you more control, and thus you might be able > to optimize the overall topology. > > Maybe inspect the details of `TopologyDescription` to spot > inefficiencies of the DSL generated Topology that might give you an idea > how much you could optimize using Processor API (to estimate if it would > be worth the effort). > > It's hard to tell w/o knowing the details. It could also be just an > inherently complex problem, and the DSL program is already as efficient > as it gets... > > Of course, there might also be ways to play with configs to cut down on > latency to some extend, if e2e latency is your main concern. Again, I > don't know the use case: for many case, sub-second latency is actually > sufficient. > > HTH. > > -Matthias > > On 2/7/24 7:41 AM, Karsten Stöckmann wrote: > > Sorry for being late with the response - I've been quite busy working > > on our Streams application lately. > > > > That leads me back to my initial question. The Folder class contains > > multiple fields with FK pointing to the Person table, all of them with > > different semantics (customer, billing address, etc). So in order to > > find _all_ folders related to a particular person regardless of its > > role, I guess I need to > > > > a) re-key the folder table on each person FK independently and then > > b) outer join the result tables. > > > > The entire topology is insanely complex, I've got around 10 tables > > with different levels of nesting (e.g. folder -- 1:n --> dependency a > > -- 1:n --> dependency b) that all need to be aggregated and in the end > > re-keyed to person IDs in order to build an aggregate person. There > > are 14 sub topologies... - measuring the e2e latency shows values > > around 600ms which seems rather high to me. Does that sound crazy? ;) > > > > Best wishes > > Karsten > > > > Am Do., 1. Feb. 2024 um 19:02 Uhr schrieb Matthias J. Sax > > <mj...@apache.org>: > >> > >> I see. You need to ensure that you get _all_ Person. > >> > >> For this case, I guess you are right. You would need to first aggregate > >> the folder per person: > >> > >> KTable allPersonFolders = > >> folder.groupBy((...) -> (folder.customerId, ...)) > >> .aggregate(...) > >> > >> And in a second step, do a left join: > >> > >> result = personTable.leftJoin(allPersonFolders,...) > >> > >> > >>> Reading the topic as a table directly did not work out as that crashed > >>> the application; apparently reading the topic as a KTable and then > >>> using that for three independent re-key-operations is not allowed. > >> > >> Not sure if I can follow. What do you mean by "crashed". -- For tables, > >> there is no `selectKey()` nor a `repartition()` as explained in my > >> previous reply. However, doing a `table.groupBy(...)` will set a new key > >> and repartition the data to your needs. > >> > >> > >> -Matthias > >> > >> > >> On 2/1/24 1:12 AM, Karsten Stöckmann wrote: > >>> Thanks so much for taking a look. An FK-table-table join is an inner > >>> join which implies there would be no Person entites without associated > >>> Folders. Unfortunately, that's not the case. That lead me to an > >>> attempt of re-keying the Folder topic by each of the three possible > >>> foreign keys in order to be able to left join Persons against each of > >>> the three re-keyed KTables to build an eventual Person aggregation > >>> containing all possible Folders associated in any way. > >>> > >>> Reading the topic as a table directly did not work out as that crashed > >>> the application; apparently reading the topic as a KTable and then > >>> using that for three independent re-key-operations is not allowed. > >>> > >>> Best wishes, > >>> Karsten > >>> > >>> Am Do., 1. Feb. 2024 um 02:16 Uhr schrieb Matthias J. Sax > >>> <mj...@apache.org>: > >>>> > >>>> Thanks for the details. This does make sense. > >>>> > >>>> So it seems you can read all topic as table (ie, builder.table("topic") > >>>> -- no need to so `builder.stream().toTable()`). > >>>> > >>>> And you can use the built-in FK-table-table join, and aggregate the > >>>> result: > >>>> > >>>> KTable result = > >>>> folderTable > >>>> .join(personTable, (folderId, folder) -> folder.customerId, ...) > >>>> .groupBy((...) -> (personId, ...)) > >>>> .aggregate(...); > >>>> result.toStream().to("resultTopic"); > >>>> > >>>> Note the fk-extractor `(folderId, folder) -> folder.customerId` that > >>>> tells the join to use `customerId` from the `folderTable` to lookup the > >>>> person from personTable. > >>>> > >>>> Think of `folderTable` as fact-table and `personTable` as dimension > >>>> table. > >>>> > >>>> > >>>> KS will take care of everything else under the hood automatically. > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> On 1/30/24 11:25 AM, Karsten Stöckmann wrote: > >>>>> Matthias, thanks for getting back on this. I'll try to illustrate my > >>>>> intent with an example as I'm not yet fully familiar with Kafka > >>>>> (Streams) and its idioms... > >>>>> > >>>>> Assume classes Person and Folder: > >>>>> > >>>>> class Person { > >>>>> Long id; > >>>>> String firstname; > >>>>> String lastname; > >>>>> // some content > >>>>> } > >>>>> > >>>>> class Folder { > >>>>> Long id; > >>>>> String folderNumber; > >>>>> // some other content > >>>>> Long customerId; // FK, points to Person.id > >>>>> Long billingAddressId; // FK, also points to Person.id > >>>>> } > >>>>> > >>>>> Thus both foreign keys of Folder point to Person entities, yet with > >>>>> different semantics. They're not composite keys but act independently. > >>>>> > >>>>> Now assume I want to build an aggregate Person object containing > >>>>> Folder.folderNumber of all folders associated with a Person entity, > >>>>> regardless whether it acts as a customer or billing address. My > >>>>> (naive) idea was to build re-keyed KTables by Folder.customerId and > >>>>> Folder.billingAddressId and then joining / aggregating them with the > >>>>> Person KTable in order to build something like this: > >>>>> > >>>>> class AggregatedPerson { > >>>>> Long id; > >>>>> List<String> folderNumbers; // or even List<Folder> > >>>>> // ... > >>>>> } > >>>>> > >>>>> (The latter supposed to be written to an output topic in order to > >>>>> serve as input for Solr or ElasticSearch.) > >>>>> > >>>>> Does this even make sense? > >>>>> > >>>>> > >>>>>> If you read the topic a KTable, you cannot repartition because it > >>>>>> violates the contract. A KTable must be partitioned by it's primary > >>>>>> key, > >>>>>> ie, the ID field, and thus the DSL does not offer you a repartition > >>>>>> option. > >>>>> > >>>>> So re-key means repartition? ATM the partition size of all input > >>>>> topics is 1 as per Kafka UI, as I've specified no extra configuration > >>>>> for them. > >>>>> > >>>>> Best wishes, > >>>>> Karsten > >>>>> > >>>>> Am Di., 30. Jan. 2024 um 20:03 Uhr schrieb Matthias J. Sax > >>>>> <mj...@apache.org>: > >>>>>> > >>>>>>>> Both fk1 and fk2 point to the PK of another entity (not shown for > >>>>>>>> brevity, of no relevance to the question). > >>>>>> > >>>>>> It this two independent FK, or one two-column FK? > >>>>>> > >>>>>> > >>>>>>> Ingesting the topic into a Kafka Streams application, how can I re-key > >>>>>>> the resulting KTable<Long, A> by both fk1 and fk2? > >>>>>> > >>>>>> If you read the topic a KTable, you cannot repartition because it > >>>>>> violates the contract. A KTable must be partitioned by it's primary > >>>>>> key, > >>>>>> ie, the ID field, and thus the DSL does not offer you a repartition > >>>>>> option. > >>>>>> > >>>>>> You could read the topic as KStream though, and provide a custom > >>>>>> `StreamPartitioner` for a `repartition()` operation. However, this is > >>>>>> also "dangerous" because for a KStream it's also assumed that it's > >>>>>> partitioned by it's key, and you might break downstream DSL operators > >>>>>> with such a violation of the "contract". > >>>>>> > >>>>>> Looking into your solution: > >>>>>> > >>>>>>> .toTable() > >>>>>>> .groupBy( > >>>>>>> (key, value) -> KeyValue.pair(value.fk1(), value), > >>>>>>> Grouped.with(...)) > >>>>>> > >>>>>> This will set fk1 as key, what seems not to align with you previous > >>>>>> comment about the key should stay the ID? (Same for f2k). > >>>>>> > >>>>>> Your last step seems to join fk1-fk2 -- is this on purpose? I guess > >>>>>> it's > >>>>>> unclear what you try to actually do to begin with? It sound like it's > >>>>>> overall a self-join of the input topic on fk1 and fk2 ? > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> On 1/28/24 2:24 AM, Karsten Stöckmann wrote: > >>>>>>> Hi all, > >>>>>>> > >>>>>>> just stumbled upon another Kafka Streams issue that keeps me busy > >>>>>>> these days. > >>>>>>> > >>>>>>> Assume a (simplified) class A like this: > >>>>>>> > >>>>>>> class A { > >>>>>>> private Long id; > >>>>>>> private String someContent; > >>>>>>> private Long fk1; > >>>>>>> private Long fk2; > >>>>>>> // Getters and setters accordingly > >>>>>>> } > >>>>>>> > >>>>>>> Both fk1 and fk2 point to the PK of another entity (not shown for > >>>>>>> brevity, of no relevance to the question). > >>>>>>> > >>>>>>> Now assume a Kafka topic built from instances of class A, keyed by its > >>>>>>> id (see above). > >>>>>>> > >>>>>>> Ingesting the topic into a Kafka Streams application, how can I re-key > >>>>>>> the resulting KTable<Long, A> by both fk1 and fk2? Note that the > >>>>>>> resulting key should not be changed or turned into some kind of > >>>>>>> composite key as it is used in later join operations. > >>>>>>> > >>>>>>> My (naive) solution involves creating two KTables from the input > >>>>>>> stream, re-keying them by fk1 and fk2 accordingly and then outer > >>>>>>> joining both resulting (re-keyed) KTables. > >>>>>>> > >>>>>>> KStream<Long, A> in = streamsBuilder.stream(topic, > >>>>>>> Consumed.with(...)); > >>>>>>> > >>>>>>> KTable<Long, A> rekeyedByFk1 = in > >>>>>>> .toTable() > >>>>>>> .groupBy( > >>>>>>> (key, value) -> KeyValue.pair(value.fk1(), value), > >>>>>>> Grouped.with(...)) > >>>>>>> .aggregate( > >>>>>>> Aggregate::new, > >>>>>>> (key, value, aggregate) -> aggregate.add(value), > >>>>>>> (key, value, aggregate) -> aggregate.remove(value), > >>>>>>> Materialized.with(...)); > >>>>>>> > >>>>>>> KTable<Long, a> rekeyedByFk2 = in > >>>>>>> .toTable() > >>>>>>> .groupBy( > >>>>>>> (key, value) -> KeyValue.pair(value.fk2(), value), > >>>>>>> Grouped.with(...)) > >>>>>>> .aggregate( > >>>>>>> ... same as above > >>>>>>> ); > >>>>>>> > >>>>>>> KTable<Long, A> joined = rekeyedByFk1 > >>>>>>> .outerJoin( > >>>>>>> rekeyedByFk2, > >>>>>>> <value joiner>) > >>>>>>> .groupBy(KeyValue::pair, Grouped.with(...)) > >>>>>>> .aggregate(...); > >>>>>>> > >>>>>>> <value joiner> would integrate the (already pre-joined) Aggregates as > >>>>>>> to avoid duplicates. > >>>>>>> > >>>>>>> Does this seem like a viable solution, or are there better / simpler / > >>>>>>> more efficient implementations? > >>>>>>> > >>>>>>> Best wishes, > >>>>>>> Karsten