[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16534356#comment-16534356
 ] 

Matthias J. Sax commented on KAFKA-4113:
----------------------------------------

I see you point. I still think, that the timestamp based semantic is superior 
and I am personally in favor to keep it. However, I agree that a timely 
decoupled table also has a broad use-case spectrum and we should allow this – 
however, it should not replace the current KTable, but complement it IMHO. The 
behavior and semantics would be similar to GlobalKTables.

About, "table backing topic is almost certainly log-compacted which means you 
can't achieve these semantics regardless as these older values are now gone" – 
if you want to do re-processing I agree. However, the time-synchronization is 
not just important for re-processing, but provides sound semantics in general. 
Without it, the computation is inherently non-deterministic (what I believe is 
not what most people want).

To fix the re-processing case, we would need to "protect" the head of the log 
from compaction: ie, the retention time of the input stream and the 
non-compacted head of the log must be equally large – there is a config 
`min.compaction.lag` but I am actually not 100% sure if it can be used for this 
purpose. Would need to double check. Maybe [~guozhang] knows?

Semantically, it is sound that you cannot do reprocessing if you lost old table 
state – note that, reprocessing should ensure that you compute the same result 
(if you don't change the program) than in the original run – if log compaction 
deletes old data, you can obviously not reprocess it. Using the latest KTable 
data will result in joining old stream records with "future" table data (future 
in this case is relative future to the stream records of course) and thus 
produce a different result and would be incorrect, too.

> Allow KTable bootstrap
> ----------------------
>
>                 Key: KAFKA-4113
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4113
>             Project: Kafka
>          Issue Type: New Feature
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Guozhang Wang
>            Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to