Repository: incubator-beam Updated Branches: refs/heads/master 6721bd584 -> 93a5d390b
Change PAssert's dummy inputs from (Void) null to integer 0 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9a5503db Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9a5503db Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9a5503db Branch: refs/heads/master Commit: 9a5503db954eccfe0215ee473417bfafb495b61e Parents: 6721bd5 Author: Kenneth Knowles <k...@google.com> Authored: Fri May 6 11:19:33 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Thu May 12 17:45:43 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/testing/PAssert.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a5503db/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index 1265acd..c2cd598 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -28,7 +28,7 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.MapCoder; -import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Aggregator; @@ -593,7 +593,7 @@ public class PAssert { final PCollectionView<ActualT> actual = input.apply("CreateActual", createActual); input - .apply(Create.<Void>of((Void) null).withCoder(VoidCoder.of())) + .apply(Create.of(0).withCoder(VarIntCoder.of())) .apply(ParDo.named("RunChecks").withSideInputs(actual) .of(new CheckerDoFn<>(checkerFn, actual))); @@ -604,8 +604,11 @@ public class PAssert { /** * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of * a {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing. + * + * <p>The input is ignored, but is {@link Integer} to be usable on runners that do not support + * null values. */ - private static class CheckerDoFn<ActualT> extends DoFn<Void, Void> { + private static class CheckerDoFn<ActualT> extends DoFn<Integer, Void> { private final SerializableFunction<ActualT, Void> checkerFn; private final Aggregator<Integer, Integer> success = createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn()); @@ -669,14 +672,17 @@ public class PAssert { final PCollectionView<ExpectedT> expected = input.apply("CreateExpected", createExpected); input - .apply(Create.<Void>of((Void) null).withCoder(VoidCoder.of())) - .apply(ParDo.named("RunChecks").withSideInputs(actual, expected) + .apply(Create.of(0).withCoder(VarIntCoder.of())) + .apply("RunChecks", ParDo.withSideInputs(actual, expected) .of(new CheckerDoFn<>(relation, actual, expected))); return PDone.in(input.getPipeline()); } - private static class CheckerDoFn<ActualT, ExpectedT> extends DoFn<Void, Void> { + /** + * Input is ignored, but is {@link Integer} for runners that do not support null values. + */ + private static class CheckerDoFn<ActualT, ExpectedT> extends DoFn<Integer, Void> { private final Aggregator<Integer, Integer> success = createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn()); private final Aggregator<Integer, Integer> failure =