There's other idea without relying on Zookeeper : use ordinal of task id
between same components (spout)

Task id is issued across all tasks including system tasks so you can't
assume spout tasks are having task id sequentially, but whatever you can do
the trick - check "ordinal" of this spout task's id around same spouts.
Please refer GeneralTopologyContext.getComponentTasks(String componentId).

Btw, Spout1 -> Bolt2 can be done with various ways but it would not be easy
to aggregate the results of Bolt2 from Bolt3.
You should consider windowing by processed time or Trident or maintain your
own buffers.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

2016년 4월 19일 (화) 오후 10:02, Jason Kusar <[email protected]>님이 작성:

> Hi,
>
> I've done a similar thing before with the exception that I was reading
> from Cassandra.  The concept is the same though.  Assuming you know that
> you have 10,000 records and you want each spout to read 1,000 of them, then
> you would launch 10 instances of the spouts.  The first thing they do
> during init is to connect to zookeeper and create an ephemeral node (
> http://zookeeper.apache.org/doc/r3.2.1/zookeeperProgrammers.html#Ephemeral+Nodes)
> starting with one called '0'.  If 0 already exists, you'll get an exception
> which means you try to create '1' and so on until you successfully create a
> node.  That tells you which batch of records that instance of the spout is
> responsible for.  I.e., if you successfully created '3', then this spout
> needs to set its offset to 3,000.
>
> The reason for using ephemeral nodes is that they are automatically
> deleted if the zookeeper client disconnects.  That way if a spout crashes,
> once Storm relaunches the spout, it will be able to re-claim that token and
> resume work on that batch.  You'll obviously need to have some way to keep
> track of which records you've already processed, but that's going to be
> specific to your implementation.
>
> Hope that helps!
> Jason
>
> On Tue, Apr 19, 2016 at 4:43 AM Navin Ipe <[email protected]>
> wrote:
>
>> Thanks guys.
>> I didn't understand "*...spout instances by utilizing Zookeper.*". How
>> does one utilize Zookeper? Is it the same as ".setNumTasks(10)" for a Spout?
>>
>> As of now I've set
>> config.setNumWorkers(2);
>> and
>> builder.setSpout("mongoSpout", new MongoSpout()).setNumTasks(2);
>>
>> I'm able to get spoutID in open() using this.spoutId =
>> context.getThisTaskId();
>> Strangely, my spoutID always begins with 3 instead of 0.
>>
>> By partitionID I understand that's the fieldGrouping's id.
>>
>> Even if I do all this, will the spout's tasks actually be distributed
>> across multiple workers? Won't I have to create separate spouts?
>> builder.setSpout("mongoSpout1", new MongoSpout());
>> builder.setSpout("mongoSpout2", new MongoSpout());
>> builder.setSpout("mongoSpout3", new MongoSpout());
>> and so on?
>>
>>
>>
>> On Tue, Apr 19, 2016 at 11:51 AM, Alexander T <[email protected]>
>> wrote:
>>
>>> Coreection - group on partition id
>>> On Apr 19, 2016 6:33 AM, "Navin Ipe" <[email protected]>
>>> wrote:
>>>
>>>> I've seen this:
>>>> http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.html
>>>> but it doesn't explain how workers coordinate with each other, so
>>>> requesting a bit of clarity.
>>>>
>>>> I'm considering a situation where I have 2 million rows in MySQL or
>>>> MongoDB.
>>>>
>>>> 1. I want to use a Spout to read the first 1000 rows and send the
>>>> processed output to a Bolt. This happens in Worker1.
>>>> 2. I want a different instance of the same Spout class to read the next
>>>> 1000 rows in parallel with the working of the Spout of 1, then send the
>>>> processed output to an instance of the same Bolt used in 1. This happens in
>>>> Worker2.
>>>> 3. Same as 1 and 2, but it happens in Worker 3.
>>>> 4. I might setup 10 workers like this.
>>>> 5. When all the Bolts in the workers are finished, they send their
>>>> outputs to a single Bolt in Worker 11.
>>>> 6. The Bolt in Worker 11 writes the processed value to a new MySQL
>>>> table.
>>>>
>>>> *My confusion here is in how to make the database iterations happen
>>>> batch by batch, parallelly*. Obviously the database connection would
>>>> have to be made in some static class outside the workers, but if workers
>>>> are started with just "conf.setNumWorkers(2);", then how do I tell the
>>>> workers to iterate different rows of the database? Assuming that the
>>>> workers are running in different machines.
>>>>
>>>> --
>>>> Regards,
>>>> Navin
>>>>
>>>
>>
>>
>> --
>> Regards,
>> Navin
>>
>

Reply via email to