Hi Taylor, Wowsers, thanks for getting back to me so quickly! Basically the flow is as follows:
The Kafka topic is composed of 20 partitions, which 20 KafkaSpout executors read from. The KafkaSpout uses a shuffle grouping to send tuples to the 1000 Bolt A executors. Bolt A parses the incoming tuples, generates 20-25 tuples that are emitted and anchored to the tuples incoming from the KafkaSpout. The anchored tuples are emitted via fieldsGrouping to 1000 Bolt B executors. When I use localOrShuffleGrouping for the 1000 Bolt A executors to 200 Bolt B executors portion of the DAG, my throughput is 12 million tuples/minute. When I use fieldsGrouping, throughput drops to 1 million tuples/minute initially, dropping to < 500K in 30 minutes and then I start seeing tuple failures. Again, 99% percent of the Bolt A executor thread is spent in the com.lmax.disruptor.BlockingWaitStrategy.waitFor method. Going from 20 KS executors to 1000 Bolt B executors works great. I am thinking the fan in from 1000 Bolt A executors to 200 Bolt B executors that uses remote messaging via fieldsGrouping is the problem. I am hoping that local messaging via localOrShuffleGrouping as follows will help: 200 workers 1000 Bolt A 200 Fan In Bolt (1 executor per worker to ensure local shuffle) 50 Bolt B --John On Wed, Dec 9, 2015 at 9:43 PM, P. Taylor Goetz <[email protected]> wrote: > Hi John, > > I think it *may* make sense, but without more details like code/sample > data, it is hard to say. > > Whenever you use a fields grouping, key distribution can come into play > and affect scaling. > > -Taylor > > > On Dec 9, 2015, at 9:31 PM, John Yost <[email protected]> wrote: > > > > Hi Everyone, > > > > I have a large fan in within my topology where I go from 1000 Bolt A > executors to 50 Bolt B executors via fieldsGrouping. When I profile via > jvisualvm, it shows that the Bolt A thread spends 99% of it's time in the > com.lmax.disruptor.BlockingWaitStrategy.waitFor method. > > > > The topology details are as follows: > > > > 200 workers > > 20 KafkaSpout executors > > 1000 Bolt A executors > > 50 Bolt B executors > > > > fieldsGrouping from Bolt A -> Bolt B because I am caching in Bolt B, > building up large Key/Value pairs for HFile import into HBase. > > > > I am thinking if I add an extra bolt between Bolt A and Bolt B where I > do a localOrShuffleGrouping to go from 1000 -> 200 locally followed by > fieldsGrouping to go from 200 -> 50 will lessen Network I/O wait time. > > > > Please confirm if this makes sense or if there are any other better > ideas. > > > > Thanks > > > > --John >
