Two things here extending what Ravi talked about: 1. You fail tuples either explicitly or they timeout as an indicator of a recoverable issue in the topology.
If the error is not recoverable don't fail the tuple, ack it and forward the error to another bolt so you can record it somewhere for further investigation like kafka (we have a topic in kafka for this) 2. Real-time processing means you have to worry about latencies at the nano second level at times, this means fail fast strategy must be used. Point to point failure at the granularity of a single tuple can be implemented using transactions with size of 1. This will slow down the topology substantially. You can try an implementation yourself and see. The XOR based tuple tree is a genius innovation from Nathan Marz to do tuple tracking very very fast while using predictable memory. So regardless of however many hops your tuple has to go through Storm uses 20 bytes to track it down. #################### Now about exactly once processing. There is no such this as exactly once processing unless using transactions with batch size of 1. (Including trident) What topology developers should focus on is idempotent processing! What does that mean? Idempotent processing means if your tuple was to replay the result would not change. So if you are using trident micro-batching or you wrote your own micro-batching in Storm the net result is, in case of failures your tuples will replay but your are okay doing that since your net result will be the same. With trident it will not process the next batch until the current one is processed. Which means the entire batch has to be handled via rollback transactions (as in you flush to the db at the end of the batch) or better write to db in an idempotent manner where each tuple has an id such that if you wrote it again it will just rewrite the same info. Most modern data stores have the concept of a key which can be used e.g. elastic document id, hbase row key, MySQL primary key etc. Now how to get UUID of the tuple? 1. Handle in your application logic if you already know what is a unique event 2. Worry from Kafka onwards (we do this) use partition id + offset + event timestamp (inside the event payload) as the UUID 3. MD5 the payload of the event (have a risk of collision here depending on your event volume and application logic) For things like unique counting you can use in-memory approach like we did (Hendrix) or use something like Redis with structures like set and hhperloglog. Thanks, Ambud On Sep 14, 2016 1:38 AM, "Cheney Chen" <[email protected]> wrote: > Thank you guys for the discussion. > > What if I want exact-once processing for all nodes (bolts), even when > failure happens, will Trident be the one? > > On Wed, Sep 14, 2016 at 3:49 PM, Ravi Sharma <[email protected]> wrote: > >> Hi T.I. >> Few things why Spout is responsible for replay rather then Various Bolts. >> >> 1. ack and fail messages carry only message ID, Usually your spouts >> generate messaged Id and knows what tuple/message is linked to it(via >> source i.e. jms etc). If ack or fail happens then Spout can do various >> things like on ack delete from queue, on fail put in some dead letter >> queue. intermediate Bolt Wont know what message it sent, unless you >> implement something of your own. Technically you can put Delete message >> from JMS in bolts but then your whole topology knows from where you are >> getting data, what if tommorow you start processing data from JMS, Http >> rest service, Database and file system etc. >> >> 2. BoltB fails, it tells BoltA, BoltA retry 3 times, it fails 3 times, >> now what BoltA should do,? Send it to another bolt(say BoltPreA exists >> between him and spout) or send it to Spout.? >> If it sends to BoltPreA that means BoltPreA will retry 3 >> times(just using 3 number consider as N), that means for each try to >> BoltPreA, BoltA will retry again 3 times, so total 9 retries.(basically >> total retries will be based on Total bolt from Spout to Failure Bolt TB and >> total Retries TR, it will be like TR + Power(TR,2) ..... + Power(TR,TB) >> If you send back from failure from BoltA to Spout then we can >> argue why not send it to Spout from BoltB, as a framework i shouldnt be >> looking into if BoltB is really costly or BoltA is really costly. >> >> 3. Also failure scenario are suppose to be really really low, and if your >> database is down(means 100% tuple will fail), then performance wont be your >> only concern. your concern will be to make sure database comes up and >> reprocess all failed tuple. >> >> 4. Also you will have to take care of retry logic in every Bolt. >> Currently its only at one place. >> >> >> >> *There is one thing i am looking forward from Storm is to inform Spout >> about what kind of failure it was*. i.e. if it was ConnectionTimeout or >> ReadTimeout etc, that means if i retry it may pass. But say it was null >> pointer exception(java world) , i know the data which is being expected is >> not there and my code is not handling that scenario, so either i will have >> to change code or ask data provider to send that field, but retry wont help >> me. >> >> Currently only way to do is use a outside datastore like Redis, whichever >> Bolt you fail add a key with mesageId and Exception/error detail in redis >> before calling fail. and then let Spout read that data from redis with >> messageId received in onFail call and then spout can decide if i want to >> retry or not. I would usually Create two wrappers Retry-able Exception and >> *non* Retry-able Exception, so each bolt can inform whether retry can >> help or not. Its upto you where you put this decision making logic. >> >> >> >> Thanks >> Ravi. >> >> >> >> >> >> >> On Wed, Sep 14, 2016 at 6:43 AM, Tech Id <[email protected]> >> wrote: >> >>> Thanks Ambud, >>> >>> I did read some very good things about acking mechanism in Storm but I >>> am not sure it explains why point to point checking is expensive. >>> >>> Consider the example: Spout--> BoltA--->BoltB. >>> >>> If BoltB fails, it will report failure to the acker. >>> If the acker can ask the Spout to replay, then why can't the acker ask >>> the parent of BoltB to replay at this point? >>> I don't think keeping parent of a bolt could be expensive. >>> >>> >>> On a related note, I am a little confused about a statement "When a new >>> tupletree is born, the spout sends the XORed edge-ids of each tuple >>> recipient, which the acker records in its pending ledger" in >>> Acking-framework-implementation.html >>> <http://storm.apache.org/releases/current/Acking-framework-implementation.html> >>> . >>> How does the spout know before hand which bolts would receive the tuple? >>> Bolts forward tuples to other bolts based on groupings and dynamically >>> generated fields. How does spout know what fields will be generated and >>> which bolts will receive the tuples? If it does not know that, then how >>> does it send the XOR of each tuple recipient in a tuple's path because each >>> tuple's path will be different (I think, not sure though). >>> >>> >>> Thx, >>> T.I. >>> >>> >>> On Tue, Sep 13, 2016 at 6:37 PM, Ambud Sharma <[email protected]> >>> wrote: >>> >>>> Here is a post on it https://bryantsai.com/fault-to >>>> lerant-message-processing-in-storm/. >>>> >>>> Point to point tracking is expensive unless you are using transactions. >>>> Flume does point to point transfers using transactions. >>>> >>>> On Sep 13, 2016 3:27 PM, "Tech Id" <[email protected]> wrote: >>>> >>>>> I agree with this statement about code/architecture but in case of >>>>> some system outages, like one of the end-points (Solr, Couchbase, >>>>> Elastic-Search etc.) being down temporarily, a very large number of other >>>>> fully-functional and healthy systems will receive a large number of >>>>> duplicate replays (especially in heavy throughput topologies). >>>>> >>>>> If you can elaborate a little more on the performance cost of tracking >>>>> tuples or point to a document reflecting the same, that will be of great >>>>> help. >>>>> >>>>> Best, >>>>> T.I. >>>>> >>>>> On Tue, Sep 13, 2016 at 12:26 PM, Hart, James W. <[email protected]> >>>>> wrote: >>>>> >>>>>> Failures should be very infrequent, if they are not then rethink the >>>>>> code and architecture. The performance cost of tracking tuples in the >>>>>> way >>>>>> that would be required to replay at the failure is large, basically that >>>>>> method would slow everything way down for very infrequent failures. >>>>>> >>>>>> >>>>>> >>>>>> *From:* S G [mailto:[email protected]] >>>>>> *Sent:* Tuesday, September 13, 2016 3:17 PM >>>>>> *To:* [email protected] >>>>>> *Subject:* Re: How will storm replay the tuple tree? >>>>>> >>>>>> >>>>>> >>>>>> Hi, >>>>>> >>>>>> >>>>>> >>>>>> I am a little curious to know why we begin at the spout level for >>>>>> case 1. >>>>>> >>>>>> If we replay at the failing bolt's parent level (BoltA in this case), >>>>>> then it should be more performant due to a decrease in duplicate >>>>>> processing >>>>>> (as compared to whole tuple tree replays). >>>>>> >>>>>> >>>>>> >>>>>> If BoltA crashes due to some reason while replaying, only then the >>>>>> Spout should receive this as a failure and whole tuple tree should be >>>>>> replayed. >>>>>> >>>>>> >>>>>> >>>>>> This saving in duplicate processing will be more visible with several >>>>>> layers of bolts. >>>>>> >>>>>> >>>>>> >>>>>> I am sure there is a good reason to replay the whole tuple-tree, and >>>>>> want to know the same. >>>>>> >>>>>> >>>>>> >>>>>> Thanks >>>>>> >>>>>> SG >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Sep 13, 2016 at 10:22 AM, P. Taylor Goetz <[email protected]> >>>>>> wrote: >>>>>> >>>>>> Hi Cheney, >>>>>> >>>>>> >>>>>> >>>>>> Replays happen at the spout level. So if there is a failure at any >>>>>> point in the tuple tree (the tuple tree being the anchored emits, >>>>>> unanchored emits don’t count), the original spout tuple will be replayed. >>>>>> So the replayed tuple will traverse the topology again, including >>>>>> unanchored points. >>>>>> >>>>>> >>>>>> >>>>>> If an unanchored tuple fails downstream, it will not trigger a replay. >>>>>> >>>>>> >>>>>> >>>>>> Hope this helps. >>>>>> >>>>>> >>>>>> >>>>>> -Taylor >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Sep 13, 2016, at 4:42 AM, Cheney Chen <[email protected]> wrote: >>>>>> >>>>>> >>>>>> >>>>>> Hi there, >>>>>> >>>>>> >>>>>> >>>>>> We're using storm 1.0.1, and I'm checking through >>>>>> http://storm.apache.org/releases/1.0.1/Guaranteeing- >>>>>> message-processing.html >>>>>> >>>>>> >>>>>> >>>>>> Got questions for below two scenarios. >>>>>> >>>>>> Assume topology: S (spout) --> BoltA --> BoltB >>>>>> >>>>>> 1. S: anchored emit, BoltA: anchored emit >>>>>> >>>>>> Suppose BoltB processing failed w/ ack, what will the replay be, will >>>>>> it execute both BoltA and BoltB or only failed BoltB processing? >>>>>> >>>>>> >>>>>> >>>>>> 2. S: anchored emit, BoltA: unanchored emit >>>>>> >>>>>> Suppose BoltB processing failed w/ ack, replay will not happen, >>>>>> correct? >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Regards, >>>>>> Qili Chen (Cheney) >>>>>> >>>>>> E-mail: [email protected] >>>>>> MP: (+1) 4086217503 >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>> >> > > > -- > Regards, > Qili Chen (Cheney) > > E-mail: [email protected] > MP: (+1) 4086217503 >
