Thanks all for the help.

I found a hack, if I put a Thread.sleep(1000) after I call my consumer then
all goes well.

I was able to reproduce this using the "ConsumerShell" exactly as it is
when I take the code, and call it from another class with another main
function and changing main to a new function.

So if I run ConsumerShell it is fine, if I bundle it into another app and
change main to a function then (without sleeping) it errors in this way
because the ZKConsumerThread has some race condition where the latch is not
set before my program exits...

As I keep digging around I keep seeing a lot of overlap in what I am doing
and some inconsistencies (like between  "ConsumerShell" and
"ConsoleConsumer") to what I could/should be doing.

I think the Scala client should handle these situations in a consistent way
(I see no client yet ) and refactor then ConsumerShell and ConsoleConsumer
to work   using the new "ClientConsumer"

not sure how much can be done for a ClientConsumer and where implementation
of the API bleeds into the implementation of an app but I think it is
something that can be done.  I could be as simple as moving 90% of the
ConsumerShell code into a ClientConsumer class and moving the latch around
and then voila, Scala Client (and throw it into examples, et). I need to
think about it some more.

On Fri, Dec 2, 2011 at 12:07 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote:

> >> "ConsumerConfig" and Predef.Map(topic -> partitions) are
> exactly the same for both consumers
>
> Digging deeper, what are the contents of this map ?
> How many brokers do you have ?
> How many partitions do you have for each of the topics that you've listed
> above ?
>
> If I understand correctly, you start only one consumer process at a time
> right ?
>
> Thanks,
> Neha
>
> On Fri, Dec 2, 2011 at 8:48 AM, Joe Stein <crypt...@gmail.com> wrote:
>
> > >> 1. What is the difference between "Old code" and "New code" ? Have you
> > >> changed any code in the latter case ?
> >
> > the code is exactly the same except: I changed the name of the package,
> > class and function that is called and put that class into a new jar file
> > calling the new named function in the new package, everything else
> > contained is the same exact code.
> >
> > >> 2. What is the {topic, consumer threads} map given to both your
> > consumers ?
> >
> > not sure I understand what you mean by "consumer threads" as far as the
> map
> > goes but I am passing the exact same topic and consumer.properties to
> both
> > consumers so my "ConsumerConfig" and Predef.Map(topic -> partitions) are
> > exactly the same for both consumers
> >
> > On Fri, Dec 2, 2011 at 11:21 AM, Neha Narkhede <neha.narkh...@gmail.com
> > >wrote:
> >
> > > Joe,
> > >
> > > A couple of questions -
> > >
> > > 1. What is the difference between "Old code" and "New code" ? Have you
> > > changed any code in the latter case ?
> > > 2. What is the {topic, consumer threads} map given to both your
> > consumers ?
> > >
> > > >> but ... if I then start back up the code that registeres
> > > as apophis_starscream.site1.medialets.com-1322840963912-9c58256d it
> keeps
> > > back up where it left off
> > >
> > > When you start a new consumer, it will automatically get a unique id
> > > assigned. However, that doesn't mean it will not pick up from where the
> > > previous consumer thread left off.
> > > If both your consumer thread ids "own" the same partitions, the 2nd one
> > > should always start from the offset committed by the previous consumer
> > > thread.
> > >
> > > Thanks,
> > > Neha
> > >
> > > On Fri, Dec 2, 2011 at 8:06 AM, Joe Stein <crypt...@gmail.com> wrote:
> > >
> > > > Thanks Jun, yup, that is how I understand it.  I just figured out the
> > > error
> > > > is a red herring to my problem since that is caught properly when
> > logged
> > > in
> > > > ZookeeperConsumerConnector.commitOffsets.
> > > >
> > > > it seems that the problem I am running into is this (any help is
> > > > appreciated)
> > > >
> > > > one code base consumer =
> > > > apophis_starscream.site1.medialets.com-1322840963912-9c58256d (from
> the
> > > > logs)
> > > >
> > > > so when i kill that consumer (on purpose) and start another consumer
> > (new
> > > > code to pickup where the old code left off) in its place it seems to
> > be a
> > > > different consumer (though I want it to start back where the other
> one
> > > > stopped)
> > > >
> > > > new code =
> > apophis_starscream.site1.medialets.com-1322840984781-5d58d658
> > > > (from the logs) says "stopping fetcher FetchRunnable-0 to host
> > > > starscream Received 0 messages"
> > > >
> > > > but ... if I then start back up the code that registeres
> > > > as apophis_starscream.site1.medialets.com-1322840963912-9c58256d it
> > keeps
> > > > back up where it left off
> > > >
> > > > I am using the exact same consumer.properties for both apps (the
> point
> > > here
> > > > is code re-write where my new code is not getting messages but my old
> > > code
> > > > is and I want to pickup where the old code was consuming messages).
> > > >
> > > > so I guess the question is how to make my new consumer code "trick"
> > > > kafka/zk into thinking it is the old one so it can pick back up where
> > the
> > > > old one left off or what else have folks done in this regard or how
> > > > should/could I handle this????
> > > >
> > > > On Fri, Dec 2, 2011 at 10:39 AM, Jun Rao <jun...@gmail.com> wrote:
> > > >
> > > > > Joe,
> > > > >
> > > > > It seems that the offset commit thread somehow gets interrupted
> while
> > > > > trying to write the offset to ZK. The consumer should still be able
> > to
> > > > > consume in this case though. The offset commit thread runs in the
> > > > > background and is decoupled from the consumption logic.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Fri, Dec 2, 2011 at 12:01 AM, Joe Stein <crypt...@gmail.com>
> > wrote:
> > > > >
> > > > > > has anyone else gotten this error? I get it 100% of the time
> > > depending
> > > > on
> > > > > > how I run my consumer
> > > > > >
> > > > > > what is weird is when i use the kafka/bin/kafka-consumer-shell.sh
> > > from
> > > > > > trunk the error shows up BUT the consumer keeps producing the
> > > messages
> > > > > >
> > > > > > if i take the ConsumerShell code and put it into my own program
> > > without
> > > > > any
> > > > > > modification the error comes up and everything is halted
> > > > > >
> > > > > > not sure if this is known or what but let me know, thanks!
> > > > > >
> > > > > > 2011-12-02 01:59:37,797 WARN Kafka-consumer-autocommit-0
> > > > > > kafka.consumer.ZookeeperConsumerConnector - exception during
> > > > > commitOffsets
> > > > > > org.I0Itec.zkclient.exception.ZkInterruptedException:
> > > > > > java.lang.InterruptedException
> > > > > > at
> > > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:687)
> > > > > > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:809)
> > > > > > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
> > > > > > at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:102)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$6.apply(ZookeeperConsumerConnector.scala:237)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2$$anonfun$apply$6.apply(ZookeeperConsumerConnector.scala:234)
> > > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:660)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
> > > > > > at
> > scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:592)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:234)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$2.apply(ZookeeperConsumerConnector.scala:232)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
> > > > > > at scala.collection.immutable.List.foreach(List.scala:45)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:232)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector.autoCommit(ZookeeperConsumerConnector.scala:220)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ZookeeperConsumerConnector$$anonfun$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:100)
> > > > > > at kafka.utils.Utils$$anon$2.run(Utils.scala:58)
> > > > > > at
> > > > >
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
> > > > > > at
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> > > > > > at java.lang.Thread.run(Thread.java:619)
> > > > > > Caused by: java.lang.InterruptedException
> > > > > > at java.lang.Object.wait(Native Method)
> > > > > > at java.lang.Object.wait(Object.java:485)
> > > > > > at
> > > org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1317)
> > > > > > at org.apache.zookeeper.ZooKeeper.setData(ZooKeeper.java:1036)
> > > > > > at
> > org.I0Itec.zkclient.ZkConnection.writeData(ZkConnection.java:111)
> > > > > > at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:813)
> > > > > > at
> > > org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> > > > > >
> > > > > >
> > > > > > /*
> > > > > > Joe Stein
> > > > > > http://www.linkedin.com/in/charmalloc
> > > > > > Twitter: @allthingshadoop <
> http://www.twitter.com/allthingshadoop>
> > > > > > */
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > >
> > > > /*
> > > > Joe Stein
> > > > http://www.linkedin.com/in/charmalloc
> > > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > > > */
> > > >
> > >
> >
> >
> >
> > --
> >
> > /*
> > Joe Stein
> > http://www.linkedin.com/in/charmalloc
> > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > */
> >
>



-- 

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
*/

Reply via email to