Thank you for your suggestion,

Regarding throughput, actually, there was a bottleneck at the process
which put logs into Kafka.
When I added more process, the throughput increased.

And, also, HyperLogLog seems a good solution in this case. I will try.

Regards,
Hironori

2016-04-07 17:45 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>:
> Ah yes, you're right. With the non-keyed stream it doesn't make a big
> difference because it's only one big state value.
>
> The throughput still seems quite low. Have you ever tried looking at the
> "back pressure" tab on the Flink dashboard. For this I would suggest to
> disable chaining, so that every operator is run in an isolated task:
>
> env.disableOperatorChaining();
>
> On Thu, 7 Apr 2016 at 05:11 Hironori Ogibayashi <ogibaya...@gmail.com>
> wrote:
>>
>> I tried RocksDB, but the result was almost the same.
>>
>> I used the following code and put 2.6million distinct records into Kafka.
>> After processing all records, the state on the HDFS become about 250MB
>> and time needed for
>> the checkpoint was almost 5sec. Processing throughput was
>> FsStateBackend-> 8000msg/sec, RocksDBSsateBackend -> 9000msg/sec
>>
>> ---
>>     env.setStateBackend(new
>> RocksDBStateBackend("hdfs://<hdfs_host>:8020/apps/flink/checkpoints"));
>>
>>     val stream = env
>>       .addSource(new FlinkKafkaConsumer09[String]("kafka.json2", new
>> SimpleStringSchema(), properties))
>>       .map(parseJson(_))
>>       .timeWindowAll(Time.of(10, TimeUnit.DAYS))
>>       .trigger(MyContinuousProcessingTimeTrigger.of(Time.seconds(5)))
>>       // count distinct values
>>       .fold(Set[String]()){(r,i) => { r + i}}
>>       .map{x => (System.currentTimeMillis(), x.size)}
>>       .addSink(new ElasticsearchSink(config, transports, new
>> IndexRequestBuilder[Tuple2[Long, Int]]  {
>>         override def createIndexRequest(element: Tuple2[Long, Int],
>> ctx: RuntimeContext): IndexRequest = {
>>           val json = new HashMap[String, AnyRef]
>>           json.put("@timestamp", new Timestamp(element._1))
>>           json.put("count", element._2: java.lang.Integer)
>>
>> Requests.indexRequest.index("dummy3").`type`("my-type").source(json)
>>         }
>>       }))
>> ---
>>
>> I guess this is because I used non-keyed stream, so I had one state
>> record with a big value (all distinct value).
>> I think copying all 250MB(or more) file to HDFS in every checkpoint
>> will be heavy, so I will try storing the distinct values
>> in the external datastore (e.g. redis).
>> Also, when incremental snapshot get implemented, I want to try.
>>
>> Regards,
>> Hironori
>>
>> 2016-04-05 21:40 GMT+09:00 Hironori Ogibayashi <ogibaya...@gmail.com>:
>> > Aljoscha,
>> >
>> > Thank you for your quick response.
>> > Yes, I am using FsStateBackend, so I will try RocksDB backend.
>> >
>> > Regards,
>> > Hironori
>> >
>> > 2016-04-05 21:23 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>:
>> >> Hi,
>> >> I guess you are using the FsStateBackend, is that correct? You could
>> >> try
>> >> using the RocksDB state backend:
>> >>
>> >> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state_backends.html#the-rocksdbstatebackend
>> >>
>> >> With this, throughput will be lower but the overhead per checkpoint
>> >> could be
>> >> lower. Also, with this most of the file copying necessary for the
>> >> checkpoint
>> >> will be done while data processing keeps running (asynchronous
>> >> snapshot).
>> >>
>> >> As to incremental snapshots. I'm afraid this feature is not yet
>> >> implemented
>> >> but we're working on it.
>> >>
>> >> Cheers,
>> >> Aljoscha
>> >>
>> >> On Tue, 5 Apr 2016 at 14:06 Hironori Ogibayashi <ogibaya...@gmail.com>
>> >> wrote:
>> >>>
>> >>> Hello,
>> >>>
>> >>> I am trying to implement windowed distinct count on a stream. In this
>> >>> case, the state
>> >>> have to hold all distinct value in the window, so can be large.
>> >>>
>> >>> In my test, if the state size become about 400MB, checkpointing takes
>> >>> 40sec and spends most of Taskmanager's CPU.
>> >>> Are there any good way to handle this situation?
>> >>>
>> >>> Flink document mentions about incremental snapshot, and I am
>> >>> interested in
>> >>> it,
>> >>> but could not find how to enable it. (not implemented yet?)
>> >>>
>> >>>
>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
>> >>>
>> >>> Regards,
>> >>> Hironori

Reply via email to