Can you try the latest from trunk? This might be related to https://issues.apache.org/jira/browse/KAFKA-550 which did not make it into 0.7.2
Thanks, Joel On Mon, Nov 5, 2012 at 4:34 PM, Jason Rosenberg <j...@squareup.com> wrote: > (I suspect this a bug, but thought I'd check with the group before filing) > > If I create consumer streams using a topic filter, and request more threads > than are actually allocated (based on the current dynamic topic event > watcher, etc.), the unused threads don't get the shutdown message, when the > connector is shut down. > > Specifically, if I have code that looks like: > > Whitelist topicRegex = new Whitelist("^metrics\\..*$"); > List<KafkaStream<Message>> streams = > consumerConnector.createMessageStreamsByFilter(topicRegex, > consumerThreads); > > ExecutorService executor = > Executors.newFixedThreadPool(consumerThreads); > > for (final KafkaStream<Message> stream : streams) { > executor.submit(new Runnable() { > @Override public void run() { > for (MessageAndMetadata<Message> msgAndMetadata : stream) { > // do some processing > } > } > }; > } > > And the number of consumerThreads is say, 20, and only 1 stream ends up > receiving messages, then only that 1 stream's thread gets the > ZookeeperConsumerConnector.shutdownCommand, which is how the KafkaStream > iterator gets notified to exit. > > Looking at ZookeeperConsumerConnector.scala, it looks like the > 'topicThreadIdAndQueues' list does not contain entries for all the > threadId's, depending on the current state of rebalancing, and thus, the > method, sendShutdownToAllQueues() doesn't actually do what it's intended to > do. > > The result, is that it's not possible to cleanly shutdown a consumer. > > I am using 0.7.2. > > Jason >