Use CountingSource in ForceStreamingTest Removes the requirement to have a FakeUnboundedSource, plus the read is fully specified.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b053be46 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b053be46 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b053be46 Branch: refs/heads/master Commit: b053be460c2e6ff486faed1b1a0996af63f93db2 Parents: 57d9bbd Author: Thomas Groh <tg...@google.com> Authored: Tue Dec 20 14:23:21 2016 -0800 Committer: Thomas Groh <tg...@google.com> Committed: Wed Dec 21 15:26:17 2016 -0800 ---------------------------------------------------------------------- .../beam/runners/spark/ForceStreamingTest.java | 39 +------------------- 1 file changed, 2 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b053be46/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java index eb17eea..1b2ff08 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java @@ -23,10 +23,9 @@ import static org.hamcrest.MatcherAssert.assertThat; import java.io.IOException; import java.util.List; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource; +import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.TransformHierarchy; @@ -59,7 +58,7 @@ public class ForceStreamingTest { // apply the BoundedReadFromUnboundedSource. @SuppressWarnings("unchecked") BoundedReadFromUnboundedSource boundedRead = - Read.from(new FakeUnboundedSource()).withMaxNumRecords(-1); + Read.from(CountingSource.unbounded()).withMaxNumRecords(-1); //noinspection unchecked pipeline.apply(boundedRead); @@ -86,38 +85,4 @@ public class ForceStreamingTest { } } - - /** - * A fake {@link UnboundedSource} to satisfy the compiler. - */ - private static class FakeUnboundedSource extends UnboundedSource { - - @Override - public List<? extends UnboundedSource> generateInitialSplits( - int desiredNumSplits, - PipelineOptions options) throws Exception { - return null; - } - - @Override - public UnboundedReader createReader( - PipelineOptions options, - CheckpointMark checkpointMark) throws IOException { - return null; - } - - @Override - public Coder getCheckpointMarkCoder() { - return null; - } - - @Override - public void validate() { } - - @Override - public Coder getDefaultOutputCoder() { - return null; - } - } - }