Re: Flink streaming throughput

2016-03-15 Thread
Milinda,

Thanks. I will try.

Regards,
Hironori
2016/03/16 1:31 "Milinda Pathirage" <mpath...@umail.iu.edu>:

> Hi Hironori,
>
> [1] and [2] describes the process of measuring Kafka performance. I think
> the perf test code is under org.apache.kafka.tools package in 0.9, so you
> may have to change commands in [2] to reflect that.
>
> Thanks
> Milinda
>
> [1]
> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
> [2] https://gist.github.com/jkreps/c7ddb4041ef62a900e6c
>
> On Tue, Mar 15, 2016 at 11:35 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote:
>
>> Robert,
>>
>> Thank you for your response.
>> I would like to try  kafka-console-consumer but I have no idea about
>> how to measure the consuming throughput.
>> Are there any standard way?
>> I would also try Kafka broker on physical servers.
>>
>> Regarding version, I have upgraded to Flink 1.0.0 and replaced
>> FlinkKafkaConsumer 082 with 09, but did not see
>> any difference in performance.
>>
>> Regards,
>> Hironori
>>
>>
>>
>> 2016-03-11 23:25 GMT+09:00 Robert Metzger <rmetz...@apache.org>:
>> > Hi Hironori,
>> >
>> > can you try with the kafka-console-consumer how many messages you can
>> read
>> > in one minute?
>> > Maybe the broker's disk I/O is limited because everything is running in
>> > virtual machines (potentially sharing one hard disk?)
>> > I'm also not sure if running a Kafka 0.8 consumer against a 0.9 broker
>> is
>> > working as expected.
>> >
>> > Our Kafka 0.8 consumer has been tested in environments where its reading
>> > with more than 100 MB/s per from a broker.
>> >
>> >
>> > On Fri, Mar 11, 2016 at 9:33 AM, おぎばやしひろのり <ogibaya...@gmail.com>
>> wrote:
>> >>
>> >> Aljoscha,
>> >>
>> >> Thank you for your response.
>> >>
>> >> I tried no JSON parsing and no sink (DiscardingSink) case. The
>> >> throughput was 8228msg/sec.
>> >> Slightly better than JSON + Elasticsearch case.
>> >> I also tried using socketTextStream instead of FlinkKafkaConsumer, in
>> >> that case, the result was
>> >> 60,000 msg/sec with just 40% flink TaskManager CPU Usage. (socket
>> >> server was the bottleneck)
>> >> That was amazing, although Flink's fault tolerance feature is not
>> >> available with socketTextStream.
>> >>
>> >> Regards,
>> >> Hironori
>> >>
>> >> 2016-03-08 21:36 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>:
>> >> > Hi,
>> >> > Another interesting test would be a combination of 3) and 2). I.e. no
>> >> > JSON parsing and no sink. This would show what the raw throughput
>> can be
>> >> > before being slowed down by writing to Elasticsearch.
>> >> >
>> >> > Also .print() is also not feasible for production since it just
>> prints
>> >> > every element to the stdout log on the TaskManagers, which itself
>> can cause
>> >> > quite a slowdown. You could try:
>> >> >
>> >> > datastream.addSink(new DiscardingSink())
>> >> >
>> >> > which is a dummy sink that does nothing.
>> >> >
>> >> > Cheers,
>> >> > Aljoscha
>> >> >> On 08 Mar 2016, at 13:31, おぎばやしひろのり <ogibaya...@gmail.com> wrote:
>> >> >>
>> >> >> Stephan,
>> >> >>
>> >> >> Sorry for the delay in my response.
>> >> >> I tried 3 cases you suggested.
>> >> >>
>> >> >> This time, I set parallelism to 1 for simpicity.
>> >> >>
>> >> >> 0) base performance (same as the first e-mail): 1,480msg/sec
>> >> >> 1) Disable checkpointing : almost same as 0)
>> >> >> 2) No ES sink. just print() : 1,510msg/sec
>> >> >> 3) JSON to TSV : 8,000msg/sec
>> >> >>
>> >> >> So, as you can see, the bottleneck was JSON parsing. I also want to
>> >> >> try eliminating Kafka to see
>> >> >> if there is a room to improve performance.(Currently, I am using
>> >> >> FlinkKafkaConsumer082 with Kafka 0.9
>> >> >> I think I should try Flink 1.0 and FlinkKafkaConsumer09).
>> >> >> Anyway, I think 8

