Created https://issues.apache.org/jira/browse/KAFKA-2008

On Sat, Mar 7, 2015 at 1:17 AM, Jiangjie Qin <j...@linkedin.com.invalid>
wrote:

> Hi Tao,
>
> Yes, your understanding is correct. We probably should update the document
> to make it more clear. Could you open a ticket for it?
>
> Jiangjie (Becket) Qin
>
> On 3/6/15, 1:23 AM, "tao xiao" <xiaotao...@gmail.com> wrote:
>
> >Hi team,
> >
> >After reading the source code of AbstractFetcherManager I found out that
> >the usage of num.consumer.fetchers may not match what is described in the
> >Kafka doc. My interpretation of the Kafka doc is that  the number of
> >fetcher threads is controlled by the value of
> > property num.consumer.fetchers. If I set num.consumer.fetchers=4 there
> >are
> >4 fetcher threads in total created after consumer is initialized.
> >
> >But what I found from the source code tells me a different thing. Below
> >code is copied from AbstractFetcherManager
> >
> >private def getFetcherId(topic: String, partitionId: Int) : Int = {
> >
> >    Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
> >
> >  }
> >
> >
> >def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition,
> >BrokerAndInitialOffset]) {
> >
> >    mapLock synchronized {
> >
> >      val partitionsPerFetcher = partitionAndOffsets.groupBy{
> >case(topicAndPartition,
> >brokerAndInitialOffset) =>
> >
> >        BrokerAndFetcherId(brokerAndInitialOffset.broker,
> >getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}
> >
> >      for ((brokerAndFetcherId, partitionAndOffsets) <-
> >partitionsPerFetcher) {
> >
> >        var fetcherThread: AbstractFetcherThread = null
> >
> >        fetcherThreadMap.get(brokerAndFetcherId) match {
> >
> >          case Some(f) => fetcherThread = f
> >
> >          case None =>
> >
> >            fetcherThread =
> >createFetcherThread(brokerAndFetcherId.fetcherId,
> >brokerAndFetcherId.broker)
> >
> >            fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
> >
> >            fetcherThread.start
> >
> >        }
> >
> >
> >
> >fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map
> >{ case (topicAndPartition, brokerAndInitOffset) =>
> >
> >          topicAndPartition -> brokerAndInitOffset.initOffset
> >
> >        })
> >
> >      }
> >
> >    }
> >
> > If I have one topic with one partition and num.consumer.fetchers set to 4
> >there is actually only one fetcher thread created not 4.
> >num.consumer.fetchers essentially set the max value of number of fetcher
> >threads not the actual number of fetcher threads. The actual number of
> >fetcher threads is controlled by this line of code
> >Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
> >
> >Is my assumption correct?
> >
> >--
> >Regards,
> >Tao
>
>


-- 
Regards,
Tao

Reply via email to