Update: Sending compressed events with console producer works:

kafka-console-producer.bat --broker-list localhost:9092 --sync --topic
topic1 --compress

I am working on Windows 7.


On Fri, Aug 30, 2013 at 8:40 AM, Lu Xuechao <lux...@gmail.com> wrote:

> After I sent 1,000 compressed events, I saw these messages in broker's log
> files:
>
> in kafka-request.log
>
> [2013-08-30 08:38:18,713] TRACE Processor 6 received request : Name:
> TopicMetadataRequest; Version: 0; CorrelationId: 0; ClientId: ; Topics:
> topic1 (kafka.network.RequestChannel$)
> [2013-08-30 08:38:18,718] TRACE Completed request:Name:
> TopicMetadataRequest; Version: 0; CorrelationId: 0; ClientId: ; Topics:
> topic1 from client 
> /127.0.0.1:64238;totalTime:5,queueTime:3,localTime:1,remoteTime:0,sendTime:1
> (kafka.request.logger)
>
>
> in server.log
>
> [2013-08-30 08:38:18,759] INFO Closing socket connection to /127.0.0.1.
> (kafka.network.Processor)
>
>
> any ideas?  Thanks.
>
>
> On Thu, Aug 29, 2013 at 10:28 PM, Jun Rao <jun...@gmail.com> wrote:
>
>> Did you see any error in the producer log? Did the broker receive the
>> produce request (you can look at the request log in the broker)?
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Thu, Aug 29, 2013 at 6:29 AM, Lu Xuechao <lux...@gmail.com> wrote:
>>
>> > Let me post my test code here. I could see producer.send(data); returned
>> > with no error.
>> >
>> > public class TestProducer extends Thread {
>> >     private final Producer<String, String> producer;
>> >
>> >     private final int m_events;
>> >     private final int m_threadNumber;
>> >
>> >     private static String msg = StringUtils.rightPad("", 1000, '*');
>> >
>> >     public TestProducer(int threadNumber, int events) {
>> >         m_threadNumber = threadNumber;
>> >         m_events = events;
>> >
>> >         Properties props = new Properties();
>> >         props.put("serializer.class",
>> KafkaProperties.p_serializer_class);
>> >         props.put("metadata.broker.list",
>> > KafkaProperties.p_metadata_broker_list);
>> >         props.put("partitioner.class",
>> > KafkaProperties.p_partitioner_class);
>> >         props.put("queue.enqueue.timeout.ms",
>> > KafkaProperties.p_queue_enqueue_timeout);
>> >         props.put("request.required.acks",
>> > KafkaProperties.p_request_required_acks);
>> >         props.put("producer.type", KafkaProperties.p_producer_type);
>> >
>> >         props.put("batch.num.messages", KafkaProperties.p_batch_num);
>> >
>> >         props.put("compression.codec",
>> > KafkaProperties.p_compression_codec);
>> >
>> >         ProducerConfig config = new ProducerConfig(props);
>> >         producer = new Producer<String, String>(config);
>> >     }
>> >
>> >     @Override
>> >     public void run() {
>> >         long start;
>> >         long num = 0;
>> >         System.out.println(new Date() + " - Message sent thread " +
>> > m_threadNumber + " started.");
>> >         while (true) {
>> >             start = System.currentTimeMillis();
>> >             String messageStr = new String(num + "_" + start);
>> >             KeyedMessage<String, String> data = new KeyedMessage<String,
>> > String>(KafkaProperties.topic, messageStr,
>> >                     start + "_" + msg);
>> >             producer.send(data);
>> >             num++;
>> >             if (num == m_events) {
>> >                 break;
>> >             }
>> >         }
>> >         producer.close();
>> >         System.out.println(new Date() + " - Message sent thread " +
>> > m_threadNumber + " end.   " + num
>> >                 + " messages sent.");
>> >     }
>> > }
>> >
>> >
>> > public interface KafkaProperties {
>> >     final static String zookeeper_connect = "127.0.0.1:2181";
>> >     final static String group_id = "group1";
>> >     final static String topic = "topic1";
>> >
>> >     final static String p_serializer_class =
>> > "kafka.serializer.StringEncoder";
>> >     final static String p_metadata_broker_list = "127.0.0.1:9092";
>> >     final static String p_partitioner_class =
>> > "kafka.producer.DefaultPartitioner";
>> >
>> >     final static String p_queue_enqueue_timeout = "-1";
>> >     final static String p_request_required_acks = "1";
>> >     final static String p_producer_type = "async";
>> >     final static String p_batch_num = "100";
>> >     final static String p_compression_codec = "1";
>> >     final static String p_message_send_retries = "3";
>> >     final static String p_retry_backoff_ms = "200";
>> >     final static String p_topic_metadata_refresh = "600000";
>> > }
>> >
>> >
>> > On Thu, Aug 29, 2013 at 9:24 PM, Lu Xuechao <lux...@gmail.com> wrote:
>> >
>> > > Thanks Paul. Yes, I am using 0.8 beta1.  I followed your suggestion to
>> > set
>> > > request.required.acks=1 and got the same result. No error message
>> seen in
>> > > broker logs, the size of the partition files were after sending
>> 1,000,000
>> > > events, the size of each event was 1KB :
>> > >
>> > > 00000000000000000000.index  10240 KB
>> > > 00000000000000000000.log  0KB
>> > >
>> > > The broker configurations:
>> > >
>> > > num.partitions=5
>> > > log.flush.interval.messages=20000
>> > > log.flush.interval.ms=5000
>> > >
>> > > log.flush.scheduler.interval.ms=1000
>> > > log.retention.hours=1
>> > > log.segment.bytes=1073741824
>> > > log.cleanup.interval.mins=30
>> > >
>> > > queued.max.requests=16
>> > > fetch.purgatory.purge.interval.requests=100
>> > > producer.purgatory.purge.interval.requests=100
>> > >
>> > > It works if I change the code to props.put("compression.codec", "0");
>> > >
>> > > thanks,
>> > > xlu
>> > >
>> > > On Thu, Aug 29, 2013 at 6:48 PM, Paul Mackles <pmack...@adobe.com>
>> > wrote:
>> > >
>> > >> I assume this is kafka 0.8, right? Are there any corresponding
>> errors in
>> > >> the broker logs? With the configuration below, I don't think any
>> errors
>> > >> will be reported back to the producer.
>> > >>
>> > >> You could also try setting erquest.required.acks=1 to see if errors
>> are
>> > >> reported back to the client.
>> > >>
>> > >> On 8/29/13 4:40 AM, "Lu Xuechao" <lux...@gmail.com> wrote:
>> > >>
>> > >> >Hi ,
>> > >> >
>> > >> >I am trying to enable gzip compression for my events. But after I
>> > >> switched
>> > >> >compression.codec to "1" I found the produced events were even not
>> be
>> > >> >persisted to disk log file. Of course, the consumer could not
>> receive
>> > any
>> > >> >compressed events. I sent 10,000 or more events but the broker's log
>> > file
>> > >> >not changed. Seems no events were actually send to broker? Below is
>> my
>> > >> >producer's code:
>> > >> >
>> > >> >        Properties props = new Properties();
>> > >> >        props.put("serializer.class",
>> > "kafka.serializer.StringEncoder");
>> > >> >        props.put("metadata.broker.list", "127.0.0.1:9092");
>> > >> >        props.put("partitioner.class",
>> > >> >"kafka.producer.DefaultPartitioner");
>> > >> >        props.put("queue.enqueue.timeout.ms", "-1");
>> > >> >        props.put("request.required.acks", "0");
>> > >> >        props.put("producer.type", "async");
>> > >> >
>> > >> >        props.put("batch.num.messages", "100");
>> > >> >
>> > >> >        props.put("compression.codec", "1");
>> > >> >
>> > >> >        ProducerConfig config = new ProducerConfig(props);
>> > >> >        producer = new Producer<String, String>(config);
>> > >> >
>> > >> >        KeyedMessage<String, String> data = new KeyedMessage<String,
>> > >> >String>("topic1", messageStr, msg);
>> > >> >        producer.send(data);
>> > >> >
>> > >> >
>> > >> >If I comment out this line of code : props.put("compression.codec",
>> > "1");
>> > >> >then everything works fine. Did I miss something?
>> > >> >
>> > >> >thanks,
>> > >> >xlu
>> > >>
>> > >>
>> > >
>> >
>>
>
>

Reply via email to