Yep, that was exactly the issue.

Thanks for the help!

On Wed, May 3, 2017 at 2:44 PM, Stefan Richter <s.rich...@data-artisans.com>
wrote:

> Ok, given the info that you are using ListState (which uses RocksDB’s
> merge() internally) this is probably a case of this problem:
> https://github.com/facebook/rocksdb/issues/1988
>
> We provide a custom version of RocksDB with Flink 1.2.1 (where we fixed
> the slow merge operations) until we can upgrade to a newer version of
> RocksDB. So updating to 1.2.1 should fix the slowdown you observe.
>
> Am 03.05.2017 um 19:10 schrieb Jason Brelloch <jb.bc....@gmail.com>:
>
> So looking through the logs I found these lines (repeated same test again
> with a rocksDB backend, took 5m55s):
>
> 2017-05-03 12:52:24,131 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>     - Triggering checkpoint 2 @ 1493830344131
> 2017-05-03 12:52:24,132 INFO  org.apache.flink.core.fs.FileSystem
>                   - Created new CloseableRegistry org.apache.flink.core.fs.
> SafetyNetCloseableRegistry@56ff02da for Async calls on Source: CIC Json
> Event Source -> Map -> Filter (1/1)
> 2017-05-03 12:52:24,132 INFO  org.apache.flink.core.fs.FileSystem
>                   - Created new CloseableRegistry org.apache.flink.core.fs.
> SafetyNetCloseableRegistry@116cba7c for Async calls on Source: Custom
> Source -> Filter -> Map -> CIC Control Source (1/1)
> 2017-05-03 12:52:24,134 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend
>  - Asynchronous RocksDB snapshot (File Stream Factory @
> file:/home/jason/flink/checkpoint/b10bfbd4911c2d37dc2d610f4951c28a,
> synchronous part) in thread Thread[Sequence Function -> Sink: Unnamed
> (1/1),5,Flink Task Threads] took 0 ms.
> 2017-05-03 12:52:24,142 INFO  org.apache.flink.core.fs.FileSystem
>                   - Ensuring all FileSystem streams are closed for Async
> calls on Source: CIC Json Event Source -> Map -> Filter (1/1)
> 2017-05-03 12:52:24,142 INFO  org.apache.flink.core.fs.FileSystem
>                   - Ensuring all FileSystem streams are closed for Async
> calls on Source: Custom Source -> Filter -> Map -> CIC Control Source (1/1)
> 2017-05-03 12:58:19,167 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend
>  - Asynchronous RocksDB snapshot (File Stream Factory @
> file:/home/jason/flink/checkpoint/b10bfbd4911c2d37dc2d610f4951c28a,
> asynchronous part) in thread Thread[pool-4-thread-2,5,Flink Task Threads]
> took 355032 ms.
> 2017-05-03 12:58:19,170 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
>     - Completed checkpoint 2 (27543402 bytes in 355037 ms).
>
> Which I think means the async save to rocksDB took up most of time, and
> not the serializing.  We had some serialization slowdown when we were using
> an ever growing ValueState object, but switching to a ListState seems to
> have resolved that, so I am not sure that that is the issue.
>
> On Wed, May 3, 2017 at 12:05 PM, Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> Sorry, just saw that your question was actually mainly about
>> checkpointing, but it can still be related to my previous answer. I assume
>> the checkpointing time is the time that is reported in the web interface?
>> This would be the end-to-end runtime of the checkpoint which does not
>> really tell you how much time is spend on writing the state itself, but you
>> can find this exact detail in the logging; you can grep for lines that
>> start with "Asynchronous RocksDB snapshot“. The background is that
>> end-to-end also includes the time the checkpoint barrier needs to travel to
>> the operator. If there is a lot of backpressure and a lot of network
>> buffers, this can take a while. Still, the reason for the backpressure
>> could still be in the way you access RocksDB, as it seems you are
>> de/serializing every time you update an ever-growing value under a single
>> key. I can see that accesses under this conditions could become very slow
>> eventually, but could remain fast on the FSBackend for the reason from my
>> first answer.
>>
>> Am 03.05.2017 um 17:54 schrieb Stefan Richter <
>> s.rich...@data-artisans.com>:
>>
>> Hi,
>>
>> typically, I would expect that the bottleneck with the RocksDB backend is
>> not RocksDB itself, but your TypeSerializers. I suggest to first run a
>> profiler/sampling attached to the process and check if the problematic
>> methods are in serialization or the actual accesses to RocksDB. The RocksDB
>> backend has to go through de/serialize roundtrips on every single state
>> access, while the FSBackend works on heap objects immediately. For
>> checkpoints, the RocksDB backend can write bytes directly whereas the
>> FSBackend has to use the serializers to get from objects to bytes, so their
>> actions w.r.t. how serializers are used are kind of inverted between
>> operation and checkpointing. For Flink 1.3 we also will introduce
>> incremental checkpoints on RocksDB that piggyback on the SST files. Flink
>> 1.2 is writing checkpoints and savepoints fully and in a custom format.
>>
>> Best,
>> Stefan
>>
>> Am 03.05.2017 um 16:46 schrieb Jason Brelloch <jb.bc....@gmail.com>:
>>
>> Hey all,
>>
>> I am looking for some advice on tuning rocksDB for better performance in
>> Flink 1.2.  I created a pretty simple job with a single kafka source and
>> one flatmap function that just stores 50000 events in a single key of
>> managed keyed state and then drops everything else, to test checkpoint
>> performance.  Using a basic FsStateBackend configured as:
>>
>> val backend = new FsStateBackend("file:///home/jason/flink/checkpoint")
>> env.setStateBackend(backend)
>>
>> With about 30MB of state we see the checkpoints completing in 151ms.
>> Using a RocksDBStateBackend configured as:
>>
>> val backend = new RocksDBStateBackend("file:///h
>> ome/jason/flink/checkpoint")
>> backend.setDbStoragePath("file:///home/jason/flink/rocksdb")
>> backend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
>> env.setStateBackend(backend)
>>
>> Running the same test the checkpoint takes 3 minutes 42 seconds.
>>
>> I expect it to be slower, but that seems excessive.  I am also a little
>> confused as to when rocksDB and flink decide to write to disk, because
>> watching the database the .sst file wasn't created until significantly
>> after the checkpoint was completed, and the state had not changed.  Is
>> there anything I can do to increase the speed of the checkpoints, or
>> anywhere I can look to debug the issue?  (Nothing seems out of the ordinary
>> in the flink logs or rocksDB logs)
>>
>> Thanks!
>>
>> --
>> *Jason Brelloch* | Product Developer
>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
>> <http://www.bettercloud.com/>
>> Subscribe to the BetterCloud Monitor
>> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch>
>>  -
>> Get IT delivered to your inbox
>>
>>
>>
>>
>
>
> --
> *Jason Brelloch* | Product Developer
> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
> <http://www.bettercloud.com/>
> Subscribe to the BetterCloud Monitor
> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch>
>  -
> Get IT delivered to your inbox
>
>
>


-- 
*Jason Brelloch* | Product Developer
3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
<http://www.bettercloud.com/>
Subscribe to the BetterCloud Monitor
<https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch>
-
Get IT delivered to your inbox

Reply via email to