The 0.7.0 seems to be a versioning bug on our side. We expect an 0.8 beta coming up soon.
Thanks, Neha On Mon, Nov 5, 2012 at 8:01 PM, Jason Rosenberg <j...@squareup.com> wrote: > Interestingly, I just checked out the latest sources, and did the build, > and it produced jars for 0.7.0! What's that about? > > Anyway, it does indeed look like this issue is indeed fixed with the latest > trunk. > > Will there be a released version of this, prior to 0.8? Or will there be a > beta for 0.8 upcoming? > > Thanks, > > Jason > > On Mon, Nov 5, 2012 at 7:35 PM, Jason Rosenberg <j...@squareup.com> wrote: > >> Hi Joel, >> >> I'd be happy to try it, but am a bit concerned about porting any other 0.8 >> api changes to get everything working (I'd rather not expend the effort >> unless there's a stable version I can port to). Or should I just be able >> to drop latest trunk (or 0.8.x) code in place without any changes? >> >> Also, is there a ready bundled download beyond 0.7.2, or do I need >> download sources and build everything locally? >> >> Jason >> >> >> On Mon, Nov 5, 2012 at 7:10 PM, Joel Koshy <jjkosh...@gmail.com> wrote: >> >>> Can you try the latest from trunk? This might be related to >>> https://issues.apache.org/jira/browse/KAFKA-550 which did not make it >>> into >>> 0.7.2 >>> >>> Thanks, >>> >>> Joel >>> >>> >>> On Mon, Nov 5, 2012 at 4:34 PM, Jason Rosenberg <j...@squareup.com> wrote: >>> >>> > (I suspect this a bug, but thought I'd check with the group before >>> filing) >>> > >>> > If I create consumer streams using a topic filter, and request more >>> threads >>> > than are actually allocated (based on the current dynamic topic event >>> > watcher, etc.), the unused threads don't get the shutdown message, when >>> the >>> > connector is shut down. >>> > >>> > Specifically, if I have code that looks like: >>> > >>> > Whitelist topicRegex = new Whitelist("^metrics\\..*$"); >>> > List<KafkaStream<Message>> streams = >>> > consumerConnector.createMessageStreamsByFilter(topicRegex, >>> > consumerThreads); >>> > >>> > ExecutorService executor = >>> > Executors.newFixedThreadPool(consumerThreads); >>> > >>> > for (final KafkaStream<Message> stream : streams) { >>> > executor.submit(new Runnable() { >>> > @Override public void run() { >>> > for (MessageAndMetadata<Message> msgAndMetadata : stream) >>> { >>> > // do some processing >>> > } >>> > } >>> > }; >>> > } >>> > >>> > And the number of consumerThreads is say, 20, and only 1 stream ends up >>> > receiving messages, then only that 1 stream's thread gets the >>> > ZookeeperConsumerConnector.shutdownCommand, which is how the KafkaStream >>> > iterator gets notified to exit. >>> > >>> > Looking at ZookeeperConsumerConnector.scala, it looks like the >>> > 'topicThreadIdAndQueues' list does not contain entries for all the >>> > threadId's, depending on the current state of rebalancing, and thus, the >>> > method, sendShutdownToAllQueues() doesn't actually do what it's >>> intended to >>> > do. >>> > >>> > The result, is that it's not possible to cleanly shutdown a consumer. >>> > >>> > I am using 0.7.2. >>> > >>> > Jason >>> > >>> >> >>