I tried RocksDB, but the result was almost the same. I used the following code and put 2.6million distinct records into Kafka. After processing all records, the state on the HDFS become about 250MB and time needed for the checkpoint was almost 5sec. Processing throughput was FsStateBackend-> 8000msg/sec, RocksDBSsateBackend -> 9000msg/sec
--- env.setStateBackend(new RocksDBStateBackend("hdfs://<hdfs_host>:8020/apps/flink/checkpoints")); val stream = env .addSource(new FlinkKafkaConsumer09[String]("kafka.json2", new SimpleStringSchema(), properties)) .map(parseJson(_)) .timeWindowAll(Time.of(10, TimeUnit.DAYS)) .trigger(MyContinuousProcessingTimeTrigger.of(Time.seconds(5))) // count distinct values .fold(Set[String]()){(r,i) => { r + i}} .map{x => (System.currentTimeMillis(), x.size)} .addSink(new ElasticsearchSink(config, transports, new IndexRequestBuilder[Tuple2[Long, Int]] { override def createIndexRequest(element: Tuple2[Long, Int], ctx: RuntimeContext): IndexRequest = { val json = new HashMap[String, AnyRef] json.put("@timestamp", new Timestamp(element._1)) json.put("count", element._2: java.lang.Integer) Requests.indexRequest.index("dummy3").`type`("my-type").source(json) } })) --- I guess this is because I used non-keyed stream, so I had one state record with a big value (all distinct value). I think copying all 250MB(or more) file to HDFS in every checkpoint will be heavy, so I will try storing the distinct values in the external datastore (e.g. redis). Also, when incremental snapshot get implemented, I want to try. Regards, Hironori 2016-04-05 21:40 GMT+09:00 Hironori Ogibayashi <ogibaya...@gmail.com>: > Aljoscha, > > Thank you for your quick response. > Yes, I am using FsStateBackend, so I will try RocksDB backend. > > Regards, > Hironori > > 2016-04-05 21:23 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>: >> Hi, >> I guess you are using the FsStateBackend, is that correct? You could try >> using the RocksDB state backend: >> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state_backends.html#the-rocksdbstatebackend >> >> With this, throughput will be lower but the overhead per checkpoint could be >> lower. Also, with this most of the file copying necessary for the checkpoint >> will be done while data processing keeps running (asynchronous snapshot). >> >> As to incremental snapshots. I'm afraid this feature is not yet implemented >> but we're working on it. >> >> Cheers, >> Aljoscha >> >> On Tue, 5 Apr 2016 at 14:06 Hironori Ogibayashi <ogibaya...@gmail.com> >> wrote: >>> >>> Hello, >>> >>> I am trying to implement windowed distinct count on a stream. In this >>> case, the state >>> have to hold all distinct value in the window, so can be large. >>> >>> In my test, if the state size become about 400MB, checkpointing takes >>> 40sec and spends most of Taskmanager's CPU. >>> Are there any good way to handle this situation? >>> >>> Flink document mentions about incremental snapshot, and I am interested in >>> it, >>> but could not find how to enable it. (not implemented yet?) >>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html >>> >>> Regards, >>> Hironori