Re: MultiThreaded HLConsumer Exits before events are all consumed

2015-04-30 Thread christopher palm
What I found was  2 problems.
1. The producer wasn't passing in a partition key, so not all partitions
were getting data.
2. After fixing the producer, I could see all threads getting data
consistently then the shutdown method was
clearly killing the threads.
I have removed the shutdown,and with the producer changes sending in a key,
this looks like it is running correctly now.

Thanks!

On Wed, Apr 29, 2015 at 10:59 PM, tao xiao xiaotao...@gmail.com wrote:

 The log suggests that the shutdown method were still called

 Thread 0: 2015-04-29
 12:55:54.292|3|13|Normal|-74.1892627|41.33900999753

 Last Shutdown via example.shutDown called!

 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
 ZKConsumerConnector shutting down

 Please ensure no consumer.shutdown(); and  executor.shutdown(); are called
 during the course of your program

 On Thu, Apr 30, 2015 at 2:23 AM, christopher palm cpa...@gmail.com
 wrote:

  Commenting out Example shutdown did not seem to make a difference, I
 added
  the print statement below to highlight the fact.
 
  The other threads still shut down, and only one thread lives on,
 eventually
  that dies after a few minutes as well
 
  Could this be that the producer default partitioner is isn't balancing
 data
  across all partitions?
 
  Thanks,
  Chris
 
  Thread 0: 2015-04-29
  12:55:54.292|3|13|Normal|-74.1892627|41.33900999753
 
  Last Shutdown via example.shutDown called!
 
  15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
  ZKConsumerConnector shutting down
 
  15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka
  scheduler
 
  15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
  [ConsumerFetcherManager-1430330968420] Stopping leader finder thread
 
  15/04/29 13:09:38 INFO
 consumer.ConsumerFetcherManager$LeaderFinderThread:
  -leader-finder-thread], Shutting down
 
  15/04/29 13:09:38 INFO
 consumer.ConsumerFetcherManager$LeaderFinderThread:
  -leader-finder-thread], Stopped
 
  15/04/29 13:09:38 INFO
 consumer.ConsumerFetcherManager$LeaderFinderThread:
  -leader-finder-thread], Shutdown completed
 
  15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
  [ConsumerFetcherManager-1430330968420] Stopping all fetchers
 
  15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
  [ConsumerFetcherThread-consumergroup], Shutting down
 
  15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
  [ConsumerFetcherThread-], Stopped
 
  15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
  [ConsumerFetcherThread-], Shutdown completed
 
  15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
  [ConsumerFetcherManager-] All connections stopped
 
  15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event
  thread.
 
  Shutting down Thread: 2
 
  Shutting down Thread: 1
 
  Shutting down Thread: 3
 
  15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:
  [consumergroup], ZKConsumerConnector shut down completed
 
  Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail
  distance|-73.99021500035|40.6636611
 
  15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector:
  [consumergroup], stopping watcher executor thread for consumer
  consumergroup
 
  Thread 0: 2015-04-29
  12:55:56.313|1|11|Normal|-79.74165300042|42.1304580009
 
  On Wed, Apr 29, 2015 at 10:11 AM, tao xiao xiaotao...@gmail.com wrote:
 
   example.shutdown(); in ConsumerGroupExample closes all consumer
  connections
   to Kafka. remove this line the consumer threads will run forever
  
   On Wed, Apr 29, 2015 at 9:42 PM, christopher palm cpa...@gmail.com
   wrote:
  
Hi All,
   
I am trying to get a multi threaded HL consumer working against a 2
   broker
Kafka cluster with a 4 partition 2 replica  topic.
   
The consumer code is set to run with 4 threads, one for each
 partition.
   
The producer code uses the default partitioner and loops indefinitely
feeding events into the topic.(I excluded the while loop in the paste
below)
   
What I see is the threads eventually all exit, even thought the
  producer
   is
still sending events into the topic.
   
My understanding is that the consumer thread per partition is the
  correct
setup.
   
Any ideas why this code doesn't continue to consume events at they
 are
pushed to the topic?
   
I suspect I am configuring something wrong here, but am not sure
 what.
   
Thanks,
   