Re: Flink streaming throughput

2016-03-15 Thread
Robert,

Thank you for your response.
I would like to try  kafka-console-consumer but I have no idea about
how to measure the consuming throughput.
Are there any standard way?
I would also try Kafka broker on physical servers.

Regarding version, I have upgraded to Flink 1.0.0 and replaced
FlinkKafkaConsumer 082 with 09, but did not see
any difference in performance.

Regards,
Hironori



2016-03-11 23:25 GMT+09:00 Robert Metzger <rmetz...@apache.org>:
> Hi Hironori,
>
> can you try with the kafka-console-consumer how many messages you can read
> in one minute?
> Maybe the broker's disk I/O is limited because everything is running in
> virtual machines (potentially sharing one hard disk?)
> I'm also not sure if running a Kafka 0.8 consumer against a 0.9 broker is
> working as expected.
>
> Our Kafka 0.8 consumer has been tested in environments where its reading
> with more than 100 MB/s per from a broker.
>
>
> On Fri, Mar 11, 2016 at 9:33 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote:
>>
>> Aljoscha,
>>
>> Thank you for your response.
>>
>> I tried no JSON parsing and no sink (DiscardingSink) case. The
>> throughput was 8228msg/sec.
>> Slightly better than JSON + Elasticsearch case.
>> I also tried using socketTextStream instead of FlinkKafkaConsumer, in
>> that case, the result was
>> 60,000 msg/sec with just 40% flink TaskManager CPU Usage. (socket
>> server was the bottleneck)
>> That was amazing, although Flink's fault tolerance feature is not
>> available with socketTextStream.
>>
>> Regards,
>> Hironori
>>
>> 2016-03-08 21:36 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>:
>> > Hi,
>> > Another interesting test would be a combination of 3) and 2). I.e. no
>> > JSON parsing and no sink. This would show what the raw throughput can be
>> > before being slowed down by writing to Elasticsearch.
>> >
>> > Also .print() is also not feasible for production since it just prints
>> > every element to the stdout log on the TaskManagers, which itself can cause
>> > quite a slowdown. You could try:
>> >
>> > datastream.addSink(new DiscardingSink())
>> >
>> > which is a dummy sink that does nothing.
>> >
>> > Cheers,
>> > Aljoscha
>> >> On 08 Mar 2016, at 13:31, おぎばやしひろのり <ogibaya...@gmail.com> wrote:
>> >>
>> >> Stephan,
>> >>
>> >> Sorry for the delay in my response.
>> >> I tried 3 cases you suggested.
>> >>
>> >> This time, I set parallelism to 1 for simpicity.
>> >>
>> >> 0) base performance (same as the first e-mail): 1,480msg/sec
>> >> 1) Disable checkpointing : almost same as 0)
>> >> 2) No ES sink. just print() : 1,510msg/sec
>> >> 3) JSON to TSV : 8,000msg/sec
>> >>
>> >> So, as you can see, the bottleneck was JSON parsing. I also want to
>> >> try eliminating Kafka to see
>> >> if there is a room to improve performance.(Currently, I am using
>> >> FlinkKafkaConsumer082 with Kafka 0.9
>> >> I think I should try Flink 1.0 and FlinkKafkaConsumer09).
>> >> Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of
>> >> Flink's scalability and fault tolerance.
>> >> Thank you for your advice.
>> >>
>> >> Regards,
>> >> Hironori Ogibayashi
>> >>
>> >> 2016-02-26 21:46 GMT+09:00 おぎばやしひろのり <ogibaya...@gmail.com>:
>> >>> Stephan,
>> >>>
>> >>> Thank you for your quick response.
>> >>> I will try and post the result later.
>> >>>
>> >>> Regards,
>> >>> Hironori
>> >>>
>> >>> 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>:
>> >>>> Hi!
>> >>>>
>> >>>> I would try and dig bit by bit into what the bottleneck is:
>> >>>>
>> >>>> 1) Disable the checkpointing, see what difference that makes
>> >>>> 2) Use a dummy sink (discarding) rather than elastic search, to see
>> >>>> if that
>> >>>> is limiting
>> >>>> 3) Check the JSON parsing. Many JSON libraries are very CPU intensive
>> >>>> and
>> >>>> easily dominate the entire pipeline.
>> >>>>
>> >>>> Greetings,
>> >>>> Stephan
>> >>>>
>> >>>>
>> >>>> On Fri, Feb 26, 2016 at 11:23 

