Hi all,

I am using a Trident MapState to persist state to an external store.

I am seeing a very strange problem: when I run the topology on a single
worker process only, everything works fine. My topology processes around
20k tuples/sec and writes around 10k/sec to the external store.

However, if I rebalance the same topology to run on 2 workers instead of 1,
the map state doesn't work at all - no writes are made to the external
store. However the topology still processes 20k tuples/sec with no explicit
errors/failures.

What could cause this to happen?

It feels like the persistentAggregate part of the trident toplogy is just
completely ignored when using more than 1 worker.

Thanks for any help with this,
Josh


P.S. the trident topology looks like this:

requestStream
.each(new Fields("msg"), new MsgFilter())
.each(new Fields("msg"), new FieldExtractorFunction(), new
Fields("field1", "field2"))
.groupBy(new Fields("field1", "field2"))
.persistentAggregate(MyMapState.getNonTransactional(),
    new CountAggregator(), new Fields("output_count"))
.parallelismHint(8)

For now, I am using a OpaqueTridentKafkaSpout to provide the input
stream and a NonTransactional implementation of the MapState.

Reply via email to