Yep, that was exactly the issue. Thanks for the help!
On Wed, May 3, 2017 at 2:44 PM, Stefan Richter <s.rich...@data-artisans.com> wrote: > Ok, given the info that you are using ListState (which uses RocksDB’s > merge() internally) this is probably a case of this problem: > https://github.com/facebook/rocksdb/issues/1988 > > We provide a custom version of RocksDB with Flink 1.2.1 (where we fixed > the slow merge operations) until we can upgrade to a newer version of > RocksDB. So updating to 1.2.1 should fix the slowdown you observe. > > Am 03.05.2017 um 19:10 schrieb Jason Brelloch <jb.bc....@gmail.com>: > > So looking through the logs I found these lines (repeated same test again > with a rocksDB backend, took 5m55s): > > 2017-05-03 12:52:24,131 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Triggering checkpoint 2 @ 1493830344131 > 2017-05-03 12:52:24,132 INFO org.apache.flink.core.fs.FileSystem > - Created new CloseableRegistry org.apache.flink.core.fs. > SafetyNetCloseableRegistry@56ff02da for Async calls on Source: CIC Json > Event Source -> Map -> Filter (1/1) > 2017-05-03 12:52:24,132 INFO org.apache.flink.core.fs.FileSystem > - Created new CloseableRegistry org.apache.flink.core.fs. > SafetyNetCloseableRegistry@116cba7c for Async calls on Source: Custom > Source -> Filter -> Map -> CIC Control Source (1/1) > 2017-05-03 12:52:24,134 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend > - Asynchronous RocksDB snapshot (File Stream Factory @ > file:/home/jason/flink/checkpoint/b10bfbd4911c2d37dc2d610f4951c28a, > synchronous part) in thread Thread[Sequence Function -> Sink: Unnamed > (1/1),5,Flink Task Threads] took 0 ms. > 2017-05-03 12:52:24,142 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Async > calls on Source: CIC Json Event Source -> Map -> Filter (1/1) > 2017-05-03 12:52:24,142 INFO org.apache.flink.core.fs.FileSystem > - Ensuring all FileSystem streams are closed for Async > calls on Source: Custom Source -> Filter -> Map -> CIC Control Source (1/1) > 2017-05-03 12:58:19,167 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend > - Asynchronous RocksDB snapshot (File Stream Factory @ > file:/home/jason/flink/checkpoint/b10bfbd4911c2d37dc2d610f4951c28a, > asynchronous part) in thread Thread[pool-4-thread-2,5,Flink Task Threads] > took 355032 ms. > 2017-05-03 12:58:19,170 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Completed checkpoint 2 (27543402 bytes in 355037 ms). > > Which I think means the async save to rocksDB took up most of time, and > not the serializing. We had some serialization slowdown when we were using > an ever growing ValueState object, but switching to a ListState seems to > have resolved that, so I am not sure that that is the issue. > > On Wed, May 3, 2017 at 12:05 PM, Stefan Richter < > s.rich...@data-artisans.com> wrote: > >> Sorry, just saw that your question was actually mainly about >> checkpointing, but it can still be related to my previous answer. I assume >> the checkpointing time is the time that is reported in the web interface? >> This would be the end-to-end runtime of the checkpoint which does not >> really tell you how much time is spend on writing the state itself, but you >> can find this exact detail in the logging; you can grep for lines that >> start with "Asynchronous RocksDB snapshot“. The background is that >> end-to-end also includes the time the checkpoint barrier needs to travel to >> the operator. If there is a lot of backpressure and a lot of network >> buffers, this can take a while. Still, the reason for the backpressure >> could still be in the way you access RocksDB, as it seems you are >> de/serializing every time you update an ever-growing value under a single >> key. I can see that accesses under this conditions could become very slow >> eventually, but could remain fast on the FSBackend for the reason from my >> first answer. >> >> Am 03.05.2017 um 17:54 schrieb Stefan Richter < >> s.rich...@data-artisans.com>: >> >> Hi, >> >> typically, I would expect that the bottleneck with the RocksDB backend is >> not RocksDB itself, but your TypeSerializers. I suggest to first run a >> profiler/sampling attached to the process and check if the problematic >> methods are in serialization or the actual accesses to RocksDB. The RocksDB >> backend has to go through de/serialize roundtrips on every single state >> access, while the FSBackend works on heap objects immediately. For >> checkpoints, the RocksDB backend can write bytes directly whereas the >> FSBackend has to use the serializers to get from objects to bytes, so their >> actions w.r.t. how serializers are used are kind of inverted between >> operation and checkpointing. For Flink 1.3 we also will introduce >> incremental checkpoints on RocksDB that piggyback on the SST files. Flink >> 1.2 is writing checkpoints and savepoints fully and in a custom format. >> >> Best, >> Stefan >> >> Am 03.05.2017 um 16:46 schrieb Jason Brelloch <jb.bc....@gmail.com>: >> >> Hey all, >> >> I am looking for some advice on tuning rocksDB for better performance in >> Flink 1.2. I created a pretty simple job with a single kafka source and >> one flatmap function that just stores 50000 events in a single key of >> managed keyed state and then drops everything else, to test checkpoint >> performance. Using a basic FsStateBackend configured as: >> >> val backend = new FsStateBackend("file:///home/jason/flink/checkpoint") >> env.setStateBackend(backend) >> >> With about 30MB of state we see the checkpoints completing in 151ms. >> Using a RocksDBStateBackend configured as: >> >> val backend = new RocksDBStateBackend("file:///h >> ome/jason/flink/checkpoint") >> backend.setDbStoragePath("file:///home/jason/flink/rocksdb") >> backend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED) >> env.setStateBackend(backend) >> >> Running the same test the checkpoint takes 3 minutes 42 seconds. >> >> I expect it to be slower, but that seems excessive. I am also a little >> confused as to when rocksDB and flink decide to write to disk, because >> watching the database the .sst file wasn't created until significantly >> after the checkpoint was completed, and the state had not changed. Is >> there anything I can do to increase the speed of the checkpoints, or >> anywhere I can look to debug the issue? (Nothing seems out of the ordinary >> in the flink logs or rocksDB logs) >> >> Thanks! >> >> -- >> *Jason Brelloch* | Product Developer >> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 >> <http://www.bettercloud.com/> >> Subscribe to the BetterCloud Monitor >> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch> >> - >> Get IT delivered to your inbox >> >> >> >> > > > -- > *Jason Brelloch* | Product Developer > 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 > <http://www.bettercloud.com/> > Subscribe to the BetterCloud Monitor > <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch> > - > Get IT delivered to your inbox > > > -- *Jason Brelloch* | Product Developer 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 <http://www.bettercloud.com/> Subscribe to the BetterCloud Monitor <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch> - Get IT delivered to your inbox