[
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16535084#comment-16535084
]
Sean McKibben commented on KAFKA-4113:
--------------------------------------
I think the sad reality of Kafka Streams' behavior right now is even worse than
is being portrayed in this ticket. Let's say I have a Kafka cluster with 2
topics, "events" and "users" and I want to produce a joined "enriched_events"
topic using Kafka Streams KStream-KTable join. Let's also say that my Kafka
cluster currently has 1 billion user records in the users topic, and 100
million event records in the events topic, with about 100k new events coming in
per second. We can even further say that the timestamps for every message in
the "users" topic is before any of the timestamps in the "events" topic (and of
course both topics are keyed with the user id and partitioned the same).
What I would expect from my new Kafka Streams app is that I would be able to
make a KTable out of "users" and do a leftJoin with events, and be able to see
messages with events+users flow right in to enriched_events, after waiting for
the users KTable to populate the RocksDB database on each instance of my app.
Unfortunately, what actually happens is that the app quickly processes the 100
million events while slowly populating the RocksDB instance, so enriched_events
receives almost no enrichment from corresponding values in my KTable,
regardless of any timestamp management.
Only after the app has burned through the 100 million event backlog that
existed when it started, and further continued to process 100k events per
second for a really long time, will the local RocksDB even be mostly populated
and we'll see a reasonable number of successful joins with users flowing into
"enriched_events".
The only time I've seen behavior remotely similar to what is described here as
the best effort is when i restart a Kafka Streams with the same Application ID
after it has been running in steady state for a long time. In that case, though
it is difficult to actually see what is going on, there appears to be some
degree of KTable preloading occurring.
The only workable solution I've been able to find that avoids "enriched_events"
being filled with a bunch of un-enriched events, is to make a new topic,
"events_controlled" to use as my Kafka Streams KStream and keep it completely
empty, then start up my app, manually watch the lag of my app reading the
"users" topic until it gets all the way to 0, and then start a separate
application to copy messages from "events" to "events_controlled". This is a
pretty high touch solution and is far from ideal in any scenario.
I really don't think the data model of a compacted topic is sufficient to
reasonably even attempt to provide "join-at-the-time" semantics as described in
this ticket. If you need historical joining, use a database with a history for
each record. If you're using a compacted Kafka topic as a KTable, you're
joining to, your source of truth is, by definition, only intended to contain
the latest value for each key in your dataset. This is very much at odds with
the "best effort" timestamp alignment strategy that acts as your only option
for KStream-KTable match semantics, and which doesn't even appear to provide
any effort during first run to preload anything into the KTable.
> Allow KTable bootstrap
> ----------------------
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
> Issue Type: New Feature
> Components: streams
> Reporter: Matthias J. Sax
> Assignee: Guozhang Wang
> Priority: Major
>
> On the mailing list, there are multiple request about the possibility to
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the
> data. Only after this topic got read completely and the KTable is ready, the
> application should start processing. This would indicate, that on startup,
> the current partition sizes must be fetched and stored, and after KTable got
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without
> reading any other topics until see one record with timestamp 1000.
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)