This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 7ff0a262f2ae4c57ab5e7f5e213ab17317f70a69
Author: Etienne Chauchot <echauc...@apache.org>
AuthorDate: Tue Jan 15 13:24:09 2019 +0100

    Serialize windowedValue to byte[] in source to be able to specify a binary 
dataset schema and deserialize windowedValue from Row to get a 
dataset<WindowedValue>
---
 .../translation/batch/DatasetSourceBatch.java      | 29 ++++++++++++++++------
 .../batch/ReadSourceTranslatorBatch.java           |  9 ++++++-
 2 files changed, 30 insertions(+), 8 deletions(-)

diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index d9e1722..c4cfeaf 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -20,6 +20,7 @@ package 
org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 import static com.google.common.base.Preconditions.checkArgument;
 import static scala.collection.JavaConversions.asScalaBuffer;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -30,6 +31,7 @@ import 
org.apache.beam.runners.core.serialization.Base64Serializer;
 import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
@@ -93,10 +95,11 @@ public class DatasetSourceBatch implements DataSourceV2, 
ReadSupport {
 
     @Override
     public StructType readSchema() {
+      // TODO: find a way to extend schema with a WindowedValue schema
       StructField[] array = new StructField[1];
-      StructField dummyStructField = StructField
-          .apply("dummyStructField", DataTypes.NullType, true, 
Metadata.empty());
-      array[0] = dummyStructField;
+      StructField binaryStructField = StructField
+          .apply("binaryStructField", DataTypes.BinaryType, true, 
Metadata.empty());
+      array[0] = binaryStructField;
       return new StructType(array);
     }
 
@@ -135,11 +138,13 @@ public class DatasetSourceBatch implements DataSourceV2, 
ReadSupport {
   private static class DatasetPartitionReader<T> implements 
InputPartitionReader<InternalRow> {
     private boolean started;
     private boolean closed;
+    private BoundedSource<T> source;
     private BoundedReader<T> reader;
 
     DatasetPartitionReader(BoundedSource<T> source, 
SerializablePipelineOptions serializablePipelineOptions) {
       this.started = false;
       this.closed = false;
+      this.source = source;
       // reader is not serializable so lazy initialize it
       try {
         reader = source
@@ -162,10 +167,20 @@ public class DatasetSourceBatch implements DataSourceV2, 
ReadSupport {
     @Override
     public InternalRow get() {
       List<Object> list = new ArrayList<>();
-      list.add(
-          WindowedValue.timestampedValueInGlobalWindow(
-              reader.getCurrent(), reader.getCurrentTimestamp()));
-      return InternalRow.apply(asScalaBuffer(list).toList());
+      WindowedValue<T> windowedValue = WindowedValue
+          .timestampedValueInGlobalWindow(reader.getCurrent(), 
reader.getCurrentTimestamp());
+      //serialize the windowedValue to bytes array to comply with dataset 
binary schema
+      WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = 
WindowedValue.FullWindowedValueCoder
+          .of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
+      ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+      try {
+        windowedValueCoder.encode(windowedValue, byteArrayOutputStream);
+        byte[] bytes = byteArrayOutputStream.toByteArray();
+        list.add(bytes);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+return InternalRow.apply(asScalaBuffer(list).toList());
     }
 
     @Override
diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 8810e21..fec0fd3 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import 
org.apache.beam.runners.core.construction.PipelineOptionsSerializationUtils;
 import org.apache.beam.runners.core.construction.ReadTranslation;
@@ -26,6 +27,7 @@ import 
org.apache.beam.runners.spark.structuredstreaming.translation.Translation
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -64,10 +66,15 @@ class ReadSourceTranslatorBatch<T>
         .option(DatasetSourceBatch.PIPELINE_OPTIONS,
             
PipelineOptionsSerializationUtils.serializeToJson(context.getOptions())).load();
 
+    // extract windowedValue from Row
     MapFunction<Row, WindowedValue> func = new MapFunction<Row, 
WindowedValue>() {
       @Override public WindowedValue call(Row value) throws Exception {
         //there is only one value put in each Row by the InputPartitionReader
-        return value.<WindowedValue>getAs(0);
+        byte[] bytes = (byte[]) value.get(0);
+        WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = 
WindowedValue.FullWindowedValueCoder
+            .of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
+        WindowedValue<T> windowedValue = windowedValueCoder.decode(new 
ByteArrayInputStream(bytes));
+        return windowedValue;
       }
     };
     //TODO: is there a better way than using the raw WindowedValue? Can an 
Encoder<WindowedValue<T>>

Reply via email to