Repository: incubator-beam Updated Branches: refs/heads/master 9ff426964 -> 0bb4f9c1e
Add display data to ParDo transforms Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/77a77c9d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/77a77c9d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/77a77c9d Branch: refs/heads/master Commit: 77a77c9d159d7f9b1e6f645e54f0a4de86180bfe Parents: 9ff4269 Author: Scott Wegner <sweg...@google.com> Authored: Tue Apr 12 10:08:37 2016 -0700 Committer: bchambers <bchamb...@google.com> Committed: Fri Apr 15 12:12:44 2016 -0700 ---------------------------------------------------------------------- .../runners/DataflowPipelineTranslatorTest.java | 94 +++++++++++--------- .../runners/inprocess/ForwardingPTransform.java | 6 ++ .../beam/sdk/transforms/DoFnReflector.java | 6 ++ .../beam/sdk/transforms/DoFnWithContext.java | 14 ++- .../org/apache/beam/sdk/transforms/Filter.java | 27 ++++++ .../apache/beam/sdk/transforms/GroupByKey.java | 9 ++ .../transforms/IntraBundleParallelization.java | 9 ++ .../org/apache/beam/sdk/transforms/ParDo.java | 62 ++++++++++--- .../apache/beam/sdk/transforms/Partition.java | 13 +++ .../inprocess/ForwardingPTransformTest.java | 10 +++ .../sdk/transforms/DoFnWithContextTest.java | 11 +++ .../apache/beam/sdk/transforms/FilterTest.java | 20 +++++ .../beam/sdk/transforms/GroupByKeyTest.java | 15 ++++ .../IntraBundleParallelizationTest.java | 26 ++++++ .../apache/beam/sdk/transforms/ParDoTest.java | 74 ++++++++++++--- .../beam/sdk/transforms/PartitionTest.java | 13 +++ 16 files changed, 341 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java index 7a3caa6..0d58601 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/runners/DataflowPipelineTranslatorTest.java @@ -72,9 +72,9 @@ import com.google.api.services.dataflow.model.Step; import com.google.api.services.dataflow.model.WorkerPool; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import org.hamcrest.Matcher; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Rule; @@ -890,10 +890,12 @@ public class DataflowPipelineTranslatorTest implements Serializable { } }; + ParDo.Bound<Integer, Integer> parDo1 = ParDo.of(fn1); + ParDo.Bound<Integer, Integer> parDo2 = ParDo.of(fn2); pipeline .apply(Create.of(1, 2, 3)) - .apply(ParDo.of(fn1)) - .apply(ParDo.of(fn2)); + .apply(parDo1) + .apply(parDo2); Job job = translator @@ -910,43 +912,53 @@ public class DataflowPipelineTranslatorTest implements Serializable { Map<String, Object> parDo2Properties = steps.get(2).getProperties(); assertThat(parDo1Properties, hasKey("display_data")); - - @SuppressWarnings("unchecked") - Collection<Map<String, Object>> fn1displayData = - (Collection<Map<String, Object>>) parDo1Properties.get("display_data"); - @SuppressWarnings("unchecked") - Collection<Map<String, Object>> fn2displayData = - (Collection<Map<String, Object>>) parDo2Properties.get("display_data"); - - @SuppressWarnings("unchecked") - Matcher<Iterable<? extends Map<String, Object>>> fn1expectedData = - Matchers.<Map<String, Object>>containsInAnyOrder( - ImmutableMap.<String, Object>builder() - .put("namespace", fn1.getClass().getName()) - .put("key", "foo") - .put("type", "STRING") - .put("value", "bar") - .build(), - ImmutableMap.<String, Object>builder() - .put("namespace", fn1.getClass().getName()) - .put("key", "foo2") - .put("type", "JAVA_CLASS") - .put("value", DataflowPipelineTranslatorTest.class.getName()) - .put("shortValue", DataflowPipelineTranslatorTest.class.getSimpleName()) - .put("label", "Test Class") - .put("linkUrl", "http://www.google.com") - .build()); - - @SuppressWarnings("unchecked") - Matcher<Iterable<? extends Map<String, Object>>> fn2expectedData = - Matchers.<Map<String, Object>>contains( - ImmutableMap.<String, Object>builder() - .put("namespace", fn2.getClass().getName()) - .put("key", "foo3") - .put("type", "INTEGER") - .put("value", 1234L) - .build()); - assertThat(fn1displayData, fn1expectedData); - assertThat(fn2displayData, fn2expectedData); + Collection<Map<String, String>> fn1displayData = + (Collection<Map<String, String>>) parDo1Properties.get("display_data"); + Collection<Map<String, String>> fn2displayData = + (Collection<Map<String, String>>) parDo2Properties.get("display_data"); + + ImmutableSet<ImmutableMap<String, Object>> expectedFn1DisplayData = ImmutableSet.of( + ImmutableMap.<String, Object>builder() + .put("key", "foo") + .put("type", "STRING") + .put("value", "bar") + .put("namespace", fn1.getClass().getName()) + .build(), + ImmutableMap.<String, Object>builder() + .put("key", "fn") + .put("type", "JAVA_CLASS") + .put("value", fn1.getClass().getName()) + .put("shortValue", fn1.getClass().getSimpleName()) + .put("namespace", parDo1.getClass().getName()) + .build(), + ImmutableMap.<String, Object>builder() + .put("key", "foo2") + .put("type", "JAVA_CLASS") + .put("value", DataflowPipelineTranslatorTest.class.getName()) + .put("shortValue", DataflowPipelineTranslatorTest.class.getSimpleName()) + .put("namespace", fn1.getClass().getName()) + .put("label", "Test Class") + .put("linkUrl", "http://www.google.com") + .build() + ); + + ImmutableSet<ImmutableMap<String, Object>> expectedFn2DisplayData = ImmutableSet.of( + ImmutableMap.<String, Object>builder() + .put("key", "fn") + .put("type", "JAVA_CLASS") + .put("value", fn2.getClass().getName()) + .put("shortValue", fn2.getClass().getSimpleName()) + .put("namespace", parDo2.getClass().getName()) + .build(), + ImmutableMap.<String, Object>builder() + .put("key", "foo3") + .put("type", "INTEGER") + .put("value", 1234L) + .put("namespace", fn2.getClass().getName()) + .build() + ); + + assertEquals(expectedFn1DisplayData, ImmutableSet.copyOf(fn1displayData)); + assertEquals(expectedFn2DisplayData, ImmutableSet.copyOf(fn2displayData)); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransform.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransform.java index 7833d42..85aa1c4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransform.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.runners.inprocess; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.TypedPValue; @@ -53,4 +54,9 @@ public abstract class ForwardingPTransform<InputT extends PInput, OutputT extend TypedPValue<T> output) throws CannotProvideCoderException { return delegate().getDefaultOutputCoder(input, output); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + delegate().populateDisplayData(builder); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java index 08c4391..bbc0220 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java @@ -22,6 +22,7 @@ import org.apache.beam.sdk.transforms.DoFnWithContext.ExtraContextFactory; import org.apache.beam.sdk.transforms.DoFnWithContext.FinishBundle; import org.apache.beam.sdk.transforms.DoFnWithContext.ProcessElement; import org.apache.beam.sdk.transforms.DoFnWithContext.StartBundle; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.UserCodeException; @@ -653,6 +654,11 @@ public abstract class DoFnReflector { return fn.getOutputTypeDescriptor(); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + fn.populateDisplayData(builder); + } + private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java index 835730c..7143626 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java @@ -25,6 +25,8 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn.DelegatingAggregator; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowingInternals; @@ -82,7 +84,7 @@ import java.util.Map; * @param <OutputT> the type of the (main) output elements */ @Experimental -public abstract class DoFnWithContext<InputT, OutputT> implements Serializable { +public abstract class DoFnWithContext<InputT, OutputT> implements Serializable, HasDisplayData { /** Information accessible to all methods in this {@code DoFnWithContext}. */ public abstract class Context { @@ -414,4 +416,14 @@ public abstract class DoFnWithContext<InputT, OutputT> implements Serializable { void prepareForProcessing() { aggregatorsAreFinal = true; } + + /** + * {@inheritDoc} + * + * <p>By default, does not register any display data. Implementors may override this method + * to provide their own display metadata. + */ + @Override + public void populateDisplayData(DisplayData.Builder builder) { + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java index 547254d..0e5e4a6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.transforms; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollection; /** @@ -99,9 +100,15 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> { c.output(c.element()); } } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + Filter.populateDisplayData(builder, String.format("x < %s", value)); + } }); } + /** * Returns a {@code PTransform} that takes an input * {@code PCollection<T>} and returns a {@code PCollection<T>} with @@ -131,6 +138,11 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> { c.output(c.element()); } } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + Filter.populateDisplayData(builder, String.format("x > %s", value)); + } }); } @@ -163,6 +175,11 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> { c.output(c.element()); } } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + Filter.populateDisplayData(builder, String.format("x ⤠%s", value)); + } }); } @@ -195,6 +212,11 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> { c.output(c.element()); } } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + Filter.populateDisplayData(builder, String.format("x ⥠%s", value)); + } }); } @@ -232,4 +254,9 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> { protected Coder<T> getDefaultOutputCoder(PCollection<T> input) { return input.getCoder(); } + + private static void populateDisplayData( + DisplayData.Builder builder, String predicateDescription) { + builder.add("predicate", predicateDescription); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java index 42c1f78..1b3c454 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java @@ -21,6 +21,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.InvalidWindows; @@ -273,4 +274,12 @@ public class GroupByKey<K, V> public static <K, V> KvCoder<K, Iterable<V>> getOutputKvCoder(Coder<KV<K, V>> inputCoder) { return KvCoder.of(getKeyCoder(inputCoder), getOutputValueCoder(inputCoder)); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + if (fewKeys) { + builder.add("fewKeys", true); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java index c66aa8d..1b91562 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms; import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowingInternals; @@ -172,6 +173,14 @@ public class IntraBundleParallelization { return input.apply( ParDo.of(new MultiThreadedIntraBundleProcessingDoFn<>(doFn, maxParallelism))); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("maxParallelism", maxParallelism) + .add("fn", doFn.getClass()) + .include(doFn); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index b448c26..d266155 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.runners.DirectPipelineRunner; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.DirectModeExecutionContext; @@ -556,7 +557,12 @@ public class ParDo { * properties can be set on it first. */ public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { - return new Unbound().of(fn); + return of(fn, fn.getClass()); + } + + private static <InputT, OutputT> Bound<InputT, OutputT> of( + DoFn<InputT, OutputT> fn, Class<?> fnClass) { + return new Unbound().of(fn, fnClass); } private static <InputT, OutputT> DoFn<InputT, OutputT> @@ -579,7 +585,7 @@ public class ParDo { */ @Experimental public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFnWithContext<InputT, OutputT> fn) { - return of(adapt(fn)); + return of(adapt(fn), fn.getClass()); } /** @@ -666,9 +672,15 @@ public class ParDo { * still be specified. */ public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { - return new Bound<>(name, sideInputs, fn); + return of(fn, fn.getClass()); + } + + private <InputT, OutputT> Bound<InputT, OutputT> of( + DoFn<InputT, OutputT> fn, Class<?> fnClass) { + return new Bound<>(name, sideInputs, fn, fnClass); } + /** * Returns a new {@link ParDo} {@link PTransform} that's like this * transform but which will invoke the given {@link DoFnWithContext} @@ -678,7 +690,7 @@ public class ParDo { * still be specified. */ public <InputT, OutputT> Bound<InputT, OutputT> of(DoFnWithContext<InputT, OutputT> fn) { - return of(adapt(fn)); + return of(adapt(fn), fn.getClass()); } } @@ -699,13 +711,16 @@ public class ParDo { // Inherits name. private final List<PCollectionView<?>> sideInputs; private final DoFn<InputT, OutputT> fn; + private final Class<?> fnClass; Bound(String name, List<PCollectionView<?>> sideInputs, - DoFn<InputT, OutputT> fn) { + DoFn<InputT, OutputT> fn, + Class<?> fnClass) { super(name); this.sideInputs = sideInputs; this.fn = SerializableUtils.clone(fn); + this.fnClass = fnClass; } /** @@ -716,7 +731,7 @@ public class ParDo { * <p>See the discussion of Naming above for more explanation. */ public Bound<InputT, OutputT> named(String name) { - return new Bound<>(name, sideInputs, fn); + return new Bound<>(name, sideInputs, fn, fnClass); } /** @@ -744,7 +759,7 @@ public class ParDo { ImmutableList.Builder<PCollectionView<?>> builder = ImmutableList.builder(); builder.addAll(this.sideInputs); builder.addAll(sideInputs); - return new Bound<>(name, builder.build(), fn); + return new Bound<>(name, builder.build(), fn, fnClass); } /** @@ -758,7 +773,7 @@ public class ParDo { public BoundMulti<InputT, OutputT> withOutputTags(TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags) { return new BoundMulti<>( - name, sideInputs, mainOutputTag, sideOutputTags, fn); + name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass); } @Override @@ -799,7 +814,7 @@ public class ParDo { */ @Override public void populateDisplayData(Builder builder) { - builder.include(fn); + ParDo.populateDisplayData(builder, fn, fnClass); } public DoFn<InputT, OutputT> getFn() { @@ -891,8 +906,12 @@ public class ParDo { * more properties can still be specified. */ public <InputT> BoundMulti<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { + return of(fn, fn.getClass()); + } + + public <InputT> BoundMulti<InputT, OutputT> of(DoFn<InputT, OutputT> fn, Class<?> fnClass) { return new BoundMulti<>( - name, sideInputs, mainOutputTag, sideOutputTags, fn); + name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass); } /** @@ -904,7 +923,7 @@ public class ParDo { * more properties can still be specified. */ public <InputT> BoundMulti<InputT, OutputT> of(DoFnWithContext<InputT, OutputT> fn) { - return of(adapt(fn)); + return of(adapt(fn), fn.getClass()); } } @@ -926,17 +945,20 @@ public class ParDo { private final TupleTag<OutputT> mainOutputTag; private final TupleTagList sideOutputTags; private final DoFn<InputT, OutputT> fn; + private final Class<?> fnClass; BoundMulti(String name, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags, - DoFn<InputT, OutputT> fn) { + DoFn<InputT, OutputT> fn, + Class<?> fnClass) { super(name); this.sideInputs = sideInputs; this.mainOutputTag = mainOutputTag; this.sideOutputTags = sideOutputTags; this.fn = SerializableUtils.clone(fn); + this.fnClass = fnClass; } /** @@ -948,7 +970,7 @@ public class ParDo { */ public BoundMulti<InputT, OutputT> named(String name) { return new BoundMulti<>( - name, sideInputs, mainOutputTag, sideOutputTags, fn); + name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass); } /** @@ -979,7 +1001,7 @@ public class ParDo { builder.addAll(sideInputs); return new BoundMulti<>( name, builder.build(), - mainOutputTag, sideOutputTags, fn); + mainOutputTag, sideOutputTags, fn, fnClass); } @@ -1027,6 +1049,11 @@ public class ParDo { } } + @Override + public void populateDisplayData(Builder builder) { + ParDo.populateDisplayData(builder, fn, fnClass); + } + public DoFn<InputT, OutputT> getFn() { return fn; } @@ -1233,6 +1260,13 @@ public class ParDo { return DirectSideInputReader.of(sideInputValues); } + private static void populateDisplayData( + DisplayData.Builder builder, DoFn<?, ?> fn, Class<?> fnClass) { + builder + .include(fn, fnClass) + .add("fn", fnClass); + } + /** * A {@code DoFnRunner.OutputManager} that provides facilities for checking output values for * illegal mutations. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java index 47d49f7..5366fd0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.transforms; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; @@ -121,6 +122,11 @@ public class Partition<T> extends PTransform<PCollection<T>, PCollectionList<T>> return pcs; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.include(partitionDoFn); + } + private final transient PartitionDoFn<T> partitionDoFn; private Partition(PartitionDoFn<T> partitionDoFn) { @@ -170,5 +176,12 @@ public class Partition<T> extends PTransform<PCollection<T>, PCollectionList<T>> partition + " not in [0.." + numPartitions + ")"); } } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("numPartitions", numPartitions) + .add("partitionFn", partitionFn.getClass()); + } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransformTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransformTest.java index ca3753c..366dfc5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransformTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ForwardingPTransformTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.when; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollection; import org.junit.Before; @@ -99,4 +100,13 @@ public class ForwardingPTransformTest { when(delegate.getDefaultOutputCoder(input, output)).thenReturn(outputCoder); assertThat(forwarding.getDefaultOutputCoder(input, output), equalTo(outputCoder)); } + + @Test + public void populateDisplayDataDelegates() { + DisplayData.Builder builder = mock(DisplayData.Builder.class); + doThrow(RuntimeException.class).when(delegate).populateDisplayData(builder); + + thrown.expect(RuntimeException.class); + forwarding.populateDisplayData(builder); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java index 40c80b7..391081a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java @@ -18,8 +18,10 @@ package org.apache.beam.sdk.transforms; import static org.hamcrest.CoreMatchers.isA; +import static org.hamcrest.Matchers.empty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -29,6 +31,7 @@ import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Max.MaxIntegerFn; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.junit.Rule; import org.junit.Test; @@ -158,6 +161,14 @@ public class DoFnWithContextTest implements Serializable { } @Test + public void testDefaultPopulateDisplayDataImplementation() { + DoFnWithContext<String, String> fn = new DoFnWithContext<String, String>() { + }; + DisplayData displayData = DisplayData.from(fn); + assertThat(displayData.items(), empty()); + } + + @Test public void testCreateAggregatorInStartBundleThrows() { TestPipeline p = createTestPipeline(new DoFnWithContext<String, String>() { @StartBundle http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java index f15f48e..f58ba17 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java @@ -17,9 +17,14 @@ */ package org.apache.beam.sdk.transforms; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + +import static org.hamcrest.MatcherAssert.assertThat; + import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollection; import org.junit.Test; @@ -158,4 +163,19 @@ public class FilterTest implements Serializable { PAssert.that(output).containsInAnyOrder(5, 6, 7); p.run(); } + + @Test + public void testDisplayData() { + ParDo.Bound<Integer, Integer> lessThan = Filter.lessThan(123); + assertThat(DisplayData.from(lessThan), hasDisplayItem("predicate", "x < 123")); + + ParDo.Bound<Integer, Integer> lessThanOrEqual = Filter.lessThanEq(234); + assertThat(DisplayData.from(lessThanOrEqual), hasDisplayItem("predicate", "x ⤠234")); + + ParDo.Bound<Integer, Integer> greaterThan = Filter.greaterThan(345); + assertThat(DisplayData.from(greaterThan), hasDisplayItem("predicate", "x > 345")); + + ParDo.Bound<Integer, Integer> greaterThanOrEqual = Filter.greaterThanEq(456); + assertThat(DisplayData.from(greaterThanOrEqual), hasDisplayItem("predicate", "x ⥠456")); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 1a7b0b7..b84845a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -18,7 +18,9 @@ package org.apache.beam.sdk.transforms; import static org.apache.beam.sdk.TestUtils.KvMatcher.isKv; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.hamcrest.core.Is.is; @@ -35,6 +37,7 @@ import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.InvalidWindows; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; @@ -375,4 +378,16 @@ public class GroupByKeyTest { public void testGroupByKeyGetName() { Assert.assertEquals("GroupByKey", GroupByKey.<String, Integer>create().getName()); } + + @Test + public void testDisplayData() { + GroupByKey<String, String> groupByKey = GroupByKey.create(); + GroupByKey<String, String> groupByFewKeys = GroupByKey.create(true); + + DisplayData gbkDisplayData = DisplayData.from(groupByKey); + DisplayData fewKeysDisplayData = DisplayData.from(groupByFewKeys); + + assertThat(gbkDisplayData.items(), empty()); + assertThat(fewKeysDisplayData, hasDisplayItem("fewKeys", true)); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java index dd01919..80f6188 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java @@ -18,6 +18,8 @@ package org.apache.beam.sdk.transforms; import static org.apache.beam.sdk.testing.SystemNanoTimeSleeper.sleepMillis; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includes; import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.containsString; @@ -31,6 +33,7 @@ import static org.junit.Assert.fail; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.junit.Before; import org.junit.Test; @@ -215,6 +218,29 @@ public class IntraBundleParallelizationTest { IntraBundleParallelization.of(new DelayFn<Integer>()).withMaxParallelism(1).getName()); } + @Test + public void testDisplayData() { + DoFn<String, String> fn = new DoFn<String, String>() { + @Override + public void processElement(ProcessContext c) throws Exception { + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("foo", "bar"); + } + }; + + PTransform<?, ?> transform = IntraBundleParallelization + .withMaxParallelism(1234) + .of(fn); + + DisplayData displayData = DisplayData.from(transform); + assertThat(displayData, includes(fn)); + assertThat(displayData, hasDisplayItem("fn", fn.getClass())); + assertThat(displayData, hasDisplayItem("maxParallelism", 1234)); + } + /** * Runs the provided doFn inside of an {@link IntraBundleParallelization} transform. * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 5724dd6..44154e6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -19,10 +19,13 @@ package org.apache.beam.sdk.transforms; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includes; import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray; import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString; import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.isA; import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; @@ -46,6 +49,7 @@ import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.ParDo.Bound; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; +import org.apache.beam.sdk.transforms.display.DisplayDataMatchers; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.IllegalMutationException; @@ -1525,20 +1529,66 @@ public class ParDoTest implements Serializable { } @Test - public void testIncludesDoFnDisplayData() { - Bound<String, String> parDo = - ParDo.of( - new DoFn<String, String>() { - @Override - public void processElement(ProcessContext c) {} + public void testDoFnDisplayData() { + DoFn<String, String> fn = new DoFn<String, String>() { + @Override + public void processElement(ProcessContext c) { + } - @Override - public void populateDisplayData(Builder builder) { - builder.add("foo", "bar"); - } - }); + @Override + public void populateDisplayData(Builder builder) { + builder.add("doFnMetadata", "bar"); + } + }; + + Bound<String, String> parDo = ParDo.of(fn); + + DisplayData displayData = DisplayData.from(parDo); + assertThat(displayData, hasDisplayItem(allOf( + hasKey("fn"), + hasType(DisplayData.Type.JAVA_CLASS), + DisplayDataMatchers.hasValue(fn.getClass().getName())))); + + assertThat(displayData, includes(fn)); + } + + @Test + public void testDoFnWithContextDisplayData() { + DoFnWithContext<String, String> fn = new DoFnWithContext<String, String>() { + @ProcessElement + public void proccessElement(ProcessContext c) {} + + @Override + public void populateDisplayData(Builder builder) { + builder.add("fnMetadata", "foobar"); + } + }; + + Bound<String, String> parDo = ParDo.of(fn); + + DisplayData displayData = DisplayData.from(parDo); + assertThat(displayData, includes(fn)); + assertThat(displayData, hasDisplayItem("fn", fn.getClass())); + } + + @Test + public void testWithOutputTagsDisplayData() { + DoFnWithContext<String, String> fn = new DoFnWithContext<String, String>() { + @ProcessElement + public void proccessElement(ProcessContext c) {} + + @Override + public void populateDisplayData(Builder builder) { + builder.add("fnMetadata", "foobar"); + } + }; + + ParDo.BoundMulti parDo = ParDo + .withOutputTags(new TupleTag(), TupleTagList.empty()) + .of(fn); DisplayData displayData = DisplayData.from(parDo); - assertThat(displayData, hasDisplayItem(hasKey("foo"))); + assertThat(displayData, includes(fn)); + assertThat(displayData, hasDisplayItem("fn", fn.getClass())); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77a77c9d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java index dba6c16..608da0f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java @@ -17,6 +17,9 @@ */ package org.apache.beam.sdk.transforms; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -25,6 +28,7 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Partition.PartitionFn; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; @@ -138,4 +142,13 @@ public class PartitionTest implements Serializable { public void testPartitionGetName() { assertEquals("Partition", Partition.of(3, new ModFn()).getName()); } + + @Test + public void testDisplayData() { + Partition<?> partition = Partition.of(123, new IdentityFn()); + DisplayData displayData = DisplayData.from(partition); + + assertThat(displayData, hasDisplayItem("numPartitions", 123)); + assertThat(displayData, hasDisplayItem("partitionFn", IdentityFn.class)); + } }