You can save the state per partition (I am not sure what is your partition key, but you should be able to use it as a key to a key-value storage). Then, when you receive a message from topic C, you should check if you have the appropriate state for the partition of this message, and if not grab it from the storage. Almost always you should already have the state, so it should not cause a performance issue.
The details are somewhat vague, but it sounds like you are trying to do stateful processing, which in my opinion is much more native with trident. On Tue, Oct 21, 2014 at 2:44 PM, Manoj Jaiswal <[email protected]> wrote: > Thanks Yair, > > The messages picked up from topic A create the queries. These messages are > partitioned and so are the real time messages from topic C. > If I persist the state . then how do I get the same partitioned data. > The partitioning of data is dynamic and based on worker nodes alive. Isnt > it ? > So even if we read the state, how to make sure its the same relative > state. as in realtime data may be now flowing to another bolt due to > partitioning . > > -Manoj > > > > On Tue, Oct 21, 2014 at 2:34 PM, Yair Weinberger <[email protected]> > wrote: > >> Hi, >> It sounds like your bolt actually has a state (Initialized by the >> messages picked up from topic A) >> When restarting the bolt in case of failover, storm does not provide any >> inherent mechanism to keep the Bolt's previous state. >> >> In my opinion, your best option would be to move to Trident, which >> provides the notion of a state. see >> https://storm.apache.org/documentation/Trident-state. >> >> Alternatively, you can use any external storage (e.g. mongo or memcached) >> to save your state. >> After the processing of the messages from topic A you should write your >> state to the external storage. >> Then, you can read it in the prepare method. It would be empty in case >> the topology was just started, or have the data that was previously written >> there if it is a failover restart. >> >> Take a look at MongoBolt for some ideas ( >> https://github.com/stormprocessor/storm-mongo/) >> >> Yair >> http://www.alooma.io >> >> On Mon, Oct 20, 2014 at 1:28 PM, Manoj Jaiswal <[email protected] >> > wrote: >> >>> Hi, >>> >>> Let me explain my use case in brief: >>> >>> Kakfa spout A picks up messages from Kakfa topic topic-A and creates >>> Queries via Esper in Storm bolt B. >>> This is done only once as soon as topology is deployed >>> Another Kafka spout C picks up realtime messages from Kafka topic -C >>> which will be processed by Esper engine in same bolt B. >>> >>> The spout data from A and B are both partitioned by account numbers so >>> that the Esper engine in different worker processes gets same account >>> numbers. >>> >>> Now the problem: >>> In case of worker threads dying due to some issue or the supervisor node >>> getting kicked out of the cluster, we are observing that the bolt instance >>> may get assigned to new process/worker. >>> But the prepare method of the bolt is initializing Esper query >>> configuration. so every time the Esper query engine in that worker process >>> is initialized .Hence it loses the queries setup by one time messages from >>> Kafka spout A. >>> >>> Any suggestions, how do we handle this? >>> >>> -Manoj >>> >> >> >
