Hello,
I started evaluating Flink and tried simple performance test.
The result was just about 4000 messages/sec with 300% CPU usage. I
think this is quite low and wondering if it is a reasonable result.
If someone could check it, it would be great.
Here is the detail:
[servers]
- 3 Kafka broker with 3 partitions
- 3 Flink TaskManager + 1 JobManager
- 1 Elasticsearch
All of them are separate VM with 8vCPU, 8GB memory
[test case]
The application counts access log by URI with in 1 minute window and
send the result to Elasticsearch. The actual code is below.
I used '-p 3' option to flink run command, so the task was distributed
to 3 TaskManagers.
In the test, I sent about 5000 logs/sec to Kafka.
[result]
- From Elasticsearch records, the total access count for all URI was
about 260,000/min = 4300/sec. This is the entire throughput.
- Kafka consumer lag was keep growing.
- The CPU usage of each TaskManager machine was about 13-14%. From top
command output, Flink java process was using 100%(1 CPU full)
So I thought the bottleneck here was CPU used by Flink Tasks.
Here is the application code.
---
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(1000)
...
val stream = env
.addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new
SimpleStringSchema(), properties))
.map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String, AnyRef]] }
.map{ x => x.get("uri") match {
case Some(y) => (y.asInstanceOf[String],1)
case None => ("", 1)
}}
.keyBy(0)
.timeWindow(Time.of(1, TimeUnit.MINUTES))
.sum(1)
.map{ x => (System.currentTimeMillis(), x)}
.addSink(new ElasticsearchSink(config, transports, new
IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]] {
override def createIndexRequest(element: Tuple2[Long,
Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
val json = new HashMap[String, AnyRef]
json.put("@timestamp", new Timestamp(element._1))
json.put("uri", element._2._1)
json.put("count", element._2._2: java.lang.Integer)
println("SENDING: " + element)
Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
}
}))
---
Regards,
Hironori Ogibayashi