Hi,

 

I do see a delay of about 4 to 5 minutes for initial rebalance to trigger when 
using KafkaConsumer.subscribe(Pattern pattern, ConsumerRebalanceListener 
listener) signature to subscribe.

 

Due to this, none of the subscribers are fetching any messages, for that 
duration, although messages are present in the Kafka topic that my subscribers 
are subscribing to.

 

I am using Camel Kafka to subscribe to Kafka topics. I looked into Camel Kafka 
implementation to eliminate the possibility that Camel Kafka is possibly 
causing issues here.

 

When I switch to KafkaConsumer.subscribe(Collection<String> topics), it just 
works fine and no delays noticed.

 

I am using Kafka v2.0.1 with Scala v2.12 and Camel Kafka v2.23.x

 

Below is a portion of log lines showing rebalance delay. Any clue on how to go 
about identifying root cause?

 

[2019-03-15 15:52:08,143] INFO [Log 
partition=queue.REF_REF_TestInboundOrdered-15, 
dir=/kafka/kafka-logs-58c06252374b] Loading producer state till offset 0 with 
message format version 2 (kafka.log.Log)

[2019-03-15 15:52:08,143] INFO [Log 
partition=queue.REF_REF_TestInboundOrdered-15, 
dir=/kafka/kafka-logs-58c06252374b] Completed load of log with 1 segments, log 
start offset 0 and log end offset 0 in 1 ms (kafka.log.Log)

[2019-03-15 15:52:08,144] INFO Created log for partition 
queue.REF_REF_TestInboundOrdered-15 in /kafka/kafka-logs-58c06252374b with 
properties {compression.type -> producer, message.format.version -> 2.0-IV1, 
file.delete.delay.ms -> 60000, max.message.bytes -> 11534336, 
min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, 
message.downconversion.enable -> true, min.insync.replicas -> 1, 
segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, 
index.interval.bytes -> 4096, unclean.leader.election.enable -> false, 
retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> 
[delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, 
segment.bytes -> 262144000, retention.ms -> 604800000, 
message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes 
-> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)

[2019-03-15 15:52:08,144] INFO [Partition queue.REF_REF_TestInboundOrdered-15 
broker=1001] No checkpointed highwatermark is found for partition 
queue.REF_REF_TestInboundOrdered-15 (kafka.cluster.Partition)

[2019-03-15 15:52:08,144] INFO Replica loaded for partition 
queue.REF_REF_TestInboundOrdered-15 with initial high watermark 0 
(kafka.cluster.Replica)

[2019-03-15 15:52:08,144] INFO [Partition queue.REF_REF_TestInboundOrdered-15 
broker=1001] queue.REF_REF_TestInboundOrdered-15 starts at Leader Epoch 0 from 
offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)

[2019-03-15 15:52:08,149] INFO [ReplicaAlterLogDirsManager on broker 1001] 
Added fetcher for partitions List() (kafka.server.ReplicaAlterLogDirsManager)

[2019-03-15 15:56:09,739] INFO [GroupMetadataManager brokerId=1001] Removed 0 
expired offsets in 5 milliseconds. 
(kafka.coordinator.group.GroupMetadataManager)

[2019-03-15 15:56:25,030] INFO [GroupCoordinator 1001]: Preparing to rebalance 
group queue.REF_REF_TestInboundOrdered_GROUP with old generation 1 
(__consumer_offsets-23) (kafka.coordinator.group.GroupCoordinator)

[2019-03-15 15:56:25,103] INFO [GroupCoordinator 1001]: Stabilized group 
queue.REF_REF_TestInboundOrdered_GROUP generation 2 (__consumer_offsets-23) 
(kafka.coordinator.group.GroupCoordinator)

[2019-03-15 15:56:25,106] INFO [GroupCoordinator 1001]: Assignment received 
from leader for group queue.REF_REF_TestInboundOrdered_GROUP for generation 2 
(kafka.coordinator.group.GroupCoordinator)

 

Regards

Viswa Ramamoorthy

Reply via email to