Kafka consumer picking up the same message multiple times

2016-09-27 Thread Shamik Bandopadhyay
Hi,

  I've recently started using kafka to read documents coming through a web
crawler. What I'm noticing is when I'm dealing with few million documents,
the consumer is processing the same message over and over again. Looks like
the data is not getting committed for some reason. This is not the case
when I'm testing the consumer with few hundred message.

I'm using kafka high level consumer client code in java. I'm using consumer
group running on number of threads equivalent to number of partitions.
Here's a code snippet for polling data.

while (true) {
try{
if(consumerDao.canPollTopic()){
ConsumerRecords records =
consumer.poll(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_COUNT));
for (ConsumerRecord record : records) {
if(record.value()!=null){
TextAnalysisRequest textAnalysisObj = record.value();
if(textAnalysisObj!=null){
PostProcessRequest req = new PostProcessRequest();
req.setRequest(this.getRequest(textAnalysisObj));
PreProcessorUtil.submitPostProcessRequest(req, config);
}
}
}
}else{
Thread.sleep(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_SLEEP));
}
}catch(Exception ex){
LOGGER.error("Error in Full Consumer group worker", ex);
}
}

Here's the kafka consumer configuration parameters I'm setting. Rest are
default values.

consumer.auto.commit=true
consumer.auto.commit.interval=1000
consumer.session.timeout=18
consumer.poll.records=2147483647
consumer.request.timeout=181000


Here's the complete consumer config:

metric.reporters = []
metadata.max.age.ms = 30
partition.assignment.strategy =
[org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [kafkahost1:9092, kafkahost2:9092]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 181000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 1000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class
com.test.preprocessor.consumer.serializer.KryoObjectSerializer
group.id = full_group
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 6
connections.max.idle.ms = 54
session.timeout.ms = 18
metrics.num.samples = 2
key.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 3
auto.offset.reset = latest

My sample kafka queue is having 8 partitions with 2 replication factor.

The log retention period in server.properties is setup as 168 hours.

log.retention.hours=168
log.roll.hours=168

Not sure what I'm missing here. Any pointers will be appreciated.

-Thanks,
Shamik


Re: Problem consuming message using custom serializer

2016-09-09 Thread Shamik Bandopadhyay
Anyone ?

On Tue, Sep 6, 2016 at 4:21 PM, Shamik Bandopadhyay <sham...@gmail.com>
wrote:

> Hi,
>
>   I'm trying to send java object using kryo object serializer . The
> producer is able to send the payload to the queue, but I'm having issues
> reading the data in consumer. I'm using consumer group using KafkaStream.
> The consumer code is based out of the example in kafka documentation.
> Here's the consumer code and the corresponding error:
>
> public void run(int a_numThreads) {
> Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> topicCountMap.put(topic, new Integer(a_numThreads));
> Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
> List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
> executor = Executors.newFixedThreadPool(a_numThreads);
> int threadNumber = 0;
> for (final KafkaStream stream : streams) {
> executor.submit(new ConsumerGroupSerializerObject(stream, threadNumber));
> threadNumber++;
> }
> }
>
> Inside ConsumerGroupSerializerObject's run method,
>
> private KafkaStream m_stream;
>
> public void run() {
> ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
> ByteArrayInputStream in = null;
> ObjectInputStream is = null;
> while (it.hasNext()){
> try{
> in = new ByteArrayInputStream(it.next().message());
> is = new ObjectInputStream(in);
> TextAnalysisRequest req = (TextAnalysisRequest)is.readObject();
> }catch(ClassNotFoundException ex){
> ex.printStackTrace();
> }catch(IOException ex){
> ex.printStackTrace();
> }finally{
> try{
> in.close();
> is.close();
> }catch(IOException ex){
> ex.printStackTrace();
> }
> }
> }
> }
>
> I'm getting exception at the following line:
>
> is = new ObjectInputStream(in);
>
> java.io.StreamCorruptedException: invalid stream header: 01746573
> at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806)
> at java.io.ObjectInputStream.(ObjectInputStream.java:299)
> at com.test.kafka.consumer.ConsumerGroupSerializerObject.run(
> ConsumerGroupSerializerObject.java:43)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Here's the property:
>
> Properties props = new Properties();
> props.put("zookeeper.connect", a_zookeeper);
> props.put("group.id", a_groupId);
> props.put("zookeeper.session.timeout.ms", "400");
> props.put("zookeeper.sync.time.ms", "200");
> props.put("auto.commit.interval.ms", "1000");
> props.put("auto.offset.reset", "smallest");
> props.put("key.deserializer", "org.apache.kafka.common.serialization.
> StringDeserializer");
> props.put("value.deserializer", KryoReadingSerializer.class.getName());
>
> I'm new to kafka, so not entirely sure if this is right approach of
> consuming message using custom serializer. Moreover, I'm using KafkaStream
> , can it be an issue as well ?
>
> Any pointers will be appreciated.
>
> Thanks,
> Shamik
>


Problem consuming message using custom serializer

2016-09-06 Thread Shamik Bandopadhyay
Hi,

  I'm trying to send java object using kryo object serializer . The
producer is able to send the payload to the queue, but I'm having issues
reading the data in consumer. I'm using consumer group using KafkaStream.
The consumer code is based out of the example in kafka documentation.
Here's the consumer code and the corresponding error:

public void run(int a_numThreads) {
Map topicCountMap = new HashMap();
topicCountMap.put(topic, new Integer(a_numThreads));
Map>> consumerMap =
consumer.createMessageStreams(topicCountMap);
List> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(a_numThreads);
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerGroupSerializerObject(stream, threadNumber));
threadNumber++;
}
}

Inside ConsumerGroupSerializerObject's run method,

private KafkaStream m_stream;

public void run() {
ConsumerIterator it = m_stream.iterator();
ByteArrayInputStream in = null;
ObjectInputStream is = null;
while (it.hasNext()){
try{
in = new ByteArrayInputStream(it.next().message());
is = new ObjectInputStream(in);
TextAnalysisRequest req = (TextAnalysisRequest)is.readObject();
}catch(ClassNotFoundException ex){
ex.printStackTrace();
}catch(IOException ex){
ex.printStackTrace();
}finally{
try{
in.close();
is.close();
}catch(IOException ex){
ex.printStackTrace();
}
}
}
}

I'm getting exception at the following line:

is = new ObjectInputStream(in);

java.io.StreamCorruptedException: invalid stream header: 01746573
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806)
at java.io.ObjectInputStream.(ObjectInputStream.java:299)
at
com.test.kafka.consumer.ConsumerGroupSerializerObject.run(ConsumerGroupSerializerObject.java:43)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Here's the property:

Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", KryoReadingSerializer.class.getName());

I'm new to kafka, so not entirely sure if this is right approach of
consuming message using custom serializer. Moreover, I'm using KafkaStream
, can it be an issue as well ?

Any pointers will be appreciated.

Thanks,
Shamik