Can you check if you have KAFKA-3003 when you run the code? On Sat, Oct 8, 2016 at 12:52 AM, Kafka <kafka...@126.com> wrote:
> Hi all, > we found our consumer have high cpu load in our product > enviroment,as we know,fetch.min.bytes and fetch.wait.ma < > http://fetch.wait.ma/>x.ms will affect the frequency of consumer’s return, > so we adjust them to very big so that broker is very hard to satisfy it. > then we found the problem is not be solved,then we check the > kafka’s code,we check delayedFetch’s tryComplete() function has these codes, > > if (endOffset.messageOffset != fetchOffset.messageOffset) { > if (endOffset.onOlderSegment(fetchOffset)) { > // Case C, this can happen when the new fetch operation is > on a truncated leader > debug("Satisfying fetch %s since it is fetching later > segments of partition %s.".format(fetchMetadata, topicAndPartition)) > return forceComplete() > } else if (fetchOffset.onOlderSegment(endOffset)) { > // Case C, this can happen when the fetch operation is > falling behind the current segment > // or the partition has just rolled a new segment > debug("Satisfying fetch %s immediately since it is > fetching older segments.".format(fetchMetadata)) > return forceComplete() > } else if (fetchOffset.messageOffset < > endOffset.messageOffset) { > // we need take the partition fetch size as upper bound > when accumulating the bytes > accumulatedSize += > math.min(endOffset.positionDiff(fetchOffset), > fetchStatus.fetchInfo.fetchSize) > } > } > > so we can ensure that our fetchOffset’s segmentBaseOffset is not the same > as endOffset’s segmentBaseOffset,then we check our topic-partition’s > segment, we found the data in the segment is all cleaned by the kafka for > log.retention. > and we guess that the fetchOffset’s segmentBaseOffset is smaller than > endOffset’s segmentBaseOffset leads this problem. > > but my point is should we use we use these code to make client use less > cpu, > if (endOffset.messageOffset != fetchOffset.messageOffset) { > if (endOffset.onOlderSegment(fetchOffset)) { > return false > } else if (fetchOffset.onOlderSegment(endOffset)) { > return false > } > } > > and then it will response after fetch.wait.ma <http://fetch.wait.ma/>x.ms > in this scene instead of immediately return. > > Feedback is greatly appreciated. Thanks. > > > >