Hi All, Iam using kafka consumer in our application integrated with Camel(Camel-Kafka). We consume the messages and send the data to the server for processing.
I have defined a topic "XYZ" defined with 30 partitions and I have assigned 15 as consumer count on each consumer node (total 2 instances). *CamelConsumer Configuration* kafka.consumersCount=15 kafka.consumerStreams=15 I see from the logs that when the consumer starts, there are 15 consumer threads (lets say on 1 node), which is good as configured. INFO Camel (camel-1) thread #2 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 0 to topic XYZ INFO Camel (camel-1) thread #3 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 1 to topic XYZ INFO Camel (camel-1) thread #4 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 2 to topic XYZ INFO Camel (camel-1) thread #5 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 3 to topic XYZ INFO Camel (camel-1) thread #6 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 4 to topic XYZ INFO Camel (camel-1) thread #7 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 5 to topic XYZ INFO Camel (camel-1) thread #8 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 6 to topic XYZ INFO Camel (camel-1) thread #9 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 7 to topic XYZ INFO Camel (camel-1) thread #10 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 8 to topic XYZ INFO Camel (camel-1) thread #11 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 9 to topic XYZ INFO Camel (camel-1) thread #12 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 10 to topic XYZ INFO Camel (camel-1) thread #13 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 11 to topic XYZ INFO Camel (camel-1) thread #14 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 12 to topic XYZ INFO Camel (camel-1) thread #15 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 13 to topic XYZ INFO Camel (camel-1) thread #16 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 14 to topic XYZ If server stops responding due to network issue or any other scenario when the server is unavailable. All the kafka consumers starts unsubscribing which is again an expected behavior (so far good) *Note:* We have also defined the Camel ThrottlingExceptionRoutePolicy 1. Is invoked when the server is unavailable. 2. Performs a health check call on the server periodically. Once the server is back and available, i see that not all 15 consumer threads are active, but only 1. >From the logs below, I observe that its getting subscribed and unsubscribed one by one and finally the application runs with only a single consumer count. INFO Camel (camel-1) thread #17 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 0 to topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefinedauto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [ListOfDefinedServers] check.crcs = trueclient.id = connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800fetch.max.wait.ms = 500 fetch.min.bytes = 1group.id = XYZ-GroupId-12345heartbeat.interval.ms = 3000 interceptor.classes = null internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576max.poll.interval.ms = 300000 max.poll.records = 500metadata.max.age.ms = 5000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFOmetrics.sample.window.ms = 30000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 40000retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = SSL send.buffer.bytes = 131072session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = HTTPS ssl.key.password = [hidden] ssl.keymanager.algorithm = SunX509 ssl.keystore.location = cert.jks ssl.keystore.password = [hidden] ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = cert.jks ssl.truststore.password = [hidden] ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer INFO Camel (camel-1) thread #17 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 0 from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #18 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 1 to topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #18 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 1 from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #19 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 2 to topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #19 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 2 from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #20 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 3 to topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #20 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 3 from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #21 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 4 to topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #21 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 4 from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #22 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 5 to topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #22 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 5 from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #23 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 6 to topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #23 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 6 from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #24 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 7 to topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #24 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 7 from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #25 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 8 to topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #25 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 8 from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #26 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 9 to topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #26 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 9 from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #27 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 10 to topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #27 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 10 from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #28 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 11 to topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #28 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 11 from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #29 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 12 to topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #29 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 12 from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #30 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 13 to topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #30 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:351)] Unsubscribing XYZ-Thread 13 from topic XYZ INFO [kafka.clients.consumer.ConsumerConfig(logAll:238)] ConsumerConfig values: Prints all the consumer config values defined/undefined INFO Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [camel.component.kafka.KafkaConsumer(doRun:222)] Subscribing XYZ-Thread 14 to topic XYZ INFO Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.AbstractCoordinator(info:341)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] Discovered group coordinator servername (id: 2147482644 rack: null) INFO Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.ConsumerCoordinator(info:341)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] Revoking previously assigned partitions [] INFO Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.AbstractCoordinator(info:336)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] (Re-)joining group INFO Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.AbstractCoordinator(info:341)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] Successfully joined group with generation 1 INFO Camel (camel-1) thread #31 - KafkaConsumer[XYZ] [clients.consumer.internals.ConsumerCoordinator(info:341)] [Consumer clientId=consumer-30, groupId=XYZ-GroupId-12345] Setting newly assigned partitions [XYZ-17, XYZ-19, XYZ-13, XYZ-15, XYZ-25, XYZ-27, XYZ-21, XYZ-23, XYZ-1, XYZ-3, XYZ-28, XYZ-9, XYZ-11, XYZ-5, XYZ-7, XYZ-16, XYZ-18, XYZ-12, XYZ-14, XYZ-24, XYZ-26, XYZ-20, XYZ-22, XYZ-0, XYZ-2, XYZ-29, XYZ-8, XYZ-10, XYZ-4, XYZ-6] Any idea why is camel not running with the defined consumer count, rather runs with only a single consumer. This affects the processing speed of the consumer application when the load is more. Any help or suggestion what is going wrong here ? I expect 15 consumer threads running after the failure which is as defined in the camel kafka config. I have also tried this behavior with the latest version of camel kafka 2.24.2, still the same issue. -Regards Srikant Mantha