Re: Flink streaming throughput
Milinda, Thanks. I will try. Regards, Hironori 2016/03/16 1:31 "Milinda Pathirage" <mpath...@umail.iu.edu>: > Hi Hironori, > > [1] and [2] describes the process of measuring Kafka performance. I think > the perf test code is under org.apache.kafka.tools package in 0.9, so you > may have to change commands in [2] to reflect that. > > Thanks > Milinda > > [1] > https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines > [2] https://gist.github.com/jkreps/c7ddb4041ef62a900e6c > > On Tue, Mar 15, 2016 at 11:35 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote: > >> Robert, >> >> Thank you for your response. >> I would like to try kafka-console-consumer but I have no idea about >> how to measure the consuming throughput. >> Are there any standard way? >> I would also try Kafka broker on physical servers. >> >> Regarding version, I have upgraded to Flink 1.0.0 and replaced >> FlinkKafkaConsumer 082 with 09, but did not see >> any difference in performance. >> >> Regards, >> Hironori >> >> >> >> 2016-03-11 23:25 GMT+09:00 Robert Metzger <rmetz...@apache.org>: >> > Hi Hironori, >> > >> > can you try with the kafka-console-consumer how many messages you can >> read >> > in one minute? >> > Maybe the broker's disk I/O is limited because everything is running in >> > virtual machines (potentially sharing one hard disk?) >> > I'm also not sure if running a Kafka 0.8 consumer against a 0.9 broker >> is >> > working as expected. >> > >> > Our Kafka 0.8 consumer has been tested in environments where its reading >> > with more than 100 MB/s per from a broker. >> > >> > >> > On Fri, Mar 11, 2016 at 9:33 AM, おぎばやしひろのり <ogibaya...@gmail.com> >> wrote: >> >> >> >> Aljoscha, >> >> >> >> Thank you for your response. >> >> >> >> I tried no JSON parsing and no sink (DiscardingSink) case. The >> >> throughput was 8228msg/sec. >> >> Slightly better than JSON + Elasticsearch case. >> >> I also tried using socketTextStream instead of FlinkKafkaConsumer, in >> >> that case, the result was >> >> 60,000 msg/sec with just 40% flink TaskManager CPU Usage. (socket >> >> server was the bottleneck) >> >> That was amazing, although Flink's fault tolerance feature is not >> >> available with socketTextStream. >> >> >> >> Regards, >> >> Hironori >> >> >> >> 2016-03-08 21:36 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>: >> >> > Hi, >> >> > Another interesting test would be a combination of 3) and 2). I.e. no >> >> > JSON parsing and no sink. This would show what the raw throughput >> can be >> >> > before being slowed down by writing to Elasticsearch. >> >> > >> >> > Also .print() is also not feasible for production since it just >> prints >> >> > every element to the stdout log on the TaskManagers, which itself >> can cause >> >> > quite a slowdown. You could try: >> >> > >> >> > datastream.addSink(new DiscardingSink()) >> >> > >> >> > which is a dummy sink that does nothing. >> >> > >> >> > Cheers, >> >> > Aljoscha >> >> >> On 08 Mar 2016, at 13:31, おぎばやしひろのり <ogibaya...@gmail.com> wrote: >> >> >> >> >> >> Stephan, >> >> >> >> >> >> Sorry for the delay in my response. >> >> >> I tried 3 cases you suggested. >> >> >> >> >> >> This time, I set parallelism to 1 for simpicity. >> >> >> >> >> >> 0) base performance (same as the first e-mail): 1,480msg/sec >> >> >> 1) Disable checkpointing : almost same as 0) >> >> >> 2) No ES sink. just print() : 1,510msg/sec >> >> >> 3) JSON to TSV : 8,000msg/sec >> >> >> >> >> >> So, as you can see, the bottleneck was JSON parsing. I also want to >> >> >> try eliminating Kafka to see >> >> >> if there is a room to improve performance.(Currently, I am using >> >> >> FlinkKafkaConsumer082 with Kafka 0.9 >> >> >> I think I should try Flink 1.0 and FlinkKafkaConsumer09). >> >> >> Anyway, I think 8
Re: Flink streaming throughput
Robert, Thank you for your response. I would like to try kafka-console-consumer but I have no idea about how to measure the consuming throughput. Are there any standard way? I would also try Kafka broker on physical servers. Regarding version, I have upgraded to Flink 1.0.0 and replaced FlinkKafkaConsumer 082 with 09, but did not see any difference in performance. Regards, Hironori 2016-03-11 23:25 GMT+09:00 Robert Metzger <rmetz...@apache.org>: > Hi Hironori, > > can you try with the kafka-console-consumer how many messages you can read > in one minute? > Maybe the broker's disk I/O is limited because everything is running in > virtual machines (potentially sharing one hard disk?) > I'm also not sure if running a Kafka 0.8 consumer against a 0.9 broker is > working as expected. > > Our Kafka 0.8 consumer has been tested in environments where its reading > with more than 100 MB/s per from a broker. > > > On Fri, Mar 11, 2016 at 9:33 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote: >> >> Aljoscha, >> >> Thank you for your response. >> >> I tried no JSON parsing and no sink (DiscardingSink) case. The >> throughput was 8228msg/sec. >> Slightly better than JSON + Elasticsearch case. >> I also tried using socketTextStream instead of FlinkKafkaConsumer, in >> that case, the result was >> 60,000 msg/sec with just 40% flink TaskManager CPU Usage. (socket >> server was the bottleneck) >> That was amazing, although Flink's fault tolerance feature is not >> available with socketTextStream. >> >> Regards, >> Hironori >> >> 2016-03-08 21:36 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>: >> > Hi, >> > Another interesting test would be a combination of 3) and 2). I.e. no >> > JSON parsing and no sink. This would show what the raw throughput can be >> > before being slowed down by writing to Elasticsearch. >> > >> > Also .print() is also not feasible for production since it just prints >> > every element to the stdout log on the TaskManagers, which itself can cause >> > quite a slowdown. You could try: >> > >> > datastream.addSink(new DiscardingSink()) >> > >> > which is a dummy sink that does nothing. >> > >> > Cheers, >> > Aljoscha >> >> On 08 Mar 2016, at 13:31, おぎばやしひろのり <ogibaya...@gmail.com> wrote: >> >> >> >> Stephan, >> >> >> >> Sorry for the delay in my response. >> >> I tried 3 cases you suggested. >> >> >> >> This time, I set parallelism to 1 for simpicity. >> >> >> >> 0) base performance (same as the first e-mail): 1,480msg/sec >> >> 1) Disable checkpointing : almost same as 0) >> >> 2) No ES sink. just print() : 1,510msg/sec >> >> 3) JSON to TSV : 8,000msg/sec >> >> >> >> So, as you can see, the bottleneck was JSON parsing. I also want to >> >> try eliminating Kafka to see >> >> if there is a room to improve performance.(Currently, I am using >> >> FlinkKafkaConsumer082 with Kafka 0.9 >> >> I think I should try Flink 1.0 and FlinkKafkaConsumer09). >> >> Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of >> >> Flink's scalability and fault tolerance. >> >> Thank you for your advice. >> >> >> >> Regards, >> >> Hironori Ogibayashi >> >> >> >> 2016-02-26 21:46 GMT+09:00 おぎばやしひろのり <ogibaya...@gmail.com>: >> >>> Stephan, >> >>> >> >>> Thank you for your quick response. >> >>> I will try and post the result later. >> >>> >> >>> Regards, >> >>> Hironori >> >>> >> >>> 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>: >> >>>> Hi! >> >>>> >> >>>> I would try and dig bit by bit into what the bottleneck is: >> >>>> >> >>>> 1) Disable the checkpointing, see what difference that makes >> >>>> 2) Use a dummy sink (discarding) rather than elastic search, to see >> >>>> if that >> >>>> is limiting >> >>>> 3) Check the JSON parsing. Many JSON libraries are very CPU intensive >> >>>> and >> >>>> easily dominate the entire pipeline. >> >>>> >> >>>> Greetings, >> >>>> Stephan >> >>>> >> >>>> >> >>>> On Fri, Feb 26, 2016 at 11:23
Re: Flink streaming throughput
Aljoscha, Thank you for your response. I tried no JSON parsing and no sink (DiscardingSink) case. The throughput was 8228msg/sec. Slightly better than JSON + Elasticsearch case. I also tried using socketTextStream instead of FlinkKafkaConsumer, in that case, the result was 60,000 msg/sec with just 40% flink TaskManager CPU Usage. (socket server was the bottleneck) That was amazing, although Flink's fault tolerance feature is not available with socketTextStream. Regards, Hironori 2016-03-08 21:36 GMT+09:00 Aljoscha Krettek <aljos...@apache.org>: > Hi, > Another interesting test would be a combination of 3) and 2). I.e. no JSON > parsing and no sink. This would show what the raw throughput can be before > being slowed down by writing to Elasticsearch. > > Also .print() is also not feasible for production since it just prints every > element to the stdout log on the TaskManagers, which itself can cause quite a > slowdown. You could try: > > datastream.addSink(new DiscardingSink()) > > which is a dummy sink that does nothing. > > Cheers, > Aljoscha >> On 08 Mar 2016, at 13:31, おぎばやしひろのり <ogibaya...@gmail.com> wrote: >> >> Stephan, >> >> Sorry for the delay in my response. >> I tried 3 cases you suggested. >> >> This time, I set parallelism to 1 for simpicity. >> >> 0) base performance (same as the first e-mail): 1,480msg/sec >> 1) Disable checkpointing : almost same as 0) >> 2) No ES sink. just print() : 1,510msg/sec >> 3) JSON to TSV : 8,000msg/sec >> >> So, as you can see, the bottleneck was JSON parsing. I also want to >> try eliminating Kafka to see >> if there is a room to improve performance.(Currently, I am using >> FlinkKafkaConsumer082 with Kafka 0.9 >> I think I should try Flink 1.0 and FlinkKafkaConsumer09). >> Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of >> Flink's scalability and fault tolerance. >> Thank you for your advice. >> >> Regards, >> Hironori Ogibayashi >> >> 2016-02-26 21:46 GMT+09:00 おぎばやしひろのり <ogibaya...@gmail.com>: >>> Stephan, >>> >>> Thank you for your quick response. >>> I will try and post the result later. >>> >>> Regards, >>> Hironori >>> >>> 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>: >>>> Hi! >>>> >>>> I would try and dig bit by bit into what the bottleneck is: >>>> >>>> 1) Disable the checkpointing, see what difference that makes >>>> 2) Use a dummy sink (discarding) rather than elastic search, to see if that >>>> is limiting >>>> 3) Check the JSON parsing. Many JSON libraries are very CPU intensive and >>>> easily dominate the entire pipeline. >>>> >>>> Greetings, >>>> Stephan >>>> >>>> >>>> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote: >>>>> >>>>> Hello, >>>>> >>>>> I started evaluating Flink and tried simple performance test. >>>>> The result was just about 4000 messages/sec with 300% CPU usage. I >>>>> think this is quite low and wondering if it is a reasonable result. >>>>> If someone could check it, it would be great. >>>>> >>>>> Here is the detail: >>>>> >>>>> [servers] >>>>> - 3 Kafka broker with 3 partitions >>>>> - 3 Flink TaskManager + 1 JobManager >>>>> - 1 Elasticsearch >>>>> All of them are separate VM with 8vCPU, 8GB memory >>>>> >>>>> [test case] >>>>> The application counts access log by URI with in 1 minute window and >>>>> send the result to Elasticsearch. The actual code is below. >>>>> I used '-p 3' option to flink run command, so the task was distributed >>>>> to 3 TaskManagers. >>>>> In the test, I sent about 5000 logs/sec to Kafka. >>>>> >>>>> [result] >>>>> - From Elasticsearch records, the total access count for all URI was >>>>> about 260,000/min = 4300/sec. This is the entire throughput. >>>>> - Kafka consumer lag was keep growing. >>>>> - The CPU usage of each TaskManager machine was about 13-14%. From top >>>>> command output, Flink java process was using 100%(1 CPU full) >>>>> >>>>> So I thought the bottleneck here was CPU used by Flink Tasks. >>>>> >>>>> Here is the application code. >&g
Re: Flink streaming throughput
Stephan, Sorry for the delay in my response. I tried 3 cases you suggested. This time, I set parallelism to 1 for simpicity. 0) base performance (same as the first e-mail): 1,480msg/sec 1) Disable checkpointing : almost same as 0) 2) No ES sink. just print() : 1,510msg/sec 3) JSON to TSV : 8,000msg/sec So, as you can see, the bottleneck was JSON parsing. I also want to try eliminating Kafka to see if there is a room to improve performance.(Currently, I am using FlinkKafkaConsumer082 with Kafka 0.9 I think I should try Flink 1.0 and FlinkKafkaConsumer09). Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of Flink's scalability and fault tolerance. Thank you for your advice. Regards, Hironori Ogibayashi 2016-02-26 21:46 GMT+09:00 おぎばやしひろのり <ogibaya...@gmail.com>: > Stephan, > > Thank you for your quick response. > I will try and post the result later. > > Regards, > Hironori > > 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>: >> Hi! >> >> I would try and dig bit by bit into what the bottleneck is: >> >> 1) Disable the checkpointing, see what difference that makes >> 2) Use a dummy sink (discarding) rather than elastic search, to see if that >> is limiting >> 3) Check the JSON parsing. Many JSON libraries are very CPU intensive and >> easily dominate the entire pipeline. >> >> Greetings, >> Stephan >> >> >> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote: >>> >>> Hello, >>> >>> I started evaluating Flink and tried simple performance test. >>> The result was just about 4000 messages/sec with 300% CPU usage. I >>> think this is quite low and wondering if it is a reasonable result. >>> If someone could check it, it would be great. >>> >>> Here is the detail: >>> >>> [servers] >>> - 3 Kafka broker with 3 partitions >>> - 3 Flink TaskManager + 1 JobManager >>> - 1 Elasticsearch >>> All of them are separate VM with 8vCPU, 8GB memory >>> >>> [test case] >>> The application counts access log by URI with in 1 minute window and >>> send the result to Elasticsearch. The actual code is below. >>> I used '-p 3' option to flink run command, so the task was distributed >>> to 3 TaskManagers. >>> In the test, I sent about 5000 logs/sec to Kafka. >>> >>> [result] >>> - From Elasticsearch records, the total access count for all URI was >>> about 260,000/min = 4300/sec. This is the entire throughput. >>> - Kafka consumer lag was keep growing. >>> - The CPU usage of each TaskManager machine was about 13-14%. From top >>> command output, Flink java process was using 100%(1 CPU full) >>> >>> So I thought the bottleneck here was CPU used by Flink Tasks. >>> >>> Here is the application code. >>> --- >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> env.enableCheckpointing(1000) >>> ... >>> val stream = env >>> .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new >>> SimpleStringSchema(), properties)) >>> .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String, >>> AnyRef]] } >>> .map{ x => x.get("uri") match { >>> case Some(y) => (y.asInstanceOf[String],1) >>> case None => ("", 1) >>> }} >>> .keyBy(0) >>> .timeWindow(Time.of(1, TimeUnit.MINUTES)) >>> .sum(1) >>> .map{ x => (System.currentTimeMillis(), x)} >>> .addSink(new ElasticsearchSink(config, transports, new >>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]] { >>> override def createIndexRequest(element: Tuple2[Long, >>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = { >>> val json = new HashMap[String, AnyRef] >>> json.put("@timestamp", new Timestamp(element._1)) >>> json.put("uri", element._2._1) >>> json.put("count", element._2._2: java.lang.Integer) >>> println("SENDING: " + element) >>> >>> Requests.indexRequest.index("dummy2").`type`("my-type").source(json) >>> } >>> })) >>> --- >>> >>> Regards, >>> Hironori Ogibayashi >> >>
Re: Flink streaming throughput
Stephan, Thank you for your quick response. I will try and post the result later. Regards, Hironori 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>: > Hi! > > I would try and dig bit by bit into what the bottleneck is: > > 1) Disable the checkpointing, see what difference that makes > 2) Use a dummy sink (discarding) rather than elastic search, to see if that > is limiting > 3) Check the JSON parsing. Many JSON libraries are very CPU intensive and > easily dominate the entire pipeline. > > Greetings, > Stephan > > > On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <ogibaya...@gmail.com> wrote: >> >> Hello, >> >> I started evaluating Flink and tried simple performance test. >> The result was just about 4000 messages/sec with 300% CPU usage. I >> think this is quite low and wondering if it is a reasonable result. >> If someone could check it, it would be great. >> >> Here is the detail: >> >> [servers] >> - 3 Kafka broker with 3 partitions >> - 3 Flink TaskManager + 1 JobManager >> - 1 Elasticsearch >> All of them are separate VM with 8vCPU, 8GB memory >> >> [test case] >> The application counts access log by URI with in 1 minute window and >> send the result to Elasticsearch. The actual code is below. >> I used '-p 3' option to flink run command, so the task was distributed >> to 3 TaskManagers. >> In the test, I sent about 5000 logs/sec to Kafka. >> >> [result] >> - From Elasticsearch records, the total access count for all URI was >> about 260,000/min = 4300/sec. This is the entire throughput. >> - Kafka consumer lag was keep growing. >> - The CPU usage of each TaskManager machine was about 13-14%. From top >> command output, Flink java process was using 100%(1 CPU full) >> >> So I thought the bottleneck here was CPU used by Flink Tasks. >> >> Here is the application code. >> --- >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> env.enableCheckpointing(1000) >> ... >> val stream = env >> .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new >> SimpleStringSchema(), properties)) >> .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String, >> AnyRef]] } >> .map{ x => x.get("uri") match { >> case Some(y) => (y.asInstanceOf[String],1) >> case None => ("", 1) >> }} >> .keyBy(0) >> .timeWindow(Time.of(1, TimeUnit.MINUTES)) >> .sum(1) >> .map{ x => (System.currentTimeMillis(), x)} >> .addSink(new ElasticsearchSink(config, transports, new >> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]] { >> override def createIndexRequest(element: Tuple2[Long, >> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = { >> val json = new HashMap[String, AnyRef] >> json.put("@timestamp", new Timestamp(element._1)) >> json.put("uri", element._2._1) >> json.put("count", element._2._2: java.lang.Integer) >> println("SENDING: " + element) >> >> Requests.indexRequest.index("dummy2").`type`("my-type").source(json) >> } >> })) >> --- >> >> Regards, >> Hironori Ogibayashi > >
Flink streaming throughput
Hello, I started evaluating Flink and tried simple performance test. The result was just about 4000 messages/sec with 300% CPU usage. I think this is quite low and wondering if it is a reasonable result. If someone could check it, it would be great. Here is the detail: [servers] - 3 Kafka broker with 3 partitions - 3 Flink TaskManager + 1 JobManager - 1 Elasticsearch All of them are separate VM with 8vCPU, 8GB memory [test case] The application counts access log by URI with in 1 minute window and send the result to Elasticsearch. The actual code is below. I used '-p 3' option to flink run command, so the task was distributed to 3 TaskManagers. In the test, I sent about 5000 logs/sec to Kafka. [result] - From Elasticsearch records, the total access count for all URI was about 260,000/min = 4300/sec. This is the entire throughput. - Kafka consumer lag was keep growing. - The CPU usage of each TaskManager machine was about 13-14%. From top command output, Flink java process was using 100%(1 CPU full) So I thought the bottleneck here was CPU used by Flink Tasks. Here is the application code. --- val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(1000) ... val stream = env .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new SimpleStringSchema(), properties)) .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String, AnyRef]] } .map{ x => x.get("uri") match { case Some(y) => (y.asInstanceOf[String],1) case None => ("", 1) }} .keyBy(0) .timeWindow(Time.of(1, TimeUnit.MINUTES)) .sum(1) .map{ x => (System.currentTimeMillis(), x)} .addSink(new ElasticsearchSink(config, transports, new IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]] { override def createIndexRequest(element: Tuple2[Long, Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = { val json = new HashMap[String, AnyRef] json.put("@timestamp", new Timestamp(element._1)) json.put("uri", element._2._1) json.put("count", element._2._2: java.lang.Integer) println("SENDING: " + element) Requests.indexRequest.index("dummy2").`type`("my-type").source(json) } })) --- Regards, Hironori Ogibayashi