Chris
   
   
*T**opic Configuration*
   
Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2
 Configs:
   
Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr:
  1,2
   
Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr:
  1,2
   
Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr:
  1,2
   
 Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr:
  1,2
   
*Producer Code:*
   
 Properties props = new Properties();
   
 

Re: MultiThreaded HLConsumer Exits before events are all consumed

2015-04-29 Thread christopher palm
Commenting out Example shutdown did not seem to make a difference, I added
the print statement below to highlight the fact.

The other threads still shut down, and only one thread lives on, eventually
that dies after a few minutes as well

Could this be that the producer default partitioner is isn't balancing data
across all partitions?

Thanks,
Chris

Thread 0: 2015-04-29
12:55:54.292|3|13|Normal|-74.1892627|41.33900999753

Last Shutdown via example.shutDown called!

15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
ZKConsumerConnector shutting down

15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka
scheduler

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-1430330968420] Stopping leader finder thread

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
-leader-finder-thread], Shutting down

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
-leader-finder-thread], Stopped

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
-leader-finder-thread], Shutdown completed

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-1430330968420] Stopping all fetchers

15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-consumergroup], Shutting down

15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-], Stopped

15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
[ConsumerFetcherThread-], Shutdown completed

15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
[ConsumerFetcherManager-] All connections stopped

15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event
thread.

Shutting down Thread: 2

Shutting down Thread: 1

Shutting down Thread: 3

15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:
[consumergroup], ZKConsumerConnector shut down completed

Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail
distance|-73.99021500035|40.6636611

15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector:
[consumergroup], stopping watcher executor thread for consumer consumergroup

Thread 0: 2015-04-29
12:55:56.313|1|11|Normal|-79.74165300042|42.1304580009