Re: Flink streaming throughput

2016-03-11 Thread
Aljoscha,

Thank you for your response.

I tried no JSON parsing and no sink (DiscardingSink) case. The
throughput was 8228msg/sec.
Slightly better than JSON + Elasticsearch case.
I also tried using socketTextStream instead of FlinkKafkaConsumer, in
that case, the result was
60,000 msg/sec with just 40% flink TaskManager CPU Usage. (socket
server was the bottleneck)
That was amazing, although Flink's fault tolerance feature is not
available with socketTextStream.

Regards,
Hironori

2016-03-08 21:36 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>:
> Hi,
> Another interesting test would be a combination of 3) and 2). I.e. no JSON 
> parsing and no sink. This would show what the raw throughput can be before 
> being slowed down by writing to Elasticsearch.
>
> Also .print() is also not feasible for production since it just prints every 
> element to the stdout log on the TaskManagers, which itself can cause quite a 
> slowdown. You could try:
>
> datastream.addSink(new DiscardingSink())
>
> which is a dummy sink that does nothing.
>
> Cheers,
> Aljoscha
>> On 08 Mar 2016, at 13:31, おぎばやしひろのり <ogibaya...@gmail.com> wrote:
>>
>> Stephan,
>>
>> Sorry for the delay in my response.
>> I tried 3 cases you suggested.
>>
>> This time, I set parallelism to 1 for simpicity.
>>
>> 0) base performance (same as the first e-mail): 1,480msg/sec
>> 1) Disable checkpointing : almost same as 0)
>> 2) No ES sink. just print() : 1,510msg/sec
>> 3) JSON to TSV : 8,000msg/sec
>>
>> So, as you can see, the bottleneck was JSON parsing. I also want to
>> try eliminating Kafka to see
>> if there is a room to improve performance.(Currently, I am using
>> FlinkKafkaConsumer082 with Kafka 0.9
>> I think I should try Flink 1.0 and FlinkKafkaConsumer09).
>> Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of
>> Flink's scalability and fault tolerance.
>> Thank you for your advice.
>>
>> Regards,
>> Hironori Ogibayashi
>>
>> 2016-02-26 21:46 GMT+09:00 おぎばやしひろのり <ogibaya...@gmail.com>:
>>> Stephan,
>>>
>>> Thank you for your quick response.
>>> I will try and post the result later.
>>>
>>> Regards,
>>> Hironori
>>>
>>> 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>:
>>>> Hi!
>>>>
>>>> I would try and dig bit by bit into what the bottleneck is:
>>>>
>>>> 1) Disable the checkpointing, see what difference that makes
>>>> 2) Use a dummy sink (discarding) rather than elastic search, to see if that
>>>> is limiting
>>>> 3) Check the JSON parsing. Many JSON libraries are very CPU intensive and
>>>> easily dominate the entire pipeline.
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I started evaluating Flink and tried simple performance test.
>>>>> The result was just about 4000 messages/sec with 300% CPU usage. I
>>>>> think this is quite low and wondering if it is a reasonable result.
>>>>> If someone could check it, it would be great.
>>>>>
>>>>> Here is the detail:
>>>>>
>>>>> [servers]
>>>>> - 3 Kafka broker with 3 partitions
>>>>> - 3 Flink TaskManager + 1 JobManager
>>>>> - 1 Elasticsearch
>>>>> All of them are separate VM with 8vCPU, 8GB memory
>>>>>
>>>>> [test case]
>>>>> The application counts access log by URI with in 1 minute window and
>>>>> send the result to Elasticsearch. The actual code is below.
>>>>> I used '-p 3' option to flink run command, so the task was distributed
>>>>> to 3 TaskManagers.
>>>>> In the test, I sent about 5000 logs/sec to Kafka.
>>>>>
>>>>> [result]
>>>>> - From Elasticsearch records, the total access count for all URI was
>>>>> about 260,000/min = 4300/sec. This is the entire throughput.
>>>>> - Kafka consumer lag was keep growing.
>>>>> - The CPU usage of each TaskManager machine was about 13-14%. From top
>>>>> command output, Flink java process was using 100%(1 CPU full)
>>>>>
>>>>> So I thought the bottleneck here was CPU used by Flink Tasks.
>>>>>
>>>>> Here is the application code.
>&g

Re: Flink streaming throughput

