Thank you for your suggestion, Regarding throughput, actually, there was a bottleneck at the process which put logs into Kafka. When I added more process, the throughput increased.
And, also, HyperLogLog seems a good solution in this case. I will try. Regards, Hironori 2016-04-07 17:45 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>: > Ah yes, you're right. With the non-keyed stream it doesn't make a big > difference because it's only one big state value. > > The throughput still seems quite low. Have you ever tried looking at the > "back pressure" tab on the Flink dashboard. For this I would suggest to > disable chaining, so that every operator is run in an isolated task: > > env.disableOperatorChaining(); > > On Thu, 7 Apr 2016 at 05:11 Hironori Ogibayashi <ogibaya...@gmail.com> > wrote: >> >> I tried RocksDB, but the result was almost the same. >> >> I used the following code and put 2.6million distinct records into Kafka. >> After processing all records, the state on the HDFS become about 250MB >> and time needed for >> the checkpoint was almost 5sec. Processing throughput was >> FsStateBackend-> 8000msg/sec, RocksDBSsateBackend -> 9000msg/sec >> >> --- >> env.setStateBackend(new >> RocksDBStateBackend("hdfs://<hdfs_host>:8020/apps/flink/checkpoints")); >> >> val stream = env >> .addSource(new FlinkKafkaConsumer09[String]("kafka.json2", new >> SimpleStringSchema(), properties)) >> .map(parseJson(_)) >> .timeWindowAll(Time.of(10, TimeUnit.DAYS)) >> .trigger(MyContinuousProcessingTimeTrigger.of(Time.seconds(5))) >> // count distinct values >> .fold(Set[String]()){(r,i) => { r + i}} >> .map{x => (System.currentTimeMillis(), x.size)} >> .addSink(new ElasticsearchSink(config, transports, new >> IndexRequestBuilder[Tuple2[Long, Int]] { >> override def createIndexRequest(element: Tuple2[Long, Int], >> ctx: RuntimeContext): IndexRequest = { >> val json = new HashMap[String, AnyRef] >> json.put("@timestamp", new Timestamp(element._1)) >> json.put("count", element._2: java.lang.Integer) >> >> Requests.indexRequest.index("dummy3").`type`("my-type").source(json) >> } >> })) >> --- >> >> I guess this is because I used non-keyed stream, so I had one state >> record with a big value (all distinct value). >> I think copying all 250MB(or more) file to HDFS in every checkpoint >> will be heavy, so I will try storing the distinct values >> in the external datastore (e.g. redis). >> Also, when incremental snapshot get implemented, I want to try. >> >> Regards, >> Hironori >> >> 2016-04-05 21:40 GMT+09:00 Hironori Ogibayashi <ogibaya...@gmail.com>: >> > Aljoscha, >> > >> > Thank you for your quick response. >> > Yes, I am using FsStateBackend, so I will try RocksDB backend. >> > >> > Regards, >> > Hironori >> > >> > 2016-04-05 21:23 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>: >> >> Hi, >> >> I guess you are using the FsStateBackend, is that correct? You could >> >> try >> >> using the RocksDB state backend: >> >> >> >> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state_backends.html#the-rocksdbstatebackend >> >> >> >> With this, throughput will be lower but the overhead per checkpoint >> >> could be >> >> lower. Also, with this most of the file copying necessary for the >> >> checkpoint >> >> will be done while data processing keeps running (asynchronous >> >> snapshot). >> >> >> >> As to incremental snapshots. I'm afraid this feature is not yet >> >> implemented >> >> but we're working on it. >> >> >> >> Cheers, >> >> Aljoscha >> >> >> >> On Tue, 5 Apr 2016 at 14:06 Hironori Ogibayashi <ogibaya...@gmail.com> >> >> wrote: >> >>> >> >>> Hello, >> >>> >> >>> I am trying to implement windowed distinct count on a stream. In this >> >>> case, the state >> >>> have to hold all distinct value in the window, so can be large. >> >>> >> >>> In my test, if the state size become about 400MB, checkpointing takes >> >>> 40sec and spends most of Taskmanager's CPU. >> >>> Are there any good way to handle this situation? >> >>> >> >>> Flink document mentions about incremental snapshot, and I am >> >>> interested in >> >>> it, >> >>> but could not find how to enable it. (not implemented yet?) >> >>> >> >>> >> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html >> >>> >> >>> Regards, >> >>> Hironori