Re: client use high cpu which caused by delayedFetch operation immediately return

2016-10-18 Thread Becket Qin
Glad to know :)

On Tue, Oct 18, 2016 at 1:24 AM, Json Tu  wrote:

> Thanks. I patch it, and everything goes ok.
> > 在 2016年10月9日,下午12:39,Becket Qin  写道:
> >
> > Can you check if you have KAFKA-3003 when you run the code?
> >
> > On Sat, Oct 8, 2016 at 12:52 AM, Kafka  wrote:
> >
> >> Hi all,
> >>we found our consumer have high cpu load in our product
> >> enviroment,as we know,fetch.min.bytes and fetch.wait.ma <
> >> http://fetch.wait.ma/>x.ms will affect the frequency of consumer’s
> return,
> >> so we adjust them to very big so that broker is very hard to satisfy it.
> >>then we found the problem is not be solved,then we check the
> >> kafka’s code,we check delayedFetch’s tryComplete() function has these
> codes,
> >>
> >> if (endOffset.messageOffset != fetchOffset.messageOffset) {
> >>  if (endOffset.onOlderSegment(fetchOffset)) {
> >>// Case C, this can happen when the new fetch operation
> is
> >> on a truncated leader
> >>debug("Satisfying fetch %s since it is fetching later
> >> segments of partition %s.".format(fetchMetadata, topicAndPartition))
> >>return forceComplete()
> >>  } else if (fetchOffset.onOlderSegment(endOffset)) {
> >>// Case C, this can happen when the fetch operation is
> >> falling behind the current segment
> >>// or the partition has just rolled a new segment
> >>debug("Satisfying fetch %s immediately since it is
> >> fetching older segments.".format(fetchMetadata))
> >>return forceComplete()
> >>  } else if (fetchOffset.messageOffset <
> >> endOffset.messageOffset) {
> >>// we need take the partition fetch size as upper bound
> >> when accumulating the bytes
> >>accumulatedSize += math.min(endOffset.
> positionDiff(fetchOffset),
> >> fetchStatus.fetchInfo.fetchSize)
> >>  }
> >>}
> >>
> >> so we can ensure that our fetchOffset’s segmentBaseOffset is not the
> same
> >> as endOffset’s segmentBaseOffset,then we check our topic-partition’s
> >> segment, we found the data in the segment is all cleaned by the kafka
> for
> >> log.retention.
> >> and we guess that the  fetchOffset’s segmentBaseOffset is smaller than
> >> endOffset’s segmentBaseOffset leads this problem.
> >>
> >> but my point is should we use we use these code to make client use less
> >> cpu,
> >>   if (endOffset.messageOffset != fetchOffset.messageOffset) {
> >>  if (endOffset.onOlderSegment(fetchOffset)) {
> >>return false
> >>  } else if (fetchOffset.onOlderSegment(endOffset)) {
> >>return false
> >>  }
> >>}
> >>
> >> and then it will response after fetch.wait.ma 
> x.ms
> >> in this scene instead of immediately return.
> >>
> >> Feedback is greatly appreciated. Thanks.
> >>
> >>
> >>
> >>
>
>
>


Re: client use high cpu which caused by delayedFetch operation immediately return

2016-10-18 Thread Json Tu
Thanks. I patch it, and everything goes ok.
> 在 2016年10月9日,下午12:39,Becket Qin  写道:
> 
> Can you check if you have KAFKA-3003 when you run the code?
> 
> On Sat, Oct 8, 2016 at 12:52 AM, Kafka  wrote:
> 
>> Hi all,
>>we found our consumer have high cpu load in our product
>> enviroment,as we know,fetch.min.bytes and fetch.wait.ma <
>> http://fetch.wait.ma/>x.ms will affect the frequency of consumer’s return,
>> so we adjust them to very big so that broker is very hard to satisfy it.
>>then we found the problem is not be solved,then we check the
>> kafka’s code,we check delayedFetch’s tryComplete() function has these codes,
>> 
>> if (endOffset.messageOffset != fetchOffset.messageOffset) {
>>  if (endOffset.onOlderSegment(fetchOffset)) {
>>// Case C, this can happen when the new fetch operation is
>> on a truncated leader
>>debug("Satisfying fetch %s since it is fetching later
>> segments of partition %s.".format(fetchMetadata, topicAndPartition))
>>return forceComplete()
>>  } else if (fetchOffset.onOlderSegment(endOffset)) {
>>// Case C, this can happen when the fetch operation is
>> falling behind the current segment
>>// or the partition has just rolled a new segment
>>debug("Satisfying fetch %s immediately since it is
>> fetching older segments.".format(fetchMetadata))
>>return forceComplete()
>>  } else if (fetchOffset.messageOffset <
>> endOffset.messageOffset) {
>>// we need take the partition fetch size as upper bound
>> when accumulating the bytes
>>accumulatedSize += 
>> math.min(endOffset.positionDiff(fetchOffset),
>> fetchStatus.fetchInfo.fetchSize)
>>  }
>>}
>> 
>> so we can ensure that our fetchOffset’s segmentBaseOffset is not the same
>> as endOffset’s segmentBaseOffset,then we check our topic-partition’s
>> segment, we found the data in the segment is all cleaned by the kafka for
>> log.retention.
>> and we guess that the  fetchOffset’s segmentBaseOffset is smaller than
>> endOffset’s segmentBaseOffset leads this problem.
>> 
>> but my point is should we use we use these code to make client use less
>> cpu,
>>   if (endOffset.messageOffset != fetchOffset.messageOffset) {
>>  if (endOffset.onOlderSegment(fetchOffset)) {
>>return false
>>  } else if (fetchOffset.onOlderSegment(endOffset)) {
>>return false
>>  }
>>}
>> 
>> and then it will response after fetch.wait.ma x.ms
>> in this scene instead of immediately return.
>> 
>> Feedback is greatly appreciated. Thanks.
>> 
>> 
>> 
>> 




Re: client use high cpu which caused by delayedFetch operation immediately return

2016-10-08 Thread Becket Qin
Can you check if you have KAFKA-3003 when you run the code?

On Sat, Oct 8, 2016 at 12:52 AM, Kafka  wrote:

> Hi all,
> we found our consumer have high cpu load in our product
> enviroment,as we know,fetch.min.bytes and fetch.wait.ma <
> http://fetch.wait.ma/>x.ms will affect the frequency of consumer’s return,
> so we adjust them to very big so that broker is very hard to satisfy it.
> then we found the problem is not be solved,then we check the
> kafka’s code,we check delayedFetch’s tryComplete() function has these codes,
>
>  if (endOffset.messageOffset != fetchOffset.messageOffset) {
>   if (endOffset.onOlderSegment(fetchOffset)) {
> // Case C, this can happen when the new fetch operation is
> on a truncated leader
> debug("Satisfying fetch %s since it is fetching later
> segments of partition %s.".format(fetchMetadata, topicAndPartition))
> return forceComplete()
>   } else if (fetchOffset.onOlderSegment(endOffset)) {
> // Case C, this can happen when the fetch operation is
> falling behind the current segment
> // or the partition has just rolled a new segment
> debug("Satisfying fetch %s immediately since it is
> fetching older segments.".format(fetchMetadata))
> return forceComplete()
>   } else if (fetchOffset.messageOffset <
> endOffset.messageOffset) {
> // we need take the partition fetch size as upper bound
> when accumulating the bytes
> accumulatedSize += 
> math.min(endOffset.positionDiff(fetchOffset),
> fetchStatus.fetchInfo.fetchSize)
>   }
> }
>
> so we can ensure that our fetchOffset’s segmentBaseOffset is not the same
> as endOffset’s segmentBaseOffset,then we check our topic-partition’s
> segment, we found the data in the segment is all cleaned by the kafka for
> log.retention.
> and we guess that the  fetchOffset’s segmentBaseOffset is smaller than
> endOffset’s segmentBaseOffset leads this problem.
>
> but my point is should we use we use these code to make client use less
> cpu,
>if (endOffset.messageOffset != fetchOffset.messageOffset) {
>   if (endOffset.onOlderSegment(fetchOffset)) {
> return false
>   } else if (fetchOffset.onOlderSegment(endOffset)) {
> return false
>   }
> }
>
> and then it will response after fetch.wait.ma x.ms
> in this scene instead of immediately return.
>
> Feedback is greatly appreciated. Thanks.
>
>
>
>