This is an automated email from the ASF dual-hosted git repository. dmvk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 18820bb Create a custom hash paritioner that deals with arrays during combines when used in spark new 32bc6da Merge pull request #8042: [BEAM-6812]: Convert keys to ByteArray in Combine.perKey to make sure hashCode is consistent 18820bb is described below commit 18820bb49d123030a6ba2712692c2b2bb51dac6a Author: Ankit Jhalaria <ajhala...@godaddy.com> AuthorDate: Tue Mar 12 14:16:45 2019 -0700 Create a custom hash paritioner that deals with arrays during combines when used in spark --- .../runners/spark/translation/GroupCombineFunctions.java | 12 ++++++++---- .../beam/runners/spark/translation/TranslationUtils.java | 10 +++++++--- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java index 95ff95a..0ec217d 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java @@ -143,10 +143,10 @@ public class GroupCombineFunctions { // Once Spark provides a way to include keys in the arguments of combine/merge functions, // we won't need to duplicate the keys anymore. // Key has to bw windowed in order to group by window as well. - JavaPairRDD<K, WindowedValue<KV<K, InputT>>> inRddDuplicatedKeyPair = - rdd.mapToPair(TranslationUtils.toPairByKeyInWindowedValue()); + JavaPairRDD<ByteArray, WindowedValue<KV<K, InputT>>> inRddDuplicatedKeyPair = + rdd.mapToPair(TranslationUtils.toPairByKeyInWindowedValue(keyCoder)); - JavaPairRDD<K, SerializableAccumulator<KV<K, AccumT>>> accumulatedResult = + JavaPairRDD<ByteArray, SerializableAccumulator<KV<K, AccumT>>> accumulatedResult = inRddDuplicatedKeyPair.combineByKey( input -> SerializableAccumulator.of(sparkCombineFn.createCombiner(input), iterAccumCoder), @@ -160,7 +160,11 @@ public class GroupCombineFunctions { acc1.getOrDecode(iterAccumCoder), acc2.getOrDecode(iterAccumCoder)), iterAccumCoder)); - return accumulatedResult.mapToPair(i -> new Tuple2<>(i._1, i._2.getOrDecode(iterAccumCoder))); + return accumulatedResult.mapToPair( + i -> + new Tuple2<>( + CoderHelpers.fromByteArray(i._1.getValue(), keyCoder), + i._2.getOrDecode(iterAccumCoder))); } /** An implementation of {@link Reshuffle} for the Spark runner. */ diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java index 8186a87..35ac89a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java @@ -27,6 +27,7 @@ import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.runners.spark.coders.CoderHelpers; +import org.apache.beam.runners.spark.util.ByteArray; import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; @@ -154,9 +155,12 @@ public final class TranslationUtils { /** Extract key from a {@link WindowedValue} {@link KV} into a pair. */ public static <K, V> - PairFunction<WindowedValue<KV<K, V>>, K, WindowedValue<KV<K, V>>> - toPairByKeyInWindowedValue() { - return windowedKv -> new Tuple2<>(windowedKv.getValue().getKey(), windowedKv); + PairFunction<WindowedValue<KV<K, V>>, ByteArray, WindowedValue<KV<K, V>>> + toPairByKeyInWindowedValue(final Coder<K> keyCoder) { + return windowedKv -> + new Tuple2<>( + new ByteArray(CoderHelpers.toByteArray(windowedKv.getValue().getKey(), keyCoder)), + windowedKv); } /** Extract window from a {@link KV} with {@link WindowedValue} value. */