Hi,

I have been using LevelDB for this purpose now, its like a persistent and
quite fast Map. In your case you could use it to store the counts. Here is
a link https://code.google.com/p/leveldb/ and here the maven dependency for
the Java Wrapper :

 <dependency>
    <groupId>org.fusesource.leveldbjni</groupId>
    <artifactId>leveldbjni-all</artifactId>
    <version>1.7</version>
 </dependency>

Working with level db is super easy, just like a map, e.g.

db.put(bytes(key), bytes(value));


If multiple bolts run in parallel, you have to somehow know which stream
partition it is running on, you can get that from the topology as the task
id (if I am not totally wrong).

 @Override
public void prepare(@SuppressWarnings("rawtypes") Map stormConf,
TopologyContext topologyContext, OutputCollector collector) {
Logger.log(RecommendationWorkflowBolt.class, "prepare", "enter",
topologyContext.getThisComponentId());
this.collector = collector;
this.partition = topologyContext.getThisTaskId();
                // open level db for partition
db = factory.open(new File(path "/"+partition), options);
}


Then you have a db of each stream partition. This works fine as long as the
bolt is not moved to another physical machine, since the leveldb files
cannot be accessed then. You could try to use some kind of network storage
that can be accessed from all servers in the cluster. Samza restores the
state  by having a separate kafka queue that stores all changes applied to
the db for the partition. During the init phase the kafka queue is consumed
to restore the the state. I am not sure of that would work on really large
systems as the queue should be come quite large...

Cheers,

Klaus





On Fri, Jan 17, 2014 at 1:25 PM, Nathan Leung <[email protected]> wrote:

> Your main() code can run on any host; the computer it's running on doesn't
> even have to be in the storm cluster at all.  When you serialize the "bolt"
> in the main code it has no information about the state of the bolt that is
> running on the cluster.  As Klausen said, you need to sync the data in the
> bolt code to some external data store, and then if necessary read it in the
> "prepare" method when starting the bolt up again.
>
>
> On Fri, Jan 17, 2014 at 7:18 AM, Aniket Alhat <[email protected]>wrote:
>
>> Thanks for reply I am able to restore the serialized object but the
>> Hashmap in wordcounter.java "counters" returns null when I try to access it
>> after de-serialization
>> Can you check that thing?
>> On Jan 17, 2014 5:04 PM, "Klausen Schaefersinho" <
>> [email protected]> wrote:
>>
>>> Hi, you could try to sync to some remote datebase. Off course this is
>>> somehow slow, maybe batching could help. In another thread someone
>>> discussed using Cassandra with some partition tricks... Cheers
>>> Am 17.01.2014 11:32 schrieb "Aniket Alhat" <[email protected]>:
>>>
>>>> Hello,
>>>> I want to periodically serialize the state of a bolt and remember what
>>>> tuples it has processed up until particular point. Then when you restart
>>>> the Bolt you can deserialize that state into the new bolt instance and
>>>> replay/*process* the tuples from where it was last.
>>>>
>>>> Consider the WordCount Topology, I want to store the WordCounter.java
>>>> bolt state for which I wrote following code in TopologyMain.java
>>>>
>>>> TopologyMain.java *http://pastebin.com/suyL4dSB
>>>> <http://pastebin.com/suyL4dSB>*
>>>> WordCouter.java *http://pastebin.com/pzEgXCgx
>>>> <http://pastebin.com/pzEgXCgx>*
>>>>
>>>> Now when ever I de-serialize the object back, The HashMap in WordCouter
>>>> bolt behaves like it has nothing stored in it and starts again as normal
>>>> bolt rather it should re-count the words but HashMap remains NULL
>>>>
>>>> *Whether should I write serialization logic any where else ?*
>>>> --
>>>>
>>>> *Aniket Alhatlinkedin.com/in/aniketalhat
>>>> <http://linkedin.com/in/aniketalhat>*
>>>> *+91 976 603 9317 <%2B91%20976%20603%209317>*
>>>>
>>>
>

Reply via email to