Hi all

I would like to better understand how groupBy/persistentAggregate works in
a "distributed environment"

I got the following topology (just an extract)

Stream currStream = topology.newStream(this.getTopologyName(), jmsspout)
...
currStream.groupBy(new Fields(A,B))

 .persistentAggregate(myDistributedCacheStateFactory(hosts),
                            new Fields(C),
                            (CombinerAggregator) sum,
                            new Fields("output"))
                    .parallelismHint(parallHintValue);

MyDistributedCacheStateFactory is an implementation of MapState (should
work because of the groupBy) that persist into a distributed cache. to keep
it simple, an entry is identified by the value of [A,B]

So as far as I understand, my batch will be grouped by [A, B] and my
CombinerAggregator will do a sum on all the objects C.

1/ correct me if I am wrong but in a distributed env, I will have several
instances of MyDistributedCacheStateFactory executed in different JVM or
threads

2/ Does Storm guarantee me that each "groupBy" will always go to the same
"thread"? or not?
if not, how can I ensure a "select for update where key=[A,B]"? how can I
ensure I will not update the same entry of the distributed cache from two
different threads (ie. a kind of race conditions)

3/ is it a better option to use groupBy().aggregate().partitionPersist()?

Hope my questions are not too blur.

Regards.

Reply via email to