I've resolved this problem now. It was nothing to do with the MapState after all - it was to do with the serialization which occurs when using multiple workers distributed across multiple nodes.
I was using a Protobufs library to deserialize the byte array emitted by my Kafka Spout, and then passing this object around in my tuples. It turns out the objects generated by the Protobufs library aren't serializable and Storm was letting me use those objects without throwing an exception, but all the fields were set to null! This was causing my tuples to be filtered out before they reached the MapState. (but this problem only happened when serialization is required, i.e. when > 1 worker is used) The solution was to pass around a byte array in the tuples, and then deserialize this with the Protobufs library every time it needs to be used. On Wed, May 20, 2015 at 4:49 PM, Josh <[email protected]> wrote: > Hi all, > > I am using a Trident MapState to persist state to an external store. > > I am seeing a very strange problem: when I run the topology on a single > worker process only, everything works fine. My topology processes around > 20k tuples/sec and writes around 10k/sec to the external store. > > However, if I rebalance the same topology to run on 2 workers instead of > 1, the map state doesn't work at all - no writes are made to the external > store. However the topology still processes 20k tuples/sec with no explicit > errors/failures. > > What could cause this to happen? > > It feels like the persistentAggregate part of the trident toplogy is just > completely ignored when using more than 1 worker. > > Thanks for any help with this, > Josh > > > P.S. the trident topology looks like this: > > requestStream > .each(new Fields("msg"), new MsgFilter()) > .each(new Fields("msg"), new FieldExtractorFunction(), new Fields("field1", > "field2")) > .groupBy(new Fields("field1", "field2")) > .persistentAggregate(MyMapState.getNonTransactional(), > new CountAggregator(), new Fields("output_count")) > .parallelismHint(8) > > For now, I am using a OpaqueTridentKafkaSpout to provide the input stream and > a NonTransactional implementation of the MapState. > >
