That would be a bug. Can you file a jira? Thanks,
Joel On Tue, Nov 6, 2012 at 1:43 PM, Jason Rosenberg <j...@squareup.com> wrote: > Ok, > > So one variation on this which does not appear to be working correctly on > trunk, is if I start up a consumer connector, but then never send any > messages to it, and then shut it down, it's consumer threads never get > notified of the shut down. > > It seems there's a dependency on initially receiving at least one message > and processing it, before the topicThreadIdAndQueues object gets > initialized. > > Shall I file this one? > > Jason > > > > On Mon, Nov 5, 2012 at 8:01 PM, Jason Rosenberg <j...@squareup.com> wrote: > > > Interestingly, I just checked out the latest sources, and did the build, > > and it produced jars for 0.7.0! What's that about? > > > > Anyway, it does indeed look like this issue is indeed fixed with the > > latest trunk. > > > > Will there be a released version of this, prior to 0.8? Or will there be > > a beta for 0.8 upcoming? > > > > Thanks, > > > > Jason > > > > > > On Mon, Nov 5, 2012 at 7:35 PM, Jason Rosenberg <j...@squareup.com> > wrote: > > > >> Hi Joel, > >> > >> I'd be happy to try it, but am a bit concerned about porting any other > >> 0.8 api changes to get everything working (I'd rather not expend the > effort > >> unless there's a stable version I can port to). Or should I just be > able > >> to drop latest trunk (or 0.8.x) code in place without any changes? > >> > >> Also, is there a ready bundled download beyond 0.7.2, or do I need > >> download sources and build everything locally? > >> > >> Jason > >> > >> > >> On Mon, Nov 5, 2012 at 7:10 PM, Joel Koshy <jjkosh...@gmail.com> wrote: > >> > >>> Can you try the latest from trunk? This might be related to > >>> https://issues.apache.org/jira/browse/KAFKA-550 which did not make it > >>> into > >>> 0.7.2 > >>> > >>> Thanks, > >>> > >>> Joel > >>> > >>> > >>> On Mon, Nov 5, 2012 at 4:34 PM, Jason Rosenberg <j...@squareup.com> > >>> wrote: > >>> > >>> > (I suspect this a bug, but thought I'd check with the group before > >>> filing) > >>> > > >>> > If I create consumer streams using a topic filter, and request more > >>> threads > >>> > than are actually allocated (based on the current dynamic topic event > >>> > watcher, etc.), the unused threads don't get the shutdown message, > >>> when the > >>> > connector is shut down. > >>> > > >>> > Specifically, if I have code that looks like: > >>> > > >>> > Whitelist topicRegex = new Whitelist("^metrics\\..*$"); > >>> > List<KafkaStream<Message>> streams = > >>> > consumerConnector.createMessageStreamsByFilter(topicRegex, > >>> > consumerThreads); > >>> > > >>> > ExecutorService executor = > >>> > Executors.newFixedThreadPool(consumerThreads); > >>> > > >>> > for (final KafkaStream<Message> stream : streams) { > >>> > executor.submit(new Runnable() { > >>> > @Override public void run() { > >>> > for (MessageAndMetadata<Message> msgAndMetadata : > >>> stream) { > >>> > // do some processing > >>> > } > >>> > } > >>> > }; > >>> > } > >>> > > >>> > And the number of consumerThreads is say, 20, and only 1 stream ends > up > >>> > receiving messages, then only that 1 stream's thread gets the > >>> > ZookeeperConsumerConnector.shutdownCommand, which is how the > >>> KafkaStream > >>> > iterator gets notified to exit. > >>> > > >>> > Looking at ZookeeperConsumerConnector.scala, it looks like the > >>> > 'topicThreadIdAndQueues' list does not contain entries for all the > >>> > threadId's, depending on the current state of rebalancing, and thus, > >>> the > >>> > method, sendShutdownToAllQueues() doesn't actually do what it's > >>> intended to > >>> > do. > >>> > > >>> > The result, is that it's not possible to cleanly shutdown a consumer. > >>> > > >>> > I am using 0.7.2. > >>> > > >>> > Jason > >>> > > >>> > >> > >> > > >