I filed this, to address the need for allowing parallelism when consuming multiple single-partition topics selected with a topic filter: https://issues.apache.org/jira/browse/KAFKA-1072
On Thu, Oct 3, 2013 at 10:56 AM, Jason Rosenberg <j...@squareup.com> wrote: > Ah, > > So this is exposed directly in the simple consumer (but not the high-level > one?). > > Jason > > > On Thu, Oct 3, 2013 at 10:25 AM, Jun Rao <jun...@gmail.com> wrote: > >> See >> >> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example >> >> Thanks, >> >> Jun >> >> >> On Wed, Oct 2, 2013 at 9:42 PM, Jason Rosenberg <j...@squareup.com> wrote: >> >> > Jun, >> > >> > Thanks, can you point me to the client code to issue a metadata request! >> > >> > Jason >> > >> > >> > On Thu, Oct 3, 2013 at 12:24 AM, Jun Rao <jun...@gmail.com> wrote: >> > >> > > It's fixable. Since we plan to rewrite the consumer client code in the >> > near >> > > future, it could be considered at that point. >> > > >> > > If you issue a metadata request with an empty topic list, you will get >> > back >> > > the metadata of all topics. >> > > >> > > Thanks, >> > > >> > > Jun >> > > >> > > >> > > On Wed, Oct 2, 2013 at 1:28 PM, Jason Rosenberg <j...@squareup.com> >> > wrote: >> > > >> > > > How hard would it be to fix this issue, where we have a topic filter >> > that >> > > > matches multiple topics, for the load to be distributed over >> multiple >> > > > threads, and over multiple consumers? For some reason, I had >> thought >> > > this >> > > > issue was fixed in 0.8, but I guess not? >> > > > >> > > > I am currently using a single partition, for multiple topics. I >> worry >> > > that >> > > > it won't scale ultimately to only ever have one thread on one >> consumer >> > > > doing all the work......We could move to multiple partitions, but >> for >> > > > ordering reasons in some use cases, this is not always ideal. >> > > > >> > > > Perhaps I can come up with some sort of dynamic topic sniffer, and >> have >> > > it >> > > > evenly divide the available topics between the available consumers >> (and >> > > > threads per consumer)! Is there a simple api within the kafka >> client >> > > code, >> > > > for getting the list of topics? >> > > > >> > > > Jason >> > > > >> > > > >> > > > On Fri, Aug 30, 2013 at 11:41 PM, Jun Rao <jun...@gmail.com> wrote: >> > > > >> > > > > It seems to me option 1) is easer. Option 2) has the same issue as >> > > option >> > > > > 1) since you have to manage different while lists. >> > > > > >> > > > > A more general solution is probably to change the consumer >> > distribution >> > > > > model to divide partitions across topics. That way, one can >> create as >> > > > many >> > > > > streams as total # partitions for all topics. We can look into >> that >> > in >> > > > the >> > > > > future. >> > > > > >> > > > > Thanks, >> > > > > >> > > > > Jun >> > > > > >> > > > > >> > > > > On Fri, Aug 30, 2013 at 8:24 AM, Rajasekar Elango < >> > > > rela...@salesforce.com >> > > > > >wrote: >> > > > > >> > > > > > Yeah. The actual bottleneck is actually number of topics that >> match >> > > the >> > > > > > topic filter. Num of streams is going be shared between all >> topics >> > > it's >> > > > > > consuming from. I thought about following ideas to work around >> > this. >> > > (I >> > > > > am >> > > > > > basically referring to mirrormaker consumer in examples). >> > > > > > >> > > > > > Option 1). Instead of running one mirrormaker process with topic >> > > filter >> > > > > > ".+", We can start multiple mirrormaker process with topic >> filter >> > > > > matching >> > > > > > each topic (Eg: mirrormaker1 => whitelist topic1.* , >> mirrormaker2 >> > > > > > => whitelist topic2.* etc) >> > > > > > >> > > > > > But this adds some operations overhead to start and manage >> multiple >> > > > > > processes on the host. >> > > > > > >> > > > > > Option 2) Modify mirrormaker code to support list of whitelist >> > > filters >> > > > > and >> > > > > > it should create message streams for each filter >> > > > > > (call createMessageStreamsByFilter for each filter). >> > > > > > >> > > > > > What would be your recommendation..? If adding feature to >> > mirrormaker >> > > > is >> > > > > > worth kafka, we can do option 2. >> > > > > > >> > > > > > Thanks, >> > > > > > Raja. >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > On Fri, Aug 30, 2013 at 10:34 AM, Jun Rao <jun...@gmail.com> >> > wrote: >> > > > > > >> > > > > > > Right, but if you set #partitions in each topic to 16, you can >> > use >> > > a >> > > > > > total >> > > > > > > of 16 streams. >> > > > > > > >> > > > > > > Thanks, >> > > > > > > >> > > > > > > Jun >> > > > > > > >> > > > > > > >> > > > > > > On Thu, Aug 29, 2013 at 9:08 PM, Rajasekar Elango < >> > > > > > rela...@salesforce.com >> > > > > > > >wrote: >> > > > > > > >> > > > > > > > With option 1) I can't really use 8 streams in each >> consumer, >> > If >> > > I >> > > > do >> > > > > > > only >> > > > > > > > one consumer seem to be doing all work. So I had to actually >> > use >> > > > > total >> > > > > > 8 >> > > > > > > > streams with 4 for each consumer. >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > On Fri, Aug 30, 2013 at 12:01 AM, Jun Rao <jun...@gmail.com >> > >> > > > wrote: >> > > > > > > > >> > > > > > > > > The drawback of 2), as you said is no auto failover. I was >> > > > > suggesting >> > > > > > > > that >> > > > > > > > > you use 16 partitions. Then you can use option 1) with 8 >> > > streams >> > > > in >> > > > > > > each >> > > > > > > > > consumer. >> > > > > > > > > >> > > > > > > > > Thanks, >> > > > > > > > > >> > > > > > > > > Jun >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > On Thu, Aug 29, 2013 at 8:51 PM, Rajasekar Elango < >> > > > > > > > rela...@salesforce.com >> > > > > > > > > >wrote: >> > > > > > > > > >> > > > > > > > > > Hi Jun, >> > > > > > > > > > >> > > > > > > > > > If you read my previous posts, based on current re >> > balancing >> > > > > logic, >> > > > > > > if >> > > > > > > > we >> > > > > > > > > > consumer from topic filter, consumer actively use all >> > > streams. >> > > > > Can >> > > > > > > you >> > > > > > > > > > provide your recommendation of option 1 vs option 2 in >> my >> > > > > previous >> > > > > > > > post? >> > > > > > > > > > >> > > > > > > > > > Thanks, >> > > > > > > > > > Raja. >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > On Thu, Aug 29, 2013 at 11:42 PM, Jun Rao < >> > jun...@gmail.com> >> > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > > You can always use more partitions to get more >> > parallelism >> > > in >> > > > > the >> > > > > > > > > > > consumers. >> > > > > > > > > > > >> > > > > > > > > > > Thanks, >> > > > > > > > > > > >> > > > > > > > > > > Jun >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > On Thu, Aug 29, 2013 at 12:44 PM, Rajasekar Elango >> > > > > > > > > > > <rela...@salesforce.com>wrote: >> > > > > > > > > > > >> > > > > > > > > > > > So what is best way to load balance multiple >> consumers >> > > > > > consuming >> > > > > > > > from >> > > > > > > > > > > topic >> > > > > > > > > > > > filter. >> > > > > > > > > > > > >> > > > > > > > > > > > Let's say we have 4 topics with 8 partitions and 2 >> > > > consumers. >> > > > > > > > > > > > >> > > > > > > > > > > > Option 1) To load balance consumers, we can set >> > > > num.streams=4 >> > > > > > so >> > > > > > > > that >> > > > > > > > > > > both >> > > > > > > > > > > > consumers split 8 partitions. but can only use half >> of >> > > > > consumer >> > > > > > > > > > streams. >> > > > > > > > > > > > >> > > > > > > > > > > > Option 2) Configure mutually exclusive topic filter >> > regex >> > > > > such >> > > > > > > > that 2 >> > > > > > > > > > > > topics will match consumer1 and 2 topics will match >> > > > > consumer2. >> > > > > > > Now >> > > > > > > > we >> > > > > > > > > > can >> > > > > > > > > > > > set num.streams=8 and fully utilize consumer >> streams. I >> > > > > believe >> > > > > > > > this >> > > > > > > > > > will >> > > > > > > > > > > > improve performance, but if consumer dies, we will >> not >> > > get >> > > > > any >> > > > > > > data >> > > > > > > > > > from >> > > > > > > > > > > > the topic used by that consumer. >> > > > > > > > > > > > >> > > > > > > > > > > > What would be your recommendation? >> > > > > > > > > > > > >> > > > > > > > > > > > Thanks, >> > > > > > > > > > > > Raja. >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > On Thu, Aug 29, 2013 at 12:42 PM, Neha Narkhede < >> > > > > > > > > > neha.narkh...@gmail.com >> > > > > > > > > > > > >wrote: >> > > > > > > > > > > > >> > > > > > > > > > > > > >> 2) When I started mirrormaker with >> num.streams=16, >> > > > looks >> > > > > > > like >> > > > > > > > 16 >> > > > > > > > > > > > > consumer >> > > > > > > > > > > > > threads were created, but only 8 are showing up as >> > > active >> > > > > as >> > > > > > > > owner >> > > > > > > > > in >> > > > > > > > > > > > > consumer offset tracker and all topics/partitions >> are >> > > > > > > distributed >> > > > > > > > > > > > between 8 >> > > > > > > > > > > > > consumer threads. >> > > > > > > > > > > > > >> > > > > > > > > > > > > This is because currently the consumer rebalancing >> > > > process >> > > > > of >> > > > > > > > > > assigning >> > > > > > > > > > > > > partitions to consumer streams is at a per topic >> > level. >> > > > > > Unless >> > > > > > > > you >> > > > > > > > > > have >> > > > > > > > > > > > at >> > > > > > > > > > > > > least one topic with 16 partitions, the remaining >> 8 >> > > > threads >> > > > > > > will >> > > > > > > > > not >> > > > > > > > > > do >> > > > > > > > > > > > any >> > > > > > > > > > > > > work. This is not ideal and we want to look into a >> > > better >> > > > > > > > > rebalancing >> > > > > > > > > > > > > algorithm. Though it is a big change and we prefer >> > > doing >> > > > it >> > > > > > as >> > > > > > > > part >> > > > > > > > > > of >> > > > > > > > > > > > the >> > > > > > > > > > > > > consumer client rewrite. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Thanks, >> > > > > > > > > > > > > Neha >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > On Thu, Aug 29, 2013 at 8:03 AM, Rajasekar Elango >> < >> > > > > > > > > > > > rela...@salesforce.com >> > > > > > > > > > > > > >wrote: >> > > > > > > > > > > > > >> > > > > > > > > > > > > > So my understanding is num of active streams >> that a >> > > > > > consumer >> > > > > > > > can >> > > > > > > > > > > > utilize >> > > > > > > > > > > > > is >> > > > > > > > > > > > > > number of partitions in topic. This is fine if >> we >> > > > > consumer >> > > > > > > from >> > > > > > > > > > > > specific >> > > > > > > > > > > > > > topic. But if we consumer from TopicFilter, I >> > thought >> > > > > > > consumer >> > > > > > > > > > should >> > > > > > > > > > > > > able >> > > > > > > > > > > > > > to utilize (number of topics that match filter * >> > > number >> > > > > of >> > > > > > > > > > partitions >> > > > > > > > > > > > in >> > > > > > > > > > > > > > topic) . But looks like number of streams that >> > > consumer >> > > > > can >> > > > > > > use >> > > > > > > > > is >> > > > > > > > > > > > > limited >> > > > > > > > > > > > > > by just number if partitions in topic although >> it's >> > > > > > consuming >> > > > > > > > > from >> > > > > > > > > > > > > multiple >> > > > > > > > > > > > > > topic. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Here what I observed with 1 mirrormaker >> consuming >> > > from >> > > > > > > > whitelist >> > > > > > > > > > > '.+'. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > The white list matches 5 topics and each topic >> has >> > 8 >> > > > > > > > partitions. >> > > > > > > > > I >> > > > > > > > > > > used >> > > > > > > > > > > > > > consumer offset checker to look at owner of >> > > each/topic >> > > > > > > > partition. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > 1) When I started mirrormaker with >> num.streams=8, >> > all >> > > > > > > > > > > topics/partitions >> > > > > > > > > > > > > are >> > > > > > > > > > > > > > distributed between 8 consumer threads. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > 2) When I started mirrormaker with >> num.streams=16, >> > > > looks >> > > > > > like >> > > > > > > > 16 >> > > > > > > > > > > > consumer >> > > > > > > > > > > > > > threads were created, but only 8 are showing up >> as >> > > > active >> > > > > > as >> > > > > > > > > owner >> > > > > > > > > > in >> > > > > > > > > > > > > > consumer offset tracker and all >> topics/partitions >> > are >> > > > > > > > distributed >> > > > > > > > > > > > > between 8 >> > > > > > > > > > > > > > consumer threads. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > So this could be bottleneck for consumers as >> > although >> > > > we >> > > > > > > > > > partitioned >> > > > > > > > > > > > > topic, >> > > > > > > > > > > > > > if we are consuming from topic filter it can't >> > > utilize >> > > > > much >> > > > > > > of >> > > > > > > > > > > > > parallelism >> > > > > > > > > > > > > > with num of streams. Am i missing something, is >> > > there a >> > > > > way >> > > > > > > to >> > > > > > > > > make >> > > > > > > > > > > > > > cosumers/mirrormakers to utilize more number of >> > > active >> > > > > > > streams? >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > -- >> > > > > > > > > > > > > > Thanks, >> > > > > > > > > > > > > > Raja. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > -- >> > > > > > > > > > > > Thanks, >> > > > > > > > > > > > Raja. >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > -- >> > > > > > > > > > Thanks, >> > > > > > > > > > Raja. >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > -- >> > > > > > > > Thanks, >> > > > > > > > Raja. >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > -- >> > > > > > Thanks, >> > > > > > Raja. >> > > > > > >> > > > > >> > > > >> > > >> > >> > >