I guess this problem is not uncommon. MapReduce also suffers from stragglers...
It is also a hard problem you want to solve. There is already a (quite old) JIRA for it: https://issues.apache.org/jira/browse/STORM-162 Hope this helps. Implementing custom grouping in general is simple. See TopologyBuilder.setBolt(..).customGrouping(...) You just need to implement "CustomStreamGrouping" interface. In your case, it is tricky, because you need a feedback-loop from the consumers back to your CustomStreamGrouping use at the producer. Maybe you can exploit Storm's "Metric" or "TopologyInfo" to build the feedback loop. But i am not sure, if this result in a "clean" solution. -Matthias On 06/25/2015 07:54 AM, Aditya Rajan wrote: > Hey Mathias, > > > We've been running the topology for about 16 hours, for the last three > hours it has been failing.Here's a screenshot of the clogging. > > > It is my assumption that all the tuples are of equal size since they are > all objects of the same class. > > Is this not a common problem? Has anyone implemented a load balance > shuffle? Could someone guide us on how to build such this custom grouping? > > > > On Wed, Jun 24, 2015 at 1:40 PM, Matthias J. Sax > <[email protected] <mailto:[email protected]>> > wrote: > > Worried might not be the right term. However, as a rule of thumb, > capacity should not exceed 1.0 -- a higher value indicates an overload. > Usually, this problem is tackled by increasing the parallelism. However, > as you have an inbalance (in terms of processing time -- some tuples > seems to need more time to get finished than others), increasing the dop > might not help. > > The question to answer would be, why heavy-weight tuples seems to > cluster at certain instances and are not distributed evenly over all > executors? > > It is also confusing, are the values of "execute latency" and "process > latency". For some instances "execute latency" is 10x higher than > "process lantency" -- of other instances it's the other way round. This > in no only inconsistent, but also in general, I would expect "process > latency" to be larger than execute latency. > > -Matthias > > On 06/24/2015 09:07 AM, Aditya Rajan wrote: > > Hey Mathias, > > > > What can be inferred from the high capacity values?Should we be worried? > > What should we do to change it ? > > > > Thanks > > Aditya > > > > > > On Tue, Jun 23, 2015 at 5:54 PM, Nathan Leung <[email protected] > <mailto:[email protected]> > > <mailto:[email protected] <mailto:[email protected]>>> wrote: > > > > Also to clarify, unless you change the sample frequency the counts > > in the ui are not precise. Note that they are all multiples of 20. > > > > On Jun 23, 2015 7:16 AM, "Matthias J. Sax" > > <[email protected] > <mailto:[email protected]> > > <mailto:[email protected] > <mailto:[email protected]>>> wrote: > > > > I don't see any in-balance. The value of "Executed" is 440/460 > > for each > > bolt. Thus each bolt processed about the same number of > tuples. > > > > Shuffle grouping does a round robin distribution and balances > > the number > > of tuples sent to each receiver. > > > > I you refer to the values "capactiy", "execute latency", > or "process > > latency", shuffle grouping cannot balance those. Furthermore, > > Storm does > > not give any support to balance them. You would need to > implement a > > "CustomStreamGrouping" or use direct-grouping to take care > of load > > balancing with regard to those metrics. > > > > > > -Matthias > > > > > > > > On 06/23/2015 11:42 AM, bhargav sarvepalli wrote: > > > > > > > > > I'm leading a spout with 30 executors into this bolt > which has 90 > > > executors. Despite using shuffle grouping , the load > seems to be > > > unbalance.Attached is a screenshot showing the same. Would > > anyone happen > > > to know why this is happening or how this can be solved? > > > > > > > > > > > > > > > > > >
signature.asc
Description: OpenPGP digital signature