On Wed, Apr 29, 2015 at 10:11 AM, tao xiao xiaotao...@gmail.com wrote:

 example.shutdown(); in ConsumerGroupExample closes all consumer connections
 to Kafka. remove this line the consumer threads will run forever

 On Wed, Apr 29, 2015 at 9:42 PM, christopher palm cpa...@gmail.com
 wrote:

  Hi All,
 
  I am trying to get a multi threaded HL consumer working against a 2
 broker
  Kafka cluster with a 4 partition 2 replica  topic.
 
  The consumer code is set to run with 4 threads, one for each partition.
 
  The producer code uses the default partitioner and loops indefinitely
  feeding events into the topic.(I excluded the while loop in the paste
  below)
 
  What I see is the threads eventually all exit, even thought the producer
 is
  still sending events into the topic.
 
  My understanding is that the consumer thread per partition is the correct
  setup.
 
  Any ideas why this code doesn't continue to consume events at they are
  pushed to the topic?
 
  I suspect I am configuring something wrong here, but am not sure what.
 
  Thanks,
 
  Chris
 
 
  *T**opic Configuration*
 
  Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:
 
  Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2
 
  Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
 
  Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2
 
   Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
 
  *Producer Code:*
 
   Properties props = new Properties();
 
  props.put(metadata.broker.list, args[0]);
 
  props.put(zk.connect, args[1]);
 
  props.put(serializer.class, kafka.serializer.StringEncoder);
 
  props.put(request.required.acks, 1);
 
  String TOPIC = args[2];
 
  ProducerConfig config = new ProducerConfig(props);
 
  ProducerString, String producer = new ProducerString, String(
  config);
 
  finalEvent = new Timestamp(new Date().getTime()) + |
 
  + truckIds[0] + | + driverIds[0] + | +
  events[random
  .nextInt(evtCnt)]
 
  + | + getLatLong(arrayroute17[i]);
 
  try {
 
  KeyedMessageString, String data = new
  KeyedMessageString, String(TOPIC, finalEvent);
 
  LOG.info(Sending Messge #:  + routeName[0] + :  + i
 +,
  msg: + finalEvent);
 
  producer.send(data);
 
  Thread.sleep(1000);
 
  } catch (Exception e) {
 
  e.printStackTrace();
 
  }
 
 
  *Consumer Code:*
 
  public class ConsumerTest implements Runnable {
 
 private KafkaStream m_stream;
 
 private int m_threadNumber;
 
 public 

MultiThreaded HLConsumer Exits before events are all consumed

2015-04-29 Thread christopher palm
Hi All,

I am trying to get a multi threaded HL consumer working against a 2 broker
Kafka cluster with a 4 partition 2 replica  topic.

The consumer code is set to run with 4 threads, one for each partition.

The producer code uses the default partitioner and loops indefinitely
feeding events into the topic.(I excluded the while loop in the paste below)

What I see is the threads eventually all exit, even thought the producer is
still sending events into the topic.

My understanding is that the consumer thread per partition is the correct
setup.

Any ideas why this code doesn't continue to consume events at they are
pushed to the topic?

I suspect I am configuring something wrong here, but am not sure what.

Thanks,

Chris


*T**opic Configuration*

Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:

Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2

Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2

Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2

 Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2

*Producer Code:*

 Properties props = new Properties();

props.put(metadata.broker.list, args[0]);

props.put(zk.connect, args[1]);

props.put(serializer.class, kafka.serializer.StringEncoder);

props.put(request.required.acks, 1);

String TOPIC = args[2];

ProducerConfig config = new ProducerConfig(props);

ProducerString, String producer = new ProducerString, String(
config);

finalEvent = new Timestamp(new Date().getTime()) + |

+ truckIds[0] + | + driverIds[0] + | + events[random
.nextInt(evtCnt)]

+ | + getLatLong(arrayroute17[i]);

try {

KeyedMessageString, String data = new
KeyedMessageString, String(TOPIC, finalEvent);

LOG.info(Sending Messge #:  + routeName[0] + :  + i +,
msg: + finalEvent);

producer.send(data);

Thread.sleep(1000);

} catch (Exception e) {

e.printStackTrace();

}


*Consumer Code:*

public class ConsumerTest implements Runnable {

   private KafkaStream m_stream;

   private int m_threadNumber;

   public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {

   m_threadNumber = a_threadNumber;

   m_stream = a_stream;

   }

   public void run() {

   ConsumerIteratorbyte[], byte[] it = m_stream.iterator();

   while (it.hasNext()){

   System.out.println(Thread  + m_threadNumber + :  + new
String(it.next().message()));

   try {

 Thread.sleep(1000);

}catch (InterruptedException e) {

 e.printStackTrace();

 }

   }

   System.out.println(Shutting down Thread:  + m_threadNumber);

   }

}

public class ConsumerGroupExample {

private final ConsumerConnector consumer;

private final String topic;

private  ExecutorService executor;



public ConsumerGroupExample(String a_zookeeper, String a_groupId,
String a_topic) {

consumer = kafka.consumer.Consumer.createJavaConsumerConnector(

createConsumerConfig(a_zookeeper, a_groupId));

this.topic = a_topic;

}



public void shutdown() {

if (consumer != null) consumer.shutdown();

if (executor != null) executor.shutdown();

try {

if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {

System.out.println(Timed out waiting for consumer threads
to shut down, exiting uncleanly);

}

} catch (InterruptedException e) {

System.out.println(Interrupted during shutdown, exiting
uncleanly);

}

   }



public void run(int a_numThreads) {

MapString, Integer topicCountMap = new HashMapString, Integer();

topicCountMap.put(topic, new Integer(a_numThreads));

MapString, ListKafkaStreambyte[], byte[] consumerMap =
consumer.createMessageStreams(topicCountMap);

ListKafkaStreambyte[], byte[] streams = consumerMap.get(topic);

executor = Executors.newFixedThreadPool(a_numThreads);

int threadNumber = 0;

for (final KafkaStream stream : streams) {

executor.submit(new ConsumerTest(stream, threadNumber));

threadNumber++;

}

}



private static ConsumerConfig createConsumerConfig(String a_zookeeper,
String a_groupId) {

Properties props = new Properties();

props.put(zookeeper.connect, a_zookeeper);

props.put(group.id, a_groupId);

props.put(zookeeper.session.timeout.ms, 400);

props.put(zookeeper.sync.time.ms, 200);

props.put(auto.commit.interval.ms, 1000);

props.put(consumer.timeout.ms, -1);

 return new ConsumerConfig(props);

}



public static void main(String[] args) {

String zooKeeper = args[0];


Re: MultiThreaded HLConsumer Exits before events are all consumed

2015-04-29 Thread tao xiao
The log suggests that the shutdown method were still called

Thread 0: 2015-04-29
12:55:54.292|3|13|Normal|-74.1892627|41.33900999753

Last Shutdown via example.shutDown called!

15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
ZKConsumerConnector shutting down

Please ensure no consumer.shutdown(); and  executor.shutdown(); are called
during the course of your program

On Thu, Apr 30, 2015 at 2:23 AM, christopher palm cpa...@gmail.com wrote:

 Commenting out Example shutdown did not seem to make a difference, I added
 the print statement below to highlight the fact.

 The other threads still shut down, and only one thread lives on, eventually
 that dies after a few minutes as well

 Could this be that the producer default partitioner is isn't balancing data
 across all partitions?

 Thanks,
 Chris

 Thread 0: 2015-04-29
 12:55:54.292|3|13|Normal|-74.1892627|41.33900999753

 Last Shutdown via example.shutDown called!

 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
 ZKConsumerConnector shutting down

 15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka
 scheduler

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
 [ConsumerFetcherManager-1430330968420] Stopping leader finder thread

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
 -leader-finder-thread], Shutting down

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
 -leader-finder-thread], Stopped

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread:
 -leader-finder-thread], Shutdown completed

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
 [ConsumerFetcherManager-1430330968420] Stopping all fetchers

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
 [ConsumerFetcherThread-consumergroup], Shutting down

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
 [ConsumerFetcherThread-], Stopped

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
 [ConsumerFetcherThread-], Shutdown completed

 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
 [ConsumerFetcherManager-] All connections stopped

 15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event
 thread.

 Shutting down Thread: 2

 Shutting down Thread: 1

 Shutting down Thread: 3

 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:
 [consumergroup], ZKConsumerConnector shut down completed

 Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail
 distance|-73.99021500035|40.6636611

 15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector:
 [consumergroup], stopping watcher executor thread for consumer
 consumergroup

 Thread 0: 2015-04-29
 12:55:56.313|1|11|Normal|-79.74165300042|42.1304580009

 On Wed, Apr 29, 2015 at 10:11 AM, tao xiao xiaotao...@gmail.com wrote:

  example.shutdown(); in ConsumerGroupExample closes all consumer
 connections
  to Kafka. remove this line the consumer threads will run forever
 
  On Wed, Apr 29, 2015 at 9:42 PM, christopher palm cpa...@gmail.com
  wrote:
 
   Hi All,
  
   I am trying to get a multi threaded HL consumer working against a 2
  broker
   Kafka cluster with a 4 partition 2 replica  topic.
  
   The consumer code is set to run with 4 threads, one for each partition.
  
   The producer code uses the default partitioner and loops indefinitely
   feeding events into the topic.(I excluded the while loop in the paste
   below)
  
   What I see is the threads eventually all exit, even thought the
 producer
  is
   still sending events into the topic.
  
   My understanding is that the consumer thread per partition is the
 correct
   setup.
  
   Any ideas why this code doesn't continue to consume events at they are
   pushed to the topic?
  
   I suspect I am configuring something wrong here, but am not sure what.
  
   Thanks,
  
   Chris
  
  
   *T**opic Configuration*
  
   Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:
  
   Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr:
 1,2
  
   Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr:
 1,2
  
   Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr:
 1,2
  
Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr:
 1,2
  
   *Producer Code:*
  
