Hi Adrian, Yes that is my understanding too. Sometimes I am wondering if it is not a good idea to use Storm to perform such computation (i.e. aggregate the data over a certain time window while the aggregation operation is not additive)
Abhishek, thank you for pointing that out! I kinda get the point of storm-redis, but didn't think about extending the same idea into a larger topology. My idea now (similar to what Tom suggest) is to have the Bolt in the middle output the data to a kafka queue, and make a spout to listen to this queue and emit the data to the downstreaing bolts, as well as, handling the re-emission upon failure. Ideally, I want to make a TopologyBuilder to automatically construct this, so that, on the surface, it would look the same as a ordinary topology. Does it make any sense? Thank you all so much for the kind replies. This community is great! Best, Andy On Wed, Feb 12, 2014 at 10:22 AM, Adrian Mocanu <[email protected]>wrote: > Hi > > > > You can fail a tuple from any intermediate bolt, but AFAIK you can't make > it not resend from the spout. So your precomputed cached result is useless. > I know in spark you can save your RDD to a db/cache but that wouldn't work > in storm. > > > > If I'm wrong someone correct me. > > > > A > > > > *From:* Abhishek Bhattacharjee [mailto:[email protected]] > > *Sent:* February-12-14 3:37 AM > > *To:* [email protected] > *Subject:* Re: How to efficiently store the intermediate result of a > bolt, and so it can be replayed after the crashes? > > > > Hi Cheng, > > If you see the repo that Aniket posted the link of and read its README > you'll get what you are asking for in the above mail. > > I'll repost the link here https://github.com/pict2014/storm-redis . This > does what you are asking for. > > It uses *kafka* for replaying and *redis* for caching the intermediate > state in batches. If you have a good understanding of storm then you can > read the > > code and understand how it works. It uses transactional topologies. > > Thanks, > > > > On Wed, Feb 12, 2014 at 4:17 AM, Cheng-Kang Hsieh (Andy) < > [email protected]> wrote: > > Hi Adrian > > > > Thank you so much for the input! If I understand how Spout works > correctly, wouldn't the tuple be regarded failed if it has not been fully > acked before the timeout? (which, by default, is 30 secs) From my > understanding (can be totally wrong though), a storm-ish way to response to > a failed tuple is to call the *fail* method in the root Spout, which, in > turn, re-emits the failed tuple to the topology. > > > > It will be nice, if there is a *fail *method in the intermediate bolt > that will be called when the down-streaming bolts failed; then this bolt > can just re-emit the intermediate results to the downstreaming bolt without > restarting the process all the way up from the root spout. > > > > A use case of that will be that, says I have 3 components chained together > as follows: Spout -> Bolt1 -> Bolt2. What Bolt1 does is to aggregate the > data within every fixed-size time window in a day and compute some > measurements based on that.(e.g. the user's activities of each hour in a > day). With the current design of storm, when the Bolt 2 fails, the Spout > has to manage to resend all the data in the corresponding time window for > the Bolt 1 to recompute the results. It will be nice if Bolt1 can cache the > results and resend it when the Bolt 2 fails. > > > > Does it make any sense? > > Any input is appreciated! > > > > Best, > > Andy > > > > On Tue, Feb 11, 2014 at 5:03 PM, Adrian Mocanu <[email protected]> > wrote: > > You can have acks from bolt to bolt. > > > > Spout: > > //ties in tuple to this UID > > _collector.emit(new Values(queue.dequeue(), *uniqueID*) > > > > Then Bolt1 will ack the tuple only after it emits it to Bolt2 so that the > ack can be tied to the tuple > > Bolt1: > > //emit first then ack > > _collector.emit(tuple, new Values("stuff")) //**anchoring** - read below > to see what it means > > _collector.ack(tuple) > > > > At this point tuple from Spout has been acked in Bolt1, but at the same > time the newly emitted tuple "stuff" to Bolt2 is "anchored" to the tuple > from Spout. What this means is that it still needs to be acked later on > otherwise on timeout it will be resent by spout. > > Bolt2: > > _collector.ack(tuple) > > Bolt2 needs to ack the tuple received from Bolt1 which will send in the > last ack that Spout was waiting for. If at this point Bolt2 emits tuple, > then there must be a Bolt3 which will get it and ack it. If the tuple is > not acked at the last point, Spout will time it out and resend it. > > Each time anchoring is done on an emit statement from bolt to bolt, a new > node in a "tree" structure is built... well more like a list in my case > since I never send the same tuple to 2 or more tuples, I have a 1 to 1 > relationship. > > All nodes in the tree need to be acked, and only then the tuple is marked > as fully arrived. If the tuple is not acked and it is sent with a UID and > anchored later on then it will be kept in memory forever (until acked). > > Hope this helps. > > > > *From:* Tom Brown [mailto:[email protected]] > *Sent:* February-11-14 4:57 PM > > > *To:* [email protected] > *Subject:* Re: How to efficiently store the intermediate result of a > bolt, and so it can be replayed after the crashes? > > > > We use 2 storm topologies, with kafka in between: Kafka --> TopologyA --> > Kafka --> TopologyB --> Final output > > > > This allows the two halves of computation to be scaled and maintained > independently. > > > > --Tom > > > > On Tue, Feb 11, 2014 at 2:36 PM, Cheng-Kang Hsieh (Andy) < > [email protected]> wrote: > > Hi Aniket & Andrian, > > > > Thank you guys so much for the kind reply! Although the replies don't > directly solve my problem, it has been very rewarding following the code of > redis-storm and Trident. > > > > I guess storing the intermediate data in an external db (like Cassandra, > as suggested by Andrian) would work, but what if the Bolt that is supposed > to receive the intermediate data fails? In this case, the emitter is also a > Bolt, and does not have the nice ACK mechanism to rely on, so the emitting > Bolt might never know when it should resend the data to the receiving Bolt. > > > > In other framework like Samza, or Spark Streaming, all the emitted data, > no matter, by a Spout or Bolt is treated as the same way and so benefits > from the same fault tolerance mechanism (they are not as easy to use as > Storm though). For example, in Samza, all the data output of a component > are push to a Kafka queue with the receiving components as the listeners > (see > here<http://samza.incubator.apache.org/learn/documentation/0.7.0/container/state-management.html> > ). > > > > Conceptually maybe a more general solution for Storm is to make a Bolt > also a Spout which can receive ACKs from the receiving Bolts; however it > seems to violate the assumption of Storm? > > > > Again I appreciate any advice or suggestion. Thank you! > > > > Best, > > Andy > > > > On Fri, Feb 7, 2014 at 9:37 AM, Adrian Mocanu <[email protected]> > wrote: > > Hi Andy, > > I think you can use Trident to persist the results at any point in your > stream processing. > > I believe the way you do that is by using STREAM.persistentAggregate(...) > > > > Here's an example from > https://github.com/nathanmarz/storm/wiki/Trident-tutorial > > > > TridentTopology topology = new TridentTopology(); > > TridentState wordCounts = > > topology.newStream("spout1", spout) > > .each(new Fields("sentence"), new Split(), new Fields("word")) > > .groupBy(new Fields("word")) > > .persistentAggregate(new MemoryMapState.Factory(), new Count(), new > Fields("count")) > > .parallelismHint(6); > > > > In this case the counts (re[place counts with whatever operations you are > doing) are stored in a memory map, but you can make another class that > saves this intermediate result to a db... at least that's my understanding... > I > am currently also learning these things. > > I'm currently working on a similar problem and I'm attempting to store > into Cassandra. Feel free to watch my conversation threads (with Svend and > Taylor Goetz) > > > > -A > > > > *From:* Aniket Alhat [mailto:[email protected]] > *Sent:* February-06-14 11:57 PM > *To:* [email protected] > *Subject:* Re: How to efficiently store the intermediate result of a > bolt, and so it can be replayed after the crashes? > > > > I hope this helps > > https://github.com/pict2014/storm-redis > > On Feb 7, 2014 12:07 AM, "Cheng-Kang Hsieh (Andy)" <[email protected]> > wrote: > > Sorry, I realized that question was badly written. Simply put, my > question is that is there a recommended way to store the tuples emitted by > a BOLT so that the tuples can be replayed after crash without repeating the > process all the way up from the source spout? any advice would be > appreciated. Thank you! > > > > Best, > > Andy > > > > On Tue, Feb 4, 2014 at 11:58 AM, Cheng-Kang Hsieh (Andy) < > [email protected]> wrote: > > Hi all, > > First of all, Thank Nathan and all the contributors for pulling out such a > great framework! I am learning a lot, even just reading the discussion > threads. > > I am building a topology that contains one spout along with a chain of > bolts. (e.g. S -> A -> B, where S is the spout, A, B are bolts.) > > When S emits a tuple, the next bolt A will buffer the tuple in a DFS, and > compute some aggregated values when it has received a sufficient amount of > data and then emit the aggregation results to the next bolt B. > > Here comes my question, is there a recommended way to store the > intermediate results emitted by a bolt, so that when machine crashes, the > results can be replayed to the downstreaming bolts (i.e. bolt B)? > > One possible solution could be that: Don't keep any intermediate results, > but resort to the storm's ack framework, so that the raw data will be > replay from spout S when crash happened. > > However, this approach might not be appropriate in my case, as it might > take pretty long time (like a couple of hours) before bolt A has received > all the required data and emit the aggregated results, so that it will be > very expensive for ack framework to keep tracking that many tuples for that > long. > > An alternative solution could be: *making bolt A also a spout* and keep the > emitted data in a DFS queue. When a result has been acked, the bolt A > removes it from the queue. > > I am wondering if it is reasonable to make a task both bolt and spout at > the same time? or if there is any better approach to do so. > > Thank you! > > -- > Cheng-Kang Hsieh > UCLA Computer Science PhD Student > M: (310) 990-4297 > A: 3770 Keystone Ave. Apt 402, > Los Angeles, CA 90034 > > > > > > -- > Cheng-Kang Hsieh > UCLA Computer Science PhD Student > M: (310) 990-4297 > A: 3770 Keystone Ave. Apt 402, > Los Angeles, CA 90034 > > > > > > -- > Cheng-Kang Hsieh > UCLA Computer Science PhD Student > M: (310) 990-4297 > A: 3770 Keystone Ave. Apt 402, > Los Angeles, CA 90034 > > > > > > > > -- > Cheng-Kang Hsieh > UCLA Computer Science PhD Student > M: (310) 990-4297 > A: 3770 Keystone Ave. Apt 402, > Los Angeles, CA 90034 > > > > > -- > > *Abhishek Bhattacharjee* > > *Pune Institute of Computer Technology* > -- Cheng-Kang Hsieh UCLA Computer Science PhD Student M: (310) 990-4297 A: 3770 Keystone Ave. Apt 402, Los Angeles, CA 90034
