Hi Joel, I'd be happy to try it, but am a bit concerned about porting any other 0.8 api changes to get everything working (I'd rather not expend the effort unless there's a stable version I can port to). Or should I just be able to drop latest trunk (or 0.8.x) code in place without any changes?
Also, is there a ready bundled download beyond 0.7.2, or do I need download sources and build everything locally? Jason On Mon, Nov 5, 2012 at 7:10 PM, Joel Koshy <jjkosh...@gmail.com> wrote: > Can you try the latest from trunk? This might be related to > https://issues.apache.org/jira/browse/KAFKA-550 which did not make it into > 0.7.2 > > Thanks, > > Joel > > > On Mon, Nov 5, 2012 at 4:34 PM, Jason Rosenberg <j...@squareup.com> wrote: > > > (I suspect this a bug, but thought I'd check with the group before > filing) > > > > If I create consumer streams using a topic filter, and request more > threads > > than are actually allocated (based on the current dynamic topic event > > watcher, etc.), the unused threads don't get the shutdown message, when > the > > connector is shut down. > > > > Specifically, if I have code that looks like: > > > > Whitelist topicRegex = new Whitelist("^metrics\\..*$"); > > List<KafkaStream<Message>> streams = > > consumerConnector.createMessageStreamsByFilter(topicRegex, > > consumerThreads); > > > > ExecutorService executor = > > Executors.newFixedThreadPool(consumerThreads); > > > > for (final KafkaStream<Message> stream : streams) { > > executor.submit(new Runnable() { > > @Override public void run() { > > for (MessageAndMetadata<Message> msgAndMetadata : stream) { > > // do some processing > > } > > } > > }; > > } > > > > And the number of consumerThreads is say, 20, and only 1 stream ends up > > receiving messages, then only that 1 stream's thread gets the > > ZookeeperConsumerConnector.shutdownCommand, which is how the KafkaStream > > iterator gets notified to exit. > > > > Looking at ZookeeperConsumerConnector.scala, it looks like the > > 'topicThreadIdAndQueues' list does not contain entries for all the > > threadId's, depending on the current state of rebalancing, and thus, the > > method, sendShutdownToAllQueues() doesn't actually do what it's intended > to > > do. > > > > The result, is that it's not possible to cleanly shutdown a consumer. > > > > I am using 0.7.2. > > > > Jason > > >