Actually, I only need to match the number of Bolt B executors to the number
of workers to ensure local shuffling in the Bolt A to Bolt B step,
correct?  I am hoping that Storm would put one Bolt B executor in each Java
process.  Is there something special I need to configure to make that
happen?

--John

On Sat, Aug 15, 2015 at 7:27 AM, John Yost <[email protected]>
wrote:

> Hi Kishore,
>
> This is an excellent response--thanks for taking time to write back!  This
> is great analysis of the problem space. I/O overhead--disk plus network--is
> a big problem for us.
>
> The suggestion to use Local grouping is an excellent one, but I need to
> use the Fields grouping at some point--either from the KafkaSpout to Bolt
> A, or Bolt A to Bolt B. The reason I need to do this is we're caching
> Key/Value pairs and then persisting each collection to a SequenceFile once
> each cache collection reaches a certain size.
>
> I currently have fields grouping for Bolt A to Bolt B, and would like to
> maintain this approach as the KafkaSpout has 10 partitions as input, any
> one of which can develop hotspots. Consequently, I am using the Shuffle
> grouping for the KafkaSpout to Bolt A step to evenly distribute the tuples
> coming from Kafka to Bolt A. I then use Fields grouping to ensure the same
> Key/Value pairs go tot the same Bolt B instance. However, as you point out,
> the network overhead is really throttling throughput, so it's worth a shot
> to do the fields grouping at the KafkaSpout-Bolt A step and then do a Local
> grouping at the Bolt A-Bolt B step.
>
> Since I have one worker per node as I mentioned in a response to Javier's
> earlier post, I will need to up the number of Bolt B instances to match
> Bolt A to ensure Local shuffling for all tuples. Not totally sure how much
> adding 300 additional executors for 120 workers will negatively impact
> performance, but I am guessing that it would be more than offset by the
> greatly decreased network and disk I/O, since Bolt A-Bolt B will be all
> intra-Java process messaging via IMAX.
>
> Thanks again for your excellent thoughts!
>
> --John
>
> On Fri, Aug 14, 2015 at 9:48 PM, Kishore Senji <[email protected]> wrote:
>
>> 7 million/minute with 200 instances, implies a latency
>> of 200*1000/(7*1000^2/60) ~ 1.71 ms for bolt A. For throughput
>> calculations, I would normalize it and visualize a single instance of the
>> bolt running. So for 1 both instance, the latency would be 1.71/200.
>>
>> By adding bolt B with 50 instances, the throughput came to 900000/min,
>> which means
>>
>> 900000= 1000*60 / ((1.71/200)+(B/50)), solving this B would be around 2.9
>> ms. Even though you mentioned there is nothing in the Bolt B impl, the
>> overall latency is 2.9ms, this means there is a lot of overhead in the
>> network. Are you using fields grouping or because of the number of
>> instances the tuples are sent over the network. Try using local grouping.
>>
>> Bumping bolt B to 100 instances, you got 1.1 million/minute. But this
>> equation should give us: 1000*60 / ((1.71/200)+(2.9/100)) around 1.6
>> million/min. This means adding more instances added more overhead, could be
>> that more number of tuples are going over the network (because they went in
>> to different worker processes). Calculating the bolt b latency for 1.1
>> million gives = 4.6 ms of overhead for Bolt B.
>>
>> Adding more instances can be costly because of the IPC. The key would be
>> to get more local shuffling. Check if you have any failures in the overall
>> system and also check the Storm UI which will show the bottleneck in your
>> topology (the capacity of the bolts and the latencies etc)
>>
>>
>> On Fri, Aug 14, 2015 at 2:43 PM Javier Gonzalez <[email protected]>
>> wrote:
>>
>>> Do  you actually have 170 machines? Try sticking to one worker per
>>> machine (tweak memory parameters in storm.yaml), makes inter bolt traffic
>>> much faster.
>>> On Aug 14, 2015 5:28 PM, "John Yost" <[email protected]> wrote:
>>>
>>>> Hey Javier,
>>>>
>>>> Cool, thanks for your response!  I have 50 workers for 200 Bolt A/5
>>>> Bolt B and 120 workers for 400 Bolt A/100 Bolt B (this latter config is
>>>> optimal, but cluster resources make it tricky to actually launch this).
>>>>
>>>> I will up the number of Ackers and see if that helps. If not, then I
>>>> will try to vary the number of B bolts beyond 100.
>>>>
>>>> Thanks Again!
>>>>
>>>> --John
>>>>
>>>> On Fri, Aug 14, 2015 at 2:59 PM, Javier Gonzalez <[email protected]>
>>>> wrote:
>>>>
>>>>> You will have a detrimental effect to wiring in boltB, even if it does
>>>>> nothing but ack. Every tuple you have processed from A has to travel to a 
>>>>> B
>>>>> bolt, and the ack has to travel back.
>>>>>
>>>>> You could try modifying the number of ackers, and playing with the
>>>>> number of A and B bolts. How many workers do you have for the topology?
>>>>>
>>>>> Regards,
>>>>> JG
>>>>> On Aug 14, 2015 12:31 PM, "John Yost" <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Everyone,
>>>>>>
>>>>>> I have a topology where a highly CPU-intensive bolt (Bolt A) requires
>>>>>> a much higher degree of parallelism than the bolt it emits tuples to 
>>>>>> (Bolt
>>>>>> B) (200 Bolt A executors vs <= 100 Bolt B executors).
>>>>>>
>>>>>> I find that the throughput, as measured in number of tuples acked,
>>>>>> goes from 7 million/minute to ~ 1 million/minute when I wire in Bolt
>>>>>> B--even if all of the logic within the Bolt B execute method is disabled
>>>>>> and the Bolt B is therefore simply acking the input tuples from Bolt A. 
>>>>>> In
>>>>>> addition, I find that, going from 50 to 100 Bolt B executors causes the
>>>>>> throughput to go from 900K/minute to ~ 1.1 million/minute.
>>>>>>
>>>>>> Is the fact that I am going from 200 bolt instances to 100 or less
>>>>>> the problem?   I've already experimented with executor.send.buffer.size 
>>>>>> and
>>>>>> executor.receive.buffer.size, which helped drive throughput from 800K to
>>>>>> 900K. I will try topology.transfer.buffer.size, perhaps set that higher 
>>>>>> to
>>>>>> 2048. Any other ideas?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> --John
>>>>>>
>>>>>>
>>>>
>

Reply via email to