This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to 
refs/heads/spark-runner_structured-streaming by this push:
     new 3533779  Fix spotlessJava issues
3533779 is described below

commit 353377986e9b0f006772de7639616fa58cca995c
Author: Alexey Romanenko <aromanenko....@gmail.com>
AuthorDate: Fri Dec 28 15:59:34 2018 +0100

    Fix spotlessJava issues
---
 .../translation/TranslationContext.java            | 10 ++---
 .../translation/batch/DatasetSourceBatch.java      |  9 +++--
 .../translation/batch/DatasetSourceMockBatch.java  | 44 ++++++++++++----------
 .../translation/batch/FlattenTranslatorBatch.java  |  2 +-
 .../batch/ReadSourceTranslatorBatch.java           | 18 +++++----
 .../batch/ReadSourceTranslatorMockBatch.java       | 18 +++++----
 .../streaming/DatasetSourceStreaming.java          | 16 +-------
 .../streaming/ReadSourceTranslatorStreaming.java   | 22 +++++------
 .../spark/structuredstreaming/SourceTest.java      | 19 +++++++++-
 9 files changed, 88 insertions(+), 70 deletions(-)

diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index acc49f4..5606886 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -115,12 +115,12 @@ public class TranslationContext {
     }
   }
 
-    public void putDatasetRaw(PValue value, Dataset<WindowedValue> dataset) {
-      if (!datasets.containsKey(value)) {
-        datasets.put(value, dataset);
-        leaves.add(dataset);
-      }
+  public void putDatasetRaw(PValue value, Dataset<WindowedValue> dataset) {
+    if (!datasets.containsKey(value)) {
+      datasets.put(value, dataset);
+      leaves.add(dataset);
     }
+  }
 
   // 
--------------------------------------------------------------------------------------------
   //  PCollections methods
diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
index f4cd885..7726ad7 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java
@@ -50,12 +50,13 @@ public class DatasetSourceBatch<T> implements DataSourceV2, 
ReadSupport {
   private TranslationContext context;
   private BoundedSource<T> source;
 
-
-  @Override public DataSourceReader createReader(DataSourceOptions options) {
+  @Override
+  public DataSourceReader createReader(DataSourceOptions options) {
     this.numPartitions = 
context.getSparkSession().sparkContext().defaultParallelism();
     checkArgument(this.numPartitions > 0, "Number of partitions must be 
greater than zero.");
     this.bundleSize = context.getOptions().getBundleSize();
-    return new DatasetReader();  }
+    return new DatasetReader();
+  }
 
   /** This class can be mapped to Beam {@link BoundedSource}. */
   private class DatasetReader implements DataSourceReader {
@@ -106,7 +107,7 @@ public class DatasetSourceBatch<T> implements DataSourceV2, 
ReadSupport {
     }
   }
 
-  /** This class can be mapped to Beam {@link BoundedReader} */
+  /** This class can be mapped to Beam {@link BoundedReader}. */
   private class DatasetPartitionReader implements 
InputPartitionReader<InternalRow> {
 
     BoundedReader<T> reader;
diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceMockBatch.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceMockBatch.java
index b616a6f..5485257 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceMockBatch.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceMockBatch.java
@@ -34,58 +34,64 @@ import 
org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
 import org.apache.spark.sql.types.StructType;
 import org.joda.time.Instant;
 
-/**
- * This is a mock source that gives values between 0 and 999.
- */
+/** This is a mock source that gives values between 0 and 999. */
 public class DatasetSourceMockBatch implements DataSourceV2, ReadSupport {
 
-  @Override public DataSourceReader createReader(DataSourceOptions options) {
+  @Override
+  public DataSourceReader createReader(DataSourceOptions options) {
     return new DatasetReader();
   }
 
   /** This class can be mapped to Beam {@link BoundedSource}. */
   private static class DatasetReader implements DataSourceReader {
 
-    @Override public StructType readSchema() {
+    @Override
+    public StructType readSchema() {
       return new StructType();
     }
 
-    @Override public List<InputPartition<InternalRow>> planInputPartitions() {
+    @Override
+    public List<InputPartition<InternalRow>> planInputPartitions() {
       List<InputPartition<InternalRow>> result = new ArrayList<>();
-      result.add(new InputPartition<InternalRow>() {
+      result.add(
+          new InputPartition<InternalRow>() {
 
-        @Override public InputPartitionReader<InternalRow> 
createPartitionReader() {
-          return new DatasetPartitionReaderMock();
-        }
-      });
+            @Override
+            public InputPartitionReader<InternalRow> createPartitionReader() {
+              return new DatasetPartitionReaderMock();
+            }
+          });
       return result;
     }
   }
 
-  /** This class is a mocked reader*/
+  /** This class is a mocked reader. */
   private static class DatasetPartitionReaderMock implements 
InputPartitionReader<InternalRow> {
 
     private ArrayList<Integer> values;
     private int currentIndex = 0;
 
     private DatasetPartitionReaderMock() {
-      for (int i = 0; i < 1000; i++){
+      for (int i = 0; i < 1000; i++) {
         values.add(i);
       }
     }
 
-    @Override public boolean next() throws IOException {
+    @Override
+    public boolean next() throws IOException {
       currentIndex++;
       return (currentIndex <= values.size());
     }
 
-    @Override public void close() throws IOException {
-    }
+    @Override
+    public void close() throws IOException {}
 
-    @Override public InternalRow get() {
+    @Override
+    public InternalRow get() {
       List<Object> list = new ArrayList<>();
-      
list.add(WindowedValue.timestampedValueInGlobalWindow(values.get(currentIndex), 
new Instant()));
+      list.add(
+          
WindowedValue.timestampedValueInGlobalWindow(values.get(currentIndex), new 
Instant()));
       return InternalRow.apply(asScalaBuffer(list).toList());
     }
   }
-}
\ No newline at end of file
+}
diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java
index 2739e83..b7b3541 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java
@@ -45,7 +45,7 @@ class FlattenTranslatorBatch<T>
       for (PValue pValue : inputs.values()) {
         checkArgument(
             pValue instanceof PCollection,
-            "Got non-PCollection input to flatten: %s of type %s",
+            "Got non-PCollection input to  flatten: %s of type %s",
             pValue,
             pValue.getClass().getSimpleName());
         @SuppressWarnings("unchecked")
diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index d980a52..1aa5a2e 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -36,7 +36,7 @@ import org.apache.spark.sql.SparkSession;
 class ReadSourceTranslatorBatch<T>
     implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
 
-  private String SOURCE_PROVIDER_CLASS = 
DatasetSourceBatch.class.getCanonicalName();
+  private static final String SOURCE_PROVIDER_CLASS = 
DatasetSourceBatch.class.getCanonicalName();
 
   @SuppressWarnings("unchecked")
   @Override
@@ -46,7 +46,7 @@ class ReadSourceTranslatorBatch<T>
         (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, 
PCollection<T>>>)
             context.getCurrentTransform();
 
-        BoundedSource<T> source;
+    BoundedSource<T> source;
     try {
       source = ReadTranslation.boundedSourceFromTransform(rootTransform);
     } catch (IOException e) {
@@ -57,12 +57,14 @@ class ReadSourceTranslatorBatch<T>
     Dataset<Row> rowDataset = 
sparkSession.read().format(SOURCE_PROVIDER_CLASS).load();
 
     //TODO pass the source and the translation context serialized as string to 
the DatasetSource
-    MapFunction<Row, WindowedValue> func = new MapFunction<Row, 
WindowedValue>() {
-      @Override public WindowedValue call(Row value) throws Exception {
-        //there is only one value put in each Row by the InputPartitionReader
-        return value.<WindowedValue>getAs(0);
-      }
-    };
+    MapFunction<Row, WindowedValue> func =
+        new MapFunction<Row, WindowedValue>() {
+          @Override
+          public WindowedValue call(Row value) throws Exception {
+            //there is only one value put in each Row by the 
InputPartitionReader
+            return value.<WindowedValue>getAs(0);
+          }
+        };
     //TODO: is there a better way than using the raw WindowedValue? Can an 
Encoder<WindowedVAlue<T>>
     // be created ?
     Dataset<WindowedValue> dataset = rowDataset.map(func, 
Encoders.kryo(WindowedValue.class));
diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
index d7b9175..ca3356c 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java
@@ -31,12 +31,14 @@ import org.apache.spark.sql.SparkSession;
 
 /**
  * Mock translator that generates a source of 0 to 999 and prints it.
+ *
  * @param <T>
  */
 class ReadSourceTranslatorMockBatch<T>
     implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
 
-  private String SOURCE_PROVIDER_CLASS = 
DatasetSourceMockBatch.class.getCanonicalName();
+  private static final String SOURCE_PROVIDER_CLASS =
+      DatasetSourceMockBatch.class.getCanonicalName();
 
   @SuppressWarnings("unchecked")
   @Override
@@ -46,12 +48,14 @@ class ReadSourceTranslatorMockBatch<T>
 
     Dataset<Row> rowDataset = 
sparkSession.read().format(SOURCE_PROVIDER_CLASS).load();
 
-    MapFunction<Row, WindowedValue> func = new MapFunction<Row, 
WindowedValue>() {
-      @Override public WindowedValue call(Row value) throws Exception {
-        //there is only one value put in each Row by the InputPartitionReader
-        return value.<WindowedValue>getAs(0);
-      }
-    };
+    MapFunction<Row, WindowedValue> func =
+        new MapFunction<Row, WindowedValue>() {
+          @Override
+          public WindowedValue call(Row value) throws Exception {
+            //there is only one value put in each Row by the 
InputPartitionReader
+            return value.<WindowedValue>getAs(0);
+          }
+        };
     //TODO: is there a better way than using the raw WindowedValue? Can an 
Encoder<WindowedVAlue<T>>
     // be created ?
     Dataset<WindowedValue> dataset = rowDataset.map(func, 
Encoders.kryo(WindowedValue.class));
diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
index fad68d3..b43f53c 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
@@ -28,16 +28,7 @@ import 
org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.spark.sql.AnalysisException;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalog.Catalog;
-import org.apache.spark.sql.catalog.Column;
-import org.apache.spark.sql.catalog.Database;
-import org.apache.spark.sql.catalog.Function;
-import org.apache.spark.sql.catalog.Table;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.ContinuousReadSupport;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
@@ -48,21 +39,18 @@ import 
org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
 import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
 import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
 import org.apache.spark.sql.types.StructType;
-import org.apache.spark.storage.StorageLevel;
-import scala.collection.immutable.Map;
 
 /**
  * This is a spark structured streaming {@link DataSourceV2} implementation. 
As Continuous streaming
  * is tagged experimental in spark, this class does no implement {@link 
ContinuousReadSupport}.
  */
-public class DatasetSourceStreaming<T> implements DataSourceV2, 
MicroBatchReadSupport{
+public class DatasetSourceStreaming<T> implements DataSourceV2, 
MicroBatchReadSupport {
 
   private int numPartitions;
   private Long bundleSize;
   private TranslationContext context;
   private BoundedSource<T> source;
 
-
   @Override
   public MicroBatchReader createMicroBatchReader(
       Optional<StructType> schema, String checkpointLocation, 
DataSourceOptions options) {
@@ -157,7 +145,7 @@ public class DatasetSourceStreaming<T> implements 
DataSourceV2, MicroBatchReadSu
     }
   }
 
-  /** This class can be mapped to Beam {@link BoundedReader} */
+  /** This class can be mapped to Beam {@link BoundedReader}. */
   private class DatasetMicroBatchPartitionReader implements 
InputPartitionReader<InternalRow> {
 
     BoundedReader<T> reader;
diff --git 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
index 6066822..8956ee9 100644
--- 
a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
+++ 
b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
@@ -21,8 +21,6 @@ import java.io.IOException;
 import org.apache.beam.runners.core.construction.ReadTranslation;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import 
org.apache.beam.runners.spark.structuredstreaming.translation.batch.DatasetSourceBatch;
-import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -38,7 +36,8 @@ import org.apache.spark.sql.SparkSession;
 class ReadSourceTranslatorStreaming<T>
     implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
 
-  private String SOURCE_PROVIDER_CLASS = 
DatasetSourceStreaming.class.getCanonicalName();
+  private static final String SOURCE_PROVIDER_CLASS =
+      DatasetSourceStreaming.class.getCanonicalName();
 
   @SuppressWarnings("unchecked")
   @Override
@@ -50,8 +49,7 @@ class ReadSourceTranslatorStreaming<T>
 
     UnboundedSource<T, UnboundedSource.CheckpointMark> source;
     try {
-       source = ReadTranslation
-          .unboundedSourceFromTransform(rootTransform);
+      source = ReadTranslation.unboundedSourceFromTransform(rootTransform);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -60,12 +58,14 @@ class ReadSourceTranslatorStreaming<T>
     Dataset<Row> rowDataset = 
sparkSession.readStream().format(SOURCE_PROVIDER_CLASS).load();
 
     //TODO pass the source and the translation context serialized as string to 
the DatasetSource
-    MapFunction<Row, WindowedValue> func = new MapFunction<Row, 
WindowedValue>() {
-      @Override public WindowedValue call(Row value) throws Exception {
-        //there is only one value put in each Row by the InputPartitionReader
-        return value.<WindowedValue>getAs(0);
-      }
-    };
+    MapFunction<Row, WindowedValue> func =
+        new MapFunction<Row, WindowedValue>() {
+          @Override
+          public WindowedValue call(Row value) throws Exception {
+            //there is only one value put in each Row by the 
InputPartitionReader
+            return value.<WindowedValue>getAs(0);
+          }
+        };
     //TODO: is there a better way than using the raw WindowedValue? Can an 
Encoder<WindowedVAlue<T>>
     // be created ?
     Dataset<WindowedValue> dataset = rowDataset.map(func, 
Encoders.kryo(WindowedValue.class));
diff --git 
a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
 
b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
index eea9769..662805c 100644
--- 
a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
+++ 
b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.spark.structuredstreaming;
 
 import org.apache.beam.sdk.Pipeline;
@@ -5,6 +22,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 
+/** Source test of primitive pipeline. */
 public class SourceTest {
   public static void main(String[] args) {
     PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
@@ -12,5 +30,4 @@ public class SourceTest {
     pipeline.apply(Create.of(1));
     pipeline.run();
   }
-
 }

Reply via email to