Hi Matthias,

Thanks for the image and explanation. I understood the parallelism and
tasks, but even now when I run
builder.setSpout(partialSpoutName, new MongoSpout(),5).setNumTasks(100);
There's a line in MongoSpout's constructor (System.out.println("MongoSpout
created");) which is getting printed only once. This is the reason for all
the confusion on my part. I'd have expected it to get printed at least 5
times.

Am running on LocalCluster localCluster = new LocalCluster(); for now, but
even then I felt it should've created the necessary worker threads.

When does Storm decide to create a new instance of MongoSpout?

When nextTuple of MongoSpout looks is this,
    @Override
    public void nextTuple() {
        while(batch.hasNext()) {
            History_DBO history = batchOfHistories.next();
                this.collector.emit(new Values(history._id), history._id);
            }
        }//while
    }

Does Storm decide to create a new MongoSpout if the first MongoSpout which
emitted a tuple hasn't yet received an ack()? Is that how it works?



On Mon, Apr 25, 2016 at 2:33 PM, Matthias J. Sax <mj...@apache.org> wrote:

> Hi Navin,
>
> If you really want a "forward connection pattern", ie, all data of a
> single spout goes to a single bolt, your idea with a loop should work.
> Of course, as you do actually deploy distinct graphs, ie, the single
> parts of the topology do not exchange data, you could also just deploy
> many topologies with parallelism of one.
>
> If "standard" way in Storm (without the loop), would be like this:
>
> > builder.setSpout(spoutName, new MongoSpout(), 5);
> > builder.setBolt(boltName, new GenBolt(), 5).shuffleGrouping(spoutName);
> > builder.setBolt(boltName2, new SomeBolt(), 5).shuffleGrouping(boltName);
>
> with 5 being the number of instances (if you don't specify any number,
> the default is 1. The third parameter (parallilism_hint) define the
> number of executor-thread (each thread will run a bolt instance).
>
> However, using shuffleGrouping (or localOrShuffleGrouping), data is not
> forwarded to a single instance, but randomly re-distirbuted over all
> bolt instances. (see attached png showing a topology with 2 spouts and 4
> bolts -- each represented by a quare. The dots within each square
> illustrates the parallelism of each -- the connection pattern would be
> shuffleGrouping)
>
> As an alternative, you can also use custom-grouping (or direct streams)
> to achieve a "forwarding" pattern. (with regard to dynamic scaling --
> see below -- this might be the correct way to go for your case). Using
> the loop, you cannot add more instances during runtime.
>
> However, I am not sure what you mean by
>
> "Storm should automatically create the required number of instances of
> spouts and bolts to be able to scale horizontally when I add more
> machines to process the data"
>
> If you deploy a topology, the number of parallel instances is stable.
> Storm does not automatically change it. If you want to change the
> parallelism during runtime, you need to do this manually via
>
> bin/storm rebalance ...
>
> The rebalance operation requires, that the topology has enough TASKS
> (the parallelism cannot be larger as the number of tasks).
>
> Thus, you need to "prepare" your topology during setup for dynamic
> scaling via .setNumTasks();
>
> > builder.setSpout(spoutName, new MongoSpout(), 5).setNumTasks(100);
> > builder.setBolt(boltName, new GenBolt(),
> 5).shuffleGrouping(spoutName).setNumTasks(100);
> > builder.setBolt(boltName2, new SomeBolt(),
> 5).shuffleGrouping(boltName).setNumTasks(100);
>
> Here, 100 define the maximum number of parallel instances you can run,
> and the initial deployment will start 5. Using rebalance you can change
> the parallelism to up to 100 now.
>
> Hope this makes sens.
>
>
> -Matthias
>
>
>
>
> On 04/25/2016 09:11 AM, Navin Ipe wrote:
> > Thank you Matthias for your time and patient explanation. I'm now clear
> > about the Fields grouping (an answer on Stackoverflow had confused me
> > <
> http://stackoverflow.com/questions/33512554/multiple-fields-grouping-in-storm
> >).
> > The first question still stands, where I'm unable to understand when
> > multiple instances of spouts and bolts get created.
> >
> > To get a topology like this:
> > Spout ---->Bolt--->Bolt
> > Spout ---->Bolt--->Bolt
> > Spout ---->Bolt--->Bolt
> >
> > is what I'm trying to achieve, but if I simply say:
> >
> > /builder.setSpout(spoutName, new MongoSpout());
> > builder.setBolt(boltName, new GenBolt()).shuffleGrouping(spoutName);/
> > /builder.setBolt(boltName2, new SomeBolt()).shuffleGrouping(boltName);
> > /
> > Then I don't see how multiple instances of MongoSpout() will get
> > created. I've already been through a lot of tutorials and documentation
> > pages, but they don't explain that (in a way that I understand). I had
> > also run some code where I set .setNumTasks(12); but the constructor of
> > MongoSpout() got called only once.
> > So under what situation does storm create multiple instances of
> MongoSpout?
> >
> > The objective is that when there are a lot of things to process, Storm
> > should automatically create the required number of instances of spouts
> > and bolts to be able to scale horizontally when I add more machines to
> > process the data.
> >
> > But if I simply create this:
> > /builder.setSpout(spoutName, new MongoSpout());
> > builder.setBolt(boltName, new GenBolt()).shuffleGrouping(spoutName);/
> > /builder.setBolt(boltName2, new SomeBolt()).shuffleGrouping(boltName);
> >
> > /
> > how will Storm scale?
> >
> > I need one spout to iterate the first 1000 rows of a database and
> > extract data while parallelly another spout iterates through the next
> > 1000 rows of a database and so on. So if the database has 1 million
> > rows, Storm should automatically create that many spouts. Is that
> > possible? (there would be a limit on the number of spouts of course)
> >
> > Then, based on the number of spouts created, the same number of bolts
> > are created to process the data the spouts emit.
> >
> > So my biggest confusion has always been just this. About how are the
> > multiple spout and bolt instances created so that the processing can
> scale?
> >
> > ps: Yes I knew the tuple does not extend IRichBolt. It was a silly
> > mistake while I was typing :-)
> > //
> >
> >
> > On Mon, Apr 25, 2016 at 3:26 AM, Matthias J. Sax <mj...@apache.org
> > <mailto:mj...@apache.org>> wrote:
> >
> >     Hi Navin,
> >
> >     I could not follow your email completely. Let me clarify a couple of
> >     things to get started. If you still have question, just ask again.
> >
> >
> >
> >     A) A IRichBolt interface defines a bolt, and not a tuple. Thus,
> >
> >     > class SomeTuple extends IRichBolt {
> >     >   private Integer someID;
> >     >   public Integer getSomeID() {return someID;}
> >     > }
> >
> >     is not correct.
> >
> >     A Tuple in Storm has multiple Fields (also called Schema). You define
> >     the fields on a tuple in .declareOutputStream(...)
> >
> >
> >
> >     B) So
> >
> >     >     public void declareOutputFields(OutputFieldsDeclarer declarer)
> {
> >     >         declarer.declare(new Fields("A","B","C","D"));
> >     >     }
> >
> >     does declare, that the tuples that are emitted by this Spout or Bolt
> >     have 4 fields. "A", "B", "C", "D" are the names of those fields (ie,
> >     meta data).
> >
> >     If you want to emit a corresponding tuple you call
> >
> >         output.emit(new Value(1, "x", 3.4, true));
> >
> >     This would emit Tuple with an Integer,String,Double,Boolean as
> concrete
> >     data types. The values can of course be anything.
> >
> >
> >
> >     C) If you use fieldsGrouping, you always need to specify the fields
> you
> >     wanna group on (it is not possible to use zero fields for this).
> >
> >     If you have for example an ID field in your tuple and you want that
> all
> >     tuples with the same ID are processed be the same bolt instance, you
> can
> >     do it like this:
> >
> >         setBolt("PRODUCER", new MyPBolt());
> >         setBolt("CONSUMER", new MyCBolt(),
> >     parallelism).fieldGrouping("PRODUCER", new Fields("ID"));
> >
> >     Of course, MyPBolt must declare a field with name "ID" in it's
> >     implementation of declareOutputFields() (otherwise Storm will
> complain).
> >
> >
> >     So answer to first question (the other two should already be covered)
> >
> >     > *1. *The way I'm using the for loop above is wrong isn't it? If I
> >     use a
> >     > single builder.setSpout and set the numTasks value, then Storm
> would
> >     > create those many Spout instances automatically?
> >
> >     Not sure what you want to get -- so not sure if the loop is right or
> >     wrong. And yes, if you use
> >
> >     builder.setSpout("name", new MySpout(), parallelism)
> >
> >     Storm will automatically start <parallelism> instances of MySpout. Do
> >     not confuse this with tasks (ie, setNumTasks()). Tasks are logical
> units
> >     of parallelism, while <parallelism> defines the number of threads
> (ie,
> >     physical parallelism). See here for more details:
> >
> https://storm.apache.org/releases/1.0.0/Understanding-the-parallelism-of-a-Storm-topology.html
> >
> >
> >     Hope this clears things up a little bit. If you are still confused,
> look
> >     at the example in storm-starter.
> >
> >
> >     -Matthias
> >
> >
> >     On 04/24/2016 07:44 PM, Navin Ipe wrote:
> >     > To parallelize some code, I considered having this topology. The
> single
> >     > [Spout] or [Bolt] represent multiple Spouts or Bolts.
> >     >
> >     > *[Spout]--emit--->[Bolt A]--emit--->[Bolt B]*
> >     >
> >     > If any of the bolts in Bolt A emit a Tuple of value 1, and it gets
> >     > processed by a certain bolt in Bolt B, then it is imperative that
> if any
> >     > of the bolts in Bolt A again emits the value 1, it should
> compulsorily
> >     > be processed by the same bolt in Bolt B. I assume fields grouping
> can
> >     > handle this.
> >     >
> >     > To have many spouts work in parallel, my initial thoughts were to
> have:
> >     > /        Integer numberOfSpouts = 10;
> >     >
> >     >         String partialSpoutName = "mongoSpout";
> >     >         String partialBoltName = "mongoBolt";
> >     >
> >     >         for(Integer i = 0; i < numberOfSpouts; ++i) {
> >     >             String spoutName = partialSpoutName + i.toString();
> >     >             String boltName = partialBoltName + i.toString();
> >     >
> >     >             builder.setSpout(spoutName, new MongoSpout());
> >     >             builder.setBolt(boltName, new
> >     > GenBolt()).shuffleGrouping(spoutName);
> >     >         }/
> >     >
> >     > But I realized it was probably not the right way, because in case I
> >     > wanted all of Bolt A's tuples to go to a single Bolt B, then I'd
> have to
> >     > include cases like this:
> >     >
> >     > /        switch (numberOfSpouts) {
> >     >             case 3:
> >     >                 builder.setBolt("sqlWriterBolt", new
> >     > SqlWriterBolt(appConfig),3)
> >     >                         .shuffleGrouping(partialBoltName+"2")
> >     >                         .shuffleGrouping(partialBoltName+"1")
> >     >
> >     > .shuffleGrouping(partialBoltName+"0");
> >     >
> >     >                 break;
> >     >             case 2:
> >     >                 builder.setBolt("sqlWriterBolt", new
> >     > SqlWriterBolt(appConfig),2)
> >     >                         .shuffleGrouping(partialBoltName+"1")
> >     >
> >     > .shuffleGrouping(partialBoltName+"0");
> >     >
> >     >                 break;
> >     >             case 1:
> >     >                 builder.setBolt("sqlWriterBolt", new
> >     > SqlWriterBolt(appConfig),1)
> >     >
> >     > .shuffleGrouping(partialBoltName+"0");
> >     >                 break;
> >     >             default:
> >     >                 System.out.println("No case here");
> >     >         }//switch/
> >     >
> >     > *So three questions:*
> >     > *1. *The way I'm using the for loop above is wrong isn't it? If I
> >     use a
> >     > single builder.setSpout and set the numTasks value, then Storm
> would
> >     > create those many Spout instances automatically?
> >     > *
> >     > 2.* When you specify something like this for fields grouping:
> >     >     public void declareOutputFields(OutputFieldsDeclarer declarer)
> {
> >     >         declarer.declare(new Fields("A","B","C","D"));
> >     >     }
> >     > What does it mean? Does it mean that 4 types of tuples might be
> emitted?
> >     > When they are received as builder.setBolt("/sqlWriterBolt/", new
> >     > Bolt_AB(), 3).fieldsGrouping("/mongoBolt/", new Fields("A", "B"));
> >     >
> >     > Does it mean that my the first 2 tuples will go to the Bolt_AB and
> the
> >     > next 2 tuples may go to any other bolt that receives tuples from
> >     > mongoBolt, and then the next two tuples will go to Bolt_AB again?
> Is
> >     > that how it works?
> >     >
> >     > *3. *If I don't specify any new Fields("A", "B"), how does Storm
> know
> >     > the grouping? How does it decide? If I have a tuple like this:
> >     > class SomeTuple extends IRichBolt {
> >     >   private Integer someID;
> >     >   public Integer getSomeID() {return someID;}
> >     > }
> >     > and if Bolt A sends SomeTuple to Bolt B, with SomeID value (assume
> a
> >     > SomeID value is 9) and the next time Bolt A generates a Tuple with
> >     > someID = 9 (and it may generate many tuples with someID=9), how do
> I
> >     > ensure that Storm sees the SomeID value and decides to send it to
> the
> >     > Bolt B instance that processes all someID's which have a value of
> 9?
> >     >
> >     >
> >     > --
> >     > Regards,
> >     > Navin
> >
> >
> >
> >
> > --
> > Regards,
> > Navin
>



-- 
Regards,
Navin

Reply via email to