Re: Question about Checkpoint Storage (RocksDB)

2016-07-26 Thread Ufuk Celebi
On Tue, Jul 26, 2016 at 2:15 PM, Sameer W  wrote:
> 1. Calling clear() on the KV state is only possible for snapshots right? Do
> you control that for checkpoints too.

What do you mean with snapshots vs. checkpoints exactly?

> 2. Assuming that the user has no control over the checkpoint process outside
> of controlling the checkpoint interval , when is the RocksDB cleared of the
> operator state for checkpoints after they are long past. It seems like there
> are only two checkpoints that are really necessary to maintain, the current
> one and the previous one for restore. Does Flink clean up checkpoints on a
> timer? When it does clean up checkpoints does it also clean up the state
> backend (I am assuming they are different).

Yes, here: 
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/fault_tolerance.html

By default, only one completed checkpoint is kept.

> 3. The pre-aggregating windows was very helpful as the WindowFunction is now
> passed the pre-aggregated state. For windows, are the Reduce and Fold
> functions called on each element event before the window is triggered. I can
> see how that would work where the pre-compute is done per element but the
> actual output is emitted only when the window is fired. But that is only
> possible if there are no Evictors defined on the window? Also how are the
> elements fed to the Reduce/Fold function. Is it like MapReduce where even if
> you are using a Iterator, in reality all the values for a key are not
> buffered into memory? Which ties back to how is RocksDB is used to store a
> large window state before it is triggered. If my elements are accumulating
> in a window (serving a ReduceFunction) does it spill to disk (RocksDB?) when
> a threshold size is reached?

- The function is called before adding the element to the window KV state
- Yes, only possible if no evictors are defined
- The window reduce function is applied directly on the elements of
the stream and then update the KvState instance (e.g. update RocksDB)
- Operations with RocksDB always touch RocksDB, which takes care of
spilling etc.


Re: Question about Checkpoint Storage (RocksDB)

2016-07-26 Thread Sameer W
Thanks Ufuk,

That was very helpful. But that raised a few more questions :-):

1. Calling clear() on the KV state is only possible for snapshots right? Do
you control that for checkpoints too.

2. Assuming that the user has no control over the checkpoint process
outside of controlling the checkpoint interval , when is the RocksDB
cleared of the operator state for checkpoints after they are long past. It
seems like there are only two checkpoints that are really necessary to
maintain, the current one and the previous one for restore. Does Flink
clean up checkpoints on a timer? When it does clean up checkpoints does it
also clean up the state backend (I am assuming they are different).

3. The pre-aggregating windows was very helpful as the WindowFunction is
now passed the pre-aggregated state. For windows, are the Reduce and Fold
functions called on each element event before the window is triggered. I
can see how that would work where the pre-compute is done per element but
the actual output is emitted only when the window is fired. But that is
only possible if there are no Evictors defined on the window? Also how are
the elements fed to the Reduce/Fold function. Is it like MapReduce where
even if you are using a Iterator, in reality all the values for a key are
not buffered into memory? Which ties back to how is RocksDB is used to
store a large window state before it is triggered. If my elements are
accumulating in a window (serving a ReduceFunction) does it spill to disk
(RocksDB?) when a threshold size is reached?

Thanks,
Sameer



On Tue, Jul 26, 2016 at 7:29 AM, Ufuk Celebi  wrote:

> On Mon, Jul 25, 2016 at 8:50 PM, Sameer W  wrote:
> > The question is, if using really long windows (in hours) if the state of
> the
> > window gets very large over time, would size of the RocksDB get larger?
> > Would replication to HDFS start causing performance bottlenecks? Also
> would
> > this need a constant (at checkpoint interval?), read from RocksDB, add
> more
> > window elements and write to RocksDB.
>
> Yes. The size of the RocksDB instance is directly correlated with the
> number of K/V state pairs you store. You can remove state by calling
> `clear()` on the KvState instance.
>
> All state updates go directly to RocksDB and snapshots copy the DB
> files (semi-async mode, current default) or iterate-and-copy all K/V
> pairs (fully-async mode). No records are deleted automatically after
> snapshots.
>
> Snapshotting large RocksDB instances will cause some slow down, but
> you can trade this cost off by adjusting the checkpointing interval.
> There are plans to do the snapshots in an incremental fashion in order
> to lower the costs for this, but there is no design doc available for
> it at this point.
>
> > Outside of the read costs, is there a risk to having very long windows
> when
> > you know you could collect a lot of elements in them. Instead is it
> safer to
> > perform aggregations on top of aggregations or use your own custom remote
> > store like HBase to persist larger state per record and use windows only
> to
> > store the keys in HBase. I mention HBase because of its support for
> column
> > qualifiers allow elements to be added to the same key in multiple ordered
> > column qualifiers. Reading can also be throttled in batches of column
> > qualifiers allowing for the better memory consumption. Is this approach
> used
> > in practice?
>
> RocksDB works quite well for large stateful jobs. If possible for your
> use case, I would still recommend work with pre-aggregating window
> functions (
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#window-functions
> )
> or pre-aggregating the data. The I/O costs will correlate with the
> state size, but there is "no risk" in the sense of that it will still
> work as expected.
>
> What you describe with HBase could work, but I'm not aware of someone
> doing this. Furhtermore, depending on your use case, it can cause
> problems in failure scenarios, because you might need to keep HBase
> and Flink state in sync.
>