Properties props = new Properties();
  
   props.put(metadata.broker.list, args[0]);
  
   props.put(zk.connect, args[1]);
  
   props.put(serializer.class,
 kafka.serializer.StringEncoder);
  
   props.put(request.required.acks, 1);
  
   String TOPIC = args[2];
  
   ProducerConfig config = new ProducerConfig(props);
  
   ProducerString, String producer = new ProducerString,
 String(
   config);
  
   finalEvent = new Timestamp(new Date().getTime()) + |
  
   + truckIds[0] + | + driverIds[0] + | +
   events[random
   .nextInt(evtCnt)]
  
   + 

Re: MultiThreaded HLConsumer Exits before events are all consumed

2015-04-29 Thread tao xiao
example.shutdown(); in ConsumerGroupExample closes all consumer connections
to Kafka. remove this line the consumer threads will run forever

On Wed, Apr 29, 2015 at 9:42 PM, christopher palm cpa...@gmail.com wrote:

 Hi All,

 I am trying to get a multi threaded HL consumer working against a 2 broker
 Kafka cluster with a 4 partition 2 replica  topic.

 The consumer code is set to run with 4 threads, one for each partition.

 The producer code uses the default partitioner and loops indefinitely
 feeding events into the topic.(I excluded the while loop in the paste
 below)

 What I see is the threads eventually all exit, even thought the producer is
 still sending events into the topic.

 My understanding is that the consumer thread per partition is the correct
 setup.

 Any ideas why this code doesn't continue to consume events at they are
 pushed to the topic?

 I suspect I am configuring something wrong here, but am not sure what.

 Thanks,

 Chris


 *T**opic Configuration*

 Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs:

 Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2

 Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2

 Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2

  Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2

 *Producer Code:*

  Properties props = new Properties();

 props.put(metadata.broker.list, args[0]);

 props.put(zk.connect, args[1]);

 props.put(serializer.class, kafka.serializer.StringEncoder);

 props.put(request.required.acks, 1);

 String TOPIC = args[2];

 ProducerConfig config = new ProducerConfig(props);

 ProducerString, String producer = new ProducerString, String(
 config);

 finalEvent = new Timestamp(new Date().getTime()) + |

 + truckIds[0] + | + driverIds[0] + | +
 events[random
 .nextInt(evtCnt)]

 + | + getLatLong(arrayroute17[i]);

 try {

 KeyedMessageString, String data = new
 KeyedMessageString, String(TOPIC, finalEvent);

 LOG.info(Sending Messge #:  + routeName[0] + :  + i +,
 msg: + finalEvent);

 producer.send(data);

 Thread.sleep(1000);

 } catch (Exception e) {

 e.printStackTrace();

 }


 *Consumer Code:*

 public class ConsumerTest implements Runnable {

private KafkaStream m_stream;

private int m_threadNumber;

public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {

m_threadNumber = a_threadNumber;

m_stream = a_stream;

}

public void run() {

ConsumerIteratorbyte[], byte[] it = m_stream.iterator();

while (it.hasNext()){

System.out.println(Thread  + m_threadNumber + :  + new
 String(it.next().message()));

try {

  Thread.sleep(1000);

 }catch (InterruptedException e) {

  e.printStackTrace();

  }

}

System.out.println(Shutting down Thread:  + m_threadNumber);

}

 }

 public class ConsumerGroupExample {

 private final ConsumerConnector consumer;

 private final String topic;

 private  ExecutorService executor;



 public ConsumerGroupExample(String a_zookeeper, String a_groupId,
 String a_topic) {

 consumer = kafka.consumer.Consumer.createJavaConsumerConnector(

 createConsumerConfig(a_zookeeper, a_groupId));

 this.topic = a_topic;

 }



 public void shutdown() {

 if (consumer != null) consumer.shutdown();

 if (executor != null) executor.shutdown();

 try {

 if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {

 System.out.println(Timed out waiting for consumer threads
 to shut down, exiting uncleanly);

 }

 } catch (InterruptedException e) {

 System.out.println(Interrupted during shutdown, exiting
 uncleanly);

 }

}



 public void run(int a_numThreads) {

 MapString, Integer topicCountMap = new HashMapString,
 Integer();

 topicCountMap.put(topic, new Integer(a_numThreads));

 MapString, ListKafkaStreambyte[], byte[] consumerMap =
 consumer.createMessageStreams(topicCountMap);

 ListKafkaStreambyte[], byte[] streams = consumerMap.get(topic);

 executor = Executors.newFixedThreadPool(a_numThreads);

 int threadNumber = 0;

 for (final KafkaStream stream : streams) {

 executor.submit(new ConsumerTest(stream, threadNumber));

 threadNumber++;

 }

 }



 private static ConsumerConfig createConsumerConfig(String a_zookeeper,
 String a_groupId) {

 Properties props = new Properties();

 props.put(zookeeper.connect, a_zookeeper);

 props.put(group.id, a_groupId);