2016-03-08 Thread
Stephan,

Sorry for the delay in my response.
I tried 3 cases you suggested.

This time, I set parallelism to 1 for simpicity.

0) base performance (same as the first e-mail): 1,480msg/sec
1) Disable checkpointing : almost same as 0)
2) No ES sink. just print() : 1,510msg/sec
3) JSON to TSV : 8,000msg/sec

So, as you can see, the bottleneck was JSON parsing. I also want to
try eliminating Kafka to see
if there is a room to improve performance.(Currently, I am using
FlinkKafkaConsumer082 with Kafka 0.9
I think I should try Flink 1.0 and FlinkKafkaConsumer09).
Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of
Flink's scalability and fault tolerance.
Thank you for your advice.

Regards,
Hironori Ogibayashi

2016-02-26 21:46 GMT+09:00 おぎばやしひろのり <ogibaya...@gmail.com>:
> Stephan,
>
> Thank you for your quick response.
> I will try and post the result later.
>
> Regards,
> Hironori
>
> 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>:
>> Hi!
>>
>> I would try and dig bit by bit into what the bottleneck is:
>>
>>  1) Disable the checkpointing, see what difference that makes
>>  2) Use a dummy sink (discarding) rather than elastic search, to see if that
>> is limiting
>>  3) Check the JSON parsing. Many JSON libraries are very CPU intensive and
>> easily dominate the entire pipeline.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote:
>>>
>>> Hello,
>>>
>>> I started evaluating Flink and tried simple performance test.
>>> The result was just about 4000 messages/sec with 300% CPU usage. I
>>> think this is quite low and wondering if it is a reasonable result.
>>> If someone could check it, it would be great.
>>>
>>> Here is the detail:
>>>
>>> [servers]
>>> - 3 Kafka broker with 3 partitions
>>> - 3 Flink TaskManager + 1 JobManager
>>> - 1 Elasticsearch
>>> All of them are separate VM with 8vCPU, 8GB memory
>>>
>>> [test case]
>>> The application counts access log by URI with in 1 minute window and
>>> send the result to Elasticsearch. The actual code is below.
>>> I used '-p 3' option to flink run command, so the task was distributed
>>> to 3 TaskManagers.
>>> In the test, I sent about 5000 logs/sec to Kafka.
>>>
>>> [result]
>>> - From Elasticsearch records, the total access count for all URI was
>>> about 260,000/min = 4300/sec. This is the entire throughput.
>>> - Kafka consumer lag was keep growing.
>>> - The CPU usage of each TaskManager machine was about 13-14%. From top
>>> command output, Flink java process was using 100%(1 CPU full)
>>>
>>> So I thought the bottleneck here was CPU used by Flink Tasks.
>>>
>>> Here is the application code.
>>> ---
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> env.enableCheckpointing(1000)
>>> ...
>>> val stream = env
>>>   .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new
>>> SimpleStringSchema(), properties))
>>>   .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String,
>>> AnyRef]] }
>>>   .map{ x => x.get("uri") match {
>>> case Some(y) => (y.asInstanceOf[String],1)
>>> case None => ("", 1)
>>>   }}
>>>   .keyBy(0)
>>>   .timeWindow(Time.of(1, TimeUnit.MINUTES))
>>>   .sum(1)
>>>   .map{ x => (System.currentTimeMillis(), x)}
>>>   .addSink(new ElasticsearchSink(config, transports, new
>>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
>>> override def createIndexRequest(element: Tuple2[Long,
>>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
>>>   val json = new HashMap[String, AnyRef]
>>>   json.put("@timestamp", new Timestamp(element._1))
>>>   json.put("uri", element._2._1)
>>>   json.put("count", element._2._2: java.lang.Integer)
>>>   println("SENDING: " + element)
>>>
>>> Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
>>> }
>>>   }))
>>> ---
>>>
>>> Regards,
>>> Hironori Ogibayashi
>>
>>


Re: Flink streaming throughput

2016-02-26 Thread
Stephan,

Thank you for your quick response.
I will try and post the result later.

Regards,
Hironori

