Hi Tao,

Yes, your understanding is correct. We probably should update the document
to make it more clear. Could you open a ticket for it?

Jiangjie (Becket) Qin

On 3/6/15, 1:23 AM, "tao xiao" <xiaotao...@gmail.com> wrote:

>Hi team,
>
>After reading the source code of AbstractFetcherManager I found out that
>the usage of num.consumer.fetchers may not match what is described in the
>Kafka doc. My interpretation of the Kafka doc is that  the number of
>fetcher threads is controlled by the value of
> property num.consumer.fetchers. If I set num.consumer.fetchers=4 there
>are
>4 fetcher threads in total created after consumer is initialized.
>
>But what I found from the source code tells me a different thing. Below
>code is copied from AbstractFetcherManager
>
>private def getFetcherId(topic: String, partitionId: Int) : Int = {
>
>    Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
>
>  }
>
>
>def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition,
>BrokerAndInitialOffset]) {
>
>    mapLock synchronized {
>
>      val partitionsPerFetcher = partitionAndOffsets.groupBy{
>case(topicAndPartition,
>brokerAndInitialOffset) =>
>
>        BrokerAndFetcherId(brokerAndInitialOffset.broker,
>getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}
>
>      for ((brokerAndFetcherId, partitionAndOffsets) <-
>partitionsPerFetcher) {
>
>        var fetcherThread: AbstractFetcherThread = null
>
>        fetcherThreadMap.get(brokerAndFetcherId) match {
>
>          case Some(f) => fetcherThread = f
>
>          case None =>
>
>            fetcherThread =
>createFetcherThread(brokerAndFetcherId.fetcherId,
>brokerAndFetcherId.broker)
>
>            fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
>
>            fetcherThread.start
>
>        }
>
>
>
>fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map
>{ case (topicAndPartition, brokerAndInitOffset) =>
>
>          topicAndPartition -> brokerAndInitOffset.initOffset
>
>        })
>
>      }
>
>    }
>
> If I have one topic with one partition and num.consumer.fetchers set to 4
>there is actually only one fetcher thread created not 4.
>num.consumer.fetchers essentially set the max value of number of fetcher
>threads not the actual number of fetcher threads. The actual number of
>fetcher threads is controlled by this line of code
>Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers
>
>Is my assumption correct?
>
>-- 
>Regards,
>Tao

Reply via email to