I am writing an app that needs to distribute incoming messages to multiple 
topics and have those topics read by multiple threads per process in the 
following way:

First, one process writes messages into many different topics (there is a 
mapping function that defines which message goes to which topic).

Next, a bank of machines is each running a consumer process.  Each consumer 
process reads a subset of the topics.  Each topic gets read by two different 
machines (for redundancy), but the exact mix of topics and machines may vary 
and in principle could be adjusted on the fly (although in my test all that is 
happening is that the topics are being spun up one at a time until the process 
is happy that it is reading from all the topics it cares about).

I am creating a separate ConsumerConnector for each topic, and then creating a 
separate KafkaStream map from each ConsumerConnector, selecting the stream and 
starting up an iterator.

Do I need to manage a single, shared ConsumerConnector?  My consumer configs 
defined a groupid of 'katta-group' but I'm not sure what impact that has.

It appears from the logic that each iterator spins up and drains whatever 
messages were in the topic when the iterator started.  My logs show that 
iterators do even overlap, one iterator continuing to read messages while a 
later, different topic iterator starts reading as well.

However, once all topics are caught up and the consumer process quieces (all 
iterators presumably waiting for input), if I then run the message generation 
process to push more data into topics, I get a burst of log entries in the 
consumer as follows:


2012-11-27 13:16:06,489 INFO consumer.ZookeeperConsumerConnector - 
katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 begin 
rebalancing consumer 
katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 try #0

2012-11-27 13:16:06,507 INFO consumer.Fetcher - Cleared all relevant queues for 
this fetcher

2012-11-27 13:16:06,507 INFO consumer.ConsumerIterator - Clearing the current 
data chunk for this consumer iterator

2012-11-27 13:16:06,507 INFO consumer.Fetcher - Cleared the data chunks in all 
the consumer message iterators

2012-11-27 13:16:06,507 INFO consumer.ZookeeperConsumerConnector - 
katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 Committing all 
offsets after clearing the fetcher queues

2012-11-27 13:16:06,508 INFO consumer.ZookeeperConsumerConnector - 
katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 Releasing 
partition ownership

2012-11-27 13:16:06,508 INFO consumer.ZookeeperConsumerConnector - 
katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 Consumer 
katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 rebalancing the 
following partitions: List(0-0) for topic v1-farsi-0 with consumers: 
List(katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63-0)

2012-11-27 13:16:06,508 INFO consumer.ZookeeperConsumerConnector - 
katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 
katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63-0 attempting to 
claim partition 0-0

2012-11-27 13:16:06,537 INFO consumer.ZookeeperConsumerConnector - 
katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 
katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63-0 successfully 
owned partition 0-0 for topic v1-farsi-0

2012-11-27 13:16:06,537 INFO consumer.ZookeeperConsumerConnector - 
katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 Updating the 
cache

2012-11-27 13:16:06,538 INFO consumer.ZookeeperConsumerConnector - 
katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 Consumer 
katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 selected 
partitions : v1-farsi-0:0-0: fetched offset = 0: consumed offset = 0

2012-11-27 13:16:06,544 INFO consumer.ZookeeperConsumerConnector - 
katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 end rebalancing 
consumer katta_group_lnx-rjervis.visibletech.net-1354050936806-78f6ef63 try #0

2012-11-27 13:16:06,544 INFO consumer.FetcherRunnable - FetchRunnable-0 start 
fetching topic: v1-farsi-0 part: 0 offset: 0 from 192.168.19.203:9092

But I get no messages read.  It looks like there are about 500 new messages 
written to a variety of topics.  If I try this experiment again, pushing 
another batch of messages, I get the same burt of rebalancing log entries (but 
with a different topic from the one listed above).  It appears that there is at 
least one message written to the topic that is mentioned in the rebalancing log 
entries, but there are no log entries indicating that the iterator for that 
topic did anything.

Any ideas?
Bob Jervis | Senior Architect

[cid:image001.png@01CDCCA4.CEF68E20]<http://www.visibletechnologies.com/>
Seattle | Boston | New York | London
Phone: 425.957.6075 | Fax: 781.404.5711

Follow Visibly Intelligent Blog<http://www.visibletechnologies.com/blog/>

[cid:image002.png@01CDCCA4.CEF68E20]<http://twitter.com/visible>[cid:image003.png@01CDCCA4.CEF68E20]<http://www.facebook.com/Visible.Technologies>
 [cid:image004.png@01CDCCA4.CEF68E20] 
<http://www.linkedin.com/company/visible-technologies>

Reply via email to