Hi Igor,

In normal operation this shouldn’t happen, it it could have happened due to a 
consumer rebalance. We have improved the code recently. From this info alone 
it’s hard to tell what really happened. I could suggest a couple of things.

1. Try reproducing the issue again
2. Enable logs to ALL (not only Debug) to see which tuples get committed and 
which don’t (this may print a lot of messages - however
3. If you have the chance, test a more up to date version of the spout. There 
have been a few bug fixes, but could handle this case (if it’s indeed an issue)

Thanks,
Hugo

On Feb 16, 2017, at 12:59 PM, Igor Kuzmenko 
<f1she...@gmail.com<mailto:f1she...@gmail.com>> wrote:

Thanks for reply Hugo.
I'll double check log tomorrow looking for KafkaSpoutRetryExponentialBackoff 
calls.

I just noticed, that in log I have there's strange thing. First message is 
"Unexpected offset found [2777849]". It's strange because if you look on 
partition 10 commited offset, it is 2777978 which is a little bit higher then 
offset found. The next message in log was "No offsets ready to commit."

So, after checking 2777849 offset it immediately stoped seeking new offset to 
commit.

On Thu, Feb 16, 2017 at 8:23 PM, Hugo Da Cruz Louro 
<hlo...@hortonworks.com<mailto:hlo...@hortonworks.com>> wrote:
Hi,

Most likely this is happening because some messages failed and/or got acked out 
of order.

For example, if you process messages with offsets 1,2,3,X,5,6,7,… where X is 
message (with offset 4) that failed, the Spout will only commit offset 3. Until 
the message with offset 4 is acked, or reaches max number of retrials (which is 
configurable but by default is forever), the messages with offsets 5,6,7,… will 
not get committed despite having been acked. That is because you cannot do 
kafkaConsumer.commitSync(new TopicPartion(test_topic,5)) if the message with 
offset 4 has not been acked or discarded by reaching the max number of 
retrials. Until the spout moves on from message with offset 4, the lag will 
increase when new messages come in.

You can try enabling the log level to ALL for 
org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff to see which 
messages are getting retried. You can also set log level to DEBUG or ALL 
org.apache.storm.kafka.spout.KafkaSpout to see exactly which offsets/records 
are being processed. However, it will print a lot of messages, and may slow 
down processing considerably.

You can also set the maxNumberOfRetires to a small number (e.g. 3-5) to see if 
that solves this situation.

Hugo

> On Feb 16, 2017, at 8:36 AM, Igor Kuzmenko 
> <f1she...@gmail.com<mailto:f1she...@gmail.com>> wrote:
>
> Today in Storm UI I saw this Kafka Spouts Lag:
> Id                    Topic   Partition       Latest Offset   Spout Committed 
> Offset  Lag
> Kafka Spout   test_topic      0               5591087         5562814         
>                 28273
> Kafka Spout   test_topic      1               2803256         2789090         
>                 14166
> Kafka Spout   test_topic      2               2801927         2787767         
>                 14160
> Kafka Spout   test_topic      3               2800627         2800626         
>                 1
> Kafka Spout   test_topic      4               2799391         2785238         
>                 14153
> Kafka Spout   test_topic      5               2798126         2798125         
>                 1
> Kafka Spout   test_topic      6               2796874         2782726         
>                 14148
> Kafka Spout   test_topic      7               2795669         2781528         
>                 14141
> Kafka Spout   test_topic      8               2794419         2780280         
>                 14139
> Kafka Spout   test_topic      9               2793255         2793254         
>                 1
> Kafka Spout   test_topic      10              2792109         2777978         
>                 14131
> Kafka Spout   test_topic      11              2790939         2776817         
>                 14122
> Kafka Spout   test_topic      12              2789783         2775665         
>                 14118
> Kafka Spout   test_topic      13              2788651         2774539         
>                 14112
> Kafka Spout   test_topic      14              2787521         2773412         
>                 14109
>
>
> There was no new messages in that topic for a while, so I expected, that my 
> topology would process all messages. But lag shows me that there's some 
> uncommitted messages in most of topics. Topology stop working and didn't 
> process any messages for few hours.
>
> In logs I found these messages:
> 2017-02-16 14:50:20.187 o.a.s.k.s.KafkaSpout [DEBUG] Unexpected offset found 
> [2777849]. OffsetEntry{topic-partition=test_topic-10, fetchOffset=2775755, 
> committedOffset=2777978, ackedMsgs=[{topic-partition=test_topic-10, 
> offset=2777849, numFails=0}, {topic-partition=test_topic-10, offset=2777850, 
> numFails=0},
> ........................................
>  {topic-partition=test_topic-10, offset=2792107, numFails=0}, 
> {topic-partition=test_topic-10, offset=2792108, numFails=0}]}
> 2017-02-16 14:50:20.201 o.a.s.k.s.KafkaSpout [DEBUG] No offsets ready to 
> commit. OffsetEntry{topic-partition=test_topic-10, fetchOffset=2775755, 
> committedOffset=2777978, ackedMsgs=[{topic-partition=test_topic-10, 
> offset=2777849, numFails=0},
> .......................................
> {topic-partition=test_topic-10, offset=2792108, numFails=0}]}
>
>
> So, I assume, messages, that showed as uncommitted, are actually processed by 
> topology and acked. After I start sending new messages to Kafka topic 
> topology start working, but spout lag increasing.
> Why spout could stop committing to Kafka?



Reply via email to