Great thoughts and info--thanks Kishore!

--John

Sent from my iPhone

> On Aug 15, 2015, at 2:42 PM, Kishore Senji <[email protected]> wrote:
> 
> As long as the number of instances of B is a multiple of the number of 
> instances of A and the number of instances of A being a multiple of number of 
> Workers, we will get a nice even distribution with Storm.
> 
> Yes please choose local or shuffle grouping on the path where there is more 
> data. For example, if Bolt A emits two tuples for every tuple it receives to 
> Bolt B, it makes sense in that case to have that path as the local or shuffle 
> grouping. But in there is a case where Bolt A only emits 10% of the time to 
> Bolt B, then it makes sense to have the upstream to Bolt A in local or 
> shuffling mode.
> 
> 
>> On Sat, Aug 15, 2015 at 4:54 AM, John Yost <[email protected]> wrote:
>> Actually, I only need to match the number of Bolt B executors to the number 
>> of workers to ensure local shuffling in the Bolt A to Bolt B step, correct?  
>> I am hoping that Storm would put one Bolt B executor in each Java process.  
>> Is there something special I need to configure to make that happen?
>> 
>> --John
>> 
>>> On Sat, Aug 15, 2015 at 7:27 AM, John Yost <[email protected]> 
>>> wrote:
>>> Hi Kishore,
>>> 
>>> This is an excellent response--thanks for taking time to write back!  This 
>>> is great analysis of the problem space. I/O overhead--disk plus network--is 
>>> a big problem for us.
>>> 
>>> The suggestion to use Local grouping is an excellent one, but I need to use 
>>> the Fields grouping at some point--either from the KafkaSpout to Bolt A, or 
>>> Bolt A to Bolt B. The reason I need to do this is we're caching Key/Value 
>>> pairs and then persisting each collection to a SequenceFile once each cache 
>>> collection reaches a certain size. 
>>> 
>>> I currently have fields grouping for Bolt A to Bolt B, and would like to 
>>> maintain this approach as the KafkaSpout has 10 partitions as input, any 
>>> one of which can develop hotspots. Consequently, I am using the Shuffle 
>>> grouping for the KafkaSpout to Bolt A step to evenly distribute the tuples 
>>> coming from Kafka to Bolt A. I then use Fields grouping to ensure the same 
>>> Key/Value pairs go tot the same Bolt B instance. However, as you point out, 
>>> the network overhead is really throttling throughput, so it's worth a shot 
>>> to do the fields grouping at the KafkaSpout-Bolt A step and then do a Local 
>>> grouping at the Bolt A-Bolt B step.
>>> 
>>> Since I have one worker per node as I mentioned in a response to Javier's 
>>> earlier post, I will need to up the number of Bolt B instances to match 
>>> Bolt A to ensure Local shuffling for all tuples. Not totally sure how much 
>>> adding 300 additional executors for 120 workers will negatively impact 
>>> performance, but I am guessing that it would be more than offset by the 
>>> greatly decreased network and disk I/O, since Bolt A-Bolt B will be all 
>>> intra-Java process messaging via IMAX.
>>> 
>>> Thanks again for your excellent thoughts!
>>> 
>>> --John
>>> 
>>>> On Fri, Aug 14, 2015 at 9:48 PM, Kishore Senji <[email protected]> wrote:
>>>> 7 million/minute with 200 instances, implies a latency of 
>>>> 200*1000/(7*1000^2/60) ~ 1.71 ms for bolt A. For throughput calculations, 
>>>> I would normalize it and visualize a single instance of the bolt running. 
>>>> So for 1 both instance, the latency would be 1.71/200. 
>>>> 
>>>> By adding bolt B with 50 instances, the throughput came to 900000/min, 
>>>> which means
>>>> 
>>>> 900000= 1000*60 / ((1.71/200)+(B/50)), solving this B would be around 2.9 
>>>> ms. Even though you mentioned there is nothing in the Bolt B impl, the 
>>>> overall latency is 2.9ms, this means there is a lot of overhead in the 
>>>> network. Are you using fields grouping or because of the number of 
>>>> instances the tuples are sent over the network. Try using local grouping.
>>>> 
>>>> Bumping bolt B to 100 instances, you got 1.1 million/minute. But this 
>>>> equation should give us: 1000*60 / ((1.71/200)+(2.9/100)) around 1.6 
>>>> million/min. This means adding more instances added more overhead, could 
>>>> be that more number of tuples are going over the network (because they 
>>>> went in to different worker processes). Calculating the bolt b latency for 
>>>> 1.1 million gives = 4.6 ms of overhead for Bolt B.
>>>> 
>>>> Adding more instances can be costly because of the IPC. The key would be 
>>>> to get more local shuffling. Check if you have any failures in the overall 
>>>> system and also check the Storm UI which will show the bottleneck in your 
>>>> topology (the capacity of the bolts and the latencies etc)
>>>> 
>>>> 
>>>>> On Fri, Aug 14, 2015 at 2:43 PM Javier Gonzalez <[email protected]> 
>>>>> wrote:
>>>>> Do  you actually have 170 machines? Try sticking to one worker per 
>>>>> machine (tweak memory parameters in storm.yaml), makes inter bolt traffic 
>>>>> much faster.
>>>>> 
>>>>>> On Aug 14, 2015 5:28 PM, "John Yost" <[email protected]> wrote:
>>>>>> Hey Javier,
>>>>>> 
>>>>>> Cool, thanks for your response!  I have 50 workers for 200 Bolt A/5 Bolt 
>>>>>> B and 120 workers for 400 Bolt A/100 Bolt B (this latter config is 
>>>>>> optimal, but cluster resources make it tricky to actually launch this).
>>>>>> 
>>>>>> I will up the number of Ackers and see if that helps. If not, then I 
>>>>>> will try to vary the number of B bolts beyond 100.
>>>>>> 
>>>>>> Thanks Again!
>>>>>> 
>>>>>> --John
>>>>>> 
>>>>>>> On Fri, Aug 14, 2015 at 2:59 PM, Javier Gonzalez <[email protected]> 
>>>>>>> wrote:
>>>>>>> You will have a detrimental effect to wiring in boltB, even if it does 
>>>>>>> nothing but ack. Every tuple you have processed from A has to travel to 
>>>>>>> a B bolt, and the ack has to travel back.
>>>>>>> 
>>>>>>> You could try modifying the number of ackers, and playing with the 
>>>>>>> number of A and B bolts. How many workers do you have for the topology?
>>>>>>> 
>>>>>>> Regards,
>>>>>>> JG
>>>>>>> 
>>>>>>>> On Aug 14, 2015 12:31 PM, "John Yost" <[email protected]> 
>>>>>>>> wrote:
>>>>>>>> Hi Everyone,
>>>>>>>> 
>>>>>>>> I have a topology where a highly CPU-intensive bolt (Bolt A) requires 
>>>>>>>> a much higher degree of parallelism than the bolt it emits tuples to 
>>>>>>>> (Bolt B) (200 Bolt A executors vs <= 100 Bolt B executors).
>>>>>>>> 
>>>>>>>> I find that the throughput, as measured in number of tuples acked, 
>>>>>>>> goes from 7 million/minute to ~ 1 million/minute when I wire in Bolt 
>>>>>>>> B--even if all of the logic within the Bolt B execute method is 
>>>>>>>> disabled and the Bolt B is therefore simply acking the input tuples 
>>>>>>>> from Bolt A. In addition, I find that, going from 50 to 100 Bolt B 
>>>>>>>> executors causes the throughput to go from 900K/minute to ~ 1.1 
>>>>>>>> million/minute.  
>>>>>>>> 
>>>>>>>> Is the fact that I am going from 200 bolt instances to 100 or less the 
>>>>>>>> problem?   I've already experimented with executor.send.buffer.size 
>>>>>>>> and executor.receive.buffer.size, which helped drive throughput from 
>>>>>>>> 800K to 900K. I will try topology.transfer.buffer.size, perhaps set 
>>>>>>>> that higher to 2048. Any other ideas?  
>>>>>>>> 
>>>>>>>> Thanks
>>>>>>>> 
>>>>>>>> --John
> 

Reply via email to