As long as the number of instances of B is a multiple of the number of
instances of A and the number of instances of A being a multiple of number
of Workers, we will get a nice even distribution with Storm.

Yes please choose local or shuffle grouping on the path where there is more
data. For example, if Bolt A emits two tuples for every tuple it receives
to Bolt B, it makes sense in that case to have that path as the local or
shuffle grouping. But in there is a case where Bolt A only emits 10% of the
time to Bolt B, then it makes sense to have the upstream to Bolt A in local
or shuffling mode.


On Sat, Aug 15, 2015 at 4:54 AM, John Yost <[email protected]>
wrote:

> Actually, I only need to match the number of Bolt B executors to the
> number of workers to ensure local shuffling in the Bolt A to Bolt B step,
> correct?  I am hoping that Storm would put one Bolt B executor in each Java
> process.  Is there something special I need to configure to make that
> happen?
>
> --John
>
> On Sat, Aug 15, 2015 at 7:27 AM, John Yost <[email protected]>
> wrote:
>
>> Hi Kishore,
>>
>> This is an excellent response--thanks for taking time to write back!
>> This is great analysis of the problem space. I/O overhead--disk plus
>> network--is a big problem for us.
>>
>> The suggestion to use Local grouping is an excellent one, but I need to
>> use the Fields grouping at some point--either from the KafkaSpout to Bolt
>> A, or Bolt A to Bolt B. The reason I need to do this is we're caching
>> Key/Value pairs and then persisting each collection to a SequenceFile once
>> each cache collection reaches a certain size.
>>
>> I currently have fields grouping for Bolt A to Bolt B, and would like to
>> maintain this approach as the KafkaSpout has 10 partitions as input, any
>> one of which can develop hotspots. Consequently, I am using the Shuffle
>> grouping for the KafkaSpout to Bolt A step to evenly distribute the tuples
>> coming from Kafka to Bolt A. I then use Fields grouping to ensure the same
>> Key/Value pairs go tot the same Bolt B instance. However, as you point out,
>> the network overhead is really throttling throughput, so it's worth a shot
>> to do the fields grouping at the KafkaSpout-Bolt A step and then do a Local
>> grouping at the Bolt A-Bolt B step.
>>
>> Since I have one worker per node as I mentioned in a response to Javier's
>> earlier post, I will need to up the number of Bolt B instances to match
>> Bolt A to ensure Local shuffling for all tuples. Not totally sure how much
>> adding 300 additional executors for 120 workers will negatively impact
>> performance, but I am guessing that it would be more than offset by the
>> greatly decreased network and disk I/O, since Bolt A-Bolt B will be all
>> intra-Java process messaging via IMAX.
>>
>> Thanks again for your excellent thoughts!
>>
>> --John
>>
>> On Fri, Aug 14, 2015 at 9:48 PM, Kishore Senji <[email protected]> wrote:
>>
>>> 7 million/minute with 200 instances, implies a latency
>>> of 200*1000/(7*1000^2/60) ~ 1.71 ms for bolt A. For throughput
>>> calculations, I would normalize it and visualize a single instance of the
>>> bolt running. So for 1 both instance, the latency would be 1.71/200.
>>>
>>> By adding bolt B with 50 instances, the throughput came to 900000/min,
>>> which means
>>>
>>> 900000= 1000*60 / ((1.71/200)+(B/50)), solving this B would be around
>>> 2.9 ms. Even though you mentioned there is nothing in the Bolt B impl, the
>>> overall latency is 2.9ms, this means there is a lot of overhead in the
>>> network. Are you using fields grouping or because of the number of
>>> instances the tuples are sent over the network. Try using local grouping.
>>>
>>> Bumping bolt B to 100 instances, you got 1.1 million/minute. But this
>>> equation should give us: 1000*60 / ((1.71/200)+(2.9/100)) around 1.6
>>> million/min. This means adding more instances added more overhead, could be
>>> that more number of tuples are going over the network (because they went in
>>> to different worker processes). Calculating the bolt b latency for 1.1
>>> million gives = 4.6 ms of overhead for Bolt B.
>>>
>>> Adding more instances can be costly because of the IPC. The key would be
>>> to get more local shuffling. Check if you have any failures in the overall
>>> system and also check the Storm UI which will show the bottleneck in your
>>> topology (the capacity of the bolts and the latencies etc)
>>>
>>>
>>> On Fri, Aug 14, 2015 at 2:43 PM Javier Gonzalez <[email protected]>
>>> wrote:
>>>
>>>> Do  you actually have 170 machines? Try sticking to one worker per
>>>> machine (tweak memory parameters in storm.yaml), makes inter bolt traffic
>>>> much faster.
>>>> On Aug 14, 2015 5:28 PM, "John Yost" <[email protected]> wrote:
>>>>
>>>>> Hey Javier,
>>>>>
>>>>> Cool, thanks for your response!  I have 50 workers for 200 Bolt A/5
>>>>> Bolt B and 120 workers for 400 Bolt A/100 Bolt B (this latter config is
>>>>> optimal, but cluster resources make it tricky to actually launch this).
>>>>>
>>>>> I will up the number of Ackers and see if that helps. If not, then I
>>>>> will try to vary the number of B bolts beyond 100.
>>>>>
>>>>> Thanks Again!
>>>>>
>>>>> --John
>>>>>
>>>>> On Fri, Aug 14, 2015 at 2:59 PM, Javier Gonzalez <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> You will have a detrimental effect to wiring in boltB, even if it
>>>>>> does nothing but ack. Every tuple you have processed from A has to travel
>>>>>> to a B bolt, and the ack has to travel back.
>>>>>>
>>>>>> You could try modifying the number of ackers, and playing with the
>>>>>> number of A and B bolts. How many workers do you have for the topology?
>>>>>>
>>>>>> Regards,
>>>>>> JG
>>>>>> On Aug 14, 2015 12:31 PM, "John Yost" <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Everyone,
>>>>>>>
>>>>>>> I have a topology where a highly CPU-intensive bolt (Bolt A)
>>>>>>> requires a much higher degree of parallelism than the bolt it emits 
>>>>>>> tuples
>>>>>>> to (Bolt B) (200 Bolt A executors vs <= 100 Bolt B executors).
>>>>>>>
>>>>>>> I find that the throughput, as measured in number of tuples acked,
>>>>>>> goes from 7 million/minute to ~ 1 million/minute when I wire in Bolt
>>>>>>> B--even if all of the logic within the Bolt B execute method is disabled
>>>>>>> and the Bolt B is therefore simply acking the input tuples from Bolt A. 
>>>>>>> In
>>>>>>> addition, I find that, going from 50 to 100 Bolt B executors causes the
>>>>>>> throughput to go from 900K/minute to ~ 1.1 million/minute.
>>>>>>>
>>>>>>> Is the fact that I am going from 200 bolt instances to 100 or less
>>>>>>> the problem?   I've already experimented with executor.send.buffer.size 
>>>>>>> and
>>>>>>> executor.receive.buffer.size, which helped drive throughput from 800K to
>>>>>>> 900K. I will try topology.transfer.buffer.size, perhaps set that higher 
>>>>>>> to
>>>>>>> 2048. Any other ideas?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> --John
>>>>>>>
>>>>>>>
>>>>>
>>
>

Reply via email to