I've resolved this problem now.

It was nothing to do with the MapState after all - it was to do with the
serialization which occurs when using multiple workers distributed across
multiple nodes.

I was using a Protobufs library to deserialize the byte array emitted by my
Kafka Spout, and then passing this object around in my tuples. It turns out
the objects generated by the Protobufs library aren't serializable and
Storm was letting me use those objects without throwing an exception, but
all the fields were set to null! This was causing my tuples to be filtered
out before they reached the MapState. (but this problem only happened when
serialization is required, i.e. when > 1 worker is used)

The solution was to pass around a byte array in the tuples, and then
deserialize this with the Protobufs library every time it needs to be used.

On Wed, May 20, 2015 at 4:49 PM, Josh <[email protected]> wrote:

> Hi all,
>
> I am using a Trident MapState to persist state to an external store.
>
> I am seeing a very strange problem: when I run the topology on a single
> worker process only, everything works fine. My topology processes around
> 20k tuples/sec and writes around 10k/sec to the external store.
>
> However, if I rebalance the same topology to run on 2 workers instead of
> 1, the map state doesn't work at all - no writes are made to the external
> store. However the topology still processes 20k tuples/sec with no explicit
> errors/failures.
>
> What could cause this to happen?
>
> It feels like the persistentAggregate part of the trident toplogy is just
> completely ignored when using more than 1 worker.
>
> Thanks for any help with this,
> Josh
>
>
> P.S. the trident topology looks like this:
>
> requestStream
> .each(new Fields("msg"), new MsgFilter())
> .each(new Fields("msg"), new FieldExtractorFunction(), new Fields("field1", 
> "field2"))
> .groupBy(new Fields("field1", "field2"))
> .persistentAggregate(MyMapState.getNonTransactional(),
>     new CountAggregator(), new Fields("output_count"))
> .parallelismHint(8)
>
> For now, I am using a OpaqueTridentKafkaSpout to provide the input stream and 
> a NonTransactional implementation of the MapState.
>
>

Reply via email to