Re: having problem with 0.8 gzip compression

2013-07-11 Thread Scott Wang
Joel,

Would you mind point me to how I would be able to enable the trace logs in
the producer and broker?

Thanks,
Scott


On Wed, Jul 10, 2013 at 5:33 PM, Joel Koshy jjkosh...@gmail.com wrote:

 Weird - I tried your exact code and it worked for me (although I was
 using 0.8 head and not the beta). Can you re-run with trace logs
 enabled in your producer and paste that output? Broker logs also if
 you can?

 Thanks,

 Joel

 On Wed, Jul 10, 2013 at 10:23 AM, Scott Wang
 scott.w...@rumbleentertainment.com wrote:
  Jun,
 
  I did a test this morning and got a very interesting result with you
  command.  I started by wipe all the log files and clean up all zookeeper
  data files.
 
  Once I restarted both server, producer and consumer then execute your
  command, what I got is a empty log as following:
 
  Dumping /Users/scott/Temp/kafka/test-topic-0/.log
  Starting offset: 0
 
  One observation, the .index file was getting huge but
  there was nothing in .log file.
 
  Thanks,
  Scott
 
 
 
 
  On Tue, Jul 9, 2013 at 8:40 PM, Jun Rao jun...@gmail.com wrote:
 
  Could you run the following command on one of the log files of your
 topic
  and attach the output?
 
  bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
  /tmp/kafka-logs/testtopic-0/.log
 
  Thanks,
 
  Jun
 
 
  On Tue, Jul 9, 2013 at 3:23 PM, Scott Wang 
  scott.w...@rumbleentertainment.com wrote:
 
   Another piece of information, the snappy compression also does not
 work.
  
   Thanks,
   Scott
  
  
   On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang 
   scott.w...@rumbleentertainment.com wrote:
  
I just try it and it still not showing up, thanks for looking into
  this.
   
Thanks,
Scott
   
   
On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao jun...@gmail.com wrote:
   
Could you try starting the consumer first (and enable gzip in the
producer)?
   
Thanks,
   
Jun
   
   
On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang 
scott.w...@rumbleentertainment.com wrote:
   
 No, I did not start the consumer before the producer.  I actually
started
 the producer first and nothing showed up in the consumer unless I
commented
 out this line -- props.put(compression.codec, gzip).If I
commented
 out the compression codec, everything just works.


 On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao jun...@gmail.com
 wrote:

  Did you start the consumer before the producer? Be default, the
