Use CountingSource in ForceStreamingTest

Removes the requirement to have a FakeUnboundedSource, plus the read is
fully specified.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b053be46
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b053be46
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b053be46

Branch: refs/heads/master
Commit: b053be460c2e6ff486faed1b1a0996af63f93db2
Parents: 57d9bbd
Author: Thomas Groh <tg...@google.com>
Authored: Tue Dec 20 14:23:21 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Dec 21 15:26:17 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/spark/ForceStreamingTest.java  | 39 +-------------------
 1 file changed, 2 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b053be46/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
index eb17eea..1b2ff08 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
@@ -23,10 +23,9 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import java.io.IOException;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
+import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.TransformHierarchy;
@@ -59,7 +58,7 @@ public class ForceStreamingTest {
     // apply the BoundedReadFromUnboundedSource.
     @SuppressWarnings("unchecked")
     BoundedReadFromUnboundedSource boundedRead =
-        Read.from(new FakeUnboundedSource()).withMaxNumRecords(-1);
+        Read.from(CountingSource.unbounded()).withMaxNumRecords(-1);
     //noinspection unchecked
     pipeline.apply(boundedRead);
 
@@ -86,38 +85,4 @@ public class ForceStreamingTest {
     }
 
   }
-
-  /**
-   * A fake {@link UnboundedSource} to satisfy the compiler.
-   */
-  private static class FakeUnboundedSource extends UnboundedSource {
-
-    @Override
-    public List<? extends UnboundedSource> generateInitialSplits(
-        int desiredNumSplits,
-        PipelineOptions options) throws Exception {
-      return null;
-    }
-
-    @Override
-    public UnboundedReader createReader(
-        PipelineOptions options,
-        CheckpointMark checkpointMark) throws IOException {
-      return null;
-    }
-
-    @Override
-    public Coder getCheckpointMarkCoder() {
-      return null;
-    }
-
-    @Override
-    public void validate() { }
-
-    @Override
-    public Coder getDefaultOutputCoder() {
-      return null;
-    }
-  }
-
 }

Reply via email to