Orel Shai created KAFKA-6553: -------------------------------- Summary: Consumer consumed committed messages Key: KAFKA-6553 URL: https://issues.apache.org/jira/browse/KAFKA-6553 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.10.2.0 Reporter: Orel Shai
Hi, We're using consumer kafka client 0.10.2.0 (that is working against Kafka broker 0.10.0) with the following configuration: {code:java} props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024); props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 16 * 1024); props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName()); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000"); props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000"); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100"); {code} So as you can see we're using autocommit. The consumer API version that we're using has a dedicated thread for doing autocommit ,so every one second we have an autocommit which means that we have an heartbeat every one second. For some reason we're getting the same message lots of times. While looking at our logs I can see the following: {code:java} 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-15 to the committed offset 352878 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-3 to the committed offset 352458 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-19 to the committed offset 353775 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-23 to the committed offset 352171 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-7 to the committed offset 352995 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-27 to the committed offset 352531 2018-02-11 10:56:24,655 DEBUG [ThreadPoolTaskExecutor-2] Resetting offset for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-11 to the committed offset 351893 2018-02-11 10:56:24,656 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for misc.ha.UpdateNodeGroup.UpdateNodeTopic-23 at offset 352171 since the current position is 352205 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for misc.ha.UpdateNodeGroup.UpdateNodeTopic-11 at offset 351893 since the current position is 351929 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-26 since it is no longer fetchable 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-17 since it is no longer fetchable 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-29 since it is no longer fetchable 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-5 since it is no longer fetchable 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-8 since it is no longer fetchable 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-20 since it is no longer fetchable 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-2 since it is no longer fetchable 2018-02-11 10:56:24,657 DEBUG [ThreadPoolTaskExecutor-2] Ignoring fetched records for partition misc.ha.UpdateNodeGroup.UpdateNodeTopic-14 since it is no longer fetchable {code} Consumer connection log: {code:java} 2018-02-12 08:18:13,506 DEBUG [DefaultThreadPool-9] Starting the Kafka consumer 2018-02-12 08:18:13,507 INFO [DefaultThreadPool-9] ConsumerConfig values: auto.commit.interval.ms = 1000 auto.offset.reset = latest bootstrap.servers = [list of servers] check.crcs = true client.id = 2cd03a2b-f040-4f7f-b20c-ce3fe5efbe00 connections.max.idle.ms = 540000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = UpdateNode.snbrepo.new heartbeat.interval.ms = 23333 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 16384 max.poll.interval.ms = 300000 max.poll.records = 100 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RoundRobinAssignor] receive.buffer.bytes = 65536 reconnect.backoff.ms = 50 request.timeout.ms = 100000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 70000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = <propriety deserializer> {code} Do you know what might be the cause for it? Also , the processing time of the message may take more than the request timeout . If we're doing auto commit then it counts as heartbeat? Is there going to be any rebalance in such cases? Thanks! Orel -- This message was sent by Atlassian JIRA (v7.6.3#76005)