Hi Matthias, Thanks for the image and explanation. I understood the parallelism and tasks, but even now when I run builder.setSpout(partialSpoutName, new MongoSpout(),5).setNumTasks(100); There's a line in MongoSpout's constructor (System.out.println("MongoSpout created");) which is getting printed only once. This is the reason for all the confusion on my part. I'd have expected it to get printed at least 5 times.
Am running on LocalCluster localCluster = new LocalCluster(); for now, but even then I felt it should've created the necessary worker threads. When does Storm decide to create a new instance of MongoSpout? When nextTuple of MongoSpout looks is this, @Override public void nextTuple() { while(batch.hasNext()) { History_DBO history = batchOfHistories.next(); this.collector.emit(new Values(history._id), history._id); } }//while } Does Storm decide to create a new MongoSpout if the first MongoSpout which emitted a tuple hasn't yet received an ack()? Is that how it works? On Mon, Apr 25, 2016 at 2:33 PM, Matthias J. Sax <mj...@apache.org> wrote: > Hi Navin, > > If you really want a "forward connection pattern", ie, all data of a > single spout goes to a single bolt, your idea with a loop should work. > Of course, as you do actually deploy distinct graphs, ie, the single > parts of the topology do not exchange data, you could also just deploy > many topologies with parallelism of one. > > If "standard" way in Storm (without the loop), would be like this: > > > builder.setSpout(spoutName, new MongoSpout(), 5); > > builder.setBolt(boltName, new GenBolt(), 5).shuffleGrouping(spoutName); > > builder.setBolt(boltName2, new SomeBolt(), 5).shuffleGrouping(boltName); > > with 5 being the number of instances (if you don't specify any number, > the default is 1. The third parameter (parallilism_hint) define the > number of executor-thread (each thread will run a bolt instance). > > However, using shuffleGrouping (or localOrShuffleGrouping), data is not > forwarded to a single instance, but randomly re-distirbuted over all > bolt instances. (see attached png showing a topology with 2 spouts and 4 > bolts -- each represented by a quare. The dots within each square > illustrates the parallelism of each -- the connection pattern would be > shuffleGrouping) > > As an alternative, you can also use custom-grouping (or direct streams) > to achieve a "forwarding" pattern. (with regard to dynamic scaling -- > see below -- this might be the correct way to go for your case). Using > the loop, you cannot add more instances during runtime. > > However, I am not sure what you mean by > > "Storm should automatically create the required number of instances of > spouts and bolts to be able to scale horizontally when I add more > machines to process the data" > > If you deploy a topology, the number of parallel instances is stable. > Storm does not automatically change it. If you want to change the > parallelism during runtime, you need to do this manually via > > bin/storm rebalance ... > > The rebalance operation requires, that the topology has enough TASKS > (the parallelism cannot be larger as the number of tasks). > > Thus, you need to "prepare" your topology during setup for dynamic > scaling via .setNumTasks(); > > > builder.setSpout(spoutName, new MongoSpout(), 5).setNumTasks(100); > > builder.setBolt(boltName, new GenBolt(), > 5).shuffleGrouping(spoutName).setNumTasks(100); > > builder.setBolt(boltName2, new SomeBolt(), > 5).shuffleGrouping(boltName).setNumTasks(100); > > Here, 100 define the maximum number of parallel instances you can run, > and the initial deployment will start 5. Using rebalance you can change > the parallelism to up to 100 now. > > Hope this makes sens. > > > -Matthias > > > > > On 04/25/2016 09:11 AM, Navin Ipe wrote: > > Thank you Matthias for your time and patient explanation. I'm now clear > > about the Fields grouping (an answer on Stackoverflow had confused me > > < > http://stackoverflow.com/questions/33512554/multiple-fields-grouping-in-storm > >). > > The first question still stands, where I'm unable to understand when > > multiple instances of spouts and bolts get created. > > > > To get a topology like this: > > Spout ---->Bolt--->Bolt > > Spout ---->Bolt--->Bolt > > Spout ---->Bolt--->Bolt > > > > is what I'm trying to achieve, but if I simply say: > > > > /builder.setSpout(spoutName, new MongoSpout()); > > builder.setBolt(boltName, new GenBolt()).shuffleGrouping(spoutName);/ > > /builder.setBolt(boltName2, new SomeBolt()).shuffleGrouping(boltName); > > / > > Then I don't see how multiple instances of MongoSpout() will get > > created. I've already been through a lot of tutorials and documentation > > pages, but they don't explain that (in a way that I understand). I had > > also run some code where I set .setNumTasks(12); but the constructor of > > MongoSpout() got called only once. > > So under what situation does storm create multiple instances of > MongoSpout? > > > > The objective is that when there are a lot of things to process, Storm > > should automatically create the required number of instances of spouts > > and bolts to be able to scale horizontally when I add more machines to > > process the data. > > > > But if I simply create this: > > /builder.setSpout(spoutName, new MongoSpout()); > > builder.setBolt(boltName, new GenBolt()).shuffleGrouping(spoutName);/ > > /builder.setBolt(boltName2, new SomeBolt()).shuffleGrouping(boltName); > > > > / > > how will Storm scale? > > > > I need one spout to iterate the first 1000 rows of a database and > > extract data while parallelly another spout iterates through the next > > 1000 rows of a database and so on. So if the database has 1 million > > rows, Storm should automatically create that many spouts. Is that > > possible? (there would be a limit on the number of spouts of course) > > > > Then, based on the number of spouts created, the same number of bolts > > are created to process the data the spouts emit. > > > > So my biggest confusion has always been just this. About how are the > > multiple spout and bolt instances created so that the processing can > scale? > > > > ps: Yes I knew the tuple does not extend IRichBolt. It was a silly > > mistake while I was typing :-) > > // > > > > > > On Mon, Apr 25, 2016 at 3:26 AM, Matthias J. Sax <mj...@apache.org > > <mailto:mj...@apache.org>> wrote: > > > > Hi Navin, > > > > I could not follow your email completely. Let me clarify a couple of > > things to get started. If you still have question, just ask again. > > > > > > > > A) A IRichBolt interface defines a bolt, and not a tuple. Thus, > > > > > class SomeTuple extends IRichBolt { > > > private Integer someID; > > > public Integer getSomeID() {return someID;} > > > } > > > > is not correct. > > > > A Tuple in Storm has multiple Fields (also called Schema). You define > > the fields on a tuple in .declareOutputStream(...) > > > > > > > > B) So > > > > > public void declareOutputFields(OutputFieldsDeclarer declarer) > { > > > declarer.declare(new Fields("A","B","C","D")); > > > } > > > > does declare, that the tuples that are emitted by this Spout or Bolt > > have 4 fields. "A", "B", "C", "D" are the names of those fields (ie, > > meta data). > > > > If you want to emit a corresponding tuple you call > > > > output.emit(new Value(1, "x", 3.4, true)); > > > > This would emit Tuple with an Integer,String,Double,Boolean as > concrete > > data types. The values can of course be anything. > > > > > > > > C) If you use fieldsGrouping, you always need to specify the fields > you > > wanna group on (it is not possible to use zero fields for this). > > > > If you have for example an ID field in your tuple and you want that > all > > tuples with the same ID are processed be the same bolt instance, you > can > > do it like this: > > > > setBolt("PRODUCER", new MyPBolt()); > > setBolt("CONSUMER", new MyCBolt(), > > parallelism).fieldGrouping("PRODUCER", new Fields("ID")); > > > > Of course, MyPBolt must declare a field with name "ID" in it's > > implementation of declareOutputFields() (otherwise Storm will > complain). > > > > > > So answer to first question (the other two should already be covered) > > > > > *1. *The way I'm using the for loop above is wrong isn't it? If I > > use a > > > single builder.setSpout and set the numTasks value, then Storm > would > > > create those many Spout instances automatically? > > > > Not sure what you want to get -- so not sure if the loop is right or > > wrong. And yes, if you use > > > > builder.setSpout("name", new MySpout(), parallelism) > > > > Storm will automatically start <parallelism> instances of MySpout. Do > > not confuse this with tasks (ie, setNumTasks()). Tasks are logical > units > > of parallelism, while <parallelism> defines the number of threads > (ie, > > physical parallelism). See here for more details: > > > https://storm.apache.org/releases/1.0.0/Understanding-the-parallelism-of-a-Storm-topology.html > > > > > > Hope this clears things up a little bit. If you are still confused, > look > > at the example in storm-starter. > > > > > > -Matthias > > > > > > On 04/24/2016 07:44 PM, Navin Ipe wrote: > > > To parallelize some code, I considered having this topology. The > single > > > [Spout] or [Bolt] represent multiple Spouts or Bolts. > > > > > > *[Spout]--emit--->[Bolt A]--emit--->[Bolt B]* > > > > > > If any of the bolts in Bolt A emit a Tuple of value 1, and it gets > > > processed by a certain bolt in Bolt B, then it is imperative that > if any > > > of the bolts in Bolt A again emits the value 1, it should > compulsorily > > > be processed by the same bolt in Bolt B. I assume fields grouping > can > > > handle this. > > > > > > To have many spouts work in parallel, my initial thoughts were to > have: > > > / Integer numberOfSpouts = 10; > > > > > > String partialSpoutName = "mongoSpout"; > > > String partialBoltName = "mongoBolt"; > > > > > > for(Integer i = 0; i < numberOfSpouts; ++i) { > > > String spoutName = partialSpoutName + i.toString(); > > > String boltName = partialBoltName + i.toString(); > > > > > > builder.setSpout(spoutName, new MongoSpout()); > > > builder.setBolt(boltName, new > > > GenBolt()).shuffleGrouping(spoutName); > > > }/ > > > > > > But I realized it was probably not the right way, because in case I > > > wanted all of Bolt A's tuples to go to a single Bolt B, then I'd > have to > > > include cases like this: > > > > > > / switch (numberOfSpouts) { > > > case 3: > > > builder.setBolt("sqlWriterBolt", new > > > SqlWriterBolt(appConfig),3) > > > .shuffleGrouping(partialBoltName+"2") > > > .shuffleGrouping(partialBoltName+"1") > > > > > > .shuffleGrouping(partialBoltName+"0"); > > > > > > break; > > > case 2: > > > builder.setBolt("sqlWriterBolt", new > > > SqlWriterBolt(appConfig),2) > > > .shuffleGrouping(partialBoltName+"1") > > > > > > .shuffleGrouping(partialBoltName+"0"); > > > > > > break; > > > case 1: > > > builder.setBolt("sqlWriterBolt", new > > > SqlWriterBolt(appConfig),1) > > > > > > .shuffleGrouping(partialBoltName+"0"); > > > break; > > > default: > > > System.out.println("No case here"); > > > }//switch/ > > > > > > *So three questions:* > > > *1. *The way I'm using the for loop above is wrong isn't it? If I > > use a > > > single builder.setSpout and set the numTasks value, then Storm > would > > > create those many Spout instances automatically? > > > * > > > 2.* When you specify something like this for fields grouping: > > > public void declareOutputFields(OutputFieldsDeclarer declarer) > { > > > declarer.declare(new Fields("A","B","C","D")); > > > } > > > What does it mean? Does it mean that 4 types of tuples might be > emitted? > > > When they are received as builder.setBolt("/sqlWriterBolt/", new > > > Bolt_AB(), 3).fieldsGrouping("/mongoBolt/", new Fields("A", "B")); > > > > > > Does it mean that my the first 2 tuples will go to the Bolt_AB and > the > > > next 2 tuples may go to any other bolt that receives tuples from > > > mongoBolt, and then the next two tuples will go to Bolt_AB again? > Is > > > that how it works? > > > > > > *3. *If I don't specify any new Fields("A", "B"), how does Storm > know > > > the grouping? How does it decide? If I have a tuple like this: > > > class SomeTuple extends IRichBolt { > > > private Integer someID; > > > public Integer getSomeID() {return someID;} > > > } > > > and if Bolt A sends SomeTuple to Bolt B, with SomeID value (assume > a > > > SomeID value is 9) and the next time Bolt A generates a Tuple with > > > someID = 9 (and it may generate many tuples with someID=9), how do > I > > > ensure that Storm sees the SomeID value and decides to send it to > the > > > Bolt B instance that processes all someID's which have a value of > 9? > > > > > > > > > -- > > > Regards, > > > Navin > > > > > > > > > > -- > > Regards, > > Navin > -- Regards, Navin