[ 
https://issues.apache.org/jira/browse/KAFKA-9986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17118850#comment-17118850
 ] 

John Roesler commented on KAFKA-9986:
-------------------------------------

Hi [~nizhikov] ,

Thanks for getting involved in this! It's something I've been wanting to see 
progress on for at least a year, so I'm really happy to see that you're 
interested in it, too. I know multiple people who would dearly love to see this 
feature in Streams.

One high-level request: can we call these things "*Snapshots*", not 
"Checkpoints" to avoid confusion with respect to the current "checkpoint" 
concept in Streams?

I'm not sure if you have already done it, but I'd highly recommend that you 
look at two sources of related work:
 # Other stateful data processing systems. Bulk processing systems may not be 
that relevant, and Streams has some unique features that may render other 
stream processing systems also not that relevant, but it's still worth taking 
the time to understand how systems that _do_ allow remote snapshotting actually 
manage those snapshots.
 # Other distributed databases. This may seem like a strange statement, but if 
you squint at it, you'll see that state stores in Streams _are_ distributed 
databases, following the primary/replica pattern, which use the changelog both 
for replication and for durability. This means that understanding the best 
snapshot/restore mechanisms from the distributed database world will be deeply 
helpful in developing a good design for Streams.

Incidentally, we did similar Related Work research while designing KIP-441 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-441:+Smooth+Scaling+Out+for+Kafka+Streams),]
 for very similar reasons. You might want to start by looking at the systems on 
our list.

 

One thing to bear in mind while you look at related systems is that Streams has 
one feature that renders it unique in both related domains. Namely, the 
changelog itself. The changelog topic represents a linearized history of the 
state store, with the offsets representing unique points in the history of the 
store. The changelog is stored durably external to Streams, and it's compacted 
with infinite retention.

This has a few important implications:
 * We do not need snapshot/restore to recover from disasters. Typically, this 
mechanism is used in data systems to recover from the loss of too many nodes in 
the cluster. However, Streams is already resilient to the loss of the entire 
cluster. For us, snapshot/restore is purely an optimization: copying filesystem 
objects to and from remote blob storage is likely to be faster than replaying 
the changelog for very large stores.
 * We do not need any special algorithms to keep coherent snapshots. As long as 
we store the snapshot along with the "checkpoint" information (the changelog 
topic, partition, and offset), we can replay any subsequent state updates from 
the changelog and return the system to _exactly_ where it left off processing 
its inputs. Other systems need to implement some form of the Chandy/Lamport 
Distributed Snapshot algorithm in order to capture coherent snapshots, but we 
get it essentially "for free".

 

Another important differentiating factor specifically as you look at 
distributed databases for comparison is that Streams is more like a distributed 
database _engine_ than "just" a distributed database itself. Most databases 
just have one storage format. For example, Cassandra stores its data in 
SSTables, Elasticsearch uses Lucene indices, etc. These systems can craft their 
snapshot/restore mechanism in full knowledge of the storage format.

On the other hand, Streams allows you to plug in multiple, custom, storage 
formats. For an efficient snapshot/restore, my impression is that you really 
need to deal with the low level format. For example, if we just iterate over a 
whole RocksDB store to copy it into a flat file for every snapshot, it's going 
to be _way_ slower and more bloated than if we just directly copy around SST 
files, and only copy the SST files that changed from previous snapshots.

It seems like we would need to design the system with two components, then. One 
is a way to keep track of the metadata: which snapshots are stored where, what 
offset they're at, etc. The other is actually performing the snapshot (and 
recovery), which would have a separate implementation for each store type. So, 
we'd have one for in-memory stores and another for rocksdb stores, and if 
people provide custom stores, they should also be able to implement the 
snapshotting logic for them.

 

Needless to say, the whole scope of this is quite large, and I think a good 
approach would be to get a general idea of how we want to structure the whole 
system, and then just implement a part of it. For example, (if you agree with 
the two part system I described in the last paragraph) maybe the initial 
implementation of the snapshot metadata component would only handle local 
filesystem locations, and maybe we would only implement snapshot/restore for 
in-memory stores. Maybe we wouldn't even bother with incremental snapshots for 
the first version of in-memory store snapshotting. As we expand the 
implementation to cover more of the full scope, we might discover that the 
design needs to be modified, which is perfectly fine.

 

I hope this helps!

-John

> Checkpointing API for State Stores
> ----------------------------------
>
>                 Key: KAFKA-9986
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9986
>             Project: Kafka
>          Issue Type: New Feature
>            Reporter: Nikolay Izhikov
>            Priority: Major
>              Labels: need-kip, streams
>
> The parent ticket is KAFKA-3184.
> The goal of this ticket is to provide a general checkpointing API for state 
> stores in Streams (not only for in-memory but also for persistent stores), 
> where the checkpoint location can be either local disks or remote storage. 
> Design scope is primarily on:
>   # the API design for both checkpointing as well as loading checkpoints into 
> the local state stores
>   # the mechanism of the checkpointing, e.g. whether it should be async? 
> whether it should be executed on separate threads? etc. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to