Hello ,
is there any way to achieve chain level transaction using trident that is I
want below mentioned whole chain to maintain transaction integrity ( at
chain level )
Batch input -> init()-> tuple ->persistentaggregation -> MultiReduction
->complete()
OR
If I create topology like this
.each() // will call init
.groupBy(key).persistentAggregate(<>) // aggregation
// stream 1 output ---------------------------------------- 1st stream
chain
.each() // will call init
.groupBy(key).persistentAggregate(<>) // aggregation
// stream 2 output ---------------------------------------- 2nd stream
chain
multireduce(s1 and s2).groupBy(key).persistentAggregate(<function3>) //
aggregation -- final output
Now my problem is my fields are related ,I want to emit aggregated fields
only if all begin intermediate and end {indicators of my tuple} fields
have arrived for that key, that is I want the aggregation to be
transactional however instead of emitting the whole batch map ,
only data against selective keys I want to emit and if for particular key
begin and end tuple indicator has arrived then data should be pass to next
multireduce level and if exception occurs at function3 , the whole batch is
replayed, finally my multireduce should get the same filtered aggregated
values against keys which it earlier got.
{I am using ITridentSpout}
How can I achieve this ?
Regards