Thanks. The bolts look fine to me. I'd look at whether the tuples are being
acked on the spout (use the debug setting on Config), and the OffsetManager
class logs I linked earlier. I don't know if it's relevant to your case,
but please note that there are some cases where setting a low
maxUncommittedOffsets can cause the spout to stop polling for tuples. It's
being fixed, but please leave maxUncommittedOffsets at the default if
you're setting it to a custom value.

What is your retry service configuration?

2017-09-02 0:11 GMT+02:00 pradeep s <>:

> Yes Stig.  Code posted is for DataBaseInsertBolt. Emit from last bolt is
> not needed.
> Problem 2 was for a separate topic . Problem 1 was observed for topics
> where processing failures are encountered previously.
> I have attached the error processing and bolt files
> Thanks
> Pradeep
> On Fri, Sep 1, 2017 at 1:09 PM, Stig Rohde Døssing <>
> wrote:
>> Just to make sure I understand:
>> This is your topology
>> KafkaSpout --> AvroDeserializerBolt-->DataBaseInsertBolt
>> The bolt you posted the execute method for is the DataBaseInsertBolt,
>> right?
>> What are these statements for if this is the last bolt in the topology? "
>> super.getOutputCollector().emit(tuple, new Values(fullMessage));"
>> Are the topics you mention in problem 1 and 2 the same topic? Essentially
>> what I'm asking is whether the topic that is stuck is also the one with
>> failures that is starting over on an old offset?
>> Can you post your RetryService configuration?
>> You talked about moving tuples to an error queue if they fail
>> deserialization in the Avro bolt. Can you post that execute too?
>> 2017-09-01 20:14 GMT+02:00 pradeep s <>:
>>> Thanks  Stig for the response . I can give some more detail on the issue
>>> we are facing now .
>>> For any database failure ,we are retrying the tuple for upto 10 times .
>>> Database failure is mostly because of parent child relation ,since we are
>>> processing out of order .
>>> Our consumer group has more than 10 topics and  each topic corresponds
>>> to one table . For eg: we have topics A, B and C in a group its
>>> corresponding to tables A,B and C in database .
>>> In this , table A will the parent and table B and table C will be child
>>> tables .
>>> Spout parallelism is set as 50 and each topic has 50 partitions .These
>>> 50 threads are going round robing across all the topics in the group.
>>> Issues observed with the current setup are
>>> 1)One partition for one topic alone getting stuck .All the other
>>> partition lag is cleared
>>> 2)Whatever topic had failures earlier ,is going to a old offset .
>>> DB Bolt Execute Method below
>>> =======================
>>> exceptionCount will have a value greater than 0 once the message is
>>> moved to error queue . In that case i am acknowleding the message . Other
>>> cases i am calling
>>> There is no downstream bolt for this . This is the final bolt in the
>>> topology.
>>>  @Override
>>>     public void execute(final Tuple tuple) {
>>>         String fullMessage = (String) tuple.getValueByField(EXTRACTE
>>>         GGMessageDTO ggMessage = (GGMessageDTO) tuple.getValueByField(
>>>         try {
>>>             // Call to handler for generating Sql
>>>             Date date = new Date();
>>>             super.getMessageHandler().handleMessage(ggMessage, super
>>> .getGenericMessageDAO());
>>>             super.getOutputCollector().emit(tuple, new Values(
>>> fullMessage));
>>>             super.getOutputCollector().ack(tuple);
>>>   "DbActionBolt Ack time in ms: {}", new
>>> Date().getTime() - date.getTime());
>>>         } catch (Exception e) {
>>>             LOGGER.error("DB bolt exception occurred from Aurora : ", e
>>> );
>>>             int exceptionCount = handleException(fullMessage, ggMessage,
>>> e, isNormalProcessing);
>>>             if (exceptionCount != -1) {
>>>                 // If message write is success acknowledge the message
>>> so
>>>                 // that it will be removed from kafka queue
>>>                 super.getOutputCollector().emit(tuple, new Values(
>>> fullMessage));
>>>                 super.getOutputCollector().ack(tuple);
>>>             } else {
>>>                 super.getOutputCollector().reportError(e);
>>>                 super.getOutputCollector().fail(tuple);
>>>             }
>>>         }
>>>     }
>>> On Fri, Sep 1, 2017 at 9:59 AM, Stig Rohde Døssing <>
>>> wrote:
>>>> Hi Pradeep,
>>>> When you move the message to an error queue, is this happening from
>>>> inside the Avro bolt or are you emitting a tuple? Can you verify that the
>>>> tuple is being acked in the Avro bolt exactly once (double acking will
>>>> cause the tuple to fail)?
>>>> Storm will ack messages on the spout as long as all edges in the tuple
>>>> tree are acked, and the topology message timeout hasn't expired before this
>>>> occurs.
>>>> For example, if the Kafka bolt emits t0 and your AvroDeserializerBolt
>>>> is the only bolt consuming from the spout, the bolt will receive t0 and
>>>> must ack it exactly once. If the AvroDeserializerBolt emits any tuples
>>>> anchored to t0 (using any of the
>>>> ses/1.1.0/javadocs/org/apache/storm/task/OutputCollector.html methods
>>>> that take a Tuple anchor), the downstream bolts must ack those exactly once
>>>> too. Let's say the Avro bolt emits t0_0 and t0_1 based on t0. The root
>>>> tuple on the spout is only acked if t0, t0_0 and t0_1 are acked once each,
>>>> and they all get acked before the message timeout elapses.
>>>> Depending on your throughput this may be infeasible, but you might try
>>>> enabling debug logging
>>>> ses/1.1.0/javadocs/org/apache/storm/Config.html#setDebug-boolean-
>>>> which will let you tell whether the tuple is being acked on the spout.
>>>> If the tuple is being acked on the spout, you might want to look at
>>>> some of the logs from this method
>>>> m/blob/v1.1.0/external/storm-kafka-client/src/main/java/org/
>>>> apache/storm/kafka/spout/internal/ They should
>>>> show you what the spout is doing internally. Keep in mind that the spout
>>>> can only commit e.g. offset 10 if offsets 0-9 have all been
>>>> acked/committed, so if an earlier tuple failed and is waiting for retry
>>>> when you restart, that could also cause this.
>>>> 2017-09-01 <20%2017%2009%2001> 7:04 GMT+02:00 pradeep s <
>>>>> Hi,
>>>>> I am using Storm 1.1.0 ,storm kafka client version 1.1.1 and Kafka
>>>>> server is
>>>>> Kakfa spout polling strategy used is UNCOMMITTED_EARLIEST.
>>>>> Message flow is like below and its a normal topology
>>>>> KafkaSpout --> AvroDeserializerBolt-->DataBaseInsertBolt.
>>>>> If the message fails avro deserialization , i am moving the message to
>>>>> a error queue and acknowledging from the avro bolt . This message is not
>>>>> emitted to database bolt .
>>>>> But its observed that after i restart topology , offset for the topic
>>>>> is going back to old offset.
>>>>> Will Kafka commit the offset, only if the message is acked from all
>>>>> bolts ?
>>>>> Is the offset going back to previous value is beacuse of this ..
>>>>> Thanks
>>>>> Pradeep

Reply via email to