Hi Navin, I could not follow your email completely. Let me clarify a couple of things to get started. If you still have question, just ask again.
A) A IRichBolt interface defines a bolt, and not a tuple. Thus,
> class SomeTuple extends IRichBolt {
> private Integer someID;
> public Integer getSomeID() {return someID;}
> }
is not correct.
A Tuple in Storm has multiple Fields (also called Schema). You define
the fields on a tuple in .declareOutputStream(...)
B) So
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(new Fields("A","B","C","D"));
> }
does declare, that the tuples that are emitted by this Spout or Bolt
have 4 fields. "A", "B", "C", "D" are the names of those fields (ie,
meta data).
If you want to emit a corresponding tuple you call
output.emit(new Value(1, "x", 3.4, true));
This would emit Tuple with an Integer,String,Double,Boolean as concrete
data types. The values can of course be anything.
C) If you use fieldsGrouping, you always need to specify the fields you
wanna group on (it is not possible to use zero fields for this).
If you have for example an ID field in your tuple and you want that all
tuples with the same ID are processed be the same bolt instance, you can
do it like this:
setBolt("PRODUCER", new MyPBolt());
setBolt("CONSUMER", new MyCBolt(),
parallelism).fieldGrouping("PRODUCER", new Fields("ID"));
Of course, MyPBolt must declare a field with name "ID" in it's
implementation of declareOutputFields() (otherwise Storm will complain).
So answer to first question (the other two should already be covered)
> *1. *The way I'm using the for loop above is wrong isn't it? If I use a
> single builder.setSpout and set the numTasks value, then Storm would
> create those many Spout instances automatically?
Not sure what you want to get -- so not sure if the loop is right or
wrong. And yes, if you use
builder.setSpout("name", new MySpout(), parallelism)
Storm will automatically start <parallelism> instances of MySpout. Do
not confuse this with tasks (ie, setNumTasks()). Tasks are logical units
of parallelism, while <parallelism> defines the number of threads (ie,
physical parallelism). See here for more details:
https://storm.apache.org/releases/1.0.0/Understanding-the-parallelism-of-a-Storm-topology.html
Hope this clears things up a little bit. If you are still confused, look
at the example in storm-starter.
-Matthias
On 04/24/2016 07:44 PM, Navin Ipe wrote:
> To parallelize some code, I considered having this topology. The single
> [Spout] or [Bolt] represent multiple Spouts or Bolts.
>
> *[Spout]--emit--->[Bolt A]--emit--->[Bolt B]*
>
> If any of the bolts in Bolt A emit a Tuple of value 1, and it gets
> processed by a certain bolt in Bolt B, then it is imperative that if any
> of the bolts in Bolt A again emits the value 1, it should compulsorily
> be processed by the same bolt in Bolt B. I assume fields grouping can
> handle this.
>
> To have many spouts work in parallel, my initial thoughts were to have:
> / Integer numberOfSpouts = 10;
>
> String partialSpoutName = "mongoSpout";
> String partialBoltName = "mongoBolt";
>
> for(Integer i = 0; i < numberOfSpouts; ++i) {
> String spoutName = partialSpoutName + i.toString();
> String boltName = partialBoltName + i.toString();
>
> builder.setSpout(spoutName, new MongoSpout());
> builder.setBolt(boltName, new
> GenBolt()).shuffleGrouping(spoutName);
> }/
>
> But I realized it was probably not the right way, because in case I
> wanted all of Bolt A's tuples to go to a single Bolt B, then I'd have to
> include cases like this:
>
> / switch (numberOfSpouts) {
> case 3:
> builder.setBolt("sqlWriterBolt", new
> SqlWriterBolt(appConfig),3)
> .shuffleGrouping(partialBoltName+"2")
> .shuffleGrouping(partialBoltName+"1")
>
> .shuffleGrouping(partialBoltName+"0");
>
> break;
> case 2:
> builder.setBolt("sqlWriterBolt", new
> SqlWriterBolt(appConfig),2)
> .shuffleGrouping(partialBoltName+"1")
>
> .shuffleGrouping(partialBoltName+"0");
>
> break;
> case 1:
> builder.setBolt("sqlWriterBolt", new
> SqlWriterBolt(appConfig),1)
>
> .shuffleGrouping(partialBoltName+"0");
> break;
> default:
> System.out.println("No case here");
> }//switch/
>
> *So three questions:*
> *1. *The way I'm using the for loop above is wrong isn't it? If I use a
> single builder.setSpout and set the numTasks value, then Storm would
> create those many Spout instances automatically?
> *
> 2.* When you specify something like this for fields grouping:
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(new Fields("A","B","C","D"));
> }
> What does it mean? Does it mean that 4 types of tuples might be emitted?
> When they are received as builder.setBolt("/sqlWriterBolt/", new
> Bolt_AB(), 3).fieldsGrouping("/mongoBolt/", new Fields("A", "B"));
>
> Does it mean that my the first 2 tuples will go to the Bolt_AB and the
> next 2 tuples may go to any other bolt that receives tuples from
> mongoBolt, and then the next two tuples will go to Bolt_AB again? Is
> that how it works?
>
> *3. *If I don't specify any new Fields("A", "B"), how does Storm know
> the grouping? How does it decide? If I have a tuple like this:
> class SomeTuple extends IRichBolt {
> private Integer someID;
> public Integer getSomeID() {return someID;}
> }
> and if Bolt A sends SomeTuple to Bolt B, with SomeID value (assume a
> SomeID value is 9) and the next time Bolt A generates a Tuple with
> someID = 9 (and it may generate many tuples with someID=9), how do I
> ensure that Storm sees the SomeID value and decides to send it to the
> Bolt B instance that processes all someID's which have a value of 9?
>
>
> --
> Regards,
> Navin
signature.asc
Description: OpenPGP digital signature
