Thank you Matthias for your time and patient explanation. I'm now clear about the Fields grouping (an answer on Stackoverflow had confused me <http://stackoverflow.com/questions/33512554/multiple-fields-grouping-in-storm> ). The first question still stands, where I'm unable to understand when multiple instances of spouts and bolts get created.
To get a topology like this: Spout ---->Bolt--->Bolt Spout ---->Bolt--->Bolt Spout ---->Bolt--->Bolt is what I'm trying to achieve, but if I simply say: *builder.setSpout(spoutName, new MongoSpout());builder.setBolt(boltName, new GenBolt()).shuffleGrouping(spoutName);* *builder.setBolt(boltName2, new SomeBolt()).shuffleGrouping(boltName);* Then I don't see how multiple instances of MongoSpout() will get created. I've already been through a lot of tutorials and documentation pages, but they don't explain that (in a way that I understand). I had also run some code where I set .setNumTasks(12); but the constructor of MongoSpout() got called only once. So under what situation does storm create multiple instances of MongoSpout? The objective is that when there are a lot of things to process, Storm should automatically create the required number of instances of spouts and bolts to be able to scale horizontally when I add more machines to process the data. But if I simply create this: *builder.setSpout(spoutName, new MongoSpout());builder.setBolt(boltName, new GenBolt()).shuffleGrouping(spoutName);* *builder.setBolt(boltName2, new SomeBolt()).shuffleGrouping(boltName);* how will Storm scale? I need one spout to iterate the first 1000 rows of a database and extract data while parallelly another spout iterates through the next 1000 rows of a database and so on. So if the database has 1 million rows, Storm should automatically create that many spouts. Is that possible? (there would be a limit on the number of spouts of course) Then, based on the number of spouts created, the same number of bolts are created to process the data the spouts emit. So my biggest confusion has always been just this. About how are the multiple spout and bolt instances created so that the processing can scale? ps: Yes I knew the tuple does not extend IRichBolt. It was a silly mistake while I was typing :-) On Mon, Apr 25, 2016 at 3:26 AM, Matthias J. Sax <[email protected]> wrote: > Hi Navin, > > I could not follow your email completely. Let me clarify a couple of > things to get started. If you still have question, just ask again. > > > > A) A IRichBolt interface defines a bolt, and not a tuple. Thus, > > > class SomeTuple extends IRichBolt { > > private Integer someID; > > public Integer getSomeID() {return someID;} > > } > > is not correct. > > A Tuple in Storm has multiple Fields (also called Schema). You define > the fields on a tuple in .declareOutputStream(...) > > > > B) So > > > public void declareOutputFields(OutputFieldsDeclarer declarer) { > > declarer.declare(new Fields("A","B","C","D")); > > } > > does declare, that the tuples that are emitted by this Spout or Bolt > have 4 fields. "A", "B", "C", "D" are the names of those fields (ie, > meta data). > > If you want to emit a corresponding tuple you call > > output.emit(new Value(1, "x", 3.4, true)); > > This would emit Tuple with an Integer,String,Double,Boolean as concrete > data types. The values can of course be anything. > > > > C) If you use fieldsGrouping, you always need to specify the fields you > wanna group on (it is not possible to use zero fields for this). > > If you have for example an ID field in your tuple and you want that all > tuples with the same ID are processed be the same bolt instance, you can > do it like this: > > setBolt("PRODUCER", new MyPBolt()); > setBolt("CONSUMER", new MyCBolt(), > parallelism).fieldGrouping("PRODUCER", new Fields("ID")); > > Of course, MyPBolt must declare a field with name "ID" in it's > implementation of declareOutputFields() (otherwise Storm will complain). > > > So answer to first question (the other two should already be covered) > > > *1. *The way I'm using the for loop above is wrong isn't it? If I use a > > single builder.setSpout and set the numTasks value, then Storm would > > create those many Spout instances automatically? > > Not sure what you want to get -- so not sure if the loop is right or > wrong. And yes, if you use > > builder.setSpout("name", new MySpout(), parallelism) > > Storm will automatically start <parallelism> instances of MySpout. Do > not confuse this with tasks (ie, setNumTasks()). Tasks are logical units > of parallelism, while <parallelism> defines the number of threads (ie, > physical parallelism). See here for more details: > > https://storm.apache.org/releases/1.0.0/Understanding-the-parallelism-of-a-Storm-topology.html > > > Hope this clears things up a little bit. If you are still confused, look > at the example in storm-starter. > > > -Matthias > > > On 04/24/2016 07:44 PM, Navin Ipe wrote: > > To parallelize some code, I considered having this topology. The single > > [Spout] or [Bolt] represent multiple Spouts or Bolts. > > > > *[Spout]--emit--->[Bolt A]--emit--->[Bolt B]* > > > > If any of the bolts in Bolt A emit a Tuple of value 1, and it gets > > processed by a certain bolt in Bolt B, then it is imperative that if any > > of the bolts in Bolt A again emits the value 1, it should compulsorily > > be processed by the same bolt in Bolt B. I assume fields grouping can > > handle this. > > > > To have many spouts work in parallel, my initial thoughts were to have: > > / Integer numberOfSpouts = 10; > > > > String partialSpoutName = "mongoSpout"; > > String partialBoltName = "mongoBolt"; > > > > for(Integer i = 0; i < numberOfSpouts; ++i) { > > String spoutName = partialSpoutName + i.toString(); > > String boltName = partialBoltName + i.toString(); > > > > builder.setSpout(spoutName, new MongoSpout()); > > builder.setBolt(boltName, new > > GenBolt()).shuffleGrouping(spoutName); > > }/ > > > > But I realized it was probably not the right way, because in case I > > wanted all of Bolt A's tuples to go to a single Bolt B, then I'd have to > > include cases like this: > > > > / switch (numberOfSpouts) { > > case 3: > > builder.setBolt("sqlWriterBolt", new > > SqlWriterBolt(appConfig),3) > > .shuffleGrouping(partialBoltName+"2") > > .shuffleGrouping(partialBoltName+"1") > > > > .shuffleGrouping(partialBoltName+"0"); > > > > break; > > case 2: > > builder.setBolt("sqlWriterBolt", new > > SqlWriterBolt(appConfig),2) > > .shuffleGrouping(partialBoltName+"1") > > > > .shuffleGrouping(partialBoltName+"0"); > > > > break; > > case 1: > > builder.setBolt("sqlWriterBolt", new > > SqlWriterBolt(appConfig),1) > > > > .shuffleGrouping(partialBoltName+"0"); > > break; > > default: > > System.out.println("No case here"); > > }//switch/ > > > > *So three questions:* > > *1. *The way I'm using the for loop above is wrong isn't it? If I use a > > single builder.setSpout and set the numTasks value, then Storm would > > create those many Spout instances automatically? > > * > > 2.* When you specify something like this for fields grouping: > > public void declareOutputFields(OutputFieldsDeclarer declarer) { > > declarer.declare(new Fields("A","B","C","D")); > > } > > What does it mean? Does it mean that 4 types of tuples might be emitted? > > When they are received as builder.setBolt("/sqlWriterBolt/", new > > Bolt_AB(), 3).fieldsGrouping("/mongoBolt/", new Fields("A", "B")); > > > > Does it mean that my the first 2 tuples will go to the Bolt_AB and the > > next 2 tuples may go to any other bolt that receives tuples from > > mongoBolt, and then the next two tuples will go to Bolt_AB again? Is > > that how it works? > > > > *3. *If I don't specify any new Fields("A", "B"), how does Storm know > > the grouping? How does it decide? If I have a tuple like this: > > class SomeTuple extends IRichBolt { > > private Integer someID; > > public Integer getSomeID() {return someID;} > > } > > and if Bolt A sends SomeTuple to Bolt B, with SomeID value (assume a > > SomeID value is 9) and the next time Bolt A generates a Tuple with > > someID = 9 (and it may generate many tuples with someID=9), how do I > > ensure that Storm sees the SomeID value and decides to send it to the > > Bolt B instance that processes all someID's which have a value of 9? > > > > > > -- > > Regards, > > Navin > > -- Regards, Navin
