[beam] branch master updated: [BEAM-6908] New Jenkins branch for Python35 benchmark
This is an automated email from the ASF dual-hosted git repository. markliu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 86b5b19 [BEAM-6908] New Jenkins branch for Python35 benchmark new d9c2de8 Merge pull request #8393 from markflyhigh/py35-benchmark 86b5b19 is described below commit 86b5b191ee50d215bed846bebaf10a56fe8453bc Author: Mark Liu AuthorDate: Wed Apr 24 14:08:31 2019 -0700 [BEAM-6908] New Jenkins branch for Python35 benchmark --- .test-infra/jenkins/README.md | 1 + .../jenkins/job_Performancetests_Python35.groovy | 63 ++ 2 files changed, 64 insertions(+) diff --git a/.test-infra/jenkins/README.md b/.test-infra/jenkins/README.md index f478d31..775569e 100644 --- a/.test-infra/jenkins/README.md +++ b/.test-infra/jenkins/README.md @@ -84,6 +84,7 @@ Beam Jenkins overview page: [link](https://builds.apache.org/view/A-D/view/Beam/ | beam_PerformanceTests_ManyFiles_TextIOIT | [cron](https://builds.apache.org/job/beam_PerformanceTests_ManyFiles_TextIOIT/), [hdfs_cron](https://builds.apache.org/job/beam_PerformanceTests_ManyFiles_TextIOIT_HDFS/) | `Run Java ManyFilesTextIO Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_ManyFiles_TextIOIT/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_ManyFiles_TextIOIT) [![Build Status](https://builds.apache.org/job/beam_Per [...] | beam_PerformanceTests_ParquetIOIT | [cron](https://builds.apache.org/job/beam_PerformanceTests_ParquetIOIT/), [hdfs_cron](https://builds.apache.org/job/beam_PerformanceTests_ParquetIOIT_HDFS/) | `Run Java ParquetIO Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_ParquetIOIT/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_ParquetIOIT) [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_ParquetIOIT_HDFS/badge/icon [...] | beam_PerformanceTests_Python | [cron](https://builds.apache.org/job/beam_PerformanceTests_Python/) | `Run Python Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_Python/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_Python) | +| beam_PerformanceTests_Python35 | [cron](https://builds.apache.org/job/beam_PerformanceTests_Python35/) | `Run Python35 Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_Python35/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_Python35) | | beam_PerformanceTests_Spark | [cron](https://builds.apache.org/job/beam_PerformanceTests_Spark/) | `Run Spark Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_Spark/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_Spark) | | beam_PerformanceTests_TFRecordIOIT | [cron](https://builds.apache.org/job/beam_PerformanceTests_TFRecordIOIT/) | `Run Java JdbcIO Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_TFRecordIOIT/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_TFRecordIOIT) | | beam_PerformanceTests_TextIOIT | [cron](https://builds.apache.org/job/beam_PerformanceTests_TextIOIT/), [hdfs_cron](https://builds.apache.org/job/beam_PerformanceTests_TextIOIT_HDFS/) | `Run Java TextIO Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_TextIOIT/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_TextIOIT) [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_TextIOIT_HDFS/badge/icon)](https://builds.apa [...] diff --git a/.test-infra/jenkins/job_Performancetests_Python35.groovy b/.test-infra/jenkins/job_Performancetests_Python35.groovy new file mode 100644 index 000..842df97 --- /dev/null +++ b/.test-infra/jenkins/job_Performancetests_Python35.groovy @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import CommonJobProperties as commonJobProperties + +// This job runs the Beam Python35 performance benchmark on PerfKit Benchmarker.
[beam] branch master updated: refactor : standardize the kotlin samples
This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new a13121c refactor : standardize the kotlin samples new b6c3c74 Merge pull request #8392 from harshithdwivedi/stdKotlin a13121c is described below commit a13121cdc6457bf4af7d9e9e22e9189860e121d8 Author: Harshit Dwivedi AuthorDate: Thu Apr 25 01:29:54 2019 +0530 refactor : standardize the kotlin samples Signed-off-by: Harshit Dwivedi --- .../beam/examples/kotlin/DebuggingWordCount.kt | 1 - .../beam/examples/kotlin/WindowedWordCount.kt | 8 +- .../org/apache/beam/examples/kotlin/WordCount.kt | 6 +- .../kotlin/common/ExampleBigQueryTableOptions.kt | 4 +- .../ExamplePubsubTopicAndSubscriptionOptions.kt| 4 +- .../kotlin/common/ExamplePubsubTopicOptions.kt | 5 +- .../beam/examples/kotlin/common/ExampleUtils.kt| 94 +++--- .../kotlin/common/WriteOneFilePerWindow.kt | 15 +--- 8 files changed, 58 insertions(+), 79 deletions(-) diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/DebuggingWordCount.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/DebuggingWordCount.kt index bd16582..bd61479 100644 --- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/DebuggingWordCount.kt +++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/DebuggingWordCount.kt @@ -156,7 +156,6 @@ public object DebuggingWordCount { @JvmStatic fun main(args: Array) { val options = (PipelineOptionsFactory.fromArgs(*args).withValidation() as WordCountOptions) - runDebuggingWordCount(options) } } diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/WindowedWordCount.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/WindowedWordCount.kt index f57a62b..f82a528 100644 --- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/WindowedWordCount.kt +++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/WindowedWordCount.kt @@ -116,16 +116,12 @@ public object WindowedWordCount { /** A [DefaultValueFactory] that returns the current system time. */ public class DefaultToCurrentSystemTime : DefaultValueFactory { -override fun create(options: PipelineOptions): Long? { -return System.currentTimeMillis() -} +override fun create(options: PipelineOptions) = System.currentTimeMillis() } /** A [DefaultValueFactory] that returns the minimum timestamp plus one hour. */ public class DefaultToMinTimestampPlusOneHour : DefaultValueFactory { -override fun create(options: PipelineOptions): Long? { -return (options as Options).minTimestampMillis!! + Duration.standardHours(1).millis -} +override fun create(options: PipelineOptions) = (options as Options).minTimestampMillis!! + Duration.standardHours(1).millis } /** diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/WordCount.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/WordCount.kt index 9cf6d3f..fd215a4 100644 --- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/WordCount.kt +++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/WordCount.kt @@ -103,7 +103,7 @@ public object WordCount { // Output each word encountered into the output PCollection. for (word in words) { -if (!word.isEmpty()) { +if (word.isNotEmpty()) { receiver.output(word) } } @@ -112,9 +112,7 @@ public object WordCount { /** A SimpleFunction that converts a Word and Count into a printable string. */ public class FormatAsTextFn : SimpleFunction, String>() { -override fun apply(input: KV): String { -return "${input.key} : ${input.value}" -} +override fun apply(input: KV) = "${input.key} : ${input.value}" } /** diff --git a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExampleBigQueryTableOptions.kt b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExampleBigQueryTableOptions.kt index 176ff7a..06decd7 100644 --- a/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExampleBigQueryTableOptions.kt +++ b/examples/kotlin/src/main/java/org/apache/beam/examples/kotlin/common/ExampleBigQueryTableOptions.kt @@ -42,8 +42,6 @@ interface ExampleBigQueryTableOptions : GcpOptions { /** Returns the job name as the default BigQuery table name. */ class BigQueryTableFactory : DefaultValueFactory { -override fun create(options: PipelineOptions): String { -return options.jobName.replace('-', '_') -} +
[beam] branch new-io-Dashboards-link deleted (was f69e960)
This is an automated email from the ASF dual-hosted git repository. iemejia pushed a change to branch new-io-Dashboards-link in repository https://gitbox.apache.org/repos/asf/beam.git. was f69e960 Update IOIT Dashbards url The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
[beam] branch dot-log deleted (was c33dd9e)
This is an automated email from the ASF dual-hosted git repository. iemejia pushed a change to branch dot-log in repository https://gitbox.apache.org/repos/asf/beam.git. was c33dd9e Add the appropriate extension to log files. The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
[beam] branch master updated: [BEAM-7012] Support TestStream in streaming Flink Runner
This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 6f3bcd9 [BEAM-7012] Support TestStream in streaming Flink Runner new c44c30f Merge pull request #8383: [BEAM-7012] Support TestStream in streaming Flink Runner 6f3bcd9 is described below commit 6f3bcd932d4fe70711cb3653192171dad67867dc Author: Maximilian Michels AuthorDate: Tue Apr 23 18:49:26 2019 +0200 [BEAM-7012] Support TestStream in streaming Flink Runner TestStream is a way to construct a stream with control over elements and time. In total, 40 ValidatesRunner tests make use of this feature (tagged via UsesTestStream). So far only the DirectRunner supported TestStream which meant that those tests were not executed for other Runners, e.g. Flink. Implementing TestStream for Flink was not hard, except for supporting the processing time functionality for which a clean solution seems impossible. However, only 2 of the 40 UsesTestStream tests make use of processing time. An annotation (UsesTestStreamWithProcessingTime) was added to allow Runners to exclude those tests. This still adds 38 new ValidatesRunner tests in Flink streaming mode. --- runners/flink/flink_runner.gradle | 3 +- .../flink/FlinkStreamingTransformTranslators.java | 37 + .../wrappers/streaming/io/TestStreamSource.java| 80 .../org/apache/beam/sdk/testing/TestStream.java| 88 ++ .../apache/beam/sdk/testing/UsesTestStream.java| 2 +- java => UsesTestStreamWithProcessingTime.java} | 7 +- .../apache/beam/sdk/testing/TestStreamTest.java| 21 +- .../apache/beam/sdk/transforms/GroupByKeyTest.java | 4 +- .../org/apache/beam/sdk/transforms/ParDoTest.java | 7 +- 9 files changed, 240 insertions(+), 9 deletions(-) diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index db6d3e3..4bc9944 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -151,10 +151,11 @@ def createValidatesRunnerTask(Map m) { excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics' if (config.streaming) { excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse' +excludeCategories 'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime' } else { excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo' +excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream' } - excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream' } } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 8d42d18..760efdd 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -49,14 +49,17 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDo import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator; import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator; +import org.apache.beam.runners.flink.translation.wrappers.streaming.io.TestStreamSource; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn; import org.apache.beam.sdk.transforms.DoFn; @@ -70,6 +73,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.AppliedCombineFn; +import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; @@ -147,6 +151,8 @@ class FlinkStreamingTransformTranslators { TRANSLATORS.put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new
[beam] branch master updated: Fix Python wordcount benchmark by specifiying beam_it_module
This is an automated email from the ASF dual-hosted git repository. markliu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new ad14cb9 Fix Python wordcount benchmark by specifiying beam_it_module new 8ea9a8e Merge pull request #8389 from markflyhigh/fix-py-benchmark ad14cb9 is described below commit ad14cb90638effd4446784ae9c0ffc0a57ebbf15 Author: Mark Liu AuthorDate: Wed Apr 24 11:02:34 2019 -0700 Fix Python wordcount benchmark by specifiying beam_it_module --- .test-infra/jenkins/job_PerformanceTests_Python.groovy | 1 + 1 file changed, 1 insertion(+) diff --git a/.test-infra/jenkins/job_PerformanceTests_Python.groovy b/.test-infra/jenkins/job_PerformanceTests_Python.groovy index be734ba..5354d46 100644 --- a/.test-infra/jenkins/job_PerformanceTests_Python.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_Python.groovy @@ -51,6 +51,7 @@ job('beam_PerformanceTests_Python'){ benchmarks : 'beam_integration_benchmark', bigquery_table : 'beam_performance.wordcount_py_pkb_results', beam_it_class: 'apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it', + beam_it_module : 'sdks/python', beam_prebuilt: 'true', // skip beam prebuild beam_python_sdk_location : 'build/apache-beam.tar.gz', beam_runner : 'TestDataflowRunner',
[beam] branch master updated: [BEAM-7015] Remove duplicate standard_coders.yaml
This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 1840b43 [BEAM-7015] Remove duplicate standard_coders.yaml new a1680b5 Merge pull request #8319: [BEAM-7015] Remove duplicate standard_coders.yaml 1840b43 is described below commit 1840b4339c44391fd2057244d09bb9cac60c812d Author: Thomas Weise AuthorDate: Mon Apr 15 22:16:27 2019 -0700 [BEAM-7015] Remove duplicate standard_coders.yaml --- .gitignore | 1 + .../apache_beam/coders/standard_coders_test.py | 6 +- .../apache_beam/testing/data/standard_coders.yaml | 235 - sdks/python/gen_protos.py | 9 + sdks/python/setup.py | 3 +- 5 files changed, 15 insertions(+), 239 deletions(-) diff --git a/.gitignore b/.gitignore index 6a5feac..385b99f 100644 --- a/.gitignore +++ b/.gitignore @@ -42,6 +42,7 @@ sdks/python/LICENSE sdks/python/NOTICE sdks/python/README.md sdks/python/apache_beam/portability/api/*pb2*.* +sdks/python/apache_beam/portability/api/*.yaml sdks/python/nosetests.xml sdks/python/postcommit_requirements.txt diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py index b4d8443..437d2be 100644 --- a/sdks/python/apache_beam/coders/standard_coders_test.py +++ b/sdks/python/apache_beam/coders/standard_coders_test.py @@ -37,8 +37,8 @@ from apache_beam.transforms.window import IntervalWindow from apache_beam.utils import windowed_value from apache_beam.utils.timestamp import Timestamp -STANDARD_CODERS_YAML = os.path.join( -os.path.dirname(__file__), '..', 'testing', 'data', 'standard_coders.yaml') +STANDARD_CODERS_YAML = os.path.normpath(os.path.join( +os.path.dirname(__file__), '../portability/api/standard_coders.yaml')) def _load_test_cases(test_yaml): @@ -76,7 +76,7 @@ class StandardCodersTest(unittest.TestCase): 'beam:coder:timer:v1': lambda x, payload_parser: dict( payload=payload_parser(x['payload']), - timestamp=Timestamp(micros=x['timestamp'])), + timestamp=Timestamp(micros=x['timestamp'] * 1000)), 'beam:coder:double:v1': lambda x: float(x), } diff --git a/sdks/python/apache_beam/testing/data/standard_coders.yaml b/sdks/python/apache_beam/testing/data/standard_coders.yaml deleted file mode 100644 index e3927ab..000 --- a/sdks/python/apache_beam/testing/data/standard_coders.yaml +++ /dev/null @@ -1,235 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# This file is broken into multiple sections delimited by ---. Each section specifies a set of -# reference encodings for a single standardized coder used in a specific context. -# -# Each section contains up to 3 properties: -# -# coder: a common coder spec. Currently, a URN and URNs for component coders as necessary. -# nested: a boolean meaning whether the coder was used in the nested context. Missing means to -# test both contexts, a shorthand for when the coder is invariant across context. -# examples: a map of {encoded bytes: original JSON object} encoded with the coder in the context. -# The LHS (key) is a byte array encoded as a JSON-escaped string. The RHS (value) is -# one of a few standard JSON types such as numbers, strings, dicts that map naturally -# to the type encoded by the coder. -# -# These choices were made to strike a balance between portability, ease of use, and simple -# legibility of this file itself. -# -# It is expected that future work will move the `coder` field into a format that it would be -# represented by the Runner API, so that it can be understood by all SDKs and harnesses. -# -# If a coder is marked non-deterministic in the coder spec, then only the decoding should be validated. - - -coder: - urn: "beam:coder:bytes:v1" -nested: false -examples: - "abc": abc - "ab\0c": "ab\0c" - - -coder: - urn: "beam:coder:bytes:v1" -nested: true -examples: -
[beam] branch master updated: [SQL] expose a public entry for toRowList.
This is an automated email from the ASF dual-hosted git repository. apilloud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new c8fc0c3 [SQL] expose a public entry for toRowList. new dd3737e Merge pull request #8382: [SQL] expose a public entry for toRowList in BeamEnumerableConverter c8fc0c3 is described below commit c8fc0c3f4ec7e46f45273cc80cd46d775c29abdd Author: amaliujia AuthorDate: Tue Apr 23 09:04:06 2019 -0700 [SQL] expose a public entry for toRowList. --- .../sql/impl/rel/BeamEnumerableConverter.java | 40 -- 1 file changed, 15 insertions(+), 25 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java index 755d589..60bd76a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java @@ -118,6 +118,17 @@ public class BeamEnumerableConverter extends ConverterImpl implements Enumerable } } + public static List toRowList(BeamRelNode node) { +final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); +try { + Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader()); + final PipelineOptions options = createPipelineOptions(node.getPipelineOptions()); + return toRowList(options, node); +} finally { + Thread.currentThread().setContextClassLoader(originalClassLoader); +} + } + public static PipelineOptions createPipelineOptions(Map map) { final String[] args = new String[map.size()]; int i = 0; @@ -135,7 +146,7 @@ public class BeamEnumerableConverter extends ConverterImpl implements Enumerable } else if (isLimitQuery(node)) { throw new UnsupportedOperationException("Does not support queries with LIMIT in toRowList."); } -return collectRowList(options, node); +return collectRows(options, node).stream().collect(Collectors.toList()); } static Enumerable toEnumerable(PipelineOptions options, BeamRelNode node) { @@ -144,8 +155,7 @@ public class BeamEnumerableConverter extends ConverterImpl implements Enumerable } else if (isLimitQuery(node)) { return limitCollect(options, node); } - -return collect(options, node); +return Linq4j.asEnumerable(rowToAvaticaAndUnboxValues((collectRows(options, node; } private static PipelineResult limitRun( @@ -191,7 +201,7 @@ public class BeamEnumerableConverter extends ConverterImpl implements Enumerable result.waitUntilFinish(); } - private static List collectRowList(PipelineOptions options, BeamRelNode node) { + private static Queue collectRows(PipelineOptions options, BeamRelNode node) { long id = options.getOptionsId(); Queue values = new ConcurrentLinkedQueue<>(); @@ -207,27 +217,7 @@ public class BeamEnumerableConverter extends ConverterImpl implements Enumerable runCollector(options, node); Collector.globalValues.remove(id); -return values.stream().collect(Collectors.toList()); - } - - private static Enumerable collect(PipelineOptions options, BeamRelNode node) { -long id = options.getOptionsId(); -Queue values = new ConcurrentLinkedQueue<>(); - -checkArgument( -options -.getRunner() -.getCanonicalName() -.equals("org.apache.beam.runners.direct.DirectRunner"), -"SELECT without INSERT is only supported in DirectRunner in SQL Shell."); - -Collector.globalValues.put(id, values); - -runCollector(options, node); - -Collector.globalValues.remove(id); - -return Linq4j.asEnumerable(rowToAvaticaAndUnboxValues(values)); +return values; } private static Enumerable limitCollect(PipelineOptions options, BeamRelNode node) {
[beam] branch master updated (de2d929 -> 47d44b1)
This is an automated email from the ASF dual-hosted git repository. apilloud pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/beam.git. from de2d929 Merge pull request #8369 from youngoli/beam7087 new 9424517 [BEAM-6760] website: disable testing external links by default new 5dfb1a0 add website postcommit test that checks external links new 47d44b1 Merge pull request #8318 from ibzib/test-website-disable-external The 21119 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: ...te_Publish.groovy => job_PostCommit_Website_Test.groovy} | 13 - website/Rakefile| 1 + website/build.gradle| 2 +- 3 files changed, 6 insertions(+), 10 deletions(-) copy .test-infra/jenkins/{job_PostCommit_Website_Publish.groovy => job_PostCommit_Website_Test.groovy} (73%)
[beam] branch master updated: [BEAM-7087] Creating "errors" package internal to Beam Go SDK
This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 5bfa896 [BEAM-7087] Creating "errors" package internal to Beam Go SDK new de2d929 Merge pull request #8369 from youngoli/beam7087 5bfa896 is described below commit 5bfa896dc249dff2bd4038641775ba50aee16e78 Author: Daniel Oliveira AuthorDate: Fri Apr 19 17:33:36 2019 -0700 [BEAM-7087] Creating "errors" package internal to Beam Go SDK I'm creating a package that prints well formatted errors for Beam. In particular this package differentiates between context and error messages and allows the top-level error message to be sent any time in the chain of errors and still be the first error to display. --- sdks/go/pkg/beam/internal/errors.go | 186 sdks/go/pkg/beam/internal/errors_test.go | 203 +++ 2 files changed, 389 insertions(+) diff --git a/sdks/go/pkg/beam/internal/errors.go b/sdks/go/pkg/beam/internal/errors.go new file mode 100644 index 000..0f3d78f --- /dev/null +++ b/sdks/go/pkg/beam/internal/errors.go @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package errors + +import ( + "fmt" + "io" + "strings" +) + +// New returns an error with the given message. +func New(message string) error { + return fmt.Errorf("%s", message) +} + +// Errorf returns an error with a message formatted according to the format +// specifier. +func Errorf(format string, args ...interface{}) error { + return fmt.Errorf(format, args...) +} + +// Wrap returns a new error annotating err with a new message. +func Wrap(err error, message string) error { + if err == nil { + return nil + } + return { + cause: err, + msg: message, + top: getTop(err), + } +} + +// Wrapf returns a new error annotating err with a new message according to +// the format specifier. +func Wrapf(err error, format string, args ...interface{}) error { + if err == nil { + return nil + } + return { + cause: err, + msg: fmt.Sprintf(format, args...), + top: getTop(err), + } +} + +// WithContext returns a new error adding additional context to err. +func WithContext(err error, context string) error { + if err == nil { + return nil + } + return { + cause: err, + context: context, + top: getTop(err), + } +} + +// WithContextf returns a new error adding additional context to err according +// to the format specifier. +func WithContextf(err error, format string, args ...interface{}) error { + if err == nil { + return nil + } + return { + cause: err, + context: fmt.Sprintf(format, args...), + top: getTop(err), + } +} + +// SetTopLevelMsg returns a new error with the given top level message. The top +// level message is the first error message that gets printed when Error() +// is called on the returned error or any error wrapping it. +func SetTopLevelMsg(err error, top string) error { + if err == nil { + return nil + } + return { + cause: err, + top: top, + } +} + +// SetTopLevelMsgf returns a new error with the given top level message +// according to the format specifier. The top level message is the first error +// message that gets printed when Error() is called on the returned error or +// any error wrapping it. +func SetTopLevelMsgf(err error, format string, args ...interface{}) error { + if err == nil { + return nil + } + return { + cause: err, + top: fmt.Sprintf(format, args...), + } +} + +func getTop(e error) string { + if be, ok := e.(*beamError); ok { + return be.top + } + return e.Error() +} + +// beamError represents one
[beam] branch master updated: Add null check to fieldToAvatica
This is an automated email from the ASF dual-hosted git repository. apilloud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 88875bc Add null check to fieldToAvatica new 1075ff3 Merge pull request #8375: [BEAM-7129] BeamEnumerableConverter does not handle null values for all types 88875bc is described below commit 88875bcf0407713ebf22d11761556549fd9da970 Author: Brian Hulette AuthorDate: Mon Apr 22 10:18:38 2019 -0700 Add null check to fieldToAvatica Also parameterizes testToEnumerable_collectNullValue to test null values of all primitive types. --- .../sql/impl/rel/BeamEnumerableConverter.java | 4 + .../sql/impl/rel/BeamEnumerableConverterTest.java | 287 - 2 files changed, 169 insertions(+), 122 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java index 755d589..3b0e94c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java @@ -303,6 +303,10 @@ public class BeamEnumerableConverter extends ConverterImpl implements Enumerable } private static Object fieldToAvatica(Schema.FieldType type, Object beamValue) { +if (beamValue == null) { + return null; +} + switch (type.getTypeName()) { case LOGICAL_TYPE: String logicalId = type.getLogicalType().getIdentifier(); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java index 76e952e..5dfa38d 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java @@ -30,6 +30,8 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.Schema.TypeName; +import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PBegin; @@ -49,6 +51,10 @@ import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexLiteral; import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; /** Test for {@code BeamEnumerableConverter}. */ public class BeamEnumerableConverterTest { @@ -57,137 +63,174 @@ public class BeamEnumerableConverterTest { static PipelineOptions options = PipelineOptionsFactory.create(); static RelOptCluster cluster = RelOptCluster.create(new VolcanoPlanner(), rexBuilder); - @Test - public void testToEnumerable_collectSingle() { -Schema schema = Schema.builder().addInt64Field("id").build(); -RelDataType type = CalciteUtils.toCalciteRowType(schema, TYPE_FACTORY); -ImmutableList> tuples = - ImmutableList.of(ImmutableList.of(rexBuilder.makeBigintLiteral(BigDecimal.ZERO))); -BeamRelNode node = new BeamValuesRel(cluster, type, tuples, null); - -Enumerable enumerable = BeamEnumerableConverter.toEnumerable(options, node); -Enumerator enumerator = enumerable.enumerator(); - -assertTrue(enumerator.moveNext()); -assertEquals(0L, enumerator.current()); -assertFalse(enumerator.moveNext()); -enumerator.close(); - } - - @Test - public void testToEnumerable_collectMultiple() { -Schema schema = Schema.builder().addInt64Field("id").addInt64Field("otherid").build(); -RelDataType type = CalciteUtils.toCalciteRowType(schema, TYPE_FACTORY); -ImmutableList> tuples = -ImmutableList.of( -ImmutableList.of( -rexBuilder.makeBigintLiteral(BigDecimal.ZERO), -rexBuilder.makeBigintLiteral(BigDecimal.ONE))); -BeamRelNode node = new BeamValuesRel(cluster, type, tuples, null); - -Enumerable enumerable = BeamEnumerableConverter.toEnumerable(options, node); -Enumerator enumerator = enumerable.enumerator(); - -assertTrue(enumerator.moveNext()); -Object[] row = (Object[]) enumerator.current(); -assertEquals(2, row.length); -assertEquals(0L, row[0]); -assertEquals(1L, row[1]);
[beam] branch spark-runner_structured-streaming updated: Use PAssert in Spark Structured Streaming transform tests
This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new b6ddc0b Use PAssert in Spark Structured Streaming transform tests b6ddc0b is described below commit b6ddc0b3fd727c78cd4c110453add122bc5892bc Author: Ismaël Mejía AuthorDate: Wed Apr 24 11:39:52 2019 +0200 Use PAssert in Spark Structured Streaming transform tests --- .../translation/batch/CombineTest.java | 47 ++ .../translation/batch/ComplexSourceTest.java | 37 +++-- .../translation/batch/FlattenTest.java | 15 +- .../translation/batch/GroupByKeyTest.java | 21 ++- .../translation/batch/ParDoTest.java | 164 + .../translation/batch/SimpleSourceTest.java| 13 +- .../translation/batch/WindowAssignTest.java| 29 +++- .../translation/streaming/SimpleSourceTest.java| 8 +- 8 files changed, 163 insertions(+), 171 deletions(-) diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java index bd5df66..cdce691 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java @@ -24,8 +24,9 @@ import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamin import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.junit.BeforeClass; @@ -36,20 +37,21 @@ import org.junit.runners.JUnit4; /** Test class for beam to spark {@link org.apache.beam.sdk.transforms.Combine} translation. */ @RunWith(JUnit4.class) public class CombineTest implements Serializable { - private static Pipeline pipeline; + private static Pipeline p; @BeforeClass public static void beforeClass() { PipelineOptions options = PipelineOptionsFactory.create().as(PipelineOptions.class); options.setRunner(SparkStructuredStreamingRunner.class); -pipeline = Pipeline.create(options); +p = Pipeline.create(options); } @Test public void testCombineGlobally() { -PCollection input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); -input.apply(Combine.globally(new IntegerCombineFn())); -pipeline.run(); +PCollection input = +p.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).apply(Sum.integersGlobally()); +PAssert.that(input).containsInAnyOrder(55); +p.run(); } @Test @@ -62,35 +64,8 @@ public class CombineTest implements Serializable { elems.add(KV.of(2, 4)); elems.add(KV.of(2, 6)); -PCollection> input = pipeline.apply(Create.of(elems)); -input.apply(Combine.perKey(new IntegerCombineFn())); -pipeline.run(); - } - - private static class IntegerCombineFn extends Combine.CombineFn { - -@Override -public Long createAccumulator() { - return 0L; -} - -@Override -public Long addInput(Long accumulator, Integer input) { - return accumulator + input; -} - -@Override -public Long mergeAccumulators(Iterable accumulators) { - Long result = 0L; - for (Long value : accumulators) { -result += value; - } - return result; -} - -@Override -public Long extractOutput(Long accumulator) { - return accumulator; -} +PCollection> input = p.apply(Create.of(elems)).apply(Sum.integersPerKey()); +PAssert.that(input).containsInAnyOrder(KV.of(1, 9), KV.of(2, 12)); +p.run(); } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java index 8e2e224..968cbd4 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ComplexSourceTest.java @@ -30,6 +30,8 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import
[beam] branch asf-site updated: Publishing website 2019/04/24 09:18:14 at commit 865727c
This is an automated email from the ASF dual-hosted git repository. git-site-role pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/asf-site by this push: new 3396219 Publishing website 2019/04/24 09:18:14 at commit 865727c 3396219 is described below commit 33962191f90fb2693d9c25a6a9434c0927cfe62a Author: jenkins AuthorDate: Wed Apr 24 09:18:14 2019 + Publishing website 2019/04/24 09:18:14 at commit 865727c --- .../documentation/io/built-in/google-bigquery/index.html | 16 +++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/website/generated-content/documentation/io/built-in/google-bigquery/index.html b/website/generated-content/documentation/io/built-in/google-bigquery/index.html index 69e4cdf..502b5ad 100644 --- a/website/generated-content/documentation/io/built-in/google-bigquery/index.html +++ b/website/generated-content/documentation/io/built-in/google-bigquery/index.html @@ -1278,7 +1278,21 @@ only usable if you are writing to a single table. This example generates one partition per day. -// Java snippet not yet available and tracked in JIRA: BEAM-5503 +weatherData.apply( +BigQueryIO.WeatherDatawrite() +.to(tableSpec + "_partitioning") +.withSchema(tableSchema) +.withFormatFunction( +(WeatherData elem) - +new TableRow() +.set("year", elem.year) +.set("month", elem.month) +.set("day", elem.day) +.set("maxTemp", elem.maxTemp)) +// NOTE: an existing table without time partitioning set up will not work +.withTimePartitioning(new TimePartitioning().setType("DAY")) +.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) +.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)); # The Beam SDK for Python does not currently support time partitioning.
[beam] branch master updated: [BEAM-5503] Update BigQueryIO time partitioning doc snippet
This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new a847ac2 [BEAM-5503] Update BigQueryIO time partitioning doc snippet new 865727c Merge pull request #8384: [BEAM-5503] Update BigQueryIO time partitioning doc snippet a847ac2 is described below commit a847ac2c5b6c060f5dced090e3066e9d7a4c14f4 Author: ttanay AuthorDate: Wed Apr 24 00:31:52 2019 +0530 [BEAM-5503] Update BigQueryIO time partitioning doc snippet Added link to code snippet for Java SDK showing usage of time-partitioning in BigQuery. --- website/src/documentation/io/built-in-google-bigquery.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/website/src/documentation/io/built-in-google-bigquery.md b/website/src/documentation/io/built-in-google-bigquery.md index 855b5cf..200d35e 100644 --- a/website/src/documentation/io/built-in-google-bigquery.md +++ b/website/src/documentation/io/built-in-google-bigquery.md @@ -727,8 +727,8 @@ To use BigQuery time partitioning, use one of these two methods: This example generates one partition per day. ```java -// Java snippet not yet available and tracked in JIRA: BEAM-5503 -``` +{% github_sample /apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java tag:BigQueryTimePartitioning +%}``` ```py # The Beam SDK for Python does not currently support time partitioning. ```
[beam] branch spark-runner_structured-streaming updated: Rename SparkPipelineResult to SparkStructuredStreamingPipelineResult This is done to avoid an eventual collision with the one in SparkRunner. H
This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new dafcca8 Rename SparkPipelineResult to SparkStructuredStreamingPipelineResult This is done to avoid an eventual collision with the one in SparkRunner. However this cannot happen at this moment because it is package private, so it is also done for consistency. dafcca8 is described below commit dafcca8ee7764b6b1908f89f31b9e3138dca2daa Author: Ismaël Mejía AuthorDate: Wed Apr 24 10:22:04 2019 +0200 Rename SparkPipelineResult to SparkStructuredStreamingPipelineResult This is done to avoid an eventual collision with the one in SparkRunner. However this cannot happen at this moment because it is package private, so it is also done for consistency. --- ...lt.java => SparkStructuredStreamingPipelineResult.java} | 2 +- .../SparkStructuredStreamingRunner.java| 14 -- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java similarity index 96% rename from runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineResult.java rename to runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java index 48c117d..d0198d4 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkPipelineResult.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingPipelineResult.java @@ -24,7 +24,7 @@ import org.apache.beam.sdk.metrics.MetricResults; import org.joda.time.Duration; /** Represents a Spark pipeline execution result. */ -class SparkPipelineResult implements PipelineResult { +class SparkStructuredStreamingPipelineResult implements PipelineResult { @Nullable // TODO: remove once method will be implemented @Override diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java index acb5615..c63efe4 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/SparkStructuredStreamingRunner.java @@ -37,17 +37,19 @@ import org.slf4j.LoggerFactory; * Beam pipeline with the default options of a single threaded spark instance in local mode, we * would do the following: * - * {@code Pipeline p = [logic for pipeline creation] SparkPipelineResult result = - * (SparkPipelineResult) p.run(); } + * {@code Pipeline p = [logic for pipeline creation] SparkStructuredStreamingPipelineResult + * result = (SparkStructuredStreamingPipelineResult) p.run(); } * * To create a pipeline runner to run against a different spark cluster, with a custom master url * we would do the following: * * {@code Pipeline p = [logic for pipeline creation] SparkStructuredStreamingPipelineOptions * options = SparkPipelineOptionsFactory.create(); options.setSparkMaster("spark://host:port"); - * SparkPipelineResult result = (SparkPipelineResult) p.run(); } + * SparkStructuredStreamingPipelineResult result = (SparkStructuredStreamingPipelineResult) p.run(); + * } */ -public final class SparkStructuredStreamingRunner extends PipelineRunner { +public final class SparkStructuredStreamingRunner +extends PipelineRunner { private static final Logger LOG = LoggerFactory.getLogger(SparkStructuredStreamingRunner.class); @@ -111,12 +113,12 @@ public final class SparkStructuredStreamingRunner extends PipelineRunner
[beam] branch spark-runner_structured-streaming updated: Add SparkStructuredStreamingPipelineOptions and SparkCommonPipelineOptions - SparkStructuredStreamingPipelineOptions was added to have the new
This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new 29e95af Add SparkStructuredStreamingPipelineOptions and SparkCommonPipelineOptions - SparkStructuredStreamingPipelineOptions was added to have the new runner rely only on its specific options. 29e95af is described below commit 29e95af93fc0812f71ab37c2307b211aa489ce28 Author: Ismaël Mejía AuthorDate: Wed Apr 24 10:10:54 2019 +0200 Add SparkStructuredStreamingPipelineOptions and SparkCommonPipelineOptions - SparkStructuredStreamingPipelineOptions was added to have the new runner rely only on its specific options. - SparkCommonPipelineOptions is used to share options common to all Spark runners. --- ...ptions.java => SparkCommonPipelineOptions.java} | 98 +++--- .../beam/runners/spark/SparkPipelineOptions.java | 46 +- .../SparkStructuredStreamingPipelineOptions.java | 27 ++ .../SparkStructuredStreamingRunner.java| 21 ++--- .../translation/PipelineTranslator.java| 10 ++- .../translation/TranslationContext.java| 8 +- .../translation/batch/DatasetSourceBatch.java | 13 ++- .../translation/batch/PipelineTranslatorBatch.java | 4 +- .../streaming/DatasetSourceStreaming.java | 10 +-- .../streaming/PipelineTranslatorStreaming.java | 4 +- .../translation/batch/CombineTest.java | 3 +- .../translation/batch/ComplexSourceTest.java | 3 +- .../translation/batch/FlattenTest.java | 3 +- .../translation/batch/GroupByKeyTest.java | 3 +- .../translation/batch/ParDoTest.java | 3 +- .../translation/batch/SimpleSourceTest.java| 3 +- .../translation/batch/WindowAssignTest.java| 3 +- .../translation/streaming/SimpleSourceTest.java| 3 +- 18 files changed, 82 insertions(+), 183 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkCommonPipelineOptions.java similarity index 51% copy from runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java copy to runners/spark/src/main/java/org/apache/beam/runners/spark/SparkCommonPipelineOptions.java index 6935b54..b70bdea 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkCommonPipelineOptions.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.spark; import java.util.List; -import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; @@ -28,9 +27,9 @@ import org.apache.beam.sdk.options.StreamingOptions; /** * Spark runner {@link PipelineOptions} handles Spark execution-related configurations, such as the - * master address, batch-interval, and other user-related knobs. + * master address, and other user-related knobs. */ -public interface SparkPipelineOptions +public interface SparkCommonPipelineOptions extends PipelineOptions, StreamingOptions, ApplicationNameOptions { @Description("The url of the spark master to connect to, (e.g. spark://host:port, local[4]).") @@ -39,38 +38,6 @@ public interface SparkPipelineOptions void setSparkMaster(String master); - @Description("Batch interval for Spark streaming in milliseconds.") - @Default.Long(500) - Long getBatchIntervalMillis(); - - void setBatchIntervalMillis(Long batchInterval); - - @Description("Batch default storage level") - @Default.String("MEMORY_ONLY") - String getStorageLevel(); - - void setStorageLevel(String storageLevel); - - @Description("Minimum time to spend on read, for each micro-batch.") - @Default.Long(200) - Long getMinReadTimeMillis(); - - void setMinReadTimeMillis(Long minReadTimeMillis); - - @Description("Max records per micro-batch. For streaming sources only.") - @Default.Long(-1) - Long getMaxRecordsPerBatch(); - - void setMaxRecordsPerBatch(Long maxRecordsPerBatch); - - @Description( - "A value between 0-1 to describe the percentage of a micro-batch dedicated " - + "to reading from UnboundedSource.") - @Default.Double(0.1) - Double getReadTimePercentage(); - - void setReadTimePercentage(Double readTimePercentage); - @Description( "A checkpoint directory for streaming resilience, ignored in batch. " + "For durability, a reliable filesystem such as HDFS/S3/GS is necessary.") @@ -80,50 +47,6 @@ public interface SparkPipelineOptions void setCheckpointDir(String checkpointDir); /** - *