Kashyap Ivaturi created KAFKA-7365:
--------------------------------------

             Summary: max.poll.records setting in Kafka Consumer is not working
                 Key: KAFKA-7365
                 URL: https://issues.apache.org/jira/browse/KAFKA-7365
             Project: Kafka
          Issue Type: Bug
          Components: consumer
            Reporter: Kashyap Ivaturi


Hi,

I have a requirement where I consume messages one by one, each message has 
additional processing that I should do and then manually commit the offset.

Things work well most of the times until I get a big bunch of records which 
takes longer time to process and I encounter CommitFailed exception for the 
last set of records even though they were processed. While i'am able to 
reconnect back its picking some messages that I had already processed. I don't 
want this to happen as its creating duplicates in target systems that I 
integrate with while processing the message.

 

I decided that even though there are more messages in the queue , I would like 
to have a control on how many records I can process when polled.

I tried to replicate a scenario where I have started the consumer by setting 
'max.poll.records' to '1' and then pushed 4 messages into the Topic the 
consumer is listening.

I expected that the consumer will only process 1 message because of my 
'max.poll.records' setting but the consumer has processed all the 4 messages in 
single poll. Any idea why did it not consider 'max.poll.records' setting or is 
some other setting overriding this setting?. Appreciate your help or guidance 
in troubleshooting this issue.

Here is the log of my Consumer config when it starts:

 

2018-08-28 08:29:47.873  INFO 91121 --- [           main] 
o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 

[auto.commit.interval.ms|https://auto.commit.interval.ms/] = 5000

auto.offset.reset = earliest

bootstrap.servers = [messaging-rtp3.cisco.com:9093]

check.crcs = true

[client.id|https://client.id/] = 

[connections.max.idle.ms|https://connections.max.idle.ms/] = 540000

enable.auto.commit = false

exclude.internal.topics = true

fetch.max.bytes = 52428800

[fetch.max.wait.ms|https://fetch.max.wait.ms/] = 500

fetch.min.bytes = 1

[group.id|https://group.id/] = empestor

[heartbeat.interval.ms|https://heartbeat.interval.ms/] = 3000

interceptor.classes = null

internal.leave.group.on.close = true

isolation.level = read_uncommitted

key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer

max.partition.fetch.bytes = 1048576

[max.poll.interval.ms|https://max.poll.interval.ms/] = 300000

max.poll.records = 1

[metadata.max.age.ms|https://metadata.max.age.ms/] = 300000

metric.reporters = []

metrics.num.samples = 2

metrics.recording.level = INFO

[metrics.sample.window.ms|https://metrics.sample.window.ms/] = 30000

partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]

receive.buffer.bytes = 65536

[reconnect.backoff.max.ms|https://reconnect.backoff.max.ms/] = 1000

[reconnect.backoff.ms|https://reconnect.backoff.ms/] = 50

[request.timeout.ms|https://request.timeout.ms/] = 40000

[retry.backoff.ms|https://retry.backoff.ms/] = 100

sasl.jaas.config = null

sasl.kerberos.kinit.cmd = /usr/bin/kinit

sasl.kerberos.min.time.before.relogin = 60000

sasl.kerberos.service.name = null

sasl.kerberos.ticket.renew.jitter = 0.05

sasl.kerberos.ticket.renew.window.factor = 0.8

sasl.mechanism = GSSAPI

security.protocol = SSL

send.buffer.bytes = 131072

[session.timeout.ms|https://session.timeout.ms/] = 10000

ssl.cipher.suites = null

ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]

ssl.endpoint.identification.algorithm = null

ssl.key.password = [hidden]

ssl.keymanager.algorithm = SunX509

ssl.keystore.location = 
/kafka/certs/empestor/certificates/kafka.client.empestor.keystore.jks

ssl.keystore.password = [hidden]

ssl.keystore.type = JKS

ssl.protocol = TLS

ssl.provider = null

ssl.secure.random.implementation = null

ssl.trustmanager.algorithm = PKIX

ssl.truststore.location = 
/kafka/certs/empestor/certificates/kafka.client.truststore.jks

ssl.truststore.password = [hidden]

ssl.truststore.type = JKS

value.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer

 

2018-08-28 08:29:48.079  INFO 91121 --- [           main] 
o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to