@Jungtaek: This person ( http://stackoverflow.com/questions/21480773/can-twitter-storm-worker-get-only-parts-of-the-topology) claims that Storm would automatically manage the flow of data between spouts and blots on different workers. Can anyone confirm this? If this is the case, I won't have to bother using Trident.
On Wed, Apr 20, 2016 at 8:59 AM, Navin Ipe <[email protected]> wrote: > @Jason: Thanks. Tried searching for Storm code which starts Ephemeral > nodes, but couldn't find it. (am new to Hadoop and Storm, so perhaps I was > searching for the wrong thing) > > @Jungtaek: Will explore component tasks. Meanwhile, I had considered > Trident, but didn't go ahead because it was not clear how I could implement > multiple spouts in Trident, where each spout would iterate a certain number > of rows of a database. Any idea how that could happen. > > On Tue, Apr 19, 2016 at 6:39 PM, Jungtaek Lim <[email protected]> wrote: > >> There's other idea without relying on Zookeeper : use ordinal of task id >> between same components (spout) >> >> Task id is issued across all tasks including system tasks so you can't >> assume spout tasks are having task id sequentially, but whatever you can do >> the trick - check "ordinal" of this spout task's id around same spouts. >> Please refer GeneralTopologyContext.getComponentTasks(String componentId). >> >> Btw, Spout1 -> Bolt2 can be done with various ways but it would not be >> easy to aggregate the results of Bolt2 from Bolt3. >> You should consider windowing by processed time or Trident or maintain >> your own buffers. >> >> Hope this helps. >> >> Thanks, >> Jungtaek Lim (HeartSaVioR) >> >> 2016년 4월 19일 (화) 오후 10:02, Jason Kusar <[email protected]>님이 작성: >> >>> Hi, >>> >>> I've done a similar thing before with the exception that I was reading >>> from Cassandra. The concept is the same though. Assuming you know that >>> you have 10,000 records and you want each spout to read 1,000 of them, then >>> you would launch 10 instances of the spouts. The first thing they do >>> during init is to connect to zookeeper and create an ephemeral node ( >>> http://zookeeper.apache.org/doc/r3.2.1/zookeeperProgrammers.html#Ephemeral+Nodes) >>> starting with one called '0'. If 0 already exists, you'll get an exception >>> which means you try to create '1' and so on until you successfully create a >>> node. That tells you which batch of records that instance of the spout is >>> responsible for. I.e., if you successfully created '3', then this spout >>> needs to set its offset to 3,000. >>> >>> The reason for using ephemeral nodes is that they are automatically >>> deleted if the zookeeper client disconnects. That way if a spout crashes, >>> once Storm relaunches the spout, it will be able to re-claim that token and >>> resume work on that batch. You'll obviously need to have some way to keep >>> track of which records you've already processed, but that's going to be >>> specific to your implementation. >>> >>> Hope that helps! >>> Jason >>> >>> On Tue, Apr 19, 2016 at 4:43 AM Navin Ipe < >>> [email protected]> wrote: >>> >>>> Thanks guys. >>>> I didn't understand "*...spout instances by utilizing Zookeper.*". How >>>> does one utilize Zookeper? Is it the same as ".setNumTasks(10)" for a >>>> Spout? >>>> >>>> As of now I've set >>>> config.setNumWorkers(2); >>>> and >>>> builder.setSpout("mongoSpout", new MongoSpout()).setNumTasks(2); >>>> >>>> I'm able to get spoutID in open() using this.spoutId = >>>> context.getThisTaskId(); >>>> Strangely, my spoutID always begins with 3 instead of 0. >>>> >>>> By partitionID I understand that's the fieldGrouping's id. >>>> >>>> Even if I do all this, will the spout's tasks actually be distributed >>>> across multiple workers? Won't I have to create separate spouts? >>>> builder.setSpout("mongoSpout1", new MongoSpout()); >>>> builder.setSpout("mongoSpout2", new MongoSpout()); >>>> builder.setSpout("mongoSpout3", new MongoSpout()); >>>> and so on? >>>> >>>> >>>> >>>> On Tue, Apr 19, 2016 at 11:51 AM, Alexander T <[email protected]> >>>> wrote: >>>> >>>>> Coreection - group on partition id >>>>> On Apr 19, 2016 6:33 AM, "Navin Ipe" <[email protected]> >>>>> wrote: >>>>> >>>>>> I've seen this: >>>>>> http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.html >>>>>> but it doesn't explain how workers coordinate with each other, so >>>>>> requesting a bit of clarity. >>>>>> >>>>>> I'm considering a situation where I have 2 million rows in MySQL or >>>>>> MongoDB. >>>>>> >>>>>> 1. I want to use a Spout to read the first 1000 rows and send the >>>>>> processed output to a Bolt. This happens in Worker1. >>>>>> 2. I want a different instance of the same Spout class to read the >>>>>> next 1000 rows in parallel with the working of the Spout of 1, then send >>>>>> the processed output to an instance of the same Bolt used in 1. This >>>>>> happens in Worker2. >>>>>> 3. Same as 1 and 2, but it happens in Worker 3. >>>>>> 4. I might setup 10 workers like this. >>>>>> 5. When all the Bolts in the workers are finished, they send their >>>>>> outputs to a single Bolt in Worker 11. >>>>>> 6. The Bolt in Worker 11 writes the processed value to a new MySQL >>>>>> table. >>>>>> >>>>>> *My confusion here is in how to make the database iterations happen >>>>>> batch by batch, parallelly*. Obviously the database connection would >>>>>> have to be made in some static class outside the workers, but if workers >>>>>> are started with just "conf.setNumWorkers(2);", then how do I tell >>>>>> the workers to iterate different rows of the database? Assuming that the >>>>>> workers are running in different machines. >>>>>> >>>>>> -- >>>>>> Regards, >>>>>> Navin >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Regards, >>>> Navin >>>> >>> > > > -- > Regards, > Navin > -- Regards, Navin