2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>:
> Hi!
>
> I would try and dig bit by bit into what the bottleneck is:
>
>  1) Disable the checkpointing, see what difference that makes
>  2) Use a dummy sink (discarding) rather than elastic search, to see if that
> is limiting
>  3) Check the JSON parsing. Many JSON libraries are very CPU intensive and
> easily dominate the entire pipeline.
>
> Greetings,
> Stephan
>
>
> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote:
>>
>> Hello,
>>
>> I started evaluating Flink and tried simple performance test.
>> The result was just about 4000 messages/sec with 300% CPU usage. I
>> think this is quite low and wondering if it is a reasonable result.
>> If someone could check it, it would be great.
>>
>> Here is the detail:
>>
>> [servers]
>> - 3 Kafka broker with 3 partitions
>> - 3 Flink TaskManager + 1 JobManager
>> - 1 Elasticsearch
>> All of them are separate VM with 8vCPU, 8GB memory
>>
>> [test case]
>> The application counts access log by URI with in 1 minute window and
>> send the result to Elasticsearch. The actual code is below.
>> I used '-p 3' option to flink run command, so the task was distributed
>> to 3 TaskManagers.
>> In the test, I sent about 5000 logs/sec to Kafka.
>>
>> [result]
>> - From Elasticsearch records, the total access count for all URI was
>> about 260,000/min = 4300/sec. This is the entire throughput.
>> - Kafka consumer lag was keep growing.
>> - The CPU usage of each TaskManager machine was about 13-14%. From top
>> command output, Flink java process was using 100%(1 CPU full)
>>
>> So I thought the bottleneck here was CPU used by Flink Tasks.
>>
>> Here is the application code.
>> ---
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.enableCheckpointing(1000)
>> ...
>> val stream = env
>>   .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new
>> SimpleStringSchema(), properties))
>>   .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String,
>> AnyRef]] }
>>   .map{ x => x.get("uri") match {
>> case Some(y) => (y.asInstanceOf[String],1)
>> case None => ("", 1)
>>   }}
>>   .keyBy(0)
>>   .timeWindow(Time.of(1, TimeUnit.MINUTES))
>>   .sum(1)
>>   .map{ x => (System.currentTimeMillis(), x)}
>>   .addSink(new ElasticsearchSink(config, transports, new
>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
>> override def createIndexRequest(element: Tuple2[Long,
>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
>>   val json = new HashMap[String, AnyRef]
>>   json.put("@timestamp", new Timestamp(element._1))
>>   json.put("uri", element._2._1)
>>   json.put("count", element._2._2: java.lang.Integer)
>>   println("SENDING: " + element)
>>
>> Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
>> }
>>   }))
>> ---
>>
>> Regards,
>> Hironori Ogibayashi
>
>


Flink streaming throughput

2016-02-26 Thread
Hello,

I started evaluating Flink and tried simple performance test.
The result was just about 4000 messages/sec with 300% CPU usage. I
think this is quite low and wondering if it is a reasonable result.
If someone could check it, it would be great.

Here is the detail:

[servers]
- 3 Kafka broker with 3 partitions
- 3 Flink TaskManager + 1 JobManager
- 1 Elasticsearch
All of them are separate VM with 8vCPU, 8GB memory

[test case]
The application counts access log by URI with in 1 minute window and
send the result to Elasticsearch. The actual code is below.
I used '-p 3' option to flink run command, so the task was distributed
to 3 TaskManagers.
In the test, I sent about 5000 logs/sec to Kafka.

[result]
- From Elasticsearch records, the total access count for all URI was
about 260,000/min = 4300/sec. This is the entire throughput.
- Kafka consumer lag was keep growing.
- The CPU usage of each TaskManager machine was about 13-14%. From top
command output, Flink java process was using 100%(1 CPU full)

So I thought the bottleneck here was CPU used by Flink Tasks.

Here is the application code.
---
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(1000)
...
val stream = env
  .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new
SimpleStringSchema(), properties))
  .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String, AnyRef]] }
  .map{ x => x.get("uri") match {
case Some(y) => (y.asInstanceOf[String],1)
case None => ("", 1)
  }}
  .keyBy(0)
  .timeWindow(Time.of(1, TimeUnit.MINUTES))
  .sum(1)
  .map{ x => (System.currentTimeMillis(), x)}
  .addSink(new ElasticsearchSink(config, transports, new
IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
override def createIndexRequest(element: Tuple2[Long,
Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
  val json = new HashMap[String, AnyRef]
  json.put("@timestamp", new Timestamp(element._1))
  json.put("uri", element._2._1)
  json.put("count", element._2._2: java.lang.Integer)
  println("SENDING: " + element)
  Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
}
  }))
---

Regards,
Hironori Ogibayashi