I'm trying to do some timed/count-based orchestrations of streams in Storm/Trident. Some of my timing problems include timed or count-based emissions of tuples from aggregators and tumbling windows (whereby I'm batching up data and I emit every so often or I emit on every 500th tuple).
I am beginning to play around with using sliding windows with a static Storm topology that allows me to specify "flows" of data that can cycle in parallel through the topology so that I can do things like collect into windows for each grouped set of fields and aggregate counts based on fields and emit every 5 seconds. Other things I want to do are more CEP-based like creating a stop-gate filter where the gate will close when a window fills up with 5 tuples and the time difference between the first and last tuples is less than or equal to 2 seconds. A closed gate will basically filter tuples for 10 minutes until the gate is opened again and the logic repeats. I noticed a comment at the bottom of the Trident API overview that states "*You might be wondering- how do you do something like a "windowed join", where tuples from one side of the join are joined against the last hour of tuples from the other side of the join. To do this, you would make use of partitionPersist and stateQuery. The last hour of tuples from one side of the join would be stored and rotated in a source of state, keyed by the join field. Then the stateQuery would do lookups by the join field and perform the "join".*" I've been using Storm and streams processing for some time but I'm very new to some of the concepts underlying Trident like state and batching. Because of that, the quote above isn't making much sense to me. How would I best implement the use-cases above using such a paradigm (if it exists?). I've looked @ previous posts on using CEP in storm and I would certainly be happy to write my own generic windowing functions but I'd like to get as much for free out of Trident as possible. For instance, if i write my own time-emitted, count-evicted tumbling window aggregator, I'm not using the great aggregation functions already supplied in Trident and it seems like a hack to me. Thanks!
