HyperLogLog is worth a mention, but only if you don't mind some inaccuracy.



On 4/7/16, 8:41 AM, "Hironori Ogibayashi" <ogibaya...@gmail.com> wrote:

>I tried RocksDB, but the result was almost the same.
>
>I used the following code and put 2.6million distinct records into Kafka.
>After processing all records, the state on the HDFS become about 250MB
>and time needed for
>the checkpoint was almost 5sec. Processing throughput was
>FsStateBackend-> 8000msg/sec, RocksDBSsateBackend -> 9000msg/sec
>
>---
>    env.setStateBackend(new
>RocksDBStateBackend("hdfs://<hdfs_host>:8020/apps/flink/checkpoints"));
>
>    val stream = env
>      .addSource(new FlinkKafkaConsumer09[String]("kafka.json2", new
>SimpleStringSchema(), properties))
>      .map(parseJson(_))
>      .timeWindowAll(Time.of(10, TimeUnit.DAYS))
>      .trigger(MyContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>      // count distinct values
>      .fold(Set[String]()){(r,i) => { r + i}}
>      .map{x => (System.currentTimeMillis(), x.size)}
>      .addSink(new ElasticsearchSink(config, transports, new
>IndexRequestBuilder[Tuple2[Long, Int]]  {
>        override def createIndexRequest(element: Tuple2[Long, Int],
>ctx: RuntimeContext): IndexRequest = {
>          val json = new HashMap[String, AnyRef]
>          json.put("@timestamp", new Timestamp(element._1))
>          json.put("count", element._2: java.lang.Integer)
>          Requests.indexRequest.index("dummy3").`type`("my-type").source(json)
>        }
>      }))
>---
>
>I guess this is because I used non-keyed stream, so I had one state
>record with a big value (all distinct value).
>I think copying all 250MB(or more) file to HDFS in every checkpoint
>will be heavy, so I will try storing the distinct values
>in the external datastore (e.g. redis).
>Also, when incremental snapshot get implemented, I want to try.
>
>Regards,
>Hironori
>
>2016-04-05 21:40 GMT+09:00 Hironori Ogibayashi <ogibaya...@gmail.com>:
>> Aljoscha,
>>
>> Thank you for your quick response.
>> Yes, I am using FsStateBackend, so I will try RocksDB backend.
>>
>> Regards,
>> Hironori
>>
>> 2016-04-05 21:23 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>:
>>> Hi,
>>> I guess you are using the FsStateBackend, is that correct? You could try
>>> using the RocksDB state backend:
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state_backends.html#the-rocksdbstatebackend
>>>
>>> With this, throughput will be lower but the overhead per checkpoint could be
>>> lower. Also, with this most of the file copying necessary for the checkpoint
>>> will be done while data processing keeps running (asynchronous snapshot).
>>>
>>> As to incremental snapshots. I'm afraid this feature is not yet implemented
>>> but we're working on it.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Tue, 5 Apr 2016 at 14:06 Hironori Ogibayashi <ogibaya...@gmail.com>
>>> wrote:
>>>>
>>>> Hello,
>>>>
>>>> I am trying to implement windowed distinct count on a stream. In this
>>>> case, the state
>>>> have to hold all distinct value in the window, so can be large.
>>>>
>>>> In my test, if the state size become about 400MB, checkpointing takes
>>>> 40sec and spends most of Taskmanager's CPU.
>>>> Are there any good way to handle this situation?
>>>>
>>>> Flink document mentions about incremental snapshot, and I am interested in
>>>> it,
>>>> but could not find how to enable it. (not implemented yet?)
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
>>>>
>>>> Regards,
>>>> Hironori
>

Reply via email to