Hi All,

Iam using kafka consumer in our application integrated with
Camel(Camel-Kafka). We consume the messages and send the data to the server
for processing.

I have defined a topic "XYZ" defined with 30 partitions and I have assigned
15 as consumer count on each consumer node (total 2 instances).

*CamelConsumer Configuration*

kafka.consumersCount=15
kafka.consumerStreams=15

I see from the logs that when the consumer starts, there are 15 consumer
threads (lets say on 1 node), which is good as configured.

INFO  Camel (camel-1) thread #2 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 0 to topic XYZ
INFO  Camel (camel-1) thread #3 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 1 to topic XYZ
INFO  Camel (camel-1) thread #4 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 2 to topic XYZ
INFO  Camel (camel-1) thread #5 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 3 to topic XYZ
INFO  Camel (camel-1) thread #6 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 4 to topic XYZ
INFO  Camel (camel-1) thread #7 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 5 to topic XYZ
INFO  Camel (camel-1) thread #8 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 6 to topic XYZ
INFO  Camel (camel-1) thread #9 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 7 to topic XYZ
INFO  Camel (camel-1) thread #10 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 8 to topic XYZ
INFO  Camel (camel-1) thread #11 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 9 to topic XYZ
INFO  Camel (camel-1) thread #12 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 10 to topic XYZ
INFO  Camel (camel-1) thread #13 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 11 to topic XYZ
INFO  Camel (camel-1) thread #14 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 12 to topic XYZ
INFO  Camel (camel-1) thread #15 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 13 to topic XYZ
INFO  Camel (camel-1) thread #16 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 14 to topic XYZ

If server stops responding due to network issue or any other scenario when
the server is unavailable. All the kafka consumers starts unsubscribing
which is again an expected behavior (so far good)

*Note:* We have also defined the Camel ThrottlingExceptionRoutePolicy
1. Is invoked when the server is unavailable.
2. Performs a health check call on the server periodically.

Once the server is back and available, i see that not all 15 consumer
threads are active, but only 1.
>From the logs below, I observe that its getting subscribed and unsubscribed
one by one and finally the application runs with only a single consumer
count.

INFO  Camel (camel-1) thread #17 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 0 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefinedauto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [ListOfDefinedServers]
check.crcs = trueclient.id = connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800fetch.max.wait.ms = 500
fetch.min.bytes = 1group.id = XYZ-GroupId-12345heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576max.poll.interval.ms = 300000
max.poll.records = 500metadata.max.age.ms = 5000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFOmetrics.sample.window.ms = 30000
partition.assignment.strategy =
[org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536reconnect.backoff.max.ms =
1000reconnect.backoff.ms = 50request.timeout.ms =
40000retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = SSL
send.buffer.bytes = 131072session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = HTTPS
ssl.key.password = [hidden]
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = cert.jks
ssl.keystore.password = [hidden]
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = cert.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
INFO  Camel (camel-1) thread #17 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing
XYZ-Thread 0 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined

INFO  Camel (camel-1) thread #18 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 1 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined
INFO  Camel (camel-1) thread #18 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing
XYZ-Thread 1 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined

INFO  Camel (camel-1) thread #19 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 2 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined
INFO  Camel (camel-1) thread #19 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing
XYZ-Thread 2 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined

INFO  Camel (camel-1) thread #20 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 3 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined
INFO  Camel (camel-1) thread #20 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing
XYZ-Thread 3 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined

INFO  Camel (camel-1) thread #21 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 4 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined
INFO  Camel (camel-1) thread #21 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing
XYZ-Thread 4 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined

INFO  Camel (camel-1) thread #22 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 5 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined
INFO  Camel (camel-1) thread #22 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing
XYZ-Thread 5 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined

INFO  Camel (camel-1) thread #23 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 6 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined
INFO  Camel (camel-1) thread #23 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing
XYZ-Thread 6 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined

INFO  Camel (camel-1) thread #24 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 7 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined
INFO  Camel (camel-1) thread #24 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing
XYZ-Thread 7 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined

INFO  Camel (camel-1) thread #25 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 8 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined
INFO  Camel (camel-1) thread #25 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing
XYZ-Thread 8 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined

INFO  Camel (camel-1) thread #26 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 9 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined
INFO  Camel (camel-1) thread #26 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing
XYZ-Thread 9 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined

INFO  Camel (camel-1) thread #27 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 10 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined
INFO  Camel (camel-1) thread #27 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing
XYZ-Thread 10 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined

INFO  Camel (camel-1) thread #28 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 11 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined
INFO  Camel (camel-1) thread #28 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing
XYZ-Thread 11 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined

INFO  Camel (camel-1) thread #29 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 12 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined
INFO  Camel (camel-1) thread #29 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing
XYZ-Thread 12 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined

INFO  Camel (camel-1) thread #30 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 13 to topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined
INFO  Camel (camel-1) thread #30 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing
XYZ-Thread 13 from topic XYZ
INFO  [kafka.clients.consumer.ConsumerConfig(logAll:238)]
ConsumerConfig values: Prints all the consumer config values
defined/undefined

INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ]
[camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing
XYZ-Thread 14 to topic XYZ

INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ]
[clients.consumer.internals.AbstractCoordinator(info:341)] [Consumer
clientId=consumer-30, groupId=XYZ-GroupId-12345] Discovered group
coordinator servername (id: 2147482644 rack: null)
INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ]
[clients.consumer.internals.ConsumerCoordinator(info:341)] [Consumer
clientId=consumer-30, groupId=XYZ-GroupId-12345] Revoking previously
assigned partitions []
INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ]
[clients.consumer.internals.AbstractCoordinator(info:336)] [Consumer
clientId=consumer-30, groupId=XYZ-GroupId-12345] (Re-)joining group
INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ]
[clients.consumer.internals.AbstractCoordinator(info:341)] [Consumer
clientId=consumer-30, groupId=XYZ-GroupId-12345] Successfully joined
group with generation 1
INFO  Camel (camel-1) thread #31 - KafkaConsumer[XYZ]
[clients.consumer.internals.ConsumerCoordinator(info:341)] [Consumer
clientId=consumer-30, groupId=XYZ-GroupId-12345] Setting newly
assigned partitions [XYZ-17, XYZ-19, XYZ-13, XYZ-15, XYZ-25, XYZ-27,
XYZ-21, XYZ-23, XYZ-1, XYZ-3, XYZ-28, XYZ-9, XYZ-11, XYZ-5, XYZ-7,
XYZ-16, XYZ-18, XYZ-12, XYZ-14, XYZ-24, XYZ-26, XYZ-20, XYZ-22, XYZ-0,
XYZ-2, XYZ-29, XYZ-8, XYZ-10, XYZ-4, XYZ-6]

Any idea why is camel not running with the defined consumer count, rather
runs with only a single consumer. This affects the processing speed of the
consumer application when the load is more.

Any help or suggestion what is going wrong here ?

I expect 15 consumer threads running after the failure which is as defined
in the camel kafka config.

I have also tried this behavior with the latest version of camel kafka
2.24.2, still the same issue.

-Regards

Srikant Mantha

Reply via email to