What I found was  2 problems.
1. The producer wasn't passing in a partition key, so not all partitions
were getting data.
2. After fixing the producer, I could see all threads getting data
consistently then the shutdown method was
clearly killing the threads.
I have removed the shutdown,and with the producer changes sending in a key,
this looks like it is running correctly now.

Thanks!

On Wed, Apr 29, 2015 at 10:59 PM, tao xiao <xiaotao...@gmail.com> wrote:

> The log suggests that the shutdown method were still called
>
> Thread 0: 2015-04-29
> 12:55:54.292|3|13|Normal|-74.189262999999997|41.339009999999753
>
> Last Shutdown via example.shutDown called!
>
> 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
> ZKConsumerConnector shutting down
>
> Please ensure no consumer.shutdown(); and  executor.shutdown(); are called
> during the course of your program
>
> On Thu, Apr 30, 2015 at 2:23 AM, christopher palm <cpa...@gmail.com>
> wrote:
>
> > Commenting out Example shutdown did not seem to make a difference, I
> added
> > the print statement below to highlight the fact.
> >
> > The other threads still shut down, and only one thread lives on,
> eventually
> > that dies after a few minutes as well
> >
> > Could this be that the producer default partitioner is isn't balancing
> data
> > across all partitions?
> >
> > Thanks,
> > Chris
> >
> > Thread 0: 2015-04-29
> > 12:55:54.292|3|13|Normal|-74.189262999999997|41.339009999999753
> >
> > Last Shutdown via example.shutDown called!
> >
> > 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
> > ZKConsumerConnector shutting down
> >
> > 15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka
> > scheduler
> >
> > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
> > [ConsumerFetcherManager-1430330968420] Stopping leader finder thread
> >
> > 15/04/29 13:09:38 INFO
> consumer.ConsumerFetcherManager$LeaderFinderThread:
> > -leader-finder-thread], Shutting down
> >
> > 15/04/29 13:09:38 INFO
> consumer.ConsumerFetcherManager$LeaderFinderThread:
> > -leader-finder-thread], Stopped
> >
> > 15/04/29 13:09:38 INFO
> consumer.ConsumerFetcherManager$LeaderFinderThread:
> > -leader-finder-thread], Shutdown completed
> >
> > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
> > [ConsumerFetcherManager-1430330968420] Stopping all fetchers
> >
> > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
> > [ConsumerFetcherThread-consumergroup], Shutting down
> >
> > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
> > [ConsumerFetcherThread-], Stopped
> >
> > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
> > [ConsumerFetcherThread-], Shutdown completed
> >
> > 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
> > [ConsumerFetcherManager-] All connections stopped
> >
> > 15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event
> > thread.
> >
> > Shutting down Thread: 2
> >
> > Shutting down Thread: 1
> >
> > Shutting down Thread: 3
> >
> > 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:
> > [consumergroup], ZKConsumerConnector shut down completed
> >
> > Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail
> > distance|-73.990215000000035|40.663669999999911
> >
> > 15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector:
> > [consumergroup], stopping watcher executor thread for consumer
> > consumergroup
> >
> > Thread 0: 2015-04-29
> > 12:55:56.313|1|11|Normal|-79.741653000000042|42.13045800000009
> >
> > On Wed, Apr 29, 2015 at 10:11 AM, tao xiao <xiaotao...@gmail.com> wrote:
> >
> > > example.shutdown(); in ConsumerGroupExample closes all consumer
> > connections
> > > to Kafka. remove this line the consumer threads will run forever
> > >
> > > On Wed, Apr 29, 2015 at 9:42 PM, christopher palm <cpa...@gmail.com>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I am trying to get a multi threaded HL consumer working against a 2
> > > broker
> > > > Kafka cluster with a 4 partition 2 replica  topic.
> > > >
> > > > The consumer code is set to run with 4 threads, one for each
> partition.
> > > >
> > > > The producer code uses the default partitioner and loops indefinitely
> > > > feeding events into the topic.(I excluded the while loop in the paste
> > > > below)
> > > >
> > > > What I see is the threads eventually all exit, even thought the
> > producer
> > > is
> > > > still sending events into the topic.
> > > >
> > > > My understanding is that the consumer thread per partition is the
> > correct
> > > > setup.
> > > >
> > > > Any ideas why this code doesn't continue to consume events at they
> are
> > > > pushed to the topic?
> > > >
> > > > I suspect I am configuring something wrong here, but am not sure
> what.
> > > >
> > > > Thanks,
> > > >
> > > > Chris
> > > >
> > > >
> > > > *T**opic Configuration*
> > > >
> > > > Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2
> Configs:
> > > >
> > > > Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr:
> > 1,2
> > > >
> > > > Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr:
> > 1,2
> > > >
> > > > Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr:
> > 1,2
> > > >
> > > >  Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr:
> > 1,2
> > > >
> > > > *Producer Code:*
> > > >
> > > >      Properties props = new Properties();
> > > >
> > > >         props.put("metadata.broker.list", args[0]);
> > > >
> > > >         props.put("zk.connect", args[1]);
> > > >
> > > >         props.put("serializer.class",
> > "kafka.serializer.StringEncoder");
> > > >
> > > >         props.put("request.required.acks", "1");
> > > >
> > > >         String TOPIC = args[2];
> > > >
> > > >         ProducerConfig config = new ProducerConfig(props);
> > > >
> > > >         Producer<String, String> producer = new Producer<String,
> > String>(
> > > > config);
> > > >
> > > >         finalEvent = new Timestamp(new Date().getTime()) + "|"
> > > >
> > > >                     + truckIds[0] + "|" + driverIds[0] + "|" +
> > > > events[random
> > > > .nextInt(evtCnt)]
> > > >
> > > >                     + "|" + getLatLong(arrayroute17[i]);
> > > >
> > > >         try {
> > > >
> > > >                 KeyedMessage<String, String> data = new
> > > > KeyedMessage<String, String>(TOPIC, finalEvent);
> > > >
> > > >                 LOG.info("Sending Messge #: " + routeName[0] + ": "
> + i
> > > +",
> > > > msg:" + finalEvent);
> > > >
> > > >                 producer.send(data);
> > > >
> > > >                 Thread.sleep(1000);
> > > >
> > > >             } catch (Exception e) {
> > > >
> > > >                 e.printStackTrace();
> > > >
> > > >             }
> > > >
> > > >
> > > > *Consumer Code:*
> > > >
> > > > public class ConsumerTest implements Runnable {
> > > >
> > > >    private KafkaStream m_stream;
> > > >
> > > >    private int m_threadNumber;
> > > >
> > > >    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
> > > >
> > > >        m_threadNumber = a_threadNumber;
> > > >
> > > >        m_stream = a_stream;
> > > >
> > > >    }
> > > >
> > > >    public void run() {
> > > >
> > > >        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
> > > >
> > > >        while (it.hasNext()){
> > > >
> > > >            System.out.println("Thread " + m_threadNumber + ": " + new
> > > > String(it.next().message()));
> > > >
> > > >            try {
> > > >
> > > >              Thread.sleep(1000);
> > > >
> > > >             }catch (InterruptedException e) {
> > > >
> > > >                  e.printStackTrace();
> > > >
> > > >  }
> > > >
> > > >        }
> > > >
> > > >        System.out.println("Shutting down Thread: " + m_threadNumber);
> > > >
> > > >    }
> > > >
> > > > }
> > > >
> > > > public class ConsumerGroupExample {
> > > >
> > > >     private final ConsumerConnector consumer;
> > > >
> > > >     private final String topic;
> > > >
> > > >     private  ExecutorService executor;
> > > >
> > > >
> > > >
> > > >     public ConsumerGroupExample(String a_zookeeper, String a_groupId,
> > > > String a_topic) {
> > > >
> > > >         consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(
> > > >
> > > >                 createConsumerConfig(a_zookeeper, a_groupId));
> > > >
> > > >         this.topic = a_topic;
> > > >
> > > >     }
> > > >
> > > >
> > > >
> > > >     public void shutdown() {
> > > >
> > > >         if (consumer != null) consumer.shutdown();
> > > >
> > > >         if (executor != null) executor.shutdown();
> > > >
> > > >         try {
> > > >
> > > >             if (!executor.awaitTermination(5000,
> > TimeUnit.MILLISECONDS))
> > > {
> > > >
> > > >                 System.out.println("Timed out waiting for consumer
> > > threads
> > > > to shut down, exiting uncleanly");
> > > >
> > > >             }
> > > >
> > > >         } catch (InterruptedException e) {
> > > >
> > > >             System.out.println("Interrupted during shutdown, exiting
> > > > uncleanly");
> > > >
> > > >         }
> > > >
> > > >    }
> > > >
> > > >
> > > >
> > > >     public void run(int a_numThreads) {
> > > >
> > > >         Map<String, Integer> topicCountMap = new HashMap<String,
> > > > Integer>();
> > > >
> > > >         topicCountMap.put(topic, new Integer(a_numThreads));
> > > >
> > > >         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > > > consumer.createMessageStreams(topicCountMap);
> > > >
> > > >         List<KafkaStream<byte[], byte[]>> streams =
> > > consumerMap.get(topic);
> > > >
> > > >         executor = Executors.newFixedThreadPool(a_numThreads);
> > > >
> > > >         int threadNumber = 0;
> > > >
> > > >         for (final KafkaStream stream : streams) {
> > > >
> > > >             executor.submit(new ConsumerTest(stream, threadNumber));
> > > >
> > > >             threadNumber++;
> > > >
> > > >         }
> > > >
> > > >     }
> > > >
> > > >
> > > >
> > > >     private static ConsumerConfig createConsumerConfig(String
> > > a_zookeeper,
> > > > String a_groupId) {
> > > >
> > > >         Properties props = new Properties();
> > > >
> > > >         props.put("zookeeper.connect", a_zookeeper);
> > > >
> > > >         props.put("group.id", a_groupId);
> > > >
> > > >         props.put("zookeeper.session.timeout.ms", "400");
> > > >
> > > >         props.put("zookeeper.sync.time.ms", "200");
> > > >
> > > >         props.put("auto.commit.interval.ms", "1000");
> > > >
> > > >         props.put("consumer.timeout.ms", "-1");
> > > >
> > > >          return new ConsumerConfig(props);
> > > >
> > > >     }
> > > >
> > > >
> > > >
> > > >     public static void main(String[] args) {
> > > >
> > > >         String zooKeeper = args[0];
> > > >
> > > >         String groupId = args[1];
> > > >
> > > >         String topic = args[2];
> > > >
> > > >         int threads = Integer.parseInt(args[3]);
> > > >
> > > >         ConsumerGroupExample example = new
> > > ConsumerGroupExample(zooKeeper,
> > > > groupId, topic);
> > > >
> > > >         example.run(threads);
> > > >
> > > >         try {
> > > >
> > > >             Thread.sleep(10000);
> > > >
> > > >         } catch (InterruptedException ie) {
> > > >
> > > >
> > > >
> > > >         }
> > > >
> > > >         example.shutdown();
> > > >
> > > >     }
> > > >
> > > > }
> > > >
> > >
> > >
> > >
> > > --
> > > Regards,
> > > Tao
> > >
> >
>
>
>
> --
> Regards,
> Tao
>

Reply via email to