Hey Andrey, Excellent thoughts, thanks!
I do need to ack between Bolts 1 and 2, so I cannot disable acking, but that's a good suggestion. Michael Noll's blog post is great and I did try tuning the send/receive buffers and, while this helped a little, the fieldsGrouping ended up much to slow, unfortunately. Your custom stream grouping sounds really cool. I don't think I can use it for my current topology but sure seems like a good idea. Do you have a github repo I could check out the code from. Thanks again for your thoughts and responding to this thread--much appreciated! --John On Mon, Oct 5, 2015 at 8:27 PM, Andrey Yegorov <[email protected]> wrote: > Hi John, > > I guess you are using acks for messages between Bolt1 and Bolt2. If you do > not really need acks, disabling them could help. > in the same situation I did the following: > > 1. read > http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/ > and spent some time tuning storm topology to improve throughput. Also make > sure you have one acker per worker. > > 2. Tuned GC for workers. > > 3. wrote custom stream grouping (GroupLocal(..)) which groups by field but > directs tuple to the bolt in the same worker process to use faster > inter-process data transfer. As result I have to shuffle data once between > machines (kafkaSpout to Bolt1) while the rest of flow works within the same > worker despite being grouped by field name. I still rely on default storm > scheduler to schedule bolts to workers, it schedules tasks evenly enough. > > ---------- > Andrey Yegorov > > On Mon, Oct 5, 2015 at 4:45 PM, Enno Shioji <[email protected]> wrote: > >> If you are doing unnecessary repartitioning (sending tuples to remote >> processes) now and if you can remove/reduce that, that could help a lot. >> That said it's not obvious to me if that's the case here (e.g. is Kafka >> partitioned in a way you could exploit it to reduce repatriating?). >> >> If the above is not the case, it's not clear to me why it should help, >> because it's just the same work being done in a different place. I think it >> won't hurt to try though. >> >> Another thing you could try is to combine Bolt 1 and Bolt 2 to one bolt. >> That's what Trident tries to do by default for optimization. >> >> >> >> >> >> >> On 5 Oct 2015, at 16:38, Javier Gonzalez <[email protected]> wrote: >> >> If you get one bolt2 per worker, it should work as you say. Though I'm >> not completely sure it's *guaranteed* that every mesage will go local. >> >> Regards, >> Javier >> On Oct 5, 2015 10:01 AM, "John Yost" <[email protected]> wrote: >> >>> Hi Javier, >>> >>> I apologize, I don't think I am making myself clear. I am attempting to >>> get all the tuples for a given key sent to the same Bolt 2 executor >>> instance. I previously followed the pattern of using fieldsGrouping on >>> Bolt1 as this is a well-established pattern. However, there are roughly 4 >>> times as many Bolt 1 executors to every Bolt 2 executor, and I was finding >>> the throughput was very low between Bolts 1 and 2. Once I switched to >>> localOrShuffleGrouping between Bolt 1 and Bolt 2, the throughput tripled. I >>> did this based upon advice from this board to do localOrShuffleGrouping for >>> large fan-in patterns like this (great advice, definitely worked great!). >>> >>> Unfortunately, this also means that there is no guarantee that all >>> tuples for a given key will be sent to the same Bolt 2. To hopefully get >>> the best of both worlds, I am thinking I can do the fieldsGrouping between >>> KafkaSpout and Bolt 1, and therefore I get the same effect of all tuples >>> for a given key going to the same Bolt 2. Of course, the key (pun intended) >>> is that there is one Bolt 2 per worker, which will ensure all tuples for >>> the same key will go to the same Bolt 1 which will then forward 'em to Bolt >>> 2. >>> >>> Please confirm if this seems logical and that it should work. I think it >>> should, but I may be missing something. >>> >>> Thanks! :) >>> >>> --John >>> >>> On Mon, Oct 5, 2015 at 9:20 AM, Javier Gonzalez <[email protected]> >>> wrote: >>> >>>> If I'm reading this correctly, I think you're not getting the result >>>> you want - having all tuples with a given key processed in the same bolt2 >>>> instance. >>>> >>>> If you want to have all messages of a given key to be processed in the >>>> same Bolt2, you need to do fields grouping from bolt1 to bolt2. By doing >>>> fields grouping in the spout-bolt1 hop and shuffle/local in the bolt1-bolt2 >>>> hop, you're ensuring that bolt1 instances always see the same key, but is >>>> there any guarantee that the bolt2 you want is the nearest/only local bolt >>>> available to any given instance of bolt1? >>>> >>>> Regards, >>>> Javier >>>> On Oct 5, 2015 7:33 AM, "John Yost" <[email protected]> wrote: >>>> >>>>> Hi Everyone, >>>>> >>>>> I am currently prototyping FieldsGrouping at the KafkaSpout vs Bolt >>>>> level. I am curious as to whether anyone else has tried this and, if so, >>>>> how well this worked. >>>>> >>>>> The reason I am attempting to do FieldsGrouping in the KafkaSpout is >>>>> that I moved from fieldsGrouping to localOrShuffleGrouping between Bolt 1 >>>>> and Bolt 2 in my topology due to a 4 to 1 fan in from Bolt 1 to Bolt 2 >>>>> (for >>>>> example, 200 Bolt 1 executors and 50 Bolt 2 executors) which was >>>>> dramatically slowing throughput. It is still highly preferable to do >>>>> fieldsGrouping one way or another so that I am getting all values for a >>>>> given key to the same Bolt 2 executor, which is the impetus for attempting >>>>> to do fieldsGrouping in the KafkaSpout. >>>>> >>>>> If anyone has any thoughts on this approach, I'd very much like to get >>>>> your thoughts. >>>>> >>>>> Thanks >>>>> >>>>> --John >>>>> >>>> >>> >
