Update: Sending compressed events with console producer works: kafka-console-producer.bat --broker-list localhost:9092 --sync --topic topic1 --compress
I am working on Windows 7. On Fri, Aug 30, 2013 at 8:40 AM, Lu Xuechao <lux...@gmail.com> wrote: > After I sent 1,000 compressed events, I saw these messages in broker's log > files: > > in kafka-request.log > > [2013-08-30 08:38:18,713] TRACE Processor 6 received request : Name: > TopicMetadataRequest; Version: 0; CorrelationId: 0; ClientId: ; Topics: > topic1 (kafka.network.RequestChannel$) > [2013-08-30 08:38:18,718] TRACE Completed request:Name: > TopicMetadataRequest; Version: 0; CorrelationId: 0; ClientId: ; Topics: > topic1 from client > /127.0.0.1:64238;totalTime:5,queueTime:3,localTime:1,remoteTime:0,sendTime:1 > (kafka.request.logger) > > > in server.log > > [2013-08-30 08:38:18,759] INFO Closing socket connection to /127.0.0.1. > (kafka.network.Processor) > > > any ideas? Thanks. > > > On Thu, Aug 29, 2013 at 10:28 PM, Jun Rao <jun...@gmail.com> wrote: > >> Did you see any error in the producer log? Did the broker receive the >> produce request (you can look at the request log in the broker)? >> >> Thanks, >> >> Jun >> >> >> On Thu, Aug 29, 2013 at 6:29 AM, Lu Xuechao <lux...@gmail.com> wrote: >> >> > Let me post my test code here. I could see producer.send(data); returned >> > with no error. >> > >> > public class TestProducer extends Thread { >> > private final Producer<String, String> producer; >> > >> > private final int m_events; >> > private final int m_threadNumber; >> > >> > private static String msg = StringUtils.rightPad("", 1000, '*'); >> > >> > public TestProducer(int threadNumber, int events) { >> > m_threadNumber = threadNumber; >> > m_events = events; >> > >> > Properties props = new Properties(); >> > props.put("serializer.class", >> KafkaProperties.p_serializer_class); >> > props.put("metadata.broker.list", >> > KafkaProperties.p_metadata_broker_list); >> > props.put("partitioner.class", >> > KafkaProperties.p_partitioner_class); >> > props.put("queue.enqueue.timeout.ms", >> > KafkaProperties.p_queue_enqueue_timeout); >> > props.put("request.required.acks", >> > KafkaProperties.p_request_required_acks); >> > props.put("producer.type", KafkaProperties.p_producer_type); >> > >> > props.put("batch.num.messages", KafkaProperties.p_batch_num); >> > >> > props.put("compression.codec", >> > KafkaProperties.p_compression_codec); >> > >> > ProducerConfig config = new ProducerConfig(props); >> > producer = new Producer<String, String>(config); >> > } >> > >> > @Override >> > public void run() { >> > long start; >> > long num = 0; >> > System.out.println(new Date() + " - Message sent thread " + >> > m_threadNumber + " started."); >> > while (true) { >> > start = System.currentTimeMillis(); >> > String messageStr = new String(num + "_" + start); >> > KeyedMessage<String, String> data = new KeyedMessage<String, >> > String>(KafkaProperties.topic, messageStr, >> > start + "_" + msg); >> > producer.send(data); >> > num++; >> > if (num == m_events) { >> > break; >> > } >> > } >> > producer.close(); >> > System.out.println(new Date() + " - Message sent thread " + >> > m_threadNumber + " end. " + num >> > + " messages sent."); >> > } >> > } >> > >> > >> > public interface KafkaProperties { >> > final static String zookeeper_connect = "127.0.0.1:2181"; >> > final static String group_id = "group1"; >> > final static String topic = "topic1"; >> > >> > final static String p_serializer_class = >> > "kafka.serializer.StringEncoder"; >> > final static String p_metadata_broker_list = "127.0.0.1:9092"; >> > final static String p_partitioner_class = >> > "kafka.producer.DefaultPartitioner"; >> > >> > final static String p_queue_enqueue_timeout = "-1"; >> > final static String p_request_required_acks = "1"; >> > final static String p_producer_type = "async"; >> > final static String p_batch_num = "100"; >> > final static String p_compression_codec = "1"; >> > final static String p_message_send_retries = "3"; >> > final static String p_retry_backoff_ms = "200"; >> > final static String p_topic_metadata_refresh = "600000"; >> > } >> > >> > >> > On Thu, Aug 29, 2013 at 9:24 PM, Lu Xuechao <lux...@gmail.com> wrote: >> > >> > > Thanks Paul. Yes, I am using 0.8 beta1. I followed your suggestion to >> > set >> > > request.required.acks=1 and got the same result. No error message >> seen in >> > > broker logs, the size of the partition files were after sending >> 1,000,000 >> > > events, the size of each event was 1KB : >> > > >> > > 00000000000000000000.index 10240 KB >> > > 00000000000000000000.log 0KB >> > > >> > > The broker configurations: >> > > >> > > num.partitions=5 >> > > log.flush.interval.messages=20000 >> > > log.flush.interval.ms=5000 >> > > >> > > log.flush.scheduler.interval.ms=1000 >> > > log.retention.hours=1 >> > > log.segment.bytes=1073741824 >> > > log.cleanup.interval.mins=30 >> > > >> > > queued.max.requests=16 >> > > fetch.purgatory.purge.interval.requests=100 >> > > producer.purgatory.purge.interval.requests=100 >> > > >> > > It works if I change the code to props.put("compression.codec", "0"); >> > > >> > > thanks, >> > > xlu >> > > >> > > On Thu, Aug 29, 2013 at 6:48 PM, Paul Mackles <pmack...@adobe.com> >> > wrote: >> > > >> > >> I assume this is kafka 0.8, right? Are there any corresponding >> errors in >> > >> the broker logs? With the configuration below, I don't think any >> errors >> > >> will be reported back to the producer. >> > >> >> > >> You could also try setting erquest.required.acks=1 to see if errors >> are >> > >> reported back to the client. >> > >> >> > >> On 8/29/13 4:40 AM, "Lu Xuechao" <lux...@gmail.com> wrote: >> > >> >> > >> >Hi , >> > >> > >> > >> >I am trying to enable gzip compression for my events. But after I >> > >> switched >> > >> >compression.codec to "1" I found the produced events were even not >> be >> > >> >persisted to disk log file. Of course, the consumer could not >> receive >> > any >> > >> >compressed events. I sent 10,000 or more events but the broker's log >> > file >> > >> >not changed. Seems no events were actually send to broker? Below is >> my >> > >> >producer's code: >> > >> > >> > >> > Properties props = new Properties(); >> > >> > props.put("serializer.class", >> > "kafka.serializer.StringEncoder"); >> > >> > props.put("metadata.broker.list", "127.0.0.1:9092"); >> > >> > props.put("partitioner.class", >> > >> >"kafka.producer.DefaultPartitioner"); >> > >> > props.put("queue.enqueue.timeout.ms", "-1"); >> > >> > props.put("request.required.acks", "0"); >> > >> > props.put("producer.type", "async"); >> > >> > >> > >> > props.put("batch.num.messages", "100"); >> > >> > >> > >> > props.put("compression.codec", "1"); >> > >> > >> > >> > ProducerConfig config = new ProducerConfig(props); >> > >> > producer = new Producer<String, String>(config); >> > >> > >> > >> > KeyedMessage<String, String> data = new KeyedMessage<String, >> > >> >String>("topic1", messageStr, msg); >> > >> > producer.send(data); >> > >> > >> > >> > >> > >> >If I comment out this line of code : props.put("compression.codec", >> > "1"); >> > >> >then everything works fine. Did I miss something? >> > >> > >> > >> >thanks, >> > >> >xlu >> > >> >> > >> >> > > >> > >> > >