This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 9fad3d438d13326e88f74a08d8370eeed8288935 Author: Etienne Chauchot <echauc...@apache.org> AuthorDate: Fri Jan 11 11:23:30 2019 +0100 Simplify beam reader creation as it created once the source as already been partitioned --- .../translation/batch/DatasetSourceBatch.java | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java index c35f62e..d9e1722 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static scala.collection.JavaConversions.asScalaBuffer; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.List; import org.apache.beam.runners.core.construction.PipelineOptionsSerializationUtils; @@ -63,7 +64,7 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport { } /** This class is mapped to Beam {@link BoundedSource}. */ - private static class DatasetReader<T> implements DataSourceReader { + private static class DatasetReader<T> implements DataSourceReader, Serializable { private int numPartitions; private BoundedSource<T> source; @@ -135,26 +136,22 @@ public class DatasetSourceBatch implements DataSourceV2, ReadSupport { private boolean started; private boolean closed; private BoundedReader<T> reader; - private BoundedSource<T> source; - private SerializablePipelineOptions serializablePipelineOptions; DatasetPartitionReader(BoundedSource<T> source, SerializablePipelineOptions serializablePipelineOptions) { - this.source = source; - this.serializablePipelineOptions = serializablePipelineOptions; this.started = false; this.closed = false; + // reader is not serializable so lazy initialize it + try { + reader = source + .createReader(serializablePipelineOptions.get().as(SparkPipelineOptions.class)); + } catch (IOException e) { + throw new RuntimeException("Error creating BoundedReader ", e); + } } @Override public boolean next() throws IOException { if (!started) { - // reader is not serializable so lazy initialize it - try { - reader = source - .createReader(serializablePipelineOptions.get().as(SparkPipelineOptions.class)); - } catch (IOException e) { - throw new RuntimeException("Error creating BoundedReader ", e); - } started = true; return reader.start(); } else {