Hi, I've recently started using kafka to read documents coming through a web crawler. What I'm noticing is when I'm dealing with few million documents, the consumer is processing the same message over and over again. Looks like the data is not getting committed for some reason. This is not the case when I'm testing the consumer with few hundred message.
I'm using kafka high level consumer client code in java. I'm using consumer group running on number of threads equivalent to number of partitions. So each thread is deciated to a partition. Here's a code snippet for polling data. while(true){try{if(consumerDao.canPollTopic()){ConsumerRecordsrecords =consumer.poll(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_COUNT));for(ConsumerRecordrecord :records){if(record.value()!=null){TextAnalysisRequesttextAnalysisObj =record.value();if(textAnalysisObj!=null){PostProcessRequestreq =newPostProcessRequest();req.setRequest(this.getRequest(textAnalysisObj));PreProcessorUtil.submitPostProcessRequest(req,config);}}}}else{Thread.sleep(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_SLEEP));}}catch(Exceptionex){LOGGER.error("Error in Full Consumer group worker",ex); }} Here's the kafka consumer configuration parameters I'm setting. Rest are default values. consumer.auto.commit=true consumer.auto.commit.interval=1000 consumer.session.timeout=180000 consumer.poll.records=2147483647 consumer.request.timeout=181000 Here's the complete consumer config: metric.reporters =metadata.max.age.ms =300000partition.assignment.strategy =[org.apache.kafka.clients.consumer.RangeAssignor]reconnect.backoff.ms =50sasl.kerberos.ticket.renew.window.factor =0.8max.partition.fetch.bytes =1048576bootstrap.servers =[kafkahost1:9092,kafkahost2:9092]ssl.keystore.type =JKS enable.auto.commit =truesasl.mechanism =GSSAPI interceptor.classes =nullexclude.internal.topics =truessl.truststore.password =nullclient.id =ssl.endpoint.identification.algorithm =nullmax.poll.records =2147483647check.crcs =truerequest.timeout.ms =181000heartbeat.interval.ms =3000auto.commit.interval.ms =1000receive.buffer.bytes =65536ssl.truststore.type =JKS ssl.truststore.location =nullssl.keystore.password =nullfetch.min.bytes =1send.buffer.bytes =131072value.deserializer =classcom.test.preprocessor.consumer.serializer.KryoObjectSerializergroup.id =full_group retry.backoff.ms =100sasl.kerberos.kinit.cmd =/usr/bin/kinit sasl.kerberos.service.name =nullsasl.kerberos.ticket.renew.jitter =0.05ssl.trustmanager.algorithm =PKIX ssl.key.password =nullfetch.max.wait.ms =500sasl.kerberos.min.time.before.relogin =60000connections.max.idle.ms =540000session.timeout.ms =180000metrics.num.samples =2key.deserializer =classorg.apache.kafka.common.serialization.StringDeserializerssl.protocol =TLS ssl.provider =nullssl.enabled.protocols =[TLSv1.2,TLSv1.1,TLSv1]ssl.keystore.location =nullssl.cipher.suites =nullsecurity.protocol =PLAINTEXT ssl.keymanager.algorithm =SunX509metrics.sample.window.ms =30000auto.offset.reset =latest My sample kafka queue is having 8 partitions with 2 replication factor. The log retention period in server.properties is setup as 168 hours. log.retention.hours=168log.roll.hours=168 Not sure what I'm missing here. Any pointers will be appreciated.