Hi,
   I've recently started using kafka to read documents coming through a web 
crawler. What I'm noticing is when I'm dealing with few million documents, the 
consumer is processing the same message over and over again. Looks like the 
data is not getting committed for some reason. This is not the case when I'm 
testing the consumer with few hundred message.

I'm using kafka high level consumer client code in java. I'm using consumer 
group running on number of threads equivalent to number of partitions. So each 
thread is deciated to a partition. Here's a code snippet for polling data.
while(true){try{if(consumerDao.canPollTopic()){ConsumerRecordsrecords 
=consumer.poll(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_COUNT));for(ConsumerRecordrecord
 :records){if(record.value()!=null){TextAnalysisRequesttextAnalysisObj 
=record.value();if(textAnalysisObj!=null){PostProcessRequestreq 
=newPostProcessRequest();req.setRequest(this.getRequest(textAnalysisObj));PreProcessorUtil.submitPostProcessRequest(req,config);}}}}else{Thread.sleep(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_SLEEP));}}catch(Exceptionex){LOGGER.error("Error
 in Full Consumer group worker",ex);
}}
Here's the kafka consumer configuration parameters I'm setting. Rest are 
default values.
consumer.auto.commit=true
consumer.auto.commit.interval=1000
consumer.session.timeout=180000
consumer.poll.records=2147483647
consumer.request.timeout=181000

Here's the complete consumer config:

metric.reporters =metadata.max.age.ms =300000partition.assignment.strategy 
=[org.apache.kafka.clients.consumer.RangeAssignor]reconnect.backoff.ms 
=50sasl.kerberos.ticket.renew.window.factor =0.8max.partition.fetch.bytes 
=1048576bootstrap.servers =[kafkahost1:9092,kafkahost2:9092]ssl.keystore.type 
=JKS
enable.auto.commit =truesasl.mechanism =GSSAPI
interceptor.classes =nullexclude.internal.topics =truessl.truststore.password 
=nullclient.id =ssl.endpoint.identification.algorithm =nullmax.poll.records 
=2147483647check.crcs =truerequest.timeout.ms =181000heartbeat.interval.ms 
=3000auto.commit.interval.ms =1000receive.buffer.bytes 
=65536ssl.truststore.type =JKS
ssl.truststore.location =nullssl.keystore.password =nullfetch.min.bytes 
=1send.buffer.bytes =131072value.deserializer 
=classcom.test.preprocessor.consumer.serializer.KryoObjectSerializergroup.id 
=full_group
retry.backoff.ms =100sasl.kerberos.kinit.cmd =/usr/bin/kinit
sasl.kerberos.service.name =nullsasl.kerberos.ticket.renew.jitter 
=0.05ssl.trustmanager.algorithm =PKIX
ssl.key.password =nullfetch.max.wait.ms 
=500sasl.kerberos.min.time.before.relogin =60000connections.max.idle.ms 
=540000session.timeout.ms =180000metrics.num.samples =2key.deserializer 
=classorg.apache.kafka.common.serialization.StringDeserializerssl.protocol =TLS
ssl.provider =nullssl.enabled.protocols 
=[TLSv1.2,TLSv1.1,TLSv1]ssl.keystore.location =nullssl.cipher.suites 
=nullsecurity.protocol =PLAINTEXT
ssl.keymanager.algorithm =SunX509metrics.sample.window.ms 
=30000auto.offset.reset =latest
My sample kafka queue is having 8 partitions with 2 replication factor.
The log retention period in server.properties is setup as 168 hours.
log.retention.hours=168log.roll.hours=168
Not sure what I'm missing here. Any pointers will be appreciated.

Reply via email to