Looks like you may be mixing the new producer with old producer configs. See the new config documentation here: http://kafka.apache.org/documentation.html#newproducerconfigs. You will likely want to set the "batch.size" and "linger.ms" to achieve your goal.
Thanks, Grant On Wed, Oct 14, 2015 at 1:29 PM, prateek arora <prateek.arora...@gmail.com> wrote: > Hi > > Thanks for help . > > but same behavior even after changing batch.size > > I have changes batch.size value to 33554432. > props.put("batch.size","33554432"); > > > > On Wed, Oct 14, 2015 at 11:09 AM, Zakee <kzak...@netzero.net> wrote: > > > Hi Prateek, > > > > Looks like you are using default batch.size which is ~16K and it forces > > the send of messages immediately as your single message is larger than > > that. Try using larger batch.size. > > > > Thanks > > Zakee > > > > > > > > > On Oct 14, 2015, at 10:29 AM, prateek arora < > prateek.arora...@gmail.com> > > wrote: > > > > > > Hi > > > > > > I want to create async producer so i can buffer messages in queue and > > send > > > after every 5 sec . > > > > > > my kafka version is 0.8.2.0. > > > > > > and i am using kafka-clients 0.8.2.0 to create kafka producer in java. > > > > > > > > > below is my sample code : > > > > > > package com.intel.labs.ive.cloud.testKafkaProducerJ; > > > > > > import java.nio.charset.Charset; > > > import java.util.HashMap; > > > > > > import java.util.Map; > > > > > > import org.apache.kafka.clients.producer.KafkaProducer; > > > import org.apache.kafka.clients.producer.Producer; > > > import org.apache.kafka.clients.producer.ProducerConfig; > > > import org.apache.kafka.clients.producer.ProducerRecord; > > > import org.apache.kafka.common.Metric; > > > import org.apache.kafka.common.MetricName; > > > import org.apache.kafka.common.serialization.Serializer; > > > import org.apache.kafka.common.serialization.StringSerializer; > > > import org.apache.kafka.common.serialization.ByteArraySerializer; > > > > > > import java.nio.file.DirectoryStream; > > > import java.nio.file.Files; > > > import java.nio.file.Path; > > > import java.nio.file.Paths; > > > > > > public class TestKafkaProducer { > > > > > > Map<String, Object> props = new HashMap<String, Object>(); > > > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > > metadataBroker); > > > props.put("producer.type", "async"); > > > props.put("queue.buffering.max.ms", "5000"); > > > > > > Serializer<String> keySerializer = new StringSerializer(); > > > Serializer<byte[]> valueSerializer = new ByteArraySerializer(); > > > > > > producer = new KafkaProducer<String, byte[]>(props, > keySerializer, > > > valueSerializer); > > > > > > ProducerRecord<String, byte[]> imageRecord; > > > > > > while ( true ) { > > > imageRecord = new ProducerRecord<String, byte[]>(topicName, > > > recordKey,imageBytes); > > > > > > producer.send(imageRecord); > > > } > > > } > > > > > > size of my message is around 77K > > > > > > but its work like a synchronous producer , send every message to broker > > . > > > not buffering a message in to queue and send after 5 sec > > > > > > > > > please help to find out a solution. > > > > > > > > > Regards > > > Prateek > > > > ____________________________________________________________ > > A Balance Transfer Card With An Outrageously Long Intro Rate And No > > Balance Transfer Fees That Can Save You Thousands > > http://thirdpartyoffers.netzero.net/TGL3231/561e9a75a77071a74763fst04vuc > -- Grant Henke Software Engineer | Cloudera gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke