Hi Matthias,
Thanks for the image and explanation. I understood the parallelism and
tasks, but even now when I run
builder.setSpout(partialSpoutName, new MongoSpout(),5).setNumTasks(100);
There's a line in MongoSpout's constructor (System.out.println("MongoSpout
created");) which is getting printed only once. This is the reason for all
the confusion on my part. I'd have expected it to get printed at least 5
times.
Am running on LocalCluster localCluster = new LocalCluster(); for now, but
even then I felt it should've created the necessary worker threads.
When does Storm decide to create a new instance of MongoSpout?
When nextTuple of MongoSpout looks is this,
@Override
public void nextTuple() {
while(batch.hasNext()) {
History_DBO history = batchOfHistories.next();
this.collector.emit(new Values(history._id), history._id);
}
}//while
}
Does Storm decide to create a new MongoSpout if the first MongoSpout which
emitted a tuple hasn't yet received an ack()? Is that how it works?
On Mon, Apr 25, 2016 at 2:33 PM, Matthias J. Sax <[email protected]> wrote:
> Hi Navin,
>
> If you really want a "forward connection pattern", ie, all data of a
> single spout goes to a single bolt, your idea with a loop should work.
> Of course, as you do actually deploy distinct graphs, ie, the single
> parts of the topology do not exchange data, you could also just deploy
> many topologies with parallelism of one.
>
> If "standard" way in Storm (without the loop), would be like this:
>
> > builder.setSpout(spoutName, new MongoSpout(), 5);
> > builder.setBolt(boltName, new GenBolt(), 5).shuffleGrouping(spoutName);
> > builder.setBolt(boltName2, new SomeBolt(), 5).shuffleGrouping(boltName);
>
> with 5 being the number of instances (if you don't specify any number,
> the default is 1. The third parameter (parallilism_hint) define the
> number of executor-thread (each thread will run a bolt instance).
>
> However, using shuffleGrouping (or localOrShuffleGrouping), data is not
> forwarded to a single instance, but randomly re-distirbuted over all
> bolt instances. (see attached png showing a topology with 2 spouts and 4
> bolts -- each represented by a quare. The dots within each square
> illustrates the parallelism of each -- the connection pattern would be
> shuffleGrouping)
>
> As an alternative, you can also use custom-grouping (or direct streams)
> to achieve a "forwarding" pattern. (with regard to dynamic scaling --
> see below -- this might be the correct way to go for your case). Using
> the loop, you cannot add more instances during runtime.
>
> However, I am not sure what you mean by
>
> "Storm should automatically create the required number of instances of
> spouts and bolts to be able to scale horizontally when I add more
> machines to process the data"
>
> If you deploy a topology, the number of parallel instances is stable.
> Storm does not automatically change it. If you want to change the
> parallelism during runtime, you need to do this manually via
>
> bin/storm rebalance ...
>
> The rebalance operation requires, that the topology has enough TASKS
> (the parallelism cannot be larger as the number of tasks).
>
> Thus, you need to "prepare" your topology during setup for dynamic
> scaling via .setNumTasks();
>
> > builder.setSpout(spoutName, new MongoSpout(), 5).setNumTasks(100);
> > builder.setBolt(boltName, new GenBolt(),
> 5).shuffleGrouping(spoutName).setNumTasks(100);
> > builder.setBolt(boltName2, new SomeBolt(),
> 5).shuffleGrouping(boltName).setNumTasks(100);
>
> Here, 100 define the maximum number of parallel instances you can run,
> and the initial deployment will start 5. Using rebalance you can change
> the parallelism to up to 100 now.
>
> Hope this makes sens.
>
>
> -Matthias
>
>
>
>
> On 04/25/2016 09:11 AM, Navin Ipe wrote:
> > Thank you Matthias for your time and patient explanation. I'm now clear
> > about the Fields grouping (an answer on Stackoverflow had confused me
> > <
> http://stackoverflow.com/questions/33512554/multiple-fields-grouping-in-storm
> >).
> > The first question still stands, where I'm unable to understand when
> > multiple instances of spouts and bolts get created.
> >
> > To get a topology like this:
> > Spout ---->Bolt--->Bolt
> > Spout ---->Bolt--->Bolt
> > Spout ---->Bolt--->Bolt
> >
> > is what I'm trying to achieve, but if I simply say:
> >
> > /builder.setSpout(spoutName, new MongoSpout());
> > builder.setBolt(boltName, new GenBolt()).shuffleGrouping(spoutName);/
> > /builder.setBolt(boltName2, new SomeBolt()).shuffleGrouping(boltName);
> > /
> > Then I don't see how multiple instances of MongoSpout() will get
> > created. I've already been through a lot of tutorials and documentation
> > pages, but they don't explain that (in a way that I understand). I had
> > also run some code where I set .setNumTasks(12); but the constructor of
> > MongoSpout() got called only once.
> > So under what situation does storm create multiple instances of
> MongoSpout?
> >
> > The objective is that when there are a lot of things to process, Storm
> > should automatically create the required number of instances of spouts
> > and bolts to be able to scale horizontally when I add more machines to
> > process the data.
> >
> > But if I simply create this:
> > /builder.setSpout(spoutName, new MongoSpout());
> > builder.setBolt(boltName, new GenBolt()).shuffleGrouping(spoutName);/
> > /builder.setBolt(boltName2, new SomeBolt()).shuffleGrouping(boltName);
> >
> > /
> > how will Storm scale?
> >
> > I need one spout to iterate the first 1000 rows of a database and
> > extract data while parallelly another spout iterates through the next
> > 1000 rows of a database and so on. So if the database has 1 million
> > rows, Storm should automatically create that many spouts. Is that
> > possible? (there would be a limit on the number of spouts of course)
> >
> > Then, based on the number of spouts created, the same number of bolts
> > are created to process the data the spouts emit.
> >
> > So my biggest confusion has always been just this. About how are the
> > multiple spout and bolt instances created so that the processing can
> scale?
> >
> > ps: Yes I knew the tuple does not extend IRichBolt. It was a silly
> > mistake while I was typing :-)
> > //
> >
> >
> > On Mon, Apr 25, 2016 at 3:26 AM, Matthias J. Sax <[email protected]
> > <mailto:[email protected]>> wrote:
> >
> > Hi Navin,
> >
> > I could not follow your email completely. Let me clarify a couple of
> > things to get started. If you still have question, just ask again.
> >
> >
> >
> > A) A IRichBolt interface defines a bolt, and not a tuple. Thus,
> >
> > > class SomeTuple extends IRichBolt {
> > > private Integer someID;
> > > public Integer getSomeID() {return someID;}
> > > }
> >
> > is not correct.
> >
> > A Tuple in Storm has multiple Fields (also called Schema). You define
> > the fields on a tuple in .declareOutputStream(...)
> >
> >
> >
> > B) So
> >
> > > public void declareOutputFields(OutputFieldsDeclarer declarer)
> {
> > > declarer.declare(new Fields("A","B","C","D"));
> > > }
> >
> > does declare, that the tuples that are emitted by this Spout or Bolt
> > have 4 fields. "A", "B", "C", "D" are the names of those fields (ie,
> > meta data).
> >
> > If you want to emit a corresponding tuple you call
> >
> > output.emit(new Value(1, "x", 3.4, true));
> >
> > This would emit Tuple with an Integer,String,Double,Boolean as
> concrete
> > data types. The values can of course be anything.
> >
> >
> >
> > C) If you use fieldsGrouping, you always need to specify the fields
> you
> > wanna group on (it is not possible to use zero fields for this).
> >
> > If you have for example an ID field in your tuple and you want that
> all
> > tuples with the same ID are processed be the same bolt instance, you
> can
> > do it like this:
> >
> > setBolt("PRODUCER", new MyPBolt());
> > setBolt("CONSUMER", new MyCBolt(),
> > parallelism).fieldGrouping("PRODUCER", new Fields("ID"));
> >
> > Of course, MyPBolt must declare a field with name "ID" in it's
> > implementation of declareOutputFields() (otherwise Storm will
> complain).
> >
> >
> > So answer to first question (the other two should already be covered)
> >
> > > *1. *The way I'm using the for loop above is wrong isn't it? If I
> > use a
> > > single builder.setSpout and set the numTasks value, then Storm
> would
> > > create those many Spout instances automatically?
> >
> > Not sure what you want to get -- so not sure if the loop is right or
> > wrong. And yes, if you use
> >
> > builder.setSpout("name", new MySpout(), parallelism)
> >
> > Storm will automatically start <parallelism> instances of MySpout. Do
> > not confuse this with tasks (ie, setNumTasks()). Tasks are logical
> units
> > of parallelism, while <parallelism> defines the number of threads
> (ie,
> > physical parallelism). See here for more details:
> >
> https://storm.apache.org/releases/1.0.0/Understanding-the-parallelism-of-a-Storm-topology.html
> >
> >
> > Hope this clears things up a little bit. If you are still confused,
> look
> > at the example in storm-starter.
> >
> >
> > -Matthias
> >
> >
> > On 04/24/2016 07:44 PM, Navin Ipe wrote:
> > > To parallelize some code, I considered having this topology. The
> single
> > > [Spout] or [Bolt] represent multiple Spouts or Bolts.
> > >
> > > *[Spout]--emit--->[Bolt A]--emit--->[Bolt B]*
> > >
> > > If any of the bolts in Bolt A emit a Tuple of value 1, and it gets
> > > processed by a certain bolt in Bolt B, then it is imperative that
> if any
> > > of the bolts in Bolt A again emits the value 1, it should
> compulsorily
> > > be processed by the same bolt in Bolt B. I assume fields grouping
> can
> > > handle this.
> > >
> > > To have many spouts work in parallel, my initial thoughts were to
> have:
> > > / Integer numberOfSpouts = 10;
> > >
> > > String partialSpoutName = "mongoSpout";
> > > String partialBoltName = "mongoBolt";
> > >
> > > for(Integer i = 0; i < numberOfSpouts; ++i) {
> > > String spoutName = partialSpoutName + i.toString();
> > > String boltName = partialBoltName + i.toString();
> > >
> > > builder.setSpout(spoutName, new MongoSpout());
> > > builder.setBolt(boltName, new
> > > GenBolt()).shuffleGrouping(spoutName);
> > > }/
> > >
> > > But I realized it was probably not the right way, because in case I
> > > wanted all of Bolt A's tuples to go to a single Bolt B, then I'd
> have to
> > > include cases like this:
> > >
> > > / switch (numberOfSpouts) {
> > > case 3:
> > > builder.setBolt("sqlWriterBolt", new
> > > SqlWriterBolt(appConfig),3)
> > > .shuffleGrouping(partialBoltName+"2")
> > > .shuffleGrouping(partialBoltName+"1")
> > >
> > > .shuffleGrouping(partialBoltName+"0");
> > >
> > > break;
> > > case 2:
> > > builder.setBolt("sqlWriterBolt", new
> > > SqlWriterBolt(appConfig),2)
> > > .shuffleGrouping(partialBoltName+"1")
> > >
> > > .shuffleGrouping(partialBoltName+"0");
> > >
> > > break;
> > > case 1:
> > > builder.setBolt("sqlWriterBolt", new
> > > SqlWriterBolt(appConfig),1)
> > >
> > > .shuffleGrouping(partialBoltName+"0");
> > > break;
> > > default:
> > > System.out.println("No case here");
> > > }//switch/
> > >
> > > *So three questions:*
> > > *1. *The way I'm using the for loop above is wrong isn't it? If I
> > use a
> > > single builder.setSpout and set the numTasks value, then Storm
> would
> > > create those many Spout instances automatically?
> > > *
> > > 2.* When you specify something like this for fields grouping:
> > > public void declareOutputFields(OutputFieldsDeclarer declarer)
> {
> > > declarer.declare(new Fields("A","B","C","D"));
> > > }
> > > What does it mean? Does it mean that 4 types of tuples might be
> emitted?
> > > When they are received as builder.setBolt("/sqlWriterBolt/", new
> > > Bolt_AB(), 3).fieldsGrouping("/mongoBolt/", new Fields("A", "B"));
> > >
> > > Does it mean that my the first 2 tuples will go to the Bolt_AB and
> the
> > > next 2 tuples may go to any other bolt that receives tuples from
> > > mongoBolt, and then the next two tuples will go to Bolt_AB again?
> Is
> > > that how it works?
> > >
> > > *3. *If I don't specify any new Fields("A", "B"), how does Storm
> know
> > > the grouping? How does it decide? If I have a tuple like this:
> > > class SomeTuple extends IRichBolt {
> > > private Integer someID;
> > > public Integer getSomeID() {return someID;}
> > > }
> > > and if Bolt A sends SomeTuple to Bolt B, with SomeID value (assume
> a
> > > SomeID value is 9) and the next time Bolt A generates a Tuple with
> > > someID = 9 (and it may generate many tuples with someID=9), how do
> I
> > > ensure that Storm sees the SomeID value and decides to send it to
> the
> > > Bolt B instance that processes all someID's which have a value of
> 9?
> > >
> > >
> > > --
> > > Regards,
> > > Navin
> >
> >
> >
> >
> > --
> > Regards,
> > Navin
>
--
Regards,
Navin