This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new 3533779 Fix spotlessJava issues 3533779 is described below commit 353377986e9b0f006772de7639616fa58cca995c Author: Alexey Romanenko <aromanenko....@gmail.com> AuthorDate: Fri Dec 28 15:59:34 2018 +0100 Fix spotlessJava issues --- .../translation/TranslationContext.java | 10 ++--- .../translation/batch/DatasetSourceBatch.java | 9 +++-- .../translation/batch/DatasetSourceMockBatch.java | 44 ++++++++++++---------- .../translation/batch/FlattenTranslatorBatch.java | 2 +- .../batch/ReadSourceTranslatorBatch.java | 18 +++++---- .../batch/ReadSourceTranslatorMockBatch.java | 18 +++++---- .../streaming/DatasetSourceStreaming.java | 16 +------- .../streaming/ReadSourceTranslatorStreaming.java | 22 +++++------ .../spark/structuredstreaming/SourceTest.java | 19 +++++++++- 9 files changed, 88 insertions(+), 70 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index acc49f4..5606886 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -115,12 +115,12 @@ public class TranslationContext { } } - public void putDatasetRaw(PValue value, Dataset<WindowedValue> dataset) { - if (!datasets.containsKey(value)) { - datasets.put(value, dataset); - leaves.add(dataset); - } + public void putDatasetRaw(PValue value, Dataset<WindowedValue> dataset) { + if (!datasets.containsKey(value)) { + datasets.put(value, dataset); + leaves.add(dataset); } + } // -------------------------------------------------------------------------------------------- // PCollections methods diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java index f4cd885..7726ad7 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceBatch.java @@ -50,12 +50,13 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport { private TranslationContext context; private BoundedSource<T> source; - - @Override public DataSourceReader createReader(DataSourceOptions options) { + @Override + public DataSourceReader createReader(DataSourceOptions options) { this.numPartitions = context.getSparkSession().sparkContext().defaultParallelism(); checkArgument(this.numPartitions > 0, "Number of partitions must be greater than zero."); this.bundleSize = context.getOptions().getBundleSize(); - return new DatasetReader(); } + return new DatasetReader(); + } /** This class can be mapped to Beam {@link BoundedSource}. */ private class DatasetReader implements DataSourceReader { @@ -106,7 +107,7 @@ public class DatasetSourceBatch<T> implements DataSourceV2, ReadSupport { } } - /** This class can be mapped to Beam {@link BoundedReader} */ + /** This class can be mapped to Beam {@link BoundedReader}. */ private class DatasetPartitionReader implements InputPartitionReader<InternalRow> { BoundedReader<T> reader; diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceMockBatch.java index b616a6f..5485257 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceMockBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DatasetSourceMockBatch.java @@ -34,58 +34,64 @@ import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; import org.apache.spark.sql.types.StructType; import org.joda.time.Instant; -/** - * This is a mock source that gives values between 0 and 999. - */ +/** This is a mock source that gives values between 0 and 999. */ public class DatasetSourceMockBatch implements DataSourceV2, ReadSupport { - @Override public DataSourceReader createReader(DataSourceOptions options) { + @Override + public DataSourceReader createReader(DataSourceOptions options) { return new DatasetReader(); } /** This class can be mapped to Beam {@link BoundedSource}. */ private static class DatasetReader implements DataSourceReader { - @Override public StructType readSchema() { + @Override + public StructType readSchema() { return new StructType(); } - @Override public List<InputPartition<InternalRow>> planInputPartitions() { + @Override + public List<InputPartition<InternalRow>> planInputPartitions() { List<InputPartition<InternalRow>> result = new ArrayList<>(); - result.add(new InputPartition<InternalRow>() { + result.add( + new InputPartition<InternalRow>() { - @Override public InputPartitionReader<InternalRow> createPartitionReader() { - return new DatasetPartitionReaderMock(); - } - }); + @Override + public InputPartitionReader<InternalRow> createPartitionReader() { + return new DatasetPartitionReaderMock(); + } + }); return result; } } - /** This class is a mocked reader*/ + /** This class is a mocked reader. */ private static class DatasetPartitionReaderMock implements InputPartitionReader<InternalRow> { private ArrayList<Integer> values; private int currentIndex = 0; private DatasetPartitionReaderMock() { - for (int i = 0; i < 1000; i++){ + for (int i = 0; i < 1000; i++) { values.add(i); } } - @Override public boolean next() throws IOException { + @Override + public boolean next() throws IOException { currentIndex++; return (currentIndex <= values.size()); } - @Override public void close() throws IOException { - } + @Override + public void close() throws IOException {} - @Override public InternalRow get() { + @Override + public InternalRow get() { List<Object> list = new ArrayList<>(); - list.add(WindowedValue.timestampedValueInGlobalWindow(values.get(currentIndex), new Instant())); + list.add( + WindowedValue.timestampedValueInGlobalWindow(values.get(currentIndex), new Instant())); return InternalRow.apply(asScalaBuffer(list).toList()); } } -} \ No newline at end of file +} diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java index 2739e83..b7b3541 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/FlattenTranslatorBatch.java @@ -45,7 +45,7 @@ class FlattenTranslatorBatch<T> for (PValue pValue : inputs.values()) { checkArgument( pValue instanceof PCollection, - "Got non-PCollection input to flatten: %s of type %s", + "Got non-PCollection input to flatten: %s of type %s", pValue, pValue.getClass().getSimpleName()); @SuppressWarnings("unchecked") diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java index d980a52..1aa5a2e 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java @@ -36,7 +36,7 @@ import org.apache.spark.sql.SparkSession; class ReadSourceTranslatorBatch<T> implements TransformTranslator<PTransform<PBegin, PCollection<T>>> { - private String SOURCE_PROVIDER_CLASS = DatasetSourceBatch.class.getCanonicalName(); + private static final String SOURCE_PROVIDER_CLASS = DatasetSourceBatch.class.getCanonicalName(); @SuppressWarnings("unchecked") @Override @@ -46,7 +46,7 @@ class ReadSourceTranslatorBatch<T> (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>) context.getCurrentTransform(); - BoundedSource<T> source; + BoundedSource<T> source; try { source = ReadTranslation.boundedSourceFromTransform(rootTransform); } catch (IOException e) { @@ -57,12 +57,14 @@ class ReadSourceTranslatorBatch<T> Dataset<Row> rowDataset = sparkSession.read().format(SOURCE_PROVIDER_CLASS).load(); //TODO pass the source and the translation context serialized as string to the DatasetSource - MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() { - @Override public WindowedValue call(Row value) throws Exception { - //there is only one value put in each Row by the InputPartitionReader - return value.<WindowedValue>getAs(0); - } - }; + MapFunction<Row, WindowedValue> func = + new MapFunction<Row, WindowedValue>() { + @Override + public WindowedValue call(Row value) throws Exception { + //there is only one value put in each Row by the InputPartitionReader + return value.<WindowedValue>getAs(0); + } + }; //TODO: is there a better way than using the raw WindowedValue? Can an Encoder<WindowedVAlue<T>> // be created ? Dataset<WindowedValue> dataset = rowDataset.map(func, Encoders.kryo(WindowedValue.class)); diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java index d7b9175..ca3356c 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java @@ -31,12 +31,14 @@ import org.apache.spark.sql.SparkSession; /** * Mock translator that generates a source of 0 to 999 and prints it. + * * @param <T> */ class ReadSourceTranslatorMockBatch<T> implements TransformTranslator<PTransform<PBegin, PCollection<T>>> { - private String SOURCE_PROVIDER_CLASS = DatasetSourceMockBatch.class.getCanonicalName(); + private static final String SOURCE_PROVIDER_CLASS = + DatasetSourceMockBatch.class.getCanonicalName(); @SuppressWarnings("unchecked") @Override @@ -46,12 +48,14 @@ class ReadSourceTranslatorMockBatch<T> Dataset<Row> rowDataset = sparkSession.read().format(SOURCE_PROVIDER_CLASS).load(); - MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() { - @Override public WindowedValue call(Row value) throws Exception { - //there is only one value put in each Row by the InputPartitionReader - return value.<WindowedValue>getAs(0); - } - }; + MapFunction<Row, WindowedValue> func = + new MapFunction<Row, WindowedValue>() { + @Override + public WindowedValue call(Row value) throws Exception { + //there is only one value put in each Row by the InputPartitionReader + return value.<WindowedValue>getAs(0); + } + }; //TODO: is there a better way than using the raw WindowedValue? Can an Encoder<WindowedVAlue<T>> // be created ? Dataset<WindowedValue> dataset = rowDataset.map(func, Encoders.kryo(WindowedValue.class)); diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java index fad68d3..b43f53c 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java @@ -28,16 +28,7 @@ import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; -import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.spark.sql.AnalysisException; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.catalog.Catalog; -import org.apache.spark.sql.catalog.Column; -import org.apache.spark.sql.catalog.Database; -import org.apache.spark.sql.catalog.Function; -import org.apache.spark.sql.catalog.Table; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.sources.v2.ContinuousReadSupport; import org.apache.spark.sql.sources.v2.DataSourceOptions; @@ -48,21 +39,18 @@ import org.apache.spark.sql.sources.v2.reader.InputPartitionReader; import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader; import org.apache.spark.sql.sources.v2.reader.streaming.Offset; import org.apache.spark.sql.types.StructType; -import org.apache.spark.storage.StorageLevel; -import scala.collection.immutable.Map; /** * This is a spark structured streaming {@link DataSourceV2} implementation. As Continuous streaming * is tagged experimental in spark, this class does no implement {@link ContinuousReadSupport}. */ -public class DatasetSourceStreaming<T> implements DataSourceV2, MicroBatchReadSupport{ +public class DatasetSourceStreaming<T> implements DataSourceV2, MicroBatchReadSupport { private int numPartitions; private Long bundleSize; private TranslationContext context; private BoundedSource<T> source; - @Override public MicroBatchReader createMicroBatchReader( Optional<StructType> schema, String checkpointLocation, DataSourceOptions options) { @@ -157,7 +145,7 @@ public class DatasetSourceStreaming<T> implements DataSourceV2, MicroBatchReadSu } } - /** This class can be mapped to Beam {@link BoundedReader} */ + /** This class can be mapped to Beam {@link BoundedReader}. */ private class DatasetMicroBatchPartitionReader implements InputPartitionReader<InternalRow> { BoundedReader<T> reader; diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java index 6066822..8956ee9 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java @@ -21,8 +21,6 @@ import java.io.IOException; import org.apache.beam.runners.core.construction.ReadTranslation; import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; -import org.apache.beam.runners.spark.structuredstreaming.translation.batch.DatasetSourceBatch; -import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; @@ -38,7 +36,8 @@ import org.apache.spark.sql.SparkSession; class ReadSourceTranslatorStreaming<T> implements TransformTranslator<PTransform<PBegin, PCollection<T>>> { - private String SOURCE_PROVIDER_CLASS = DatasetSourceStreaming.class.getCanonicalName(); + private static final String SOURCE_PROVIDER_CLASS = + DatasetSourceStreaming.class.getCanonicalName(); @SuppressWarnings("unchecked") @Override @@ -50,8 +49,7 @@ class ReadSourceTranslatorStreaming<T> UnboundedSource<T, UnboundedSource.CheckpointMark> source; try { - source = ReadTranslation - .unboundedSourceFromTransform(rootTransform); + source = ReadTranslation.unboundedSourceFromTransform(rootTransform); } catch (IOException e) { throw new RuntimeException(e); } @@ -60,12 +58,14 @@ class ReadSourceTranslatorStreaming<T> Dataset<Row> rowDataset = sparkSession.readStream().format(SOURCE_PROVIDER_CLASS).load(); //TODO pass the source and the translation context serialized as string to the DatasetSource - MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() { - @Override public WindowedValue call(Row value) throws Exception { - //there is only one value put in each Row by the InputPartitionReader - return value.<WindowedValue>getAs(0); - } - }; + MapFunction<Row, WindowedValue> func = + new MapFunction<Row, WindowedValue>() { + @Override + public WindowedValue call(Row value) throws Exception { + //there is only one value put in each Row by the InputPartitionReader + return value.<WindowedValue>getAs(0); + } + }; //TODO: is there a better way than using the raw WindowedValue? Can an Encoder<WindowedVAlue<T>> // be created ? Dataset<WindowedValue> dataset = rowDataset.map(func, Encoders.kryo(WindowedValue.class)); diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java index eea9769..662805c 100644 --- a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java +++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.spark.structuredstreaming; import org.apache.beam.sdk.Pipeline; @@ -5,6 +22,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; +/** Source test of primitive pipeline. */ public class SourceTest { public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); @@ -12,5 +30,4 @@ public class SourceTest { pipeline.apply(Create.of(1)); pipeline.run(); } - }