Created https://issues.apache.org/jira/browse/KAFKA-2008
On Sat, Mar 7, 2015 at 1:17 AM, Jiangjie Qin <j...@linkedin.com.invalid> wrote: > Hi Tao, > > Yes, your understanding is correct. We probably should update the document > to make it more clear. Could you open a ticket for it? > > Jiangjie (Becket) Qin > > On 3/6/15, 1:23 AM, "tao xiao" <xiaotao...@gmail.com> wrote: > > >Hi team, > > > >After reading the source code of AbstractFetcherManager I found out that > >the usage of num.consumer.fetchers may not match what is described in the > >Kafka doc. My interpretation of the Kafka doc is that the number of > >fetcher threads is controlled by the value of > > property num.consumer.fetchers. If I set num.consumer.fetchers=4 there > >are > >4 fetcher threads in total created after consumer is initialized. > > > >But what I found from the source code tells me a different thing. Below > >code is copied from AbstractFetcherManager > > > >private def getFetcherId(topic: String, partitionId: Int) : Int = { > > > > Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers > > > > } > > > > > >def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, > >BrokerAndInitialOffset]) { > > > > mapLock synchronized { > > > > val partitionsPerFetcher = partitionAndOffsets.groupBy{ > >case(topicAndPartition, > >brokerAndInitialOffset) => > > > > BrokerAndFetcherId(brokerAndInitialOffset.broker, > >getFetcherId(topicAndPartition.topic, topicAndPartition.partition))} > > > > for ((brokerAndFetcherId, partitionAndOffsets) <- > >partitionsPerFetcher) { > > > > var fetcherThread: AbstractFetcherThread = null > > > > fetcherThreadMap.get(brokerAndFetcherId) match { > > > > case Some(f) => fetcherThread = f > > > > case None => > > > > fetcherThread = > >createFetcherThread(brokerAndFetcherId.fetcherId, > >brokerAndFetcherId.broker) > > > > fetcherThreadMap.put(brokerAndFetcherId, fetcherThread) > > > > fetcherThread.start > > > > } > > > > > > > >fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map > >{ case (topicAndPartition, brokerAndInitOffset) => > > > > topicAndPartition -> brokerAndInitOffset.initOffset > > > > }) > > > > } > > > > } > > > > If I have one topic with one partition and num.consumer.fetchers set to 4 > >there is actually only one fetcher thread created not 4. > >num.consumer.fetchers essentially set the max value of number of fetcher > >threads not the actual number of fetcher threads. The actual number of > >fetcher threads is controlled by this line of code > >Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers > > > >Is my assumption correct? > > > >-- > >Regards, > >Tao > > -- Regards, Tao