Hi,

  I've recently started using kafka to read documents coming through a web
crawler. What I'm noticing is when I'm dealing with few million documents,
the consumer is processing the same message over and over again. Looks like
the data is not getting committed for some reason. This is not the case
when I'm testing the consumer with few hundred message.

I'm using kafka high level consumer client code in java. I'm using consumer
group running on number of threads equivalent to number of partitions.
Here's a code snippet for polling data.

while (true) {
try{
if(consumerDao.canPollTopic()){
ConsumerRecords<String, TextAnalysisRequest> records =
consumer.poll(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_COUNT));
for (ConsumerRecord<String, TextAnalysisRequest> record : records) {
if(record.value()!=null){
TextAnalysisRequest textAnalysisObj = record.value();
if(textAnalysisObj!=null){
PostProcessRequest req = new PostProcessRequest();
req.setRequest(this.getRequest(textAnalysisObj));
PreProcessorUtil.submitPostProcessRequest(req, config);
}
}
}
}else{
Thread.sleep(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_SLEEP));
}
}catch(Exception ex){
LOGGER.error("Error in Full Consumer group worker", ex);
}
}

Here's the kafka consumer configuration parameters I'm setting. Rest are
default values.

consumer.auto.commit=true
consumer.auto.commit.interval=1000
consumer.session.timeout=180000
consumer.poll.records=2147483647
consumer.request.timeout=181000


Here's the complete consumer config:

metric.reporters = []
        metadata.max.age.ms = 300000
        partition.assignment.strategy =
[org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [kafkahost1:9092, kafkahost2:9092]
        ssl.keystore.type = JKS
        enable.auto.commit = true
        sasl.mechanism = GSSAPI
        interceptor.classes = null
        exclude.internal.topics = true
        ssl.truststore.password = null
        client.id =
        ssl.endpoint.identification.algorithm = null
        max.poll.records = 2147483647
        check.crcs = true
        request.timeout.ms = 181000
        heartbeat.interval.ms = 3000
        auto.commit.interval.ms = 1000
        receive.buffer.bytes = 65536
        ssl.truststore.type = JKS
        ssl.truststore.location = null
        ssl.keystore.password = null
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        value.deserializer = class
com.test.preprocessor.consumer.serializer.KryoObjectSerializer
        group.id = full_group
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        ssl.key.password = null
        fetch.max.wait.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        session.timeout.ms = 180000
        metrics.num.samples = 2
        key.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = PLAINTEXT
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        auto.offset.reset = latest

My sample kafka queue is having 8 partitions with 2 replication factor.

The log retention period in server.properties is setup as 168 hours.

log.retention.hours=168
log.roll.hours=168

Not sure what I'm missing here. Any pointers will be appreciated.

-Thanks,
Shamik

Reply via email to