It is a good question. I think it is related to groupings. You should study different types of groupings to understand the problem fully. Let me if this helps. You can refer this book http://shop.oreilly.com/product/0636920024835.do
On Tue, Jan 14, 2014 at 5:41 AM, Spico Florin <[email protected]> wrote: > Hello! > I'm a newbie in Storm and I have some questions regarding the scaling > the number of workers/executors among clusters and how data is correctly > handled between them. > In the case of the WordCountTopology the WordCount Bolt is used to > count the words from a text. From my observations and understanding, it is > using an internal Map that is keeping the counts for the arrived words to > each task. > > public static class WordCount extends BaseBasicBolt { > Map<String, Integer> counts = new HashMap<String, Integer>(); > public void execute(Tuple tuple, BasicOutputCollector collector) { > String word = tuple.getString(0); > Integer count = counts.get(word); > if (count == null) > count = 0; > count++; > counts.put(word, count); > } > } > > > From the configuration of the topology: > > builder.setBolt("count", new WordCount(), 6).fieldsGrouping("split", new > Fields("word")); > > I can understand that each of the task will receive same words as it > received first time or new ones,thus keeping the counts consistently (no > task will receive words that were processed by a different task). > > Now suppose that: > - you have a huge text with a small number of different words (let's say > that you have 10MB of text containing only the words one, two, three, > four,five,six,seven, eight, nine, ten). > -you start the topology with 2 workers > -at some moment in time (after all the words are distributed through the > tasks and already have numbers), we are adding one more workers . > Here are my questions: > 1. When we re-balance our topology, will two newly added workers get data? > 2. If the two more workers will get data, how the words counts are kept > consistently since they will receive some already processed words? Are the > count values for the already processed words passed to the newly created > workers? > 3. Given the fact that I don't have remote cluster installed, are my > assumption correct? > > I look forward for your answers and opinions. > > Thanks. > > Regards, > Florin > > Example. > Given: > Time T1: > worker 1: > task1: counts{(one,10),(two,21)} > task2: counts{(three,5),(four,2)} > task3: counts{(five,8)} > > worker 2: > task1: counts{(six,10),(seven,21)} > task2: counts{(eight,5),(nine,2)} > task3: counts{(ten,8)} > > Time T10: (rebalancing 3 workers in place) > > worker 1: > task1: counts{(one,10),(two,21)} > task2: counts{(three,5),(four,2)} > worker 2: > task1: counts{(six,10),(seven,21)} > task2: counts{(eight,5),(nine,2)} > > worker 3: > task1: counts{(five,15 )} > task2: counts{(ten,25)} > > For the worker 3 task 1: the value 15 should come from the previous value > 8 (computed by w1/t3) plus 7 new computed > worker 3 task 2: the value 25 should come from the previous value 8 > (computed by w2/t3) plus 17 new computed > > > > > > > > > > > > > > -- *Abhishek Bhattacharjee* *Pune Institute of Computer Technology*
