Hi all
I saw a very low rate when message consuming from kafka in our different
jobs.
I order to find the bottleneck i created
a very simple pipeline that reads string messages from kafka and just
prints
the output .
The pipeline runs over flink cluster with the following setup:
1 task manager, 3 slots, parallelism set to 3
PCollection<KV<String, String>> readFromKafka = pipeline.apply(
"readFromKafka",
KafkaTransform.readStrFromKafka(
pipelineUtil.getBootstrapServers(), topic_name,
consumer_group));
readFromKafka.apply("Get message contents", Values.<String>create())
.apply("Log messages", MapElements.into(TypeDescriptor.of(String.class))
.via(message -> {
log.atInfo().log("Received: {}", message);
return message;
}));
the kafka consumer is:
return KafkaIO.<String, String>read()
.withBootstrapServers(bootstrapServers)
.withTopic(topic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withConsumerConfigUpdates((ImmutableMap.of(
"auto.offset.reset", "earliest",
ConsumerConfig.GROUP_ID_CONFIG, consumerGroup)))
.withoutMetadata();
according to the metrics it seems that i do have 3 threads that read
from kafka but each one reads around 56 records per second.
per my opinion this is a very low rate.
I am not sure I understand this behaviour.
I have checked cpu and memory issues and they both look ok.
Any ideas would be really appreciated
Thanks alot
Sigalit