HyperLogLog is worth a mention, but only if you don't mind some inaccuracy.
On 4/7/16, 8:41 AM, "Hironori Ogibayashi" <ogibaya...@gmail.com> wrote: >I tried RocksDB, but the result was almost the same. > >I used the following code and put 2.6million distinct records into Kafka. >After processing all records, the state on the HDFS become about 250MB >and time needed for >the checkpoint was almost 5sec. Processing throughput was >FsStateBackend-> 8000msg/sec, RocksDBSsateBackend -> 9000msg/sec > >--- > env.setStateBackend(new >RocksDBStateBackend("hdfs://<hdfs_host>:8020/apps/flink/checkpoints")); > > val stream = env > .addSource(new FlinkKafkaConsumer09[String]("kafka.json2", new >SimpleStringSchema(), properties)) > .map(parseJson(_)) > .timeWindowAll(Time.of(10, TimeUnit.DAYS)) > .trigger(MyContinuousProcessingTimeTrigger.of(Time.seconds(5))) > // count distinct values > .fold(Set[String]()){(r,i) => { r + i}} > .map{x => (System.currentTimeMillis(), x.size)} > .addSink(new ElasticsearchSink(config, transports, new >IndexRequestBuilder[Tuple2[Long, Int]] { > override def createIndexRequest(element: Tuple2[Long, Int], >ctx: RuntimeContext): IndexRequest = { > val json = new HashMap[String, AnyRef] > json.put("@timestamp", new Timestamp(element._1)) > json.put("count", element._2: java.lang.Integer) > Requests.indexRequest.index("dummy3").`type`("my-type").source(json) > } > })) >--- > >I guess this is because I used non-keyed stream, so I had one state >record with a big value (all distinct value). >I think copying all 250MB(or more) file to HDFS in every checkpoint >will be heavy, so I will try storing the distinct values >in the external datastore (e.g. redis). >Also, when incremental snapshot get implemented, I want to try. > >Regards, >Hironori > >2016-04-05 21:40 GMT+09:00 Hironori Ogibayashi <ogibaya...@gmail.com>: >> Aljoscha, >> >> Thank you for your quick response. >> Yes, I am using FsStateBackend, so I will try RocksDB backend. >> >> Regards, >> Hironori >> >> 2016-04-05 21:23 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>: >>> Hi, >>> I guess you are using the FsStateBackend, is that correct? You could try >>> using the RocksDB state backend: >>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state_backends.html#the-rocksdbstatebackend >>> >>> With this, throughput will be lower but the overhead per checkpoint could be >>> lower. Also, with this most of the file copying necessary for the checkpoint >>> will be done while data processing keeps running (asynchronous snapshot). >>> >>> As to incremental snapshots. I'm afraid this feature is not yet implemented >>> but we're working on it. >>> >>> Cheers, >>> Aljoscha >>> >>> On Tue, 5 Apr 2016 at 14:06 Hironori Ogibayashi <ogibaya...@gmail.com> >>> wrote: >>>> >>>> Hello, >>>> >>>> I am trying to implement windowed distinct count on a stream. In this >>>> case, the state >>>> have to hold all distinct value in the window, so can be large. >>>> >>>> In my test, if the state size become about 400MB, checkpointing takes >>>> 40sec and spends most of Taskmanager's CPU. >>>> Are there any good way to handle this situation? >>>> >>>> Flink document mentions about incremental snapshot, and I am interested in >>>> it, >>>> but could not find how to enable it. (not implemented yet?) >>>> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html >>>> >>>> Regards, >>>> Hironori >