Re: Question about Checkpoint Storage (RocksDB)

2016-07-26 Thread Ufuk Celebi
On Mon, Jul 25, 2016 at 8:50 PM, Sameer W  wrote:
> The question is, if using really long windows (in hours) if the state of the
> window gets very large over time, would size of the RocksDB get larger?
> Would replication to HDFS start causing performance bottlenecks? Also would
> this need a constant (at checkpoint interval?), read from RocksDB, add more
> window elements and write to RocksDB.

Yes. The size of the RocksDB instance is directly correlated with the
number of K/V state pairs you store. You can remove state by calling
`clear()` on the KvState instance.

All state updates go directly to RocksDB and snapshots copy the DB
files (semi-async mode, current default) or iterate-and-copy all K/V
pairs (fully-async mode). No records are deleted automatically after
snapshots.

Snapshotting large RocksDB instances will cause some slow down, but
you can trade this cost off by adjusting the checkpointing interval.
There are plans to do the snapshots in an incremental fashion in order
to lower the costs for this, but there is no design doc available for
it at this point.

> Outside of the read costs, is there a risk to having very long windows when
> you know you could collect a lot of elements in them. Instead is it safer to
> perform aggregations on top of aggregations or use your own custom remote
> store like HBase to persist larger state per record and use windows only to
> store the keys in HBase. I mention HBase because of its support for column
> qualifiers allow elements to be added to the same key in multiple ordered
> column qualifiers. Reading can also be throttled in batches of column
> qualifiers allowing for the better memory consumption. Is this approach used
> in practice?

RocksDB works quite well for large stateful jobs. If possible for your
use case, I would still recommend work with pre-aggregating window
functions 
(https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#window-functions)
or pre-aggregating the data. The I/O costs will correlate with the
state size, but there is "no risk" in the sense of that it will still
work as expected.

What you describe with HBase could work, but I'm not aware of someone
doing this. Furhtermore, depending on your use case, it can cause
problems in failure scenarios, because you might need to keep HBase
and Flink state in sync.


Question about Checkpoint Storage (RocksDB)

2016-07-25 Thread Sameer W
Hi,

My understanding about the RocksDB state backend is as follows:

When using a RocksDB state backend, it the checkpoints are backed up
locally (to the TaskManager) using the backup feature of RocksDB by taking
snapshots from RocksDB which are consistent read-only views on the RockDB
database. Each checkpoint is backed up on the task manager node and this
checkpoint is asynchronously backed up to the remote HDFS location.  When
each checkpoint is committed, the records are deleted from RocksDB,
allowing RocksDb data folders to remain small. This in turn allows each
snapshot to be relatively small. If the Task node goes away due to failure,
I assume the RocksDB database is restored from the checkpoints from the
remote HDFS. Since each checkpoint state is relatively small, the
restoration time from HDFS for the RocksDB database on the new task node is
relatively small.

The question is, if using really long windows (in hours) if the state of
the window gets very large over time, would size of the RocksDB get larger?
Would replication to HDFS start causing performance bottlenecks? Also would
this need a constant (at checkpoint interval?), read from RocksDB, add more
window elements and write to RocksDB.

Outside of the read costs, is there a risk to having very long windows when
you know you could collect a lot of elements in them. Instead is it safer
to perform aggregations on top of aggregations or use your own custom
remote store like HBase to persist larger state per record and use windows
only to store the keys in HBase. I mention HBase because of its support for
column qualifiers allow elements to be added to the same key in multiple
ordered column qualifiers. Reading can also be throttled in batches of
column qualifiers allowing for the better memory consumption. Is this
approach used in practice?

Thanks,
Sameer