Hello Robert,

> Is there something I might could do to optimize the grouping?

You can try to make your `RichGroupReduceFunction` implement the
`GroupCombineFunction` interface, so that Flink can do combining
before the shuffle, which might significantly reduce the network load.
(How much the combiner helps the performance can greatly depend on how
large are your groups on average.)

Alternatively, if you can reformulate your algorithm to use a `reduce`
instead of a `reduceGroup` that might also improve the performance.
Also, if you are using a `reduce`, then you can try calling
`.setCombineHint(CombineHint.HASH)` after the reduce. (The combine
hint is a relatively new feature, so you need the current master for
this.)

Best,
Gábor



2016-07-25 14:06 GMT+02:00 Paschek, Robert <robert.pasc...@tu-berlin.de>:
> Hi Mailing List,
>
>
>
> i actually do some benchmarks with different algorithms. The System has 8
> nodes and a configured parallelism of 48 - The IBM-Power-1 cluster, if
> somebody from the TU Berlin read this : - ) – and to “simulate” Hadoop
> MapReduce, the execution mode is set to “BATCH_FORCED”
>
>
>
> It is suspicious, that three of the six algorithms had a big gap in runtime
> (5000ms vs. 20000ms) for easy (low dim) tuple. Additionally the algorithms
> in the “upper” group using a groupBy transformation and the algorithms in
> the “lower” group don’t use groupBy.
>
> I attached the plot for better visualization.
>
>
>
> I also checked the logs, especially the time, when the mappers finishing and
> the reducers start _iterating_ - they hardened my speculation.
>
>
>
> So my question is, if it is “normal”, that grouping is so cost-intensive
> that – in my case – the runtime increases by 4 times?
>
> I have data from the same experiments running on a 13 nodes cluster with 26
> cores with Apache Hadoop MapReduce, where the gap is still present, but
> smaller (50s vs 57s or 55s vs 65s).
>
>
>
> Is there something I might could do to optimize the grouping? Some
> codesnipplets:
>
>
>
> The Job:
> DataSet<?> output = input
>
>                         .mapPartition(new
> MR_GPMRS_Mapper()).withBroadcastSet(metaData,
> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_MAPPER")
>
>                         .groupBy(0)
>
>                         .reduceGroup(new
> MR_GPMRS_Reducer()).returns(input.getType()).withBroadcastSet(metaData,
> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.getSimpleName()+"_REDUCER");
>
>
>
> MR_GPMRS_Mapper():
>
> public class MR_GPMRS_Mapper <T extends Tuple> extends
> RichMapPartitionFunction<T, Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>,
> BitSet, BitSet>>>
>
>
>
> MR_GPMRS_Reducer():
>
> public class MR_GPMRS_Reducer <T extends Tuple> extends
> RichGroupReduceFunction<Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>,
> BitSet, BitSet>>, T>
>
>
>
> The Tuple2 has as Payload on position f1 the Tuple3 and on position f0 the
> Integer Key for grouping.
>
>
>
> Any suggestions (or comments, that it is a “normal” behaviour) are welcome :
> - )
>
>
>
> Thank you in advance!
>
> Robert

Reply via email to