Joe, If you force kill a consumer, its registration in ZK may not go away until ZK sessiontimeout time has passed. This means that a new consumer may not be able to own a partition and therefore may not get events immediately. Not sure if this is exactly your problem though.
Jun On Fri, Dec 2, 2011 at 11:12 AM, Joe Stein <crypt...@gmail.com> wrote: > Thanks all for the help. > > I found a hack, if I put a Thread.sleep(1000) after I call my consumer then > all goes well. > > I was able to reproduce this using the "ConsumerShell" exactly as it is > when I take the code, and call it from another class with another main > function and changing main to a new function. > > So if I run ConsumerShell it is fine, if I bundle it into another app and > change main to a function then (without sleeping) it errors in this way > because the ZKConsumerThread has some race condition where the latch is not > set before my program exits... > > As I keep digging around I keep seeing a lot of overlap in what I am doing > and some inconsistencies (like between "ConsumerShell" and > "ConsoleConsumer") to what I could/should be doing. > > I think the Scala client should handle these situations in a consistent way > (I see no client yet ) and refactor then ConsumerShell and ConsoleConsumer > to work using the new "ClientConsumer" > > not sure how much can be done for a ClientConsumer and where implementation > of the API bleeds into the implementation of an app but I think it is > something that can be done. I could be as simple as moving 90% of the > ConsumerShell code into a ClientConsumer class and moving the latch around > and then voila, Scala Client (and throw it into examples, et). I need to > think about it some more. > > On Fri, Dec 2, 2011 at 12:07 PM, Neha Narkhede <neha.narkh...@gmail.com > >wrote: > > > >> "ConsumerConfig" and Predef.Map(topic -> partitions) are > > exactly the same for both consumers > > > > Digging deeper, what are the contents of this map ? > > How many brokers do you have ? > > How many partitions do you have for each of the topics that you've listed > > above ? > > > > If I understand correctly, you start only one consumer process at a time > > right ? > > > > Thanks, > > Neha > > > > On Fri, Dec 2, 2011 at 8:48 AM, Joe Stein <crypt...@gmail.com> wrote: > > > > > >> 1. What is the difference between "Old code" and "New code" ? Have > you > > > >> changed any code in the latter case ? > > > > > > the code is exactly the same except: I changed the name of the package, > > > class and function that is called and put that class into a new jar > file > > > calling the new named function in the new package, everything else > > > contained is the same exact code. > > > > > > >> 2. What is the {topic, consumer threads} map given to both your > > > consumers ? > > > > > > not sure I understand what you mean by "consumer threads" as far as the > > map > > > goes but I am passing the exact same topic and consumer.properties to > > both > > > consumers so my "ConsumerConfig" and Predef.Map(topic -> partitions) > are > > > exactly the same for both consumers > > > > > > On Fri, Dec 2, 2011 at 11:21 AM, Neha Narkhede < > neha.narkh...@gmail.com > > > >wrote: > > > > > > > Joe, > > > > > > > > A couple of questions - > > > > > > > > 1. What is the difference between "Old code" and "New code" ? Have > you > > > > changed any code in the latter case ? > > > > 2. What is the {topic, consumer threads} map given to both your > > > consumers ? > > > > > > > > >> but ... if I then start back up the code that registeres > > > > as apophis_starscream.site1.medialets.com-1322840963912-9c58256d it > > keeps > > > > back up where it left off > > > > > > > > When you start a new consumer, it will automatically get a unique id > > > > assigned. However, that doesn't mean it will not pick up from where > the > > > > previous consumer thread left off. > > > > If both your consumer thread ids "own" the same partitions, the 2nd > one > > > > should always start from the offset committed by the previous > consumer > > > > thread. > > > > > > > > Thanks, > > > > Neha > > > > > > > > On Fri, Dec 2, 2011 at 8:06 AM, Joe Stein <crypt...@gmail.com> > wrote: > > > > > > > > > Thanks Jun, yup, that is how I understand it. I just figured out > the > > > > error > > > > > is a red herring to my problem since that is caught properly when > > > logged > > > > in > > > > > ZookeeperConsumerConnector.commitOffsets. > > > > > > > > > > it seems that the problem I am running into is this (any help is > > > > > appreciated) > > > > > > > > > > one code base consumer = > > > > > apophis_starscream.site1.medialets.com-1322840963912-9c58256d (from > > the > > > > > logs) > > > > > > > > > > so when i kill that consumer (on purpose) and start another > consumer > > > (new > > > > > code to pickup where the old code left off) in its place it seems > to > > > be a > > > > > different consumer (though I want it to start back where the other > > one > > > > > stopped) > > > > > > > > > > new code = > > > apophis_starscream.site1.medialets.com-1322840984781-5d58d658 > > > > > (from the logs) says "stopping fetcher FetchRunnable-0 to host > > > > > starscream Received 0 messages" > > > > > > > > > > but ... if I then start back up the code that registeres > > > > > as apophis_starscream.site1.medialets.com-1322840963912-9c58256d it > > > keeps > > > > > back up where it left off > > > > > > > > > > I am using the exact same consumer.properties for both apps (the > > point > > > > here > > > > > is code re-write where my new code is not getting messages but my > old > > > > code > > > > > is and I want to pickup where the old code was consuming messages). > > > > > > > > > > so I guess the question is how to make my new consumer code "trick" > > > > > kafka/zk into thinking it is the old one so it can pick back up > where > > > the > > > > > old one left off or what else have folks done in this regard or how > > > > > should/could I handle this???? > > > > > > > > > > On Fri, Dec 2, 2011 at 10:39 AM, Jun Rao <jun...@gmail.com> wrote: > > > > > > > > > > > Joe, > > > > > > > > > > > > It seems that the offset commit thread somehow gets interrupted > > while > > > > > > trying to write the offset to ZK. The consumer should still be > able > > > to > > > > > > consume in this case though. The offset commit thread runs in the > > > > > > background and is decoupled from the consumption logic. > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jun > > > > > > > > > > > > On Fri, Dec 2, 2011 at 12:01 AM, Joe Stein <crypt...@gmail.com> > > > wrote: > > > > > > > > > > > > > has anyone else gotten this error? I get it 100% of the time > > > > depending > > > > > on > > > > > > > how I run my consumer > > > > > > > > > > > > > > what is weird is when i use the > kafka/bin/kafka-consumer-shell.sh > > > > from > > > > > > > trunk the error shows up BUT the consumer keeps producing the > > > > messages > > > > > > > > > > > > > > if i take the ConsumerShell code and put it into my own program > > > > without > > > > > > any > > > > > > > modification the error comes up and everything is halted > > > > > > > > > > > > > > not sure if this is known or what but let me know, thanks! > > > > > > > > > > > > > > 2011-12-02 01:59:37,797 WARN Kafka-consumer-autocommit-0 > > > > > > > kafka.consumer.ZookeeperConsumerConnector - exception during > > > > > > commitOffsets > > > > > > > org.I0Itec.zkclient.exception.ZkInterruptedException: > > > > > > > java.lang.InterruptedException > > > > > > > at > > > > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:687) > > > > > > > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:809) > > > > > > > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) > > > > > > > at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:102) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$6.apply(ZookeeperConsumerConnector.scala:237) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$6.apply(ZookeeperConsumerConnector.scala:234) > > > > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:660) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573) > > > > > > > at > > > scala.collection.IterableLike$class.foreach(IterableLike.scala:73) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:592) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:234) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:232) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) > > > > > > > at scala.collection.immutable.List.foreach(List.scala:45) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:232) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.consumer.ZookeeperConsumerConnector.autoCommit(ZookeeperConsumerConnector.scala:220) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.consumer.ZookeeperConsumerConnector$$anonfun$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:100) > > > > > > > at kafka.utils.Utils$$anon$2.run(Utils.scala:58) > > > > > > > at > > > > > > > > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) > > > > > > > at > > java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > > > > > > > at java.lang.Thread.run(Thread.java:619) > > > > > > > Caused by: java.lang.InterruptedException > > > > > > > at java.lang.Object.wait(Native Method) > > > > > > > at java.lang.Object.wait(Object.java:485) > > > > > > > at > > > > org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1317) > > > > > > > at org.apache.zookeeper.ZooKeeper.setData(ZooKeeper.java:1036) > > > > > > > at > > > org.I0Itec.zkclient.ZkConnection.writeData(ZkConnection.java:111) > > > > > > > at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:813) > > > > > > > at > > > > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) > > > > > > > > > > > > > > > > > > > > > /* > > > > > > > Joe Stein > > > > > > > http://www.linkedin.com/in/charmalloc > > > > > > > Twitter: @allthingshadoop < > > http://www.twitter.com/allthingshadoop> > > > > > > > */ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > /* > > > > > Joe Stein > > > > > http://www.linkedin.com/in/charmalloc > > > > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > > > > > */ > > > > > > > > > > > > > > > > > > > > > -- > > > > > > /* > > > Joe Stein > > > http://www.linkedin.com/in/charmalloc > > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > > > */ > > > > > > > > > -- > > /* > Joe Stein > http://www.linkedin.com/in/charmalloc > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > */ >