Hi, I've recently started using kafka to read documents coming through a web crawler. What I'm noticing is when I'm dealing with few million documents, the consumer is processing the same message over and over again. Looks like the data is not getting committed for some reason. This is not the case when I'm testing the consumer with few hundred message.
I'm using kafka high level consumer client code in java. I'm using consumer group running on number of threads equivalent to number of partitions. Here's a code snippet for polling data. while (true) { try{ if(consumerDao.canPollTopic()){ ConsumerRecords<String, TextAnalysisRequest> records = consumer.poll(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_COUNT)); for (ConsumerRecord<String, TextAnalysisRequest> record : records) { if(record.value()!=null){ TextAnalysisRequest textAnalysisObj = record.value(); if(textAnalysisObj!=null){ PostProcessRequest req = new PostProcessRequest(); req.setRequest(this.getRequest(textAnalysisObj)); PreProcessorUtil.submitPostProcessRequest(req, config); } } } }else{ Thread.sleep(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_SLEEP)); } }catch(Exception ex){ LOGGER.error("Error in Full Consumer group worker", ex); } } Here's the kafka consumer configuration parameters I'm setting. Rest are default values. consumer.auto.commit=true consumer.auto.commit.interval=1000 consumer.session.timeout=180000 consumer.poll.records=2147483647 consumer.request.timeout=181000 Here's the complete consumer config: metric.reporters = [] metadata.max.age.ms = 300000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [kafkahost1:9092, kafkahost2:9092] ssl.keystore.type = JKS enable.auto.commit = true sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = ssl.endpoint.identification.algorithm = null max.poll.records = 2147483647 check.crcs = true request.timeout.ms = 181000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 1000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class com.test.preprocessor.consumer.serializer.KryoObjectSerializer group.id = full_group retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 180000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = latest My sample kafka queue is having 8 partitions with 2 replication factor. The log retention period in server.properties is setup as 168 hours. log.retention.hours=168 log.roll.hours=168 Not sure what I'm missing here. Any pointers will be appreciated. -Thanks, Shamik