Hi Tao, Yes, your understanding is correct. We probably should update the document to make it more clear. Could you open a ticket for it?
Jiangjie (Becket) Qin On 3/6/15, 1:23 AM, "tao xiao" <xiaotao...@gmail.com> wrote: >Hi team, > >After reading the source code of AbstractFetcherManager I found out that >the usage of num.consumer.fetchers may not match what is described in the >Kafka doc. My interpretation of the Kafka doc is that the number of >fetcher threads is controlled by the value of > property num.consumer.fetchers. If I set num.consumer.fetchers=4 there >are >4 fetcher threads in total created after consumer is initialized. > >But what I found from the source code tells me a different thing. Below >code is copied from AbstractFetcherManager > >private def getFetcherId(topic: String, partitionId: Int) : Int = { > > Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers > > } > > >def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, >BrokerAndInitialOffset]) { > > mapLock synchronized { > > val partitionsPerFetcher = partitionAndOffsets.groupBy{ >case(topicAndPartition, >brokerAndInitialOffset) => > > BrokerAndFetcherId(brokerAndInitialOffset.broker, >getFetcherId(topicAndPartition.topic, topicAndPartition.partition))} > > for ((brokerAndFetcherId, partitionAndOffsets) <- >partitionsPerFetcher) { > > var fetcherThread: AbstractFetcherThread = null > > fetcherThreadMap.get(brokerAndFetcherId) match { > > case Some(f) => fetcherThread = f > > case None => > > fetcherThread = >createFetcherThread(brokerAndFetcherId.fetcherId, >brokerAndFetcherId.broker) > > fetcherThreadMap.put(brokerAndFetcherId, fetcherThread) > > fetcherThread.start > > } > > > >fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map >{ case (topicAndPartition, brokerAndInitOffset) => > > topicAndPartition -> brokerAndInitOffset.initOffset > > }) > > } > > } > > If I have one topic with one partition and num.consumer.fetchers set to 4 >there is actually only one fetcher thread created not 4. >num.consumer.fetchers essentially set the max value of number of fetcher >threads not the actual number of fetcher threads. The actual number of >fetcher threads is controlled by this line of code >Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers > >Is my assumption correct? > >-- >Regards, >Tao