Re: MultiThreaded HLConsumer Exits before events are all consumed
What I found was 2 problems. 1. The producer wasn't passing in a partition key, so not all partitions were getting data. 2. After fixing the producer, I could see all threads getting data consistently then the shutdown method was clearly killing the threads. I have removed the shutdown,and with the producer changes sending in a key, this looks like it is running correctly now. Thanks! On Wed, Apr 29, 2015 at 10:59 PM, tao xiao xiaotao...@gmail.com wrote: The log suggests that the shutdown method were still called Thread 0: 2015-04-29 12:55:54.292|3|13|Normal|-74.1892627|41.33900999753 Last Shutdown via example.shutDown called! 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:, ZKConsumerConnector shutting down Please ensure no consumer.shutdown(); and executor.shutdown(); are called during the course of your program On Thu, Apr 30, 2015 at 2:23 AM, christopher palm cpa...@gmail.com wrote: Commenting out Example shutdown did not seem to make a difference, I added the print statement below to highlight the fact. The other threads still shut down, and only one thread lives on, eventually that dies after a few minutes as well Could this be that the producer default partitioner is isn't balancing data across all partitions? Thanks, Chris Thread 0: 2015-04-29 12:55:54.292|3|13|Normal|-74.1892627|41.33900999753 Last Shutdown via example.shutDown called! 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:, ZKConsumerConnector shutting down 15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka scheduler 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1430330968420] Stopping leader finder thread 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Shutting down 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Stopped 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Shutdown completed 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1430330968420] Stopping all fetchers 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-consumergroup], Shutting down 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-], Stopped 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-], Shutdown completed 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-] All connections stopped 15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event thread. Shutting down Thread: 2 Shutting down Thread: 1 Shutting down Thread: 3 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector: [consumergroup], ZKConsumerConnector shut down completed Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail distance|-73.99021500035|40.6636611 15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector: [consumergroup], stopping watcher executor thread for consumer consumergroup Thread 0: 2015-04-29 12:55:56.313|1|11|Normal|-79.74165300042|42.1304580009 On Wed, Apr 29, 2015 at 10:11 AM, tao xiao xiaotao...@gmail.com wrote: example.shutdown(); in ConsumerGroupExample closes all consumer connections to Kafka. remove this line the consumer threads will run forever On Wed, Apr 29, 2015 at 9:42 PM, christopher palm cpa...@gmail.com wrote: Hi All, I am trying to get a multi threaded HL consumer working against a 2 broker Kafka cluster with a 4 partition 2 replica topic. The consumer code is set to run with 4 threads, one for each partition. The producer code uses the default partitioner and loops indefinitely feeding events into the topic.(I excluded the while loop in the paste below) What I see is the threads eventually all exit, even thought the producer is still sending events into the topic. My understanding is that the consumer thread per partition is the correct setup. Any ideas why this code doesn't continue to consume events at they are pushed to the topic? I suspect I am configuring something wrong here, but am not sure what. Thanks, Chris *T**opic Configuration* Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs: Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2 *Producer Code:* Properties props = new Properties();
Re: MultiThreaded HLConsumer Exits before events are all consumed
Commenting out Example shutdown did not seem to make a difference, I added the print statement below to highlight the fact. The other threads still shut down, and only one thread lives on, eventually that dies after a few minutes as well Could this be that the producer default partitioner is isn't balancing data across all partitions? Thanks, Chris Thread 0: 2015-04-29 12:55:54.292|3|13|Normal|-74.1892627|41.33900999753 Last Shutdown via example.shutDown called! 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:, ZKConsumerConnector shutting down 15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka scheduler 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1430330968420] Stopping leader finder thread 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Shutting down 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Stopped 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Shutdown completed 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1430330968420] Stopping all fetchers 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-consumergroup], Shutting down 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-], Stopped 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-], Shutdown completed 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-] All connections stopped 15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event thread. Shutting down Thread: 2 Shutting down Thread: 1 Shutting down Thread: 3 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector: [consumergroup], ZKConsumerConnector shut down completed Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail distance|-73.99021500035|40.6636611 15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector: [consumergroup], stopping watcher executor thread for consumer consumergroup Thread 0: 2015-04-29 12:55:56.313|1|11|Normal|-79.74165300042|42.1304580009 On Wed, Apr 29, 2015 at 10:11 AM, tao xiao xiaotao...@gmail.com wrote: example.shutdown(); in ConsumerGroupExample closes all consumer connections to Kafka. remove this line the consumer threads will run forever On Wed, Apr 29, 2015 at 9:42 PM, christopher palm cpa...@gmail.com wrote: Hi All, I am trying to get a multi threaded HL consumer working against a 2 broker Kafka cluster with a 4 partition 2 replica topic. The consumer code is set to run with 4 threads, one for each partition. The producer code uses the default partitioner and loops indefinitely feeding events into the topic.(I excluded the while loop in the paste below) What I see is the threads eventually all exit, even thought the producer is still sending events into the topic. My understanding is that the consumer thread per partition is the correct setup. Any ideas why this code doesn't continue to consume events at they are pushed to the topic? I suspect I am configuring something wrong here, but am not sure what. Thanks, Chris *T**opic Configuration* Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs: Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2 *Producer Code:* Properties props = new Properties(); props.put(metadata.broker.list, args[0]); props.put(zk.connect, args[1]); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(request.required.acks, 1); String TOPIC = args[2]; ProducerConfig config = new ProducerConfig(props); ProducerString, String producer = new ProducerString, String( config); finalEvent = new Timestamp(new Date().getTime()) + | + truckIds[0] + | + driverIds[0] + | + events[random .nextInt(evtCnt)] + | + getLatLong(arrayroute17[i]); try { KeyedMessageString, String data = new KeyedMessageString, String(TOPIC, finalEvent); LOG.info(Sending Messge #: + routeName[0] + : + i +, msg: + finalEvent); producer.send(data); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } *Consumer Code:* public class ConsumerTest implements Runnable { private KafkaStream m_stream; private int m_threadNumber; public
MultiThreaded HLConsumer Exits before events are all consumed
Hi All, I am trying to get a multi threaded HL consumer working against a 2 broker Kafka cluster with a 4 partition 2 replica topic. The consumer code is set to run with 4 threads, one for each partition. The producer code uses the default partitioner and loops indefinitely feeding events into the topic.(I excluded the while loop in the paste below) What I see is the threads eventually all exit, even thought the producer is still sending events into the topic. My understanding is that the consumer thread per partition is the correct setup. Any ideas why this code doesn't continue to consume events at they are pushed to the topic? I suspect I am configuring something wrong here, but am not sure what. Thanks, Chris *T**opic Configuration* Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs: Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2 *Producer Code:* Properties props = new Properties(); props.put(metadata.broker.list, args[0]); props.put(zk.connect, args[1]); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(request.required.acks, 1); String TOPIC = args[2]; ProducerConfig config = new ProducerConfig(props); ProducerString, String producer = new ProducerString, String( config); finalEvent = new Timestamp(new Date().getTime()) + | + truckIds[0] + | + driverIds[0] + | + events[random .nextInt(evtCnt)] + | + getLatLong(arrayroute17[i]); try { KeyedMessageString, String data = new KeyedMessageString, String(TOPIC, finalEvent); LOG.info(Sending Messge #: + routeName[0] + : + i +, msg: + finalEvent); producer.send(data); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } *Consumer Code:* public class ConsumerTest implements Runnable { private KafkaStream m_stream; private int m_threadNumber; public ConsumerTest(KafkaStream a_stream, int a_threadNumber) { m_threadNumber = a_threadNumber; m_stream = a_stream; } public void run() { ConsumerIteratorbyte[], byte[] it = m_stream.iterator(); while (it.hasNext()){ System.out.println(Thread + m_threadNumber + : + new String(it.next().message())); try { Thread.sleep(1000); }catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Shutting down Thread: + m_threadNumber); } } public class ConsumerGroupExample { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println(Timed out waiting for consumer threads to shut down, exiting uncleanly); } } catch (InterruptedException e) { System.out.println(Interrupted during shutdown, exiting uncleanly); } } public void run(int a_numThreads) { MapString, Integer topicCountMap = new HashMapString, Integer(); topicCountMap.put(topic, new Integer(a_numThreads)); MapString, ListKafkaStreambyte[], byte[] consumerMap = consumer.createMessageStreams(topicCountMap); ListKafkaStreambyte[], byte[] streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(a_numThreads); int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); threadNumber++; } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put(zookeeper.connect, a_zookeeper); props.put(group.id, a_groupId); props.put(zookeeper.session.timeout.ms, 400); props.put(zookeeper.sync.time.ms, 200); props.put(auto.commit.interval.ms, 1000); props.put(consumer.timeout.ms, -1); return new ConsumerConfig(props); } public static void main(String[] args) { String zooKeeper = args[0];
Re: MultiThreaded HLConsumer Exits before events are all consumed
The log suggests that the shutdown method were still called Thread 0: 2015-04-29 12:55:54.292|3|13|Normal|-74.1892627|41.33900999753 Last Shutdown via example.shutDown called! 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:, ZKConsumerConnector shutting down Please ensure no consumer.shutdown(); and executor.shutdown(); are called during the course of your program On Thu, Apr 30, 2015 at 2:23 AM, christopher palm cpa...@gmail.com wrote: Commenting out Example shutdown did not seem to make a difference, I added the print statement below to highlight the fact. The other threads still shut down, and only one thread lives on, eventually that dies after a few minutes as well Could this be that the producer default partitioner is isn't balancing data across all partitions? Thanks, Chris Thread 0: 2015-04-29 12:55:54.292|3|13|Normal|-74.1892627|41.33900999753 Last Shutdown via example.shutDown called! 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:, ZKConsumerConnector shutting down 15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka scheduler 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1430330968420] Stopping leader finder thread 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Shutting down 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Stopped 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: -leader-finder-thread], Shutdown completed 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1430330968420] Stopping all fetchers 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-consumergroup], Shutting down 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-], Stopped 15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-], Shutdown completed 15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-] All connections stopped 15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event thread. Shutting down Thread: 2 Shutting down Thread: 1 Shutting down Thread: 3 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector: [consumergroup], ZKConsumerConnector shut down completed Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail distance|-73.99021500035|40.6636611 15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector: [consumergroup], stopping watcher executor thread for consumer consumergroup Thread 0: 2015-04-29 12:55:56.313|1|11|Normal|-79.74165300042|42.1304580009 On Wed, Apr 29, 2015 at 10:11 AM, tao xiao xiaotao...@gmail.com wrote: example.shutdown(); in ConsumerGroupExample closes all consumer connections to Kafka. remove this line the consumer threads will run forever On Wed, Apr 29, 2015 at 9:42 PM, christopher palm cpa...@gmail.com wrote: Hi All, I am trying to get a multi threaded HL consumer working against a 2 broker Kafka cluster with a 4 partition 2 replica topic. The consumer code is set to run with 4 threads, one for each partition. The producer code uses the default partitioner and loops indefinitely feeding events into the topic.(I excluded the while loop in the paste below) What I see is the threads eventually all exit, even thought the producer is still sending events into the topic. My understanding is that the consumer thread per partition is the correct setup. Any ideas why this code doesn't continue to consume events at they are pushed to the topic? I suspect I am configuring something wrong here, but am not sure what. Thanks, Chris *T**opic Configuration* Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs: Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2 *Producer Code:* Properties props = new Properties(); props.put(metadata.broker.list, args[0]); props.put(zk.connect, args[1]); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(request.required.acks, 1); String TOPIC = args[2]; ProducerConfig config = new ProducerConfig(props); ProducerString, String producer = new ProducerString, String( config); finalEvent = new Timestamp(new Date().getTime()) + | + truckIds[0] + | + driverIds[0] + | + events[random .nextInt(evtCnt)] +
Re: MultiThreaded HLConsumer Exits before events are all consumed
example.shutdown(); in ConsumerGroupExample closes all consumer connections to Kafka. remove this line the consumer threads will run forever On Wed, Apr 29, 2015 at 9:42 PM, christopher palm cpa...@gmail.com wrote: Hi All, I am trying to get a multi threaded HL consumer working against a 2 broker Kafka cluster with a 4 partition 2 replica topic. The consumer code is set to run with 4 threads, one for each partition. The producer code uses the default partitioner and loops indefinitely feeding events into the topic.(I excluded the while loop in the paste below) What I see is the threads eventually all exit, even thought the producer is still sending events into the topic. My understanding is that the consumer thread per partition is the correct setup. Any ideas why this code doesn't continue to consume events at they are pushed to the topic? I suspect I am configuring something wrong here, but am not sure what. Thanks, Chris *T**opic Configuration* Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2 Configs: Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr: 1,2 Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2 *Producer Code:* Properties props = new Properties(); props.put(metadata.broker.list, args[0]); props.put(zk.connect, args[1]); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(request.required.acks, 1); String TOPIC = args[2]; ProducerConfig config = new ProducerConfig(props); ProducerString, String producer = new ProducerString, String( config); finalEvent = new Timestamp(new Date().getTime()) + | + truckIds[0] + | + driverIds[0] + | + events[random .nextInt(evtCnt)] + | + getLatLong(arrayroute17[i]); try { KeyedMessageString, String data = new KeyedMessageString, String(TOPIC, finalEvent); LOG.info(Sending Messge #: + routeName[0] + : + i +, msg: + finalEvent); producer.send(data); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } *Consumer Code:* public class ConsumerTest implements Runnable { private KafkaStream m_stream; private int m_threadNumber; public ConsumerTest(KafkaStream a_stream, int a_threadNumber) { m_threadNumber = a_threadNumber; m_stream = a_stream; } public void run() { ConsumerIteratorbyte[], byte[] it = m_stream.iterator(); while (it.hasNext()){ System.out.println(Thread + m_threadNumber + : + new String(it.next().message())); try { Thread.sleep(1000); }catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Shutting down Thread: + m_threadNumber); } } public class ConsumerGroupExample { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig(a_zookeeper, a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println(Timed out waiting for consumer threads to shut down, exiting uncleanly); } } catch (InterruptedException e) { System.out.println(Interrupted during shutdown, exiting uncleanly); } } public void run(int a_numThreads) { MapString, Integer topicCountMap = new HashMapString, Integer(); topicCountMap.put(topic, new Integer(a_numThreads)); MapString, ListKafkaStreambyte[], byte[] consumerMap = consumer.createMessageStreams(topicCountMap); ListKafkaStreambyte[], byte[] streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(a_numThreads); int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerTest(stream, threadNumber)); threadNumber++; } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put(zookeeper.connect, a_zookeeper); props.put(group.id, a_groupId);