Hi

I have this strange problem: 4 task managers each with one task slot, attaching 
to the same Kafka topic which has 10 partitions.

When I post a single message to the Kafka topic it seems that all 4 consumers 
fetch the message and start processing (confirmed by TM logs).

If I run kafka-consumer-groups.sh  --describe --group TopicConsumers it says 
that only one message was posted to a single partition. Next message would 
generally go to another partition.

In addition, while the Flink jobs are running on the message, I start two 
kafka-console-consumer.sh and each would get only one message, as expected.

On start each of the Flink TM would post something that to me reads as if it 
would read from all partitions:

2017-11-17 15:03:38,688 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Got 10 
partitions from these topics: [TopicToConsume]
2017-11-17 15:03:38,689 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09  - Consumer is 
going to read the following topics (with number of partitions): TopicToConsume 
(10), 
2017-11-17 15:03:38,689 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer 
subtask 0 will start reading the following 10 partitions from the committed 
group offsets in Kafka: [KafkaTopicPartition{topic='TopicToConsume', 
partition=8}, KafkaTopicPartition{topic='TopicToConsume', partition=9}, 
KafkaTopicPartition{topic='TopicToConsume', partition=6}, 
KafkaTopicPartition{topic='TopicToConsume', partition=7}, 
KafkaTopicPartition{topic='TopicToConsume', partition=4}, 
KafkaTopicPartition{topic='TopicToConsume', partition=5}, 
KafkaTopicPartition{topic='TopicToConsume', partition=2}, 
KafkaTopicPartition{topic='TopicToConsume', partition=3}, 
KafkaTopicPartition{topic='TopicToConsume', partition=0}, 
KafkaTopicPartition{topic='TopicToConsume', partition=1}]
2017-11-17 15:03:38,699 INFO  org.apache.kafka.clients.consumer.ConsumerConfig  
            - ConsumerConfig values: 
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest



Any hints?


Reply via email to