Thanks. The bolts look fine to me. I'd look at whether the tuples are being acked on the spout (use the debug setting on Config), and the OffsetManager class logs I linked earlier. I don't know if it's relevant to your case, but please note that there are some cases where setting a low maxUncommittedOffsets can cause the spout to stop polling for tuples. It's being fixed, but please leave maxUncommittedOffsets at the default if you're setting it to a custom value.
What is your retry service configuration? 2017-09-02 0:11 GMT+02:00 pradeep s <sreekumar.prad...@gmail.com>: > Yes Stig. Code posted is for DataBaseInsertBolt. Emit from last bolt is > not needed. > > Problem 2 was for a separate topic . Problem 1 was observed for topics > where processing failures are encountered previously. > > I have attached the error processing and bolt files > > Thanks > Pradeep > > > > > On Fri, Sep 1, 2017 at 1:09 PM, Stig Rohde Døssing <s...@apache.org> > wrote: > >> Just to make sure I understand: >> >> This is your topology >> KafkaSpout --> AvroDeserializerBolt-->DataBaseInsertBolt >> >> The bolt you posted the execute method for is the DataBaseInsertBolt, >> right? >> What are these statements for if this is the last bolt in the topology? " >> super.getOutputCollector().emit(tuple, new Values(fullMessage));" >> Are the topics you mention in problem 1 and 2 the same topic? Essentially >> what I'm asking is whether the topic that is stuck is also the one with >> failures that is starting over on an old offset? >> Can you post your RetryService configuration? >> You talked about moving tuples to an error queue if they fail >> deserialization in the Avro bolt. Can you post that execute too? >> >> 2017-09-01 20:14 GMT+02:00 pradeep s <sreekumar.prad...@gmail.com>: >> >>> Thanks Stig for the response . I can give some more detail on the issue >>> we are facing now . >>> For any database failure ,we are retrying the tuple for upto 10 times . >>> Database failure is mostly because of parent child relation ,since we are >>> processing out of order . >>> Our consumer group has more than 10 topics and each topic corresponds >>> to one table . For eg: we have topics A, B and C in a group its >>> corresponding to tables A,B and C in database . >>> In this , table A will the parent and table B and table C will be child >>> tables . >>> Spout parallelism is set as 50 and each topic has 50 partitions .These >>> 50 threads are going round robing across all the topics in the group. >>> >>> Issues observed with the current setup are >>> >>> 1)One partition for one topic alone getting stuck .All the other >>> partition lag is cleared >>> >>> 2)Whatever topic had failures earlier ,is going to a old offset . >>> >>> >>> DB Bolt Execute Method below >>> ======================= >>> exceptionCount will have a value greater than 0 once the message is >>> moved to error queue . In that case i am acknowleding the message . Other >>> cases i am calling tuple.fail. >>> There is no downstream bolt for this . This is the final bolt in the >>> topology. >>> >>> @Override >>> >>> public void execute(final Tuple tuple) { >>> >>> String fullMessage = (String) tuple.getValueByField(EXTRACTE >>> D_MESSAGE); >>> >>> GGMessageDTO ggMessage = (GGMessageDTO) tuple.getValueByField( >>> GG_MESSAGE); >>> >>> try { >>> >>> // Call to handler for generating Sql >>> >>> Date date = new Date(); >>> >>> super.getMessageHandler().handleMessage(ggMessage, super >>> .getGenericMessageDAO()); >>> >>> super.getOutputCollector().emit(tuple, new Values( >>> fullMessage)); >>> >>> super.getOutputCollector().ack(tuple); >>> >>> LOGGER.info("DbActionBolt Ack time in ms: {}", new >>> Date().getTime() - date.getTime()); >>> >>> } catch (Exception e) { >>> >>> LOGGER.error("DB bolt exception occurred from Aurora : ", e >>> ); >>> >>> int exceptionCount = handleException(fullMessage, ggMessage, >>> e, isNormalProcessing); >>> >>> if (exceptionCount != -1) { >>> >>> // If message write is success acknowledge the message >>> so >>> >>> // that it will be removed from kafka queue >>> >>> super.getOutputCollector().emit(tuple, new Values( >>> fullMessage)); >>> >>> super.getOutputCollector().ack(tuple); >>> >>> } else { >>> >>> super.getOutputCollector().reportError(e); >>> >>> super.getOutputCollector().fail(tuple); >>> >>> } >>> >>> } >>> >>> } >>> >>> >>> >>> >>> >>> On Fri, Sep 1, 2017 at 9:59 AM, Stig Rohde Døssing <s...@apache.org> >>> wrote: >>> >>>> Hi Pradeep, >>>> >>>> When you move the message to an error queue, is this happening from >>>> inside the Avro bolt or are you emitting a tuple? Can you verify that the >>>> tuple is being acked in the Avro bolt exactly once (double acking will >>>> cause the tuple to fail)? >>>> >>>> Storm will ack messages on the spout as long as all edges in the tuple >>>> tree are acked, and the topology message timeout hasn't expired before this >>>> occurs. >>>> >>>> For example, if the Kafka bolt emits t0 and your AvroDeserializerBolt >>>> is the only bolt consuming from the spout, the bolt will receive t0 and >>>> must ack it exactly once. If the AvroDeserializerBolt emits any tuples >>>> anchored to t0 (using any of the https://storm.apache.org/relea >>>> ses/1.1.0/javadocs/org/apache/storm/task/OutputCollector.html methods >>>> that take a Tuple anchor), the downstream bolts must ack those exactly once >>>> too. Let's say the Avro bolt emits t0_0 and t0_1 based on t0. The root >>>> tuple on the spout is only acked if t0, t0_0 and t0_1 are acked once each, >>>> and they all get acked before the message timeout elapses. >>>> >>>> Depending on your throughput this may be infeasible, but you might try >>>> enabling debug logging https://storm.apache.org/relea >>>> ses/1.1.0/javadocs/org/apache/storm/Config.html#setDebug-boolean- >>>> which will let you tell whether the tuple is being acked on the spout. >>>> >>>> If the tuple is being acked on the spout, you might want to look at >>>> some of the logs from this method https://github.com/apache/stor >>>> m/blob/v1.1.0/external/storm-kafka-client/src/main/java/org/ >>>> apache/storm/kafka/spout/internal/OffsetManager.java#L64. They should >>>> show you what the spout is doing internally. Keep in mind that the spout >>>> can only commit e.g. offset 10 if offsets 0-9 have all been >>>> acked/committed, so if an earlier tuple failed and is waiting for retry >>>> when you restart, that could also cause this. >>>> >>>> 2017-09-01 <20%2017%2009%2001> 7:04 GMT+02:00 pradeep s < >>>> sreekumar.prad...@gmail.com>: >>>> >>>>> Hi, >>>>> I am using Storm 1.1.0 ,storm kafka client version 1.1.1 and Kafka >>>>> server is 0.10.1.1. >>>>> >>>>> Kakfa spout polling strategy used is UNCOMMITTED_EARLIEST. >>>>> >>>>> Message flow is like below and its a normal topology >>>>> >>>>> KafkaSpout --> AvroDeserializerBolt-->DataBaseInsertBolt. >>>>> >>>>> If the message fails avro deserialization , i am moving the message to >>>>> a error queue and acknowledging from the avro bolt . This message is not >>>>> emitted to database bolt . >>>>> >>>>> But its observed that after i restart topology , offset for the topic >>>>> is going back to old offset. >>>>> >>>>> Will Kafka commit the offset, only if the message is acked from all >>>>> bolts ? >>>>> >>>>> Is the offset going back to previous value is beacuse of this .. >>>>> >>>>> Thanks >>>>> Pradeep >>>>> >>>> >>>> >>> >> >