Vaibhav,

Those exceptions seem to be caused by improper synchronization between
shutdown and rebalance in Kafka. They shouldn't affect the consumption
though. In general, our consumption model is at least once. However, if you
use auto offset commit, Kafka will commit the last consumed offset on
shutdown. So, you shouldn't get duplicates. When the consumer does
rebalance, there is a slight chance that you can get 1 duplicated message
per partition (if compression is used, more dups can be introduced).

Thanks,

Jun

On Fri, Jul 27, 2012 at 11:22 AM, Vaibhav Puranik <vpura...@gmail.com>wrote:

> Hi all,
>
> I have written my own Kafka spout for storm processing. In this spout I
> open the connection using
> Consumer.createJavaConsumerConnector(consumerConfig),
> iterate over a batch of messages (say 5000) and then break the loop and
> call consumerConnector.shutdown(). This process keeps repeating forever.
>
> I am periodically getting the following exceptions:
>
>  ZookeeperConsumerConnector [WARN]
> TEST_ip-00-00-00-00-1343411701915-6f2df44c exception during commitOffsets
> org.I0Itec.zkclient.exception.ZkInterruptedException:
> java.lang.InterruptedException
>     at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:687)
>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:809)
>     at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)
>     at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:103)
>
>
> ZookeeperConsumerConnector [ERROR]
> TEST_ip-00-00-00-00-1343411340656-456ca7c2 error during syncedRebalance
> java.lang.NullPointerException
>     at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
>     at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
>     at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>     at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
>     at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
>
>
> ZookeeperConsumerConnector [ERROR]
> TEST_ip-00-00-00-00-1343411242019-233e5975 error during syncedRebalance
> java.lang.NullPointerException
>     at kafka.utils.ZkUtils$.getChildrenParentMayNotExist(ZkUtils.scala:181)
>     at kafka.utils.ZkUtils$.getCluster(ZkUtils.scala:202)
>     at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:447)
>     at
>
> scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
>
>
> Is this happening because there are certain threads running at the
> background and they do not complete their work before
> consumerConnector.shutdown() is completed? In that case what's a way to get
> messages in batch without having to keep track of offsets?
>
> Also I am assuming that if the offsets are not committed, Kafka probably
> won't know the exact location it should start with the next consumer comes
> back online. Is that right? Does that mean whenever this happens, I  may
> get duplicate events?
>
> Regards,
> Vaibhav
> GumGum
>

Reply via email to