Kafka consumer picking up the same message multiple times
Hi, I've recently started using kafka to read documents coming through a web crawler. What I'm noticing is when I'm dealing with few million documents, the consumer is processing the same message over and over again. Looks like the data is not getting committed for some reason. This is not the case when I'm testing the consumer with few hundred message. I'm using kafka high level consumer client code in java. I'm using consumer group running on number of threads equivalent to number of partitions. Here's a code snippet for polling data. while (true) { try{ if(consumerDao.canPollTopic()){ ConsumerRecordsrecords = consumer.poll(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_COUNT)); for (ConsumerRecord record : records) { if(record.value()!=null){ TextAnalysisRequest textAnalysisObj = record.value(); if(textAnalysisObj!=null){ PostProcessRequest req = new PostProcessRequest(); req.setRequest(this.getRequest(textAnalysisObj)); PreProcessorUtil.submitPostProcessRequest(req, config); } } } }else{ Thread.sleep(this.config.getPropertyAsIneger(IPreProcessorConstant.KAFKA_POLL_SLEEP)); } }catch(Exception ex){ LOGGER.error("Error in Full Consumer group worker", ex); } } Here's the kafka consumer configuration parameters I'm setting. Rest are default values. consumer.auto.commit=true consumer.auto.commit.interval=1000 consumer.session.timeout=18 consumer.poll.records=2147483647 consumer.request.timeout=181000 Here's the complete consumer config: metric.reporters = [] metadata.max.age.ms = 30 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [kafkahost1:9092, kafkahost2:9092] ssl.keystore.type = JKS enable.auto.commit = true sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = ssl.endpoint.identification.algorithm = null max.poll.records = 2147483647 check.crcs = true request.timeout.ms = 181000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 1000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class com.test.preprocessor.consumer.serializer.KryoObjectSerializer group.id = full_group retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 6 connections.max.idle.ms = 54 session.timeout.ms = 18 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 3 auto.offset.reset = latest My sample kafka queue is having 8 partitions with 2 replication factor. The log retention period in server.properties is setup as 168 hours. log.retention.hours=168 log.roll.hours=168 Not sure what I'm missing here. Any pointers will be appreciated. -Thanks, Shamik
Re: Problem consuming message using custom serializer
Anyone ? On Tue, Sep 6, 2016 at 4:21 PM, Shamik Bandopadhyay <sham...@gmail.com> wrote: > Hi, > > I'm trying to send java object using kryo object serializer . The > producer is able to send the payload to the queue, but I'm having issues > reading the data in consumer. I'm using consumer group using KafkaStream. > The consumer code is based out of the example in kafka documentation. > Here's the consumer code and the corresponding error: > > public void run(int a_numThreads) { > Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); > topicCountMap.put(topic, new Integer(a_numThreads)); > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = > consumer.createMessageStreams(topicCountMap); > List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); > executor = Executors.newFixedThreadPool(a_numThreads); > int threadNumber = 0; > for (final KafkaStream stream : streams) { > executor.submit(new ConsumerGroupSerializerObject(stream, threadNumber)); > threadNumber++; > } > } > > Inside ConsumerGroupSerializerObject's run method, > > private KafkaStream m_stream; > > public void run() { > ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); > ByteArrayInputStream in = null; > ObjectInputStream is = null; > while (it.hasNext()){ > try{ > in = new ByteArrayInputStream(it.next().message()); > is = new ObjectInputStream(in); > TextAnalysisRequest req = (TextAnalysisRequest)is.readObject(); > }catch(ClassNotFoundException ex){ > ex.printStackTrace(); > }catch(IOException ex){ > ex.printStackTrace(); > }finally{ > try{ > in.close(); > is.close(); > }catch(IOException ex){ > ex.printStackTrace(); > } > } > } > } > > I'm getting exception at the following line: > > is = new ObjectInputStream(in); > > java.io.StreamCorruptedException: invalid stream header: 01746573 > at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806) > at java.io.ObjectInputStream.(ObjectInputStream.java:299) > at com.test.kafka.consumer.ConsumerGroupSerializerObject.run( > ConsumerGroupSerializerObject.java:43) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > > Here's the property: > > Properties props = new Properties(); > props.put("zookeeper.connect", a_zookeeper); > props.put("group.id", a_groupId); > props.put("zookeeper.session.timeout.ms", "400"); > props.put("zookeeper.sync.time.ms", "200"); > props.put("auto.commit.interval.ms", "1000"); > props.put("auto.offset.reset", "smallest"); > props.put("key.deserializer", "org.apache.kafka.common.serialization. > StringDeserializer"); > props.put("value.deserializer", KryoReadingSerializer.class.getName()); > > I'm new to kafka, so not entirely sure if this is right approach of > consuming message using custom serializer. Moreover, I'm using KafkaStream > , can it be an issue as well ? > > Any pointers will be appreciated. > > Thanks, > Shamik >
Problem consuming message using custom serializer
Hi, I'm trying to send java object using kryo object serializer . The producer is able to send the payload to the queue, but I'm having issues reading the data in consumer. I'm using consumer group using KafkaStream. The consumer code is based out of the example in kafka documentation. Here's the consumer code and the corresponding error: public void run(int a_numThreads) { MaptopicCountMap = new HashMap (); topicCountMap.put(topic, new Integer(a_numThreads)); Map >> consumerMap = consumer.createMessageStreams(topicCountMap); List > streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(a_numThreads); int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerGroupSerializerObject(stream, threadNumber)); threadNumber++; } } Inside ConsumerGroupSerializerObject's run method, private KafkaStream m_stream; public void run() { ConsumerIterator it = m_stream.iterator(); ByteArrayInputStream in = null; ObjectInputStream is = null; while (it.hasNext()){ try{ in = new ByteArrayInputStream(it.next().message()); is = new ObjectInputStream(in); TextAnalysisRequest req = (TextAnalysisRequest)is.readObject(); }catch(ClassNotFoundException ex){ ex.printStackTrace(); }catch(IOException ex){ ex.printStackTrace(); }finally{ try{ in.close(); is.close(); }catch(IOException ex){ ex.printStackTrace(); } } } } I'm getting exception at the following line: is = new ObjectInputStream(in); java.io.StreamCorruptedException: invalid stream header: 01746573 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806) at java.io.ObjectInputStream.(ObjectInputStream.java:299) at com.test.kafka.consumer.ConsumerGroupSerializerObject.run(ConsumerGroupSerializerObject.java:43) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Here's the property: Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", KryoReadingSerializer.class.getName()); I'm new to kafka, so not entirely sure if this is right approach of consuming message using custom serializer. Moreover, I'm using KafkaStream , can it be an issue as well ? Any pointers will be appreciated. Thanks, Shamik