Ok, So one variation on this which does not appear to be working correctly on trunk, is if I start up a consumer connector, but then never send any messages to it, and then shut it down, it's consumer threads never get notified of the shut down.
It seems there's a dependency on initially receiving at least one message and processing it, before the topicThreadIdAndQueues object gets initialized. Shall I file this one? Jason On Mon, Nov 5, 2012 at 8:01 PM, Jason Rosenberg <j...@squareup.com> wrote: > Interestingly, I just checked out the latest sources, and did the build, > and it produced jars for 0.7.0! What's that about? > > Anyway, it does indeed look like this issue is indeed fixed with the > latest trunk. > > Will there be a released version of this, prior to 0.8? Or will there be > a beta for 0.8 upcoming? > > Thanks, > > Jason > > > On Mon, Nov 5, 2012 at 7:35 PM, Jason Rosenberg <j...@squareup.com> wrote: > >> Hi Joel, >> >> I'd be happy to try it, but am a bit concerned about porting any other >> 0.8 api changes to get everything working (I'd rather not expend the effort >> unless there's a stable version I can port to). Or should I just be able >> to drop latest trunk (or 0.8.x) code in place without any changes? >> >> Also, is there a ready bundled download beyond 0.7.2, or do I need >> download sources and build everything locally? >> >> Jason >> >> >> On Mon, Nov 5, 2012 at 7:10 PM, Joel Koshy <jjkosh...@gmail.com> wrote: >> >>> Can you try the latest from trunk? This might be related to >>> https://issues.apache.org/jira/browse/KAFKA-550 which did not make it >>> into >>> 0.7.2 >>> >>> Thanks, >>> >>> Joel >>> >>> >>> On Mon, Nov 5, 2012 at 4:34 PM, Jason Rosenberg <j...@squareup.com> >>> wrote: >>> >>> > (I suspect this a bug, but thought I'd check with the group before >>> filing) >>> > >>> > If I create consumer streams using a topic filter, and request more >>> threads >>> > than are actually allocated (based on the current dynamic topic event >>> > watcher, etc.), the unused threads don't get the shutdown message, >>> when the >>> > connector is shut down. >>> > >>> > Specifically, if I have code that looks like: >>> > >>> > Whitelist topicRegex = new Whitelist("^metrics\\..*$"); >>> > List<KafkaStream<Message>> streams = >>> > consumerConnector.createMessageStreamsByFilter(topicRegex, >>> > consumerThreads); >>> > >>> > ExecutorService executor = >>> > Executors.newFixedThreadPool(consumerThreads); >>> > >>> > for (final KafkaStream<Message> stream : streams) { >>> > executor.submit(new Runnable() { >>> > @Override public void run() { >>> > for (MessageAndMetadata<Message> msgAndMetadata : >>> stream) { >>> > // do some processing >>> > } >>> > } >>> > }; >>> > } >>> > >>> > And the number of consumerThreads is say, 20, and only 1 stream ends up >>> > receiving messages, then only that 1 stream's thread gets the >>> > ZookeeperConsumerConnector.shutdownCommand, which is how the >>> KafkaStream >>> > iterator gets notified to exit. >>> > >>> > Looking at ZookeeperConsumerConnector.scala, it looks like the >>> > 'topicThreadIdAndQueues' list does not contain entries for all the >>> > threadId's, depending on the current state of rebalancing, and thus, >>> the >>> > method, sendShutdownToAllQueues() doesn't actually do what it's >>> intended to >>> > do. >>> > >>> > The result, is that it's not possible to cleanly shutdown a consumer. >>> > >>> > I am using 0.7.2. >>> > >>> > Jason >>> > >>> >> >> >