+1 to what Gavor said. The hash combine will be part of the upcoming 1.1. release, too.
This could be further amplified by the blocking intermediate results, which have a very simplistic implementation writing out many different files, which can lead to a lot of random I/O. – Ufuk On Tue, Jul 26, 2016 at 11:41 AM, Gábor Gévay <gga...@gmail.com> wrote: > Hello Robert, > >> Is there something I might could do to optimize the grouping? > > You can try to make your `RichGroupReduceFunction` implement the > `GroupCombineFunction` interface, so that Flink can do combining > before the shuffle, which might significantly reduce the network load. > (How much the combiner helps the performance can greatly depend on how > large are your groups on average.) > > Alternatively, if you can reformulate your algorithm to use a `reduce` > instead of a `reduceGroup` that might also improve the performance. > Also, if you are using a `reduce`, then you can try calling > `.setCombineHint(CombineHint.HASH)` after the reduce. (The combine > hint is a relatively new feature, so you need the current master for > this.) > > Best, > Gábor > > > > 2016-07-25 14:06 GMT+02:00 Paschek, Robert <robert.pasc...@tu-berlin.de>: >> Hi Mailing List, >> >> >> >> i actually do some benchmarks with different algorithms. The System has 8 >> nodes and a configured parallelism of 48 - The IBM-Power-1 cluster, if >> somebody from the TU Berlin read this : - ) – and to “simulate” Hadoop >> MapReduce, the execution mode is set to “BATCH_FORCED” >> >> >> >> It is suspicious, that three of the six algorithms had a big gap in runtime >> (5000ms vs. 20000ms) for easy (low dim) tuple. Additionally the algorithms >> in the “upper” group using a groupBy transformation and the algorithms in >> the “lower” group don’t use groupBy. >> >> I attached the plot for better visualization. >> >> >> >> I also checked the logs, especially the time, when the mappers finishing and >> the reducers start _iterating_ - they hardened my speculation. >> >> >> >> So my question is, if it is “normal”, that grouping is so cost-intensive >> that – in my case – the runtime increases by 4 times? >> >> I have data from the same experiments running on a 13 nodes cluster with 26 >> cores with Apache Hadoop MapReduce, where the gap is still present, but >> smaller (50s vs 57s or 55s vs 65s). >> >> >> >> Is there something I might could do to optimize the grouping? Some >> codesnipplets: >> >> >> >> The Job: >> DataSet<?> output = input >> >> .mapPartition(new >> MR_GPMRS_Mapper()).withBroadcastSet(metaData, >> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_MAPPER") >> >> .groupBy(0) >> >> .reduceGroup(new >> MR_GPMRS_Reducer()).returns(input.getType()).withBroadcastSet(metaData, >> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_REDUCER"); >> >> >> >> MR_GPMRS_Mapper(): >> >> public class MR_GPMRS_Mapper <T extends Tuple> extends >> RichMapPartitionFunction<T, Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>, >> BitSet, BitSet>>> >> >> >> >> MR_GPMRS_Reducer(): >> >> public class MR_GPMRS_Reducer <T extends Tuple> extends >> RichGroupReduceFunction<Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>, >> BitSet, BitSet>>, T> >> >> >> >> The Tuple2 has as Payload on position f1 the Tuple3 and on position f0 the >> Integer Key for grouping. >> >> >> >> Any suggestions (or comments, that it is a “normal” behaviour) are welcome : >> - ) >> >> >> >> Thank you in advance! >> >> Robert