Hi Navin,

If you really want a "forward connection pattern", ie, all data of a
single spout goes to a single bolt, your idea with a loop should work.
Of course, as you do actually deploy distinct graphs, ie, the single
parts of the topology do not exchange data, you could also just deploy
many topologies with parallelism of one.

If "standard" way in Storm (without the loop), would be like this:

> builder.setSpout(spoutName, new MongoSpout(), 5);
> builder.setBolt(boltName, new GenBolt(), 5).shuffleGrouping(spoutName);
> builder.setBolt(boltName2, new SomeBolt(), 5).shuffleGrouping(boltName);

with 5 being the number of instances (if you don't specify any number,
the default is 1. The third parameter (parallilism_hint) define the
number of executor-thread (each thread will run a bolt instance).

However, using shuffleGrouping (or localOrShuffleGrouping), data is not
forwarded to a single instance, but randomly re-distirbuted over all
bolt instances. (see attached png showing a topology with 2 spouts and 4
bolts -- each represented by a quare. The dots within each square
illustrates the parallelism of each -- the connection pattern would be
shuffleGrouping)

As an alternative, you can also use custom-grouping (or direct streams)
to achieve a "forwarding" pattern. (with regard to dynamic scaling --
see below -- this might be the correct way to go for your case). Using
the loop, you cannot add more instances during runtime.

However, I am not sure what you mean by

"Storm should automatically create the required number of instances of
spouts and bolts to be able to scale horizontally when I add more
machines to process the data"

If you deploy a topology, the number of parallel instances is stable.
Storm does not automatically change it. If you want to change the
parallelism during runtime, you need to do this manually via

bin/storm rebalance ...

The rebalance operation requires, that the topology has enough TASKS
(the parallelism cannot be larger as the number of tasks).

Thus, you need to "prepare" your topology during setup for dynamic
scaling via .setNumTasks();

> builder.setSpout(spoutName, new MongoSpout(), 5).setNumTasks(100);
> builder.setBolt(boltName, new GenBolt(), 
> 5).shuffleGrouping(spoutName).setNumTasks(100);
> builder.setBolt(boltName2, new SomeBolt(), 
> 5).shuffleGrouping(boltName).setNumTasks(100);

Here, 100 define the maximum number of parallel instances you can run,
and the initial deployment will start 5. Using rebalance you can change
the parallelism to up to 100 now.

Hope this makes sens.


-Matthias




On 04/25/2016 09:11 AM, Navin Ipe wrote:
> Thank you Matthias for your time and patient explanation. I'm now clear
> about the Fields grouping (an answer on Stackoverflow had confused me
> <http://stackoverflow.com/questions/33512554/multiple-fields-grouping-in-storm>).
> The first question still stands, where I'm unable to understand when
> multiple instances of spouts and bolts get created.
> 
> To get a topology like this:
> Spout ---->Bolt--->Bolt
> Spout ---->Bolt--->Bolt
> Spout ---->Bolt--->Bolt
> 
> is what I'm trying to achieve, but if I simply say:
> 
> /builder.setSpout(spoutName, new MongoSpout());
> builder.setBolt(boltName, new GenBolt()).shuffleGrouping(spoutName);/
> /builder.setBolt(boltName2, new SomeBolt()).shuffleGrouping(boltName);
> /
> Then I don't see how multiple instances of MongoSpout() will get
> created. I've already been through a lot of tutorials and documentation
> pages, but they don't explain that (in a way that I understand). I had
> also run some code where I set .setNumTasks(12); but the constructor of
> MongoSpout() got called only once.
> So under what situation does storm create multiple instances of MongoSpout?
> 
> The objective is that when there are a lot of things to process, Storm
> should automatically create the required number of instances of spouts
> and bolts to be able to scale horizontally when I add more machines to
> process the data.
> 
> But if I simply create this:
> /builder.setSpout(spoutName, new MongoSpout());
> builder.setBolt(boltName, new GenBolt()).shuffleGrouping(spoutName);/
> /builder.setBolt(boltName2, new SomeBolt()).shuffleGrouping(boltName);
> 
> /
> how will Storm scale?
> 
> I need one spout to iterate the first 1000 rows of a database and
> extract data while parallelly another spout iterates through the next
> 1000 rows of a database and so on. So if the database has 1 million
> rows, Storm should automatically create that many spouts. Is that
> possible? (there would be a limit on the number of spouts of course)
> 
> Then, based on the number of spouts created, the same number of bolts
> are created to process the data the spouts emit.
> 
> So my biggest confusion has always been just this. About how are the
> multiple spout and bolt instances created so that the processing can scale?
> 
> ps: Yes I knew the tuple does not extend IRichBolt. It was a silly
> mistake while I was typing :-)
> //
> 
> 
> On Mon, Apr 25, 2016 at 3:26 AM, Matthias J. Sax <[email protected]
> <mailto:[email protected]>> wrote:
> 
>     Hi Navin,
> 
>     I could not follow your email completely. Let me clarify a couple of
>     things to get started. If you still have question, just ask again.
> 
> 
> 
>     A) A IRichBolt interface defines a bolt, and not a tuple. Thus,
> 
>     > class SomeTuple extends IRichBolt {
>     >   private Integer someID;
>     >   public Integer getSomeID() {return someID;}
>     > }
> 
>     is not correct.
> 
>     A Tuple in Storm has multiple Fields (also called Schema). You define
>     the fields on a tuple in .declareOutputStream(...)
> 
> 
> 
>     B) So
> 
>     >     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>     >         declarer.declare(new Fields("A","B","C","D"));
>     >     }
> 
>     does declare, that the tuples that are emitted by this Spout or Bolt
>     have 4 fields. "A", "B", "C", "D" are the names of those fields (ie,
>     meta data).
> 
>     If you want to emit a corresponding tuple you call
> 
>         output.emit(new Value(1, "x", 3.4, true));
> 
>     This would emit Tuple with an Integer,String,Double,Boolean as concrete
>     data types. The values can of course be anything.
> 
> 
> 
>     C) If you use fieldsGrouping, you always need to specify the fields you
>     wanna group on (it is not possible to use zero fields for this).
> 
>     If you have for example an ID field in your tuple and you want that all
>     tuples with the same ID are processed be the same bolt instance, you can
>     do it like this:
> 
>         setBolt("PRODUCER", new MyPBolt());
>         setBolt("CONSUMER", new MyCBolt(),
>     parallelism).fieldGrouping("PRODUCER", new Fields("ID"));
> 
>     Of course, MyPBolt must declare a field with name "ID" in it's
>     implementation of declareOutputFields() (otherwise Storm will complain).
> 
> 
>     So answer to first question (the other two should already be covered)
> 
>     > *1. *The way I'm using the for loop above is wrong isn't it? If I
>     use a
>     > single builder.setSpout and set the numTasks value, then Storm would
>     > create those many Spout instances automatically?
> 
>     Not sure what you want to get -- so not sure if the loop is right or
>     wrong. And yes, if you use
> 
>     builder.setSpout("name", new MySpout(), parallelism)
> 
>     Storm will automatically start <parallelism> instances of MySpout. Do
>     not confuse this with tasks (ie, setNumTasks()). Tasks are logical units
>     of parallelism, while <parallelism> defines the number of threads (ie,
>     physical parallelism). See here for more details:
>     
> https://storm.apache.org/releases/1.0.0/Understanding-the-parallelism-of-a-Storm-topology.html
> 
> 
>     Hope this clears things up a little bit. If you are still confused, look
>     at the example in storm-starter.
> 
> 
>     -Matthias
> 
> 
>     On 04/24/2016 07:44 PM, Navin Ipe wrote:
>     > To parallelize some code, I considered having this topology. The single
>     > [Spout] or [Bolt] represent multiple Spouts or Bolts.
>     >
>     > *[Spout]--emit--->[Bolt A]--emit--->[Bolt B]*
>     >
>     > If any of the bolts in Bolt A emit a Tuple of value 1, and it gets
>     > processed by a certain bolt in Bolt B, then it is imperative that if any
>     > of the bolts in Bolt A again emits the value 1, it should compulsorily
>     > be processed by the same bolt in Bolt B. I assume fields grouping can
>     > handle this.
>     >
>     > To have many spouts work in parallel, my initial thoughts were to have:
>     > /        Integer numberOfSpouts = 10;
>     >
>     >         String partialSpoutName = "mongoSpout";
>     >         String partialBoltName = "mongoBolt";
>     >
>     >         for(Integer i = 0; i < numberOfSpouts; ++i) {
>     >             String spoutName = partialSpoutName + i.toString();
>     >             String boltName = partialBoltName + i.toString();
>     >
>     >             builder.setSpout(spoutName, new MongoSpout());
>     >             builder.setBolt(boltName, new
>     > GenBolt()).shuffleGrouping(spoutName);
>     >         }/
>     >
>     > But I realized it was probably not the right way, because in case I
>     > wanted all of Bolt A's tuples to go to a single Bolt B, then I'd have to
>     > include cases like this:
>     >
>     > /        switch (numberOfSpouts) {
>     >             case 3:
>     >                 builder.setBolt("sqlWriterBolt", new
>     > SqlWriterBolt(appConfig),3)
>     >                         .shuffleGrouping(partialBoltName+"2")
>     >                         .shuffleGrouping(partialBoltName+"1")
>     >
>     > .shuffleGrouping(partialBoltName+"0");
>     >
>     >                 break;
>     >             case 2:
>     >                 builder.setBolt("sqlWriterBolt", new
>     > SqlWriterBolt(appConfig),2)
>     >                         .shuffleGrouping(partialBoltName+"1")
>     >
>     > .shuffleGrouping(partialBoltName+"0");
>     >
>     >                 break;
>     >             case 1:
>     >                 builder.setBolt("sqlWriterBolt", new
>     > SqlWriterBolt(appConfig),1)
>     >
>     > .shuffleGrouping(partialBoltName+"0");
>     >                 break;
>     >             default:
>     >                 System.out.println("No case here");
>     >         }//switch/
>     >
>     > *So three questions:*
>     > *1. *The way I'm using the for loop above is wrong isn't it? If I
>     use a
>     > single builder.setSpout and set the numTasks value, then Storm would
>     > create those many Spout instances automatically?
>     > *
>     > 2.* When you specify something like this for fields grouping:
>     >     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>     >         declarer.declare(new Fields("A","B","C","D"));
>     >     }
>     > What does it mean? Does it mean that 4 types of tuples might be emitted?
>     > When they are received as builder.setBolt("/sqlWriterBolt/", new
>     > Bolt_AB(), 3).fieldsGrouping("/mongoBolt/", new Fields("A", "B"));
>     >
>     > Does it mean that my the first 2 tuples will go to the Bolt_AB and the
>     > next 2 tuples may go to any other bolt that receives tuples from
>     > mongoBolt, and then the next two tuples will go to Bolt_AB again? Is
>     > that how it works?
>     >
>     > *3. *If I don't specify any new Fields("A", "B"), how does Storm know
>     > the grouping? How does it decide? If I have a tuple like this:
>     > class SomeTuple extends IRichBolt {
>     >   private Integer someID;
>     >   public Integer getSomeID() {return someID;}
>     > }
>     > and if Bolt A sends SomeTuple to Bolt B, with SomeID value (assume a
>     > SomeID value is 9) and the next time Bolt A generates a Tuple with
>     > someID = 9 (and it may generate many tuples with someID=9), how do I
>     > ensure that Storm sees the SomeID value and decides to send it to the
>     > Bolt B instance that processes all someID's which have a value of 9?
>     >
>     >
>     > --
>     > Regards,
>     > Navin
> 
> 
> 
> 
> -- 
> Regards,
> Navin

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to