consumer
  gets only the new data?
 
  Thanks,
 
  Jun
 
 
  On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang 
  scott.w...@rumbleentertainment.com wrote:
 
   I am testing with Kafka 0.8 beta and having problem of
 receiving
 message
  in
   consumer.  There is no error so does anyone have any
 insights.
 When I
   commented out the compression.code everything works fine.
  
   My producer:
   public class TestKafka08Prod {
  
   public static void main(String [] args) {
  
   ProducerInteger, String producer = null;
   try {
   Properties props = new Properties();
   props.put(metadata.broker.list,
 localhost:9092);
   props.put(serializer.class,
   kafka.serializer.StringEncoder);
   props.put(producer.type, sync);
   props.put(request.required.acks,1);
   props.put(compression.codec, gzip);
   ProducerConfig config = new
 ProducerConfig(props);
   producer = new ProducerInteger, String(config);
   int j=0;
   for(int i=0; i10; i++) {
   KeyedMessageInteger, String data = new
   KeyedMessageInteger, String(test-topic, test-message:
 +i+
   +System.currentTimeMillis());
   producer.send(data);
  
   }
  
   } catch (Exception e) {
   System.out.println(Error happened: );
   e.printStackTrace();
   } finally {
   if(null != null) {
   producer.close();
   }
  
   System.out.println(Ened of Sending);
   }
  
   System.exit(0);
   }
   }
  
  
   My consumer:
  
   public class TestKafka08Consumer {
   public static void main(String [] args) throws
 UnknownHostException,
   SocketException {
  
   Properties props = new Properties();
   props.put(zookeeper.connect,
   localhost:2181/kafka_0_8);
   props.put(group.id, test08ConsumerId);
   props.put(zk.sessiontimeout.ms, 4000);
   props.put(zk.synctime.ms, 2000);
   props.put(autocommit.interval.ms, 1000);
  
   ConsumerConfig 

Re: having problem with 0.8 gzip compression

2013-07-11 Thread Scott Wang
Ok, the problem solved, I think it might be because some of the jar files
that I was using were OLD.  I was building the producer and consumer
under the 0.7 environment except swapping out the kafka jar file.   Now, I
created a whole new environment and pull in all the jar files from the
0.8.  That seems to solve my 0.8 gzip problem.   Thank you for all the
help.


Re: having problem with 0.8 gzip compression

2013-07-10 Thread Scott Wang
Jun,

I did a test this morning and got a very interesting result with you
command.  I started by wipe all the log files and clean up all zookeeper
data files.

Once I restarted both server, producer and consumer then execute your
command, what I got is a empty log as following:

Dumping /Users/scott/Temp/kafka/test-topic-0/.log
Starting offset: 0

One observation, the .index file was getting huge but
there was nothing in .log file.

Thanks,
Scott




On Tue, Jul 9, 2013 at 8:40 PM, Jun Rao jun...@gmail.com wrote:

 Could you run the following command on one of the log files of your topic
 and attach the output?

 bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
 /tmp/kafka-logs/testtopic-0/.log

 Thanks,

 Jun


 On Tue, Jul 9, 2013 at 3:23 PM, Scott Wang 
 scott.w...@rumbleentertainment.com wrote:

  Another piece of information, the snappy compression also does not work.
 
  Thanks,
  Scott
 
 
  On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang 
  scott.w...@rumbleentertainment.com wrote:
 
   I just try it and it still not showing up, thanks for looking into
 this.
  
   Thanks,
   Scott
  
  
   On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao jun...@gmail.com wrote:
  
   Could you try starting the consumer first (and enable gzip in the
   producer)?
  
   Thanks,
  
   Jun
  
  
   On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang 
   scott.w...@rumbleentertainment.com wrote:
  
No, I did not start the consumer before the producer.  I actually
   started
the producer first and nothing showed up in the consumer unless I
   commented
out this line -- props.put(compression.codec, gzip).If I
   commented
out the compression codec, everything just works.
   
   
On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao jun...@gmail.com wrote:
   
 Did you start the consumer before the producer? Be default, the
   consumer
 gets only the new data?

 Thanks,

 Jun


 On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang 
 scott.w...@rumbleentertainment.com wrote:

  I am testing with Kafka 0.8 beta and having problem of receiving
message
 in
  consumer.  There is no error so does anyone have any insights.
When I
  commented out the compression.code everything works fine.
 
  My producer:
  public class TestKafka08Prod {
 
  public static void main(String [] args) {
 
  ProducerInteger, String producer = null;
  try {
  Properties props = new Properties();
  props.put(metadata.broker.list, localhost:9092);
  props.put(serializer.class,
  kafka.serializer.StringEncoder);
  props.put(producer.type, sync);
  props.put(request.required.acks,1);
  props.put(compression.codec, gzip);
  ProducerConfig config = new ProducerConfig(props);
  producer = new ProducerInteger, String(config);
  int j=0;
  for(int i=0; i10; i++) {
  KeyedMessageInteger, String data = new
  KeyedMessageInteger, String(test-topic, test-message: +i+
  +System.currentTimeMillis());
  producer.send(data);
 
  }
 
  } catch (Exception e) {
  System.out.println(Error happened: );
  e.printStackTrace();
  } finally {
  if(null != null) {
  producer.close();
  }
 
  System.out.println(Ened of Sending);
  }
 
  System.exit(0);
  }
  }
 
 
  My consumer:
 
  public class TestKafka08Consumer {
  public static void main(String [] args) throws
UnknownHostException,
  SocketException {
 
  Properties props = new Properties();
  props.put(zookeeper.connect,
  localhost:2181/kafka_0_8);
  props.put(group.id, test08ConsumerId);
  props.put(zk.sessiontimeout.ms, 4000);
  props.put(zk.synctime.ms, 2000);
  props.put(autocommit.interval.ms, 1000);
 
  ConsumerConfig consumerConfig = new
 ConsumerConfig(props);
 
  ConsumerConnector consumerConnector =
 
  kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
 
  String topic = test-topic;
  MapString, Integer topicCountMap = new HashMapString,
  Integer();
  topicCountMap.put(topic, new Integer(1));
  MapString, ListKafkaStreambyte[], byte[]
  consumerMap =
  consumerConnector.createMessageStreams(topicCountMap);
  KafkaStreambyte[], byte[] stream =
   consumerMap.get(topic).get(0);
 
  ConsumerIteratorbyte[], byte[] it = stream.iterator();
 
  int counter=0;
   

Re: having problem with 0.8 gzip compression

2013-07-10 Thread Joel Koshy
Weird - I tried your exact code and it worked for me (although I was
using 0.8 head and not the beta). Can you re-run with trace logs
enabled in your producer and paste that output? Broker logs also if
you can?

Thanks,

Joel

On Wed, Jul 10, 2013 at 10:23 AM, Scott Wang
scott.w...@rumbleentertainment.com wrote:
 Jun,

 I did a test this morning and got a very interesting result with you
 command.  I started by wipe all the log files and clean up all zookeeper
 data files.

 Once I restarted both server, producer and consumer then execute your
 command, what I got is a empty log as following:

 Dumping /Users/scott/Temp/kafka/test-topic-0/.log
 Starting offset: 0

 One observation, the .index file was getting huge but
 there was nothing in .log file.

 Thanks,
 Scott




 On Tue, Jul 9, 2013 at 8:40 PM, Jun Rao jun...@gmail.com wrote:

 Could you run the following command on one of the log files of your topic
 and attach the output?

 bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
 /tmp/kafka-logs/testtopic-0/.log

 Thanks,

 Jun


 On Tue, Jul 9, 2013 at 3:23 PM, Scott Wang 
 scott.w...@rumbleentertainment.com wrote:

  Another piece of information, the snappy compression also does not work.
 
  Thanks,
  Scott
 
 
  On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang 
  scott.w...@rumbleentertainment.com wrote:
 
   I just try it and it still not showing up, thanks for looking into
 this.
  
   Thanks,
   Scott
  
  
   On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao jun...@gmail.com wrote:
  
   Could you try starting the consumer first (and enable gzip in the
   producer)?
  
   Thanks,
  
   Jun
  
  
   On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang 
   scott.w...@rumbleentertainment.com wrote:
  
No, I did not start the consumer before the producer.  I actually
   started
the producer first and nothing showed up in the consumer unless I
   commented
out this line -- props.put(compression.codec, gzip).If I
   commented
out the compression codec, everything just works.
   
   
On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao jun...@gmail.com wrote:
   
 Did you start the consumer before the producer? Be default, the
   consumer
 gets only the new data?

 Thanks,

 Jun


 On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang 
 scott.w...@rumbleentertainment.com wrote:

  I am testing with Kafka 0.8 beta and having problem of receiving
message
 in
  consumer.  There is no error so does anyone have any insights.
When I
  commented out the compression.code everything works fine.
 
  My producer:
  public class TestKafka08Prod {
 
  public static void main(String [] args) {
 
  ProducerInteger, String producer = null;
  try {
  Properties props = new Properties();
  props.put(metadata.broker.list, localhost:9092);
  props.put(serializer.class,
  kafka.serializer.StringEncoder);
  props.put(producer.type, sync);
  props.put(request.required.acks,1);
  props.put(compression.codec, gzip);
  ProducerConfig config = new ProducerConfig(props);
  producer = new ProducerInteger, String(config);
  int j=0;
  for(int i=0; i10; i++) {
  KeyedMessageInteger, String data = new
  KeyedMessageInteger, String(test-topic, test-message: +i+
  +System.currentTimeMillis());
  producer.send(data);
 
  }
 
  } catch (Exception e) {
  System.out.println(Error happened: );
  e.printStackTrace();
  } finally {
  if(null != null) {
  producer.close();
  }
 
  System.out.println(Ened of Sending);
  }
 
  System.exit(0);
  }
  }
 
 
  My consumer:
 
  public class TestKafka08Consumer {
  public static void main(String [] args) throws
UnknownHostException,
  SocketException {
 
  Properties props = new Properties();
  props.put(zookeeper.connect,
  localhost:2181/kafka_0_8);
  props.put(group.id, test08ConsumerId);
  props.put(zk.sessiontimeout.ms, 4000);
  props.put(zk.synctime.ms, 2000);
  props.put(autocommit.interval.ms, 1000);
 
  ConsumerConfig consumerConfig = new
 ConsumerConfig(props);
 
  ConsumerConnector consumerConnector =
 
  kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
 
  String topic = test-topic;
  MapString, Integer topicCountMap = new HashMapString,
  Integer();
  topicCountMap.put(topic, new Integer(1));
 

Re: having problem with 0.8 gzip compression

2013-07-09 Thread Scott Wang
Another piece of information, the snappy compression also does not work.

Thanks,
Scott


On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang 
scott.w...@rumbleentertainment.com wrote:

 I just try it and it still not showing up, thanks for looking into this.

 Thanks,
 Scott


 On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao jun...@gmail.com wrote:

 Could you try starting the consumer first (and enable gzip in the
 producer)?

 Thanks,

 Jun


 On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang 
 scott.w...@rumbleentertainment.com wrote:

  No, I did not start the consumer before the producer.  I actually
 started
  the producer first and nothing showed up in the consumer unless I
 commented
  out this line -- props.put(compression.codec, gzip).If I
 commented
  out the compression codec, everything just works.
 
 
  On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao jun...@gmail.com wrote:
 
   Did you start the consumer before the producer? Be default, the
 consumer
   gets only the new data?
  
   Thanks,
  
   Jun
  
  
   On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang 
   scott.w...@rumbleentertainment.com wrote:
  
I am testing with Kafka 0.8 beta and having problem of receiving
  message
   in
consumer.  There is no error so does anyone have any insights.
  When I
commented out the compression.code everything works fine.
   
My producer:
public class TestKafka08Prod {
   
public static void main(String [] args) {
   
ProducerInteger, String producer = null;
try {
Properties props = new Properties();
props.put(metadata.broker.list, localhost:9092);
props.put(serializer.class,
kafka.serializer.StringEncoder);
props.put(producer.type, sync);
props.put(request.required.acks,1);
props.put(compression.codec, gzip);
ProducerConfig config = new ProducerConfig(props);
producer = new ProducerInteger, String(config);
int j=0;
for(int i=0; i10; i++) {
KeyedMessageInteger, String data = new
KeyedMessageInteger, String(test-topic, test-message: +i+
+System.currentTimeMillis());
producer.send(data);
   
}
   
} catch (Exception e) {
System.out.println(Error happened: );
e.printStackTrace();
} finally {
if(null != null) {
producer.close();
}
   
System.out.println(Ened of Sending);
}
   
System.exit(0);
}
}
   
   
My consumer:
   
public class TestKafka08Consumer {
public static void main(String [] args) throws
  UnknownHostException,
SocketException {
   
Properties props = new Properties();
props.put(zookeeper.connect, localhost:2181/kafka_0_8);
props.put(group.id, test08ConsumerId);
props.put(zk.sessiontimeout.ms, 4000);
props.put(zk.synctime.ms, 2000);
props.put(autocommit.interval.ms, 1000);
   
ConsumerConfig consumerConfig = new ConsumerConfig(props);
   
ConsumerConnector consumerConnector =
kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
   
String topic = test-topic;
MapString, Integer topicCountMap = new HashMapString,
Integer();
topicCountMap.put(topic, new Integer(1));
MapString, ListKafkaStreambyte[], byte[] consumerMap =
consumerConnector.createMessageStreams(topicCountMap);
KafkaStreambyte[], byte[] stream =
 consumerMap.get(topic).get(0);
   
ConsumerIteratorbyte[], byte[] it = stream.iterator();
   
int counter=0;
while(it.hasNext()) {
try {
String fromPlatform = new
 String(it.next().message());
System.out.println(The messages: +fromPlatform);
} catch(Exception e) {
e.printStackTrace();
}
}
System.out.println(SystemOut);
}
}
   
   
Thanks
   
  
 





Re: having problem with 0.8 gzip compression

2013-07-08 Thread Scott Wang
No, I did not start the consumer before the producer.  I actually started
the producer first and nothing showed up in the consumer unless I commented
out this line -- props.put(compression.codec, gzip).If I commented
out the compression codec, everything just works.


On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao jun...@gmail.com wrote:

 Did you start the consumer before the producer? Be default, the consumer
 gets only the new data?

 Thanks,

 Jun


 On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang 
 scott.w...@rumbleentertainment.com wrote:

  I am testing with Kafka 0.8 beta and having problem of receiving message
 in
  consumer.  There is no error so does anyone have any insights.  When I
  commented out the compression.code everything works fine.
 
  My producer:
  public class TestKafka08Prod {
 
  public static void main(String [] args) {
 
  ProducerInteger, String producer = null;
  try {
  Properties props = new Properties();
  props.put(metadata.broker.list, localhost:9092);
  props.put(serializer.class,
  kafka.serializer.StringEncoder);
  props.put(producer.type, sync);
  props.put(request.required.acks,1);
  props.put(compression.codec, gzip);
  ProducerConfig config = new ProducerConfig(props);
  producer = new ProducerInteger, String(config);
  int j=0;
  for(int i=0; i10; i++) {
  KeyedMessageInteger, String data = new
  KeyedMessageInteger, String(test-topic, test-message: +i+
  +System.currentTimeMillis());
  producer.send(data);
 
  }
 
  } catch (Exception e) {
  System.out.println(Error happened: );
  e.printStackTrace();
  } finally {
  if(null != null) {
  producer.close();
  }
 
  System.out.println(Ened of Sending);
  }
 
  System.exit(0);
  }
  }
 
 
  My consumer:
 
  public class TestKafka08Consumer {
  public static void main(String [] args) throws UnknownHostException,
  SocketException {
 
  Properties props = new Properties();
  props.put(zookeeper.connect, localhost:2181/kafka_0_8);
  props.put(group.id, test08ConsumerId);
  props.put(zk.sessiontimeout.ms, 4000);
  props.put(zk.synctime.ms, 2000);
  props.put(autocommit.interval.ms, 1000);
 
  ConsumerConfig consumerConfig = new ConsumerConfig(props);
 
  ConsumerConnector consumerConnector =
  kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
 
  String topic = test-topic;
  MapString, Integer topicCountMap = new HashMapString,
  Integer();
  topicCountMap.put(topic, new Integer(1));
  MapString, ListKafkaStreambyte[], byte[] consumerMap =
  consumerConnector.createMessageStreams(topicCountMap);
  KafkaStreambyte[], byte[] stream =
   consumerMap.get(topic).get(0);
 
  ConsumerIteratorbyte[], byte[] it = stream.iterator();
 
  int counter=0;
  while(it.hasNext()) {
  try {
  String fromPlatform = new String(it.next().message());
  System.out.println(The messages: +fromPlatform);
  } catch(Exception e) {
  e.printStackTrace();
  }
  }
  System.out.println(SystemOut);
  }
  }
 
 
  Thanks