[3/3] incubator-beam git commit: [BEAM-1097] Provide a better error message for non-existing gcpTempLocation
[BEAM-1097] Provide a better error message for non-existing gcpTempLocation This closes #1522 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/acd2196c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/acd2196c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/acd2196c Branch: refs/heads/master Commit: acd2196cf54e30e18e69c4dd30b57e6179909ecf Parents: 2f4b803 96d3931 Author: Luke CwikAuthored: Tue Dec 20 13:04:31 2016 -0800 Committer: Luke Cwik Committed: Tue Dec 20 13:04:31 2016 -0800 -- .../beam/runners/dataflow/DataflowRunner.java | 25 ++ .../options/DataflowPipelineOptions.java| 19 .../runners/dataflow/DataflowRunnerTest.java| 48 ++-- .../options/DataflowPipelineOptionsTest.java| 20 +--- .../org/apache/beam/sdk/options/GcpOptions.java | 19 .../apache/beam/sdk/util/GcsPathValidator.java | 3 +- .../apache/beam/sdk/options/GcpOptionsTest.java | 32 +++-- .../beam/sdk/util/GcsPathValidatorTest.java | 15 +- 8 files changed, 117 insertions(+), 64 deletions(-) --
[2/3] incubator-beam git commit: Provide a better error message for non-existing gcpTempLocation
Provide a better error message for non-existing gcpTempLocation gcpTempLocation will default to using the value for tmpLocation, as long as the value is a valid GCP path. Non-valid GCP paths are silently discarded. This change removes existence validation from the default value logic such that downstream validation can provide a better error message. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ef1a8583 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ef1a8583 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ef1a8583 Branch: refs/heads/master Commit: ef1a858347e475cd15f7dcd8873464f506527b2a Parents: 2f4b803 Author: Scott WegnerAuthored: Tue Dec 6 14:19:12 2016 -0800 Committer: Luke Cwik Committed: Tue Dec 20 13:04:24 2016 -0800 -- .../beam/runners/dataflow/DataflowRunner.java | 25 .../options/DataflowPipelineOptions.java| 19 - .../runners/dataflow/DataflowRunnerTest.java| 42 +++- .../options/DataflowPipelineOptionsTest.java| 20 ++ .../org/apache/beam/sdk/options/GcpOptions.java | 19 + .../apache/beam/sdk/util/GcsPathValidator.java | 3 +- .../apache/beam/sdk/options/GcpOptionsTest.java | 32 +-- .../beam/sdk/util/GcsPathValidatorTest.java | 15 +-- 8 files changed, 114 insertions(+), 61 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1a8583/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java -- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 711b1b0..1a15eaf 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -244,14 +244,23 @@ public class DataflowRunner extends PipelineRunner { } PathValidator validator = dataflowOptions.getPathValidator(); -checkArgument( -!isNullOrEmpty(dataflowOptions.getGcpTempLocation()), -"DataflowRunner requires gcpTempLocation, and it is missing in PipelineOptions."); - validator.validateOutputFilePrefixSupported(dataflowOptions.getGcpTempLocation()); -checkArgument( -!isNullOrEmpty(dataflowOptions.getStagingLocation()), -"DataflowRunner requires stagingLocation, and it is missing in PipelineOptions."); - validator.validateOutputFilePrefixSupported(dataflowOptions.getStagingLocation()); +String gcpTempLocation; +try { + gcpTempLocation = dataflowOptions.getGcpTempLocation(); +} catch (Exception e) { + throw new IllegalArgumentException("DataflowRunner requires gcpTempLocation, " + + "but failed to retrieve a value from PipelineOptions", e); +} +validator.validateOutputFilePrefixSupported(gcpTempLocation); + +String stagingLocation; +try { + stagingLocation = dataflowOptions.getStagingLocation(); +} catch (Exception e) { + throw new IllegalArgumentException("DataflowRunner requires stagingLocation, " + + "but failed to retrieve a value from PipelineOptions", e); +} +validator.validateOutputFilePrefixSupported(stagingLocation); if (!Strings.isNullOrEmpty(dataflowOptions.getSaveProfilesToGcs())) { validator.validateOutputFilePrefixSupported(dataflowOptions.getSaveProfilesToGcs()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1a8583/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java -- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java index 66632ad..5ddc5d0 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java @@ -17,9 +17,6 @@ */ package org.apache.beam.runners.dataflow.options; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Strings.isNullOrEmpty; - import java.io.IOException; import
[1/3] incubator-beam git commit: Fixup usage of canonical name with name since canonical name != name for inner classes.
Repository: incubator-beam Updated Branches: refs/heads/master 2f4b80312 -> acd2196cf Fixup usage of canonical name with name since canonical name != name for inner classes. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/96d39314 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/96d39314 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/96d39314 Branch: refs/heads/master Commit: 96d393147c365d0911a091d0b3600fef621709f4 Parents: ef1a858 Author: Luke CwikAuthored: Tue Dec 20 11:47:42 2016 -0800 Committer: Luke Cwik Committed: Tue Dec 20 13:04:24 2016 -0800 -- .../org/apache/beam/runners/dataflow/DataflowRunnerTest.java | 8 1 file changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96d39314/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java -- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index b29c4cd..21d575a 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -267,7 +267,7 @@ public class DataflowRunnerTest { "--runner=DataflowRunner", "--tempLocation=/tmp/not/a/gs/path", "--project=test-project", -"--credentialFactoryClass=" + NoopCredentialFactory.class.getCanonicalName(), +"--credentialFactoryClass=" + NoopCredentialFactory.class.getName(), }; try { @@ -286,7 +286,7 @@ public class DataflowRunnerTest { "--runner=DataflowRunner", "--tempLocation=gs://does/not/exist", "--project=test-project", -"--credentialFactoryClass=" + NoopCredentialFactory.class.getCanonicalName(), +"--credentialFactoryClass=" + NoopCredentialFactory.class.getName(), }; try { @@ -306,8 +306,8 @@ public class DataflowRunnerTest { "--runner=DataflowRunner", "--tempLocation=/tmp/testing", "--project=test-project", -"--credentialFactoryClass=" + NoopCredentialFactory.class.getCanonicalName(), -"--pathValidatorClass=" + NoopPathValidator.class.getCanonicalName(), +"--credentialFactoryClass=" + NoopCredentialFactory.class.getName(), +"--pathValidatorClass=" + NoopPathValidator.class.getName(), }; // Should not crash, because gcpTempLocation should get set from tempLocation TestPipeline.fromOptions(PipelineOptionsFactory.fromArgs(args).create());
[1/2] incubator-beam git commit: [BEAM-59] initial interfaces and classes of Beam FileSystem.
Repository: incubator-beam Updated Branches: refs/heads/master 4f97efc11 -> 28d7913be [BEAM-59] initial interfaces and classes of Beam FileSystem. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/467f7d17 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/467f7d17 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/467f7d17 Branch: refs/heads/master Commit: 467f7d17c4c96bc57b0160c2d4768ceb303bc561 Parents: 4f97efc Author: Pei HeAuthored: Wed Dec 7 17:35:23 2016 -0800 Committer: Luke Cwik Committed: Mon Dec 19 15:20:37 2016 -0800 -- .../java/org/apache/beam/sdk/io/FileSystem.java | 29 .../apache/beam/sdk/io/FileSystemRegistrar.java | 49 ++ .../org/apache/beam/sdk/io/FileSystems.java | 155 +++ .../org/apache/beam/sdk/io/LocalFileSystem.java | 27 .../beam/sdk/io/LocalFileSystemRegistrar.java | 41 + .../org/apache/beam/sdk/io/FileSystemsTest.java | 104 + .../sdk/io/LocalFileSystemRegistrarTest.java| 44 ++ sdks/java/io/google-cloud-platform/pom.xml | 6 + .../beam/sdk/io/gcp/storage/GcsFileSystem.java | 34 .../io/gcp/storage/GcsFileSystemRegistrar.java | 42 + .../beam/sdk/io/gcp/storage/package-info.java | 21 +++ .../gcp/storage/GcsFileSystemRegistrarTest.java | 51 ++ sdks/java/io/hdfs/pom.xml | 6 + .../beam/sdk/io/hdfs/HadoopFileSystem.java | 29 .../sdk/io/hdfs/HadoopFileSystemRegistrar.java | 42 + .../io/hdfs/HadoopFileSystemRegistrarTest.java | 52 +++ 16 files changed, 732 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java new file mode 100644 index 000..d990403 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +/** + * File system interface in Beam. + * + * It defines APIs for writing file systems agnostic code. + * + * All methods are protected, and they are for file system providers to implement. + * Clients should use {@link FileSystems} utility. + */ +public abstract class FileSystem { +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/467f7d17/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java new file mode 100644 index 000..1d81c1e --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystemRegistrar.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io; + +import com.google.auto.service.AutoService; +import
[2/2] incubator-beam git commit: [BEAM-59] initial interfaces and classes of Beam FileSystem
[BEAM-59] initial interfaces and classes of Beam FileSystem This closes #1558 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/28d7913b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/28d7913b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/28d7913b Branch: refs/heads/master Commit: 28d7913be5d9bba9d4cb23187c59c9dfd3ab0cae Parents: 4f97efc 467f7d1 Author: Luke CwikAuthored: Mon Dec 19 15:21:02 2016 -0800 Committer: Luke Cwik Committed: Mon Dec 19 15:21:02 2016 -0800 -- .../java/org/apache/beam/sdk/io/FileSystem.java | 29 .../apache/beam/sdk/io/FileSystemRegistrar.java | 49 ++ .../org/apache/beam/sdk/io/FileSystems.java | 155 +++ .../org/apache/beam/sdk/io/LocalFileSystem.java | 27 .../beam/sdk/io/LocalFileSystemRegistrar.java | 41 + .../org/apache/beam/sdk/io/FileSystemsTest.java | 104 + .../sdk/io/LocalFileSystemRegistrarTest.java| 44 ++ sdks/java/io/google-cloud-platform/pom.xml | 6 + .../beam/sdk/io/gcp/storage/GcsFileSystem.java | 34 .../io/gcp/storage/GcsFileSystemRegistrar.java | 42 + .../beam/sdk/io/gcp/storage/package-info.java | 21 +++ .../gcp/storage/GcsFileSystemRegistrarTest.java | 51 ++ sdks/java/io/hdfs/pom.xml | 6 + .../beam/sdk/io/hdfs/HadoopFileSystem.java | 29 .../sdk/io/hdfs/HadoopFileSystemRegistrar.java | 42 + .../io/hdfs/HadoopFileSystemRegistrarTest.java | 52 +++ 16 files changed, 732 insertions(+) --
[2/3] incubator-beam git commit: Better comments and cleanup
Better comments and cleanup Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a0a1fea3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a0a1fea3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a0a1fea3 Branch: refs/heads/master Commit: a0a1fea366849dec8c5dba868a71a1ccdce902eb Parents: ae1d2a3 Author: Vikas KedigehalliAuthored: Tue Dec 13 14:16:16 2016 -0800 Committer: Luke Cwik Committed: Tue Dec 13 22:37:23 2016 -0800 -- .../sdk/options/PipelineOptionsFactory.java | 50 ++-- .../sdk/options/PipelineOptionsFactoryTest.java | 1 - 2 files changed, 26 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a1fea3/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java index 2d013fd..42e1092 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java @@ -1652,9 +1652,11 @@ public class PipelineOptionsFactory { return convertedOptions; } + /** - * Returns true if the given type is a SIMPLE_TYPES, enum or any of these types in a - * parameterized ValueProvider. + * Returns true if the given type is one of {@code SIMPLE_TYPES} or an enum, or if the given type + * is a {@link ValueProvider ValueProviderT} and {@code T} is one of {@code SIMPLE_TYPES} + * or an enum. */ private static boolean isSimpleType(Class type, JavaType genericType) { Class unwrappedType = type.equals(ValueProvider.class) @@ -1663,23 +1665,23 @@ public class PipelineOptionsFactory { } /** - * Returns true if the given type is a any Array or Collection of SIMPLE_TYPES or enum, or - * any of these types in a parameterized ValueProvider. + * Returns true if the given type is an array or {@link Collection} of {@code SIMPLE_TYPES} or + * enums, or if the given type is a {@link ValueProvider ValueProviderT} and {@code T} is + * an array or {@link Collection} of {@code SIMPLE_TYPES} or enums. */ private static boolean isCollectionOrArrayOfAllowedTypes(Class type, JavaType genericType) { -Class containerType = type.equals(ValueProvider.class) -? genericType.containedType(0).getRawClass() : type; +JavaType containerType = type.equals(ValueProvider.class) +? genericType.containedType(0) : genericType; // Check if it is an array of simple types or enum. -if (containerType.isArray() && (SIMPLE_TYPES.contains(containerType.getComponentType()) -|| containerType.getComponentType().isEnum())) { +if (containerType.getRawClass().isArray() +&& (SIMPLE_TYPES.contains(containerType.getRawClass().getComponentType()) +|| containerType.getRawClass().getComponentType().isEnum())) { return true; } // Check if it is Collection of simple types or enum. -if (Collection.class.isAssignableFrom(containerType)) { - JavaType innerType = type.equals(ValueProvider.class) - ? genericType.containedType(0).containedType(0) - : genericType.containedType(0); +if (Collection.class.isAssignableFrom(containerType.getRawClass())) { + JavaType innerType = containerType.containedType(0); // Note that raw types are allowed, hence the null check. if (innerType == null || SIMPLE_TYPES.contains(innerType.getRawClass()) || innerType.getRawClass().isEnum()) { @@ -1692,8 +1694,10 @@ public class PipelineOptionsFactory { /** * Ensures that empty string value is allowed for a given type. * - * Empty strings are only allowed for String, String Array, Collection of Strings or any of - * these types in a parameterized ValueProvider. + * Empty strings are only allowed for {@link String}, {@link String String[]}, + * {@link Collection CollectionString}, or {@link ValueProvider ValueProviderT} + * and {@code T} is of type {@link String}, {@link String String[]}, + * {@link Collection CollectionString}. * * @param type class object for the type under check. * @param genericType complete type information for the type under check. @@ -1701,16 +1705,14 @@ public class PipelineOptionsFactory { */ private static void checkEmptyStringAllowed(Class type, JavaType genericType, String genericTypeName) { -Class unwrappedType = type.equals(ValueProvider.class) -?
[3/3] incubator-beam git commit: [BEAM-1136, BEAM-1137] Allow empty string value for ValueProvider types.
[BEAM-1136, BEAM-1137] Allow empty string value for ValueProvider types. This closes #1580 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5a51ace8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5a51ace8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5a51ace8 Branch: refs/heads/master Commit: 5a51ace8d44f7e8b75566c806796a31c9bf4f7e7 Parents: f516627 a0a1fea Author: Luke CwikAuthored: Tue Dec 13 22:37:55 2016 -0800 Committer: Luke Cwik Committed: Tue Dec 13 22:37:55 2016 -0800 -- .../sdk/options/PipelineOptionsFactory.java | 110 ++--- .../sdk/options/PipelineOptionsFactoryTest.java | 222 --- 2 files changed, 279 insertions(+), 53 deletions(-) --
incubator-beam git commit: fixup! Fix extraneous brace
Repository: incubator-beam Updated Branches: refs/heads/master 5169e4925 -> 9bab78b55 fixup! Fix extraneous brace Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9bab78b5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9bab78b5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9bab78b5 Branch: refs/heads/master Commit: 9bab78b55fe4661e9a221969441d8a2a4716f7e1 Parents: 5169e49 Author: Luke CwikAuthored: Thu Dec 8 19:20:22 2016 -0800 Committer: Luke Cwik Committed: Thu Dec 8 19:20:22 2016 -0800 -- .../src/test/java/org/apache/beam/sdk/io/CountingInputTest.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9bab78b5/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java index 063bead..dfc4919 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java @@ -71,7 +71,7 @@ public class CountingInputTest { public void testBoundedInput() { Pipeline p = TestPipeline.create(); long numElements = 1000; -PCollection input = p.apply(CountingInput.upTo(numElements))); +PCollection input = p.apply(CountingInput.upTo(numElements)); addCountingAsserts(input, 0, numElements); p.run();
incubator-beam git commit: fixup! Fix CountingInput naming
Repository: incubator-beam Updated Branches: refs/heads/master ddb59125a -> 5169e4925 fixup! Fix CountingInput naming Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5169e492 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5169e492 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5169e492 Branch: refs/heads/master Commit: 5169e492578a3759e20b50e36ace61bc86636ad2 Parents: ddb5912 Author: Luke CwikAuthored: Thu Dec 8 19:17:21 2016 -0800 Committer: Luke Cwik Committed: Thu Dec 8 19:17:21 2016 -0800 -- .../test/java/org/apache/beam/sdk/io/CountingInputTest.java| 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5169e492/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java index 4349f66..063bead 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java @@ -71,7 +71,7 @@ public class CountingInputTest { public void testBoundedInput() { Pipeline p = TestPipeline.create(); long numElements = 1000; -PCollection input = p.apply(Read.from(CountingSource.upTo(numElements))); +PCollection input = p.apply(CountingInput.upTo(numElements))); addCountingAsserts(input, 0, numElements); p.run(); @@ -79,7 +79,7 @@ public class CountingInputTest { @Test @Category(RunnableOnService.class) - public void testEmptyBoundedSource() { + public void testEmptyBoundedInput() { Pipeline p = TestPipeline.create(); PCollection input = p.apply(CountingInput.upTo(0)); @@ -89,7 +89,7 @@ public class CountingInputTest { @Test @Category(RunnableOnService.class) - public void testEmptyBoundedSourceUsingRange() { + public void testEmptyBoundedInputSubrange() { Pipeline p = TestPipeline.create(); PCollection input = p.apply(CountingInput.forSubrange(42, 42));
[2/2] incubator-beam git commit: Add support for having an empty CountingInput/CountingSource
Add support for having an empty CountingInput/CountingSource This closes #1557 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ddb59125 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ddb59125 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ddb59125 Branch: refs/heads/master Commit: ddb59125aeacb809b7695c203fe8b1a40e36aed2 Parents: 40bd276 30ff1ee Author: Luke CwikAuthored: Thu Dec 8 18:41:42 2016 -0800 Committer: Luke Cwik Committed: Thu Dec 8 18:41:42 2016 -0800 -- .../org/apache/beam/sdk/io/CountingInput.java | 12 ++ .../org/apache/beam/sdk/io/CountingSource.java | 12 ++ .../apache/beam/sdk/io/CountingInputTest.java | 23 +++- .../apache/beam/sdk/io/CountingSourceTest.java | 10 + 4 files changed, 48 insertions(+), 9 deletions(-) --
[1/2] incubator-beam git commit: Add support for having an empty CountingInput/CountingSource
Repository: incubator-beam Updated Branches: refs/heads/master 40bd27602 -> ddb59125a Add support for having an empty CountingInput/CountingSource Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/30ff1ee1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/30ff1ee1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/30ff1ee1 Branch: refs/heads/master Commit: 30ff1ee17bb290f2b50fd082d8cb63d48280c5c2 Parents: 40bd276 Author: Luke CwikAuthored: Thu Dec 8 15:22:35 2016 -0800 Committer: Luke Cwik Committed: Thu Dec 8 18:41:17 2016 -0800 -- .../org/apache/beam/sdk/io/CountingInput.java | 12 ++ .../org/apache/beam/sdk/io/CountingSource.java | 12 ++ .../apache/beam/sdk/io/CountingInputTest.java | 23 +++- .../apache/beam/sdk/io/CountingSourceTest.java | 10 + 4 files changed, 48 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30ff1ee1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java index 3148d8d..ac70aca 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java @@ -75,17 +75,21 @@ public class CountingInput { * from {@code 0} to {@code numElements - 1}. */ public static BoundedCountingInput upTo(long numElements) { -checkArgument(numElements > 0, "numElements (%s) must be greater than 0", numElements); +checkArgument(numElements >= 0, +"numElements (%s) must be greater than or equal to 0", +numElements); return new BoundedCountingInput(numElements); } /** * Creates a {@link BoundedCountingInput} that will produce elements - * starting from {@code startIndex} to {@code endIndex - 1}. + * starting from {@code startIndex} (inclusive) to {@code endIndex} (exclusive). + * If {@code startIndex == endIndex}, then no elements will be produced. */ public static BoundedCountingInput forSubrange(long startIndex, long endIndex) { -checkArgument(endIndex > startIndex, "endIndex (%s) must be greater than startIndex (%s)", -endIndex, startIndex); +checkArgument(endIndex >= startIndex, +"endIndex (%s) must be greater than or equal to startIndex (%s)", +endIndex, startIndex); return new BoundedCountingInput(startIndex, endIndex); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30ff1ee1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java index bc7fb78..9752dba 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java @@ -78,17 +78,21 @@ public class CountingSource { */ @Deprecated public static BoundedSource upTo(long numElements) { -checkArgument(numElements > 0, "numElements (%s) must be greater than 0", numElements); +checkArgument(numElements >= 0, +"numElements (%s) must be greater than or equal to 0", +numElements); return new BoundedCountingSource(0, numElements); } /** * Creates a {@link BoundedSource} that will produce elements - * from {@code startIndex} to {@code endIndex - 1}. + * starting from {@code startIndex} (inclusive) to {@code endIndex} (exclusive). + * If {@code startIndex == endIndex}, then no elements will be produced. */ static BoundedSource createSourceForSubrange(long startIndex, long endIndex) { -checkArgument(endIndex > startIndex, "endIndex (%s) must be greater than startIndex (%s)", -endIndex, startIndex); +checkArgument(endIndex >= startIndex, +"endIndex (%s) must be greater than or equal to startIndex (%s)", +endIndex, startIndex); return new BoundedCountingSource(startIndex, endIndex); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30ff1ee1/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java index
[2/2] incubator-beam git commit: [BEAM-498] Remove misc occurrences of OldDoFn
[BEAM-498] Remove misc occurrences of OldDoFn Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/04a41ee5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/04a41ee5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/04a41ee5 Branch: refs/heads/master Commit: 04a41ee54ced13e7d896b7d31ffa99a3273af2dd Parents: 92ff63d 44e17d1 Author: Luke CwikAuthored: Thu Dec 8 09:04:27 2016 -0800 Committer: Luke Cwik Committed: Thu Dec 8 09:04:27 2016 -0800 -- .../beam/sdk/AggregatorPipelineExtractor.java | 5 ++-- .../sdk/transforms/AggregatorRetriever.java | 2 +- .../org/apache/beam/sdk/transforms/Combine.java | 4 +-- .../apache/beam/sdk/util/ExecutionContext.java | 8 +++--- .../sdk/AggregatorPipelineExtractorTest.java| 20 +++--- .../beam/sdk/transforms/DoFnTesterTest.java | 2 +- .../beam/sdk/transforms/ParDoLifecycleTest.java | 28 ++-- 7 files changed, 35 insertions(+), 34 deletions(-) --
[1/2] incubator-beam git commit: Remove misc occurrences of OldDoFn
Repository: incubator-beam Updated Branches: refs/heads/master 92ff63d3b -> 04a41ee54 Remove misc occurrences of OldDoFn Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/44e17d1c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/44e17d1c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/44e17d1c Branch: refs/heads/master Commit: 44e17d1c97babd487584cc78690505bdf57704b2 Parents: 92ff63d Author: Kenneth KnowlesAuthored: Wed Dec 7 14:17:01 2016 -0800 Committer: Luke Cwik Committed: Thu Dec 8 09:03:59 2016 -0800 -- .../beam/sdk/AggregatorPipelineExtractor.java | 5 ++-- .../sdk/transforms/AggregatorRetriever.java | 2 +- .../org/apache/beam/sdk/transforms/Combine.java | 4 +-- .../apache/beam/sdk/util/ExecutionContext.java | 8 +++--- .../sdk/AggregatorPipelineExtractorTest.java| 20 +++--- .../beam/sdk/transforms/DoFnTesterTest.java | 2 +- .../beam/sdk/transforms/ParDoLifecycleTest.java | 28 ++-- 7 files changed, 35 insertions(+), 34 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java index d2130d0..ade5978 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java @@ -70,9 +70,10 @@ class AggregatorPipelineExtractor { private Collection getAggregators(PTransform transform) { if (transform != null) { if (transform instanceof ParDo.Bound) { - return AggregatorRetriever.getAggregators(((ParDo.Bound) transform).getFn()); + return AggregatorRetriever.getAggregators(((ParDo.Bound) transform).getNewFn()); } else if (transform instanceof ParDo.BoundMulti) { - return AggregatorRetriever.getAggregators(((ParDo.BoundMulti) transform).getFn()); + return AggregatorRetriever.getAggregators( + ((ParDo.BoundMulti) transform).getNewFn()); } } return Collections.emptyList(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java index abed843..ce47e22 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java @@ -30,7 +30,7 @@ public final class AggregatorRetriever { /** * Returns the {@link Aggregator Aggregators} created by the provided {@link OldDoFn}. */ - public static Collection getAggregators(OldDoFn fn) { + public static Collection getAggregators(DoFn fn) { return fn.getAggregators(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index be063e2..4127d94 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -2392,8 +2392,8 @@ public class Combine { PCollection>> input) { PCollection > output = input.apply(ParDo.of( - new OldDoFn , KV >() { -@Override + new DoFn , KV >() { +@ProcessElement public void processElement(final ProcessContext c) { K key = c.element().getKey(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java
[2/2] incubator-beam git commit: Add experimental warning to datastoreio
Add experimental warning to datastoreio This closes #1508 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f7118c8a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f7118c8a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f7118c8a Branch: refs/heads/python-sdk Commit: f7118c8a592a27c8eedb1dced7cbf85a4084c390 Parents: 0d99856 f3dcf6c Author: Luke CwikAuthored: Mon Dec 5 11:39:09 2016 -0800 Committer: Luke Cwik Committed: Mon Dec 5 11:39:09 2016 -0800 -- sdks/python/apache_beam/io/datastore/v1/datastoreio.py | 2 ++ 1 file changed, 2 insertions(+) --
[1/2] incubator-beam git commit: Add experimental warning to datastoreio
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 0d99856f3 -> f7118c8a5 Add experimental warning to datastoreio Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f3dcf6c2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f3dcf6c2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f3dcf6c2 Branch: refs/heads/python-sdk Commit: f3dcf6c2dd32e17998d9e185a326fb725abccf31 Parents: 0d99856 Author: Vikas KedigehalliAuthored: Mon Dec 5 11:23:53 2016 -0800 Committer: Vikas Kedigehalli Committed: Mon Dec 5 11:28:52 2016 -0800 -- sdks/python/apache_beam/io/datastore/v1/datastoreio.py | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f3dcf6c2/sdks/python/apache_beam/io/datastore/v1/datastoreio.py -- diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py index 20466b9..fc3e813 100644 --- a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py +++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py @@ -86,6 +86,7 @@ class ReadFromDatastore(PTransform): namespace: An optional namespace. num_splits: Number of splits for the query. """ +logging.warning('datastoreio read transform is experimental.') super(ReadFromDatastore, self).__init__() if not project: @@ -309,6 +310,7 @@ class _Mutate(PTransform): """ self._project = project self._mutation_fn = mutation_fn +logging.warning('datastoreio write transform is experimental.') def apply(self, pcoll): return (pcoll
[2/2] incubator-beam git commit: Use more natural class to find class loader in ReflectHelpers
Use more natural class to find class loader in ReflectHelpers This closes #1427 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5f3e7787 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5f3e7787 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5f3e7787 Branch: refs/heads/master Commit: 5f3e7787d7e724b827af8924b2773ed3b5c2b036 Parents: 3ae9425 09986e9 Author: Luke CwikAuthored: Wed Nov 23 07:42:13 2016 -0800 Committer: Luke Cwik Committed: Wed Nov 23 07:42:13 2016 -0800 -- .../main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) --
[1/2] incubator-beam git commit: Use more natural class to find class loader in ReflectHelpers
Repository: incubator-beam Updated Branches: refs/heads/master 3ae9425b3 -> 5f3e7787d Use more natural class to find class loader in ReflectHelpers Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/09986e94 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/09986e94 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/09986e94 Branch: refs/heads/master Commit: 09986e9433d49812d5061fe6543dff90d78eba6a Parents: 3ae9425 Author: Kenneth KnowlesAuthored: Tue Nov 22 22:16:29 2016 -0800 Committer: Luke Cwik Committed: Wed Nov 23 07:41:40 2016 -0800 -- .../main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09986e94/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java index 637e8e3..4ec39c1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java @@ -39,7 +39,6 @@ import java.util.Queue; import java.util.ServiceLoader; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import org.apache.beam.sdk.util.IOChannelUtils; /** * Utilities for working with with {@link Class Classes} and {@link Method Methods}. @@ -225,7 +224,7 @@ public class ReflectHelpers { public static ClassLoader findClassLoader() { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); if (classLoader == null) { - classLoader = IOChannelUtils.class.getClassLoader(); + classLoader = ReflectHelpers.class.getClassLoader(); } if (classLoader == null) { classLoader = ClassLoader.getSystemClassLoader();
[3/4] incubator-beam git commit: [BEAM-59] Use ServiceLoader to register IOChannelFactories in IOChannelUtils.
[BEAM-59] Use ServiceLoader to register IOChannelFactories in IOChannelUtils. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fa417f9c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fa417f9c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fa417f9c Branch: refs/heads/master Commit: fa417f9c2c671626eba3326e82d47741000ec64d Parents: cd1a5e7 Author: Pei HeAuthored: Mon Oct 31 18:02:49 2016 -0700 Committer: Luke Cwik Committed: Tue Nov 22 06:18:55 2016 -0800 -- .../beam/runners/dataflow/DataflowRunner.java | 2 +- .../options/DataflowPipelineOptionsTest.java| 6 +- .../runners/dataflow/util/PackageUtilTest.java | 2 +- .../sdk/options/PipelineOptionsFactory.java | 32 + .../apache/beam/sdk/runners/PipelineRunner.java | 2 +- .../apache/beam/sdk/testing/TestPipeline.java | 2 +- .../beam/sdk/util/FileIOChannelFactory.java | 10 +- .../sdk/util/IOChannelFactoryRegistrar.java | 11 +- .../apache/beam/sdk/util/IOChannelUtils.java| 133 ++- .../beam/sdk/util/common/ReflectHelpers.java| 29 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 2 +- .../apache/beam/sdk/io/FileBasedSourceTest.java | 2 +- .../java/org/apache/beam/sdk/io/TextIOTest.java | 2 +- .../sdk/options/PipelineOptionsFactoryTest.java | 34 - .../util/FileIOChannelFactoryRegistrarTest.java | 4 +- .../beam/sdk/util/FileIOChannelFactoryTest.java | 2 +- .../util/GcsIOChannelFactoryRegistrarTest.java | 4 +- .../beam/sdk/util/IOChannelUtilsTest.java | 39 ++ .../sdk/util/common/ReflectHelpersTest.java | 33 + .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 6 +- 20 files changed, 259 insertions(+), 98 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java -- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 841b13f..36328e9 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -240,7 +240,7 @@ public class DataflowRunner extends PipelineRunner { */ public static DataflowRunner fromOptions(PipelineOptions options) { // (Re-)register standard IO factories. Clobbers any prior credentials. -IOChannelUtils.registerStandardIOFactories(options); +IOChannelUtils.registerIOFactoriesAllowOverride(options); DataflowPipelineOptions dataflowOptions = PipelineOptionsValidator.validate(DataflowPipelineOptions.class, options); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa417f9c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java -- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java index 202d04b..52082e0 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java @@ -126,7 +126,7 @@ public class DataflowPipelineOptionsTest { @Test public void testStagingLocation() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); -IOChannelUtils.registerStandardIOFactories(options); +IOChannelUtils.registerIOFactoriesAllowOverride(options); options.setTempLocation("file://temp_location"); options.setStagingLocation("gs://staging_location"); assertTrue(isNullOrEmpty(options.getGcpTempLocation())); @@ -136,7 +136,7 @@ public class DataflowPipelineOptionsTest { @Test public void testDefaultToTempLocation() { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); -IOChannelUtils.registerStandardIOFactories(options); +IOChannelUtils.registerIOFactoriesAllowOverride(options); options.setPathValidatorClass(NoopPathValidator.class); options.setTempLocation("gs://temp_location");
[4/4] incubator-beam git commit: [BEAM-952] Use ServiceLoader to register IOChannelFactories.
[BEAM-952] Use ServiceLoader to register IOChannelFactories. This closes #1255 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e53d6d45 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e53d6d45 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e53d6d45 Branch: refs/heads/master Commit: e53d6d45895aee6c61fd8060b20751875352a4ee Parents: 249dbc0 fa417f9 Author: Luke CwikAuthored: Tue Nov 22 06:19:30 2016 -0800 Committer: Luke Cwik Committed: Tue Nov 22 06:19:30 2016 -0800 -- .../beam/runners/dataflow/DataflowRunner.java | 2 +- .../options/DataflowPipelineOptionsTest.java| 6 +- .../runners/dataflow/util/PackageUtilTest.java | 2 +- .../sdk/options/PipelineOptionsFactory.java | 32 + .../apache/beam/sdk/runners/PipelineRunner.java | 2 +- .../apache/beam/sdk/testing/TestPipeline.java | 2 +- .../beam/sdk/util/FileIOChannelFactory.java | 11 ++ .../sdk/util/FileIOChannelFactoryRegistrar.java | 38 ++ .../beam/sdk/util/GcsIOChannelFactory.java | 10 +- .../sdk/util/GcsIOChannelFactoryRegistrar.java | 38 ++ .../sdk/util/IOChannelFactoryRegistrar.java | 48 +++ .../apache/beam/sdk/util/IOChannelUtils.java| 134 ++- .../beam/sdk/util/common/ReflectHelpers.java| 29 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 2 +- .../apache/beam/sdk/io/FileBasedSourceTest.java | 2 +- .../java/org/apache/beam/sdk/io/TextIOTest.java | 2 +- .../sdk/options/PipelineOptionsFactoryTest.java | 34 - .../util/FileIOChannelFactoryRegistrarTest.java | 44 ++ .../beam/sdk/util/FileIOChannelFactoryTest.java | 2 +- .../util/GcsIOChannelFactoryRegistrarTest.java | 44 ++ .../beam/sdk/util/GcsIOChannelFactoryTest.java | 2 +- .../beam/sdk/util/IOChannelUtilsTest.java | 39 ++ .../sdk/util/common/ReflectHelpersTest.java | 33 + .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 6 +- 24 files changed, 478 insertions(+), 86 deletions(-) --
[2/4] incubator-beam git commit: [BEAM-59] Drops public constructors and uses Factory methods in Gcs/File/IOChannelFactory.
[BEAM-59] Drops public constructors and uses Factory methods in Gcs/File/IOChannelFactory. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e6fa2ff2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e6fa2ff2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e6fa2ff2 Branch: refs/heads/master Commit: e6fa2ff26e836848fa549c290ed098dd019cc4e1 Parents: 249dbc0 Author: Pei HeAuthored: Mon Oct 31 17:58:31 2016 -0700 Committer: Luke Cwik Committed: Tue Nov 22 06:18:54 2016 -0800 -- .../apache/beam/sdk/util/FileIOChannelFactory.java | 17 + .../apache/beam/sdk/util/GcsIOChannelFactory.java | 10 +- .../org/apache/beam/sdk/util/IOChannelUtils.java | 7 +++ .../beam/sdk/util/FileIOChannelFactoryTest.java| 2 +- .../beam/sdk/util/GcsIOChannelFactoryTest.java | 2 +- 5 files changed, 31 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e6fa2ff2/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java index 0eefb77..13591a3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java @@ -44,6 +44,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.regex.Matcher; +import org.apache.beam.sdk.options.PipelineOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +54,22 @@ import org.slf4j.LoggerFactory; public class FileIOChannelFactory implements IOChannelFactory { private static final Logger LOG = LoggerFactory.getLogger(FileIOChannelFactory.class); + /** + * Create a {@link FileIOChannelFactory} with the given {@link PipelineOptions}. + */ + public static FileIOChannelFactory fromOptions(PipelineOptions options) { +return create(); + } + + /** + * Create a {@link FileIOChannelFactory}. + */ + public static FileIOChannelFactory create() { +return new FileIOChannelFactory(); + } + + private FileIOChannelFactory() {} + /** * Converts the given file spec to a java {@link File}. If {@code spec} is actually a URI with * the {@code file} scheme, then this function will ensure that the returned {@link File} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e6fa2ff2/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java index bd2ec4e..9f99cd6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; import org.apache.beam.sdk.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.gcsfs.GcsPath; /** @@ -32,9 +33,16 @@ import org.apache.beam.sdk.util.gcsfs.GcsPath; */ public class GcsIOChannelFactory implements IOChannelFactory { + /** + * Create a {@link GcsIOChannelFactory} with the given {@link PipelineOptions}. + */ + public static GcsIOChannelFactory fromOptions(PipelineOptions options) { +return new GcsIOChannelFactory(options.as(GcsOptions.class)); + } + private final GcsOptions options; - public GcsIOChannelFactory(GcsOptions options) { + private GcsIOChannelFactory(GcsOptions options) { this.options = options; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e6fa2ff2/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java index 16a6e95..d221fa9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java @@ -29,7 +29,6 @@ import java.util.Map; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.beam.sdk.options.GcsOptions; import
[1/4] incubator-beam git commit: [BEAM-59] Create IOChannelFactoryRegistrar interface and its gcs/file implementations.
Repository: incubator-beam Updated Branches: refs/heads/master 249dbc045 -> e53d6d458 [BEAM-59] Create IOChannelFactoryRegistrar interface and its gcs/file implementations. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cd1a5e7e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cd1a5e7e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cd1a5e7e Branch: refs/heads/master Commit: cd1a5e7e30a3bd46f822d371333afa975fc7e4af Parents: e6fa2ff Author: Pei HeAuthored: Mon Oct 31 18:01:41 2016 -0700 Committer: Luke Cwik Committed: Tue Nov 22 06:18:54 2016 -0800 -- .../sdk/util/FileIOChannelFactoryRegistrar.java | 38 + .../sdk/util/GcsIOChannelFactoryRegistrar.java | 38 + .../sdk/util/IOChannelFactoryRegistrar.java | 43 +++ .../util/FileIOChannelFactoryRegistrarTest.java | 44 .../util/GcsIOChannelFactoryRegistrarTest.java | 44 5 files changed, 207 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd1a5e7e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrar.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrar.java new file mode 100644 index 000..acc0222 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactoryRegistrar.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * {@link AutoService} registrar for the {@link FileIOChannelFactory}. + */ +@AutoService(IOChannelFactoryRegistrar.class) +public class FileIOChannelFactoryRegistrar implements IOChannelFactoryRegistrar { + + @Override + public IOChannelFactory fromOptions(PipelineOptions options) { +return FileIOChannelFactory.fromOptions(options); + } + + @Override + public String getScheme() { +return "file"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd1a5e7e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java new file mode 100644 index 000..b4c457f --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactoryRegistrar.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * {@link AutoService} registrar for the {@link GcsIOChannelFactory}. + */ +@AutoService(IOChannelFactoryRegistrar.class) +public class GcsIOChannelFactoryRegistrar implements
[4/4] incubator-beam git commit: [BEAM-1010] A few improvements to Apache Beam Python's FileIO.
[BEAM-1010] A few improvements to Apache Beam Python's FileIO. This closes #1392 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8e88c7b0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8e88c7b0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8e88c7b0 Branch: refs/heads/python-sdk Commit: 8e88c7b035e76c6e15d03a79f9751c6e53786859 Parents: c1440f7 6aa50c1 Author: Luke CwikAuthored: Mon Nov 21 11:31:42 2016 -0800 Committer: Luke Cwik Committed: Mon Nov 21 11:31:42 2016 -0800 -- sdks/python/apache_beam/io/fileio.py | 14 +++- sdks/python/apache_beam/io/fileio_test.py | 48 +++--- sdks/python/apache_beam/io/textio.py | 6 +--- sdks/python/apache_beam/io/textio_test.py | 26 ++ 4 files changed, 84 insertions(+), 10 deletions(-) --
[2/4] incubator-beam git commit: Handling the 'collision' case for UIDs and also augmenting comments.
Handling the 'collision' case for UIDs and also augmenting comments. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9f9c9865 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9f9c9865 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9f9c9865 Branch: refs/heads/python-sdk Commit: 9f9c986500f769ebeb0baaf32f1a86334a36eec5 Parents: e85f67a Author: Gus KatsiapisAuthored: Sat Nov 19 15:22:47 2016 -0800 Committer: Luke Cwik Committed: Mon Nov 21 11:29:07 2016 -0800 -- sdks/python/apache_beam/io/fileio.py | 6 +- 1 file changed, 5 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f9c9865/sdks/python/apache_beam/io/fileio.py -- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 1dcd622..cb7f25c 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -862,7 +862,11 @@ class FileSink(iobase.Sink): def open_writer(self, init_result, uid): # A proper suffix is needed for AUTO compression detection. -suffix = os.path.basename(self.file_path_prefix) + self.file_name_suffix +# We also ensure there will be no collisions with uid and a +# (possibly unsharded) file_path_prefix and a (possibly empty) +# file_name_suffix. +suffix = ( + '.' + os.path.basename(self.file_path_prefix) + self.file_name_suffix) return FileSinkWriter(self, os.path.join(init_result, uid) + suffix) def finalize_write(self, init_result, writer_results):
[3/4] incubator-beam git commit: A few improvements to Apache Beam Python's FileIO.
A few improvements to Apache Beam Python's FileIO. - Ensuring that AUTO compression works properly for FileSinks. - Introducing __enter__ and __exit__ in _CompressedFile to allow use of "with", and updating textio accordingly. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e85f67a1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e85f67a1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e85f67a1 Branch: refs/heads/python-sdk Commit: e85f67a1a467a26259a849bd20c42e89f165828e Parents: c1440f7 Author: Gus KatsiapisAuthored: Fri Nov 18 18:31:20 2016 -0800 Committer: Luke Cwik Committed: Mon Nov 21 11:29:07 2016 -0800 -- sdks/python/apache_beam/io/fileio.py | 10 +- sdks/python/apache_beam/io/fileio_test.py | 48 +++--- sdks/python/apache_beam/io/textio.py | 6 +--- sdks/python/apache_beam/io/textio_test.py | 26 ++ 4 files changed, 80 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e85f67a1/sdks/python/apache_beam/io/fileio.py -- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 4d0eea6..1dcd622 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -749,6 +749,12 @@ class _CompressedFile(object): def seekable(self): return False + def __enter__(self): +return self + + def __exit__(self, exception_type, exception_value, traceback): +self.close() + class FileSink(iobase.Sink): """A sink to a GCS or local files. @@ -855,7 +861,9 @@ class FileSink(iobase.Sink): return tmp_dir def open_writer(self, init_result, uid): -return FileSinkWriter(self, os.path.join(init_result, uid)) +# A proper suffix is needed for AUTO compression detection. +suffix = os.path.basename(self.file_path_prefix) + self.file_name_suffix +return FileSinkWriter(self, os.path.join(init_result, uid) + suffix) def finalize_write(self, init_result, writer_results): writer_results = sorted(writer_results) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e85f67a1/sdks/python/apache_beam/io/fileio_test.py -- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index 9d1e424..098ace1 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -38,10 +38,7 @@ from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher # TODO: Add tests for file patterns (ie not just individual files) for both -# uncompressed - -# TODO: Update code to not use NamedTemporaryFile (or to use it in a way that -# doesn't violate its assumptions). +# compressed and uncompressed files. class TestTextFileSource(unittest.TestCase): @@ -721,6 +718,49 @@ class TestNativeTextFileSink(unittest.TestCase): with bz2.BZ2File(self.path, 'r') as f: self.assertEqual(f.read().splitlines(), []) + def test_write_dataflow(self): +pipeline = beam.Pipeline('DirectPipelineRunner') +pcoll = pipeline | beam.core.Create('Create', self.lines) +pcoll | 'Write' >> beam.Write(fileio.NativeTextFileSink(self.path)) # pylint: disable=expression-not-assigned +pipeline.run() + +read_result = [] +for file_name in glob.glob(self.path + '*'): + with open(file_name, 'r') as f: +read_result.extend(f.read().splitlines()) + +self.assertEqual(read_result, self.lines) + + def test_write_dataflow_auto_compression(self): +pipeline = beam.Pipeline('DirectPipelineRunner') +pcoll = pipeline | beam.core.Create('Create', self.lines) +pcoll | 'Write' >> beam.Write( # pylint: disable=expression-not-assigned +fileio.NativeTextFileSink( +self.path, file_name_suffix='.gz')) +pipeline.run() + +read_result = [] +for file_name in glob.glob(self.path + '*'): + with gzip.GzipFile(file_name, 'r') as f: +read_result.extend(f.read().splitlines()) + +self.assertEqual(read_result, self.lines) + + def test_write_dataflow_auto_compression_unsharded(self): +pipeline = beam.Pipeline('DirectPipelineRunner') +pcoll = pipeline | beam.core.Create('Create', self.lines) +pcoll | 'Write' >> beam.Write( # pylint: disable=expression-not-assigned +fileio.NativeTextFileSink( +self.path + '.gz', shard_name_template='')) +pipeline.run() + +read_result = [] +for file_name in glob.glob(self.path + '*'): + with
[1/4] incubator-beam git commit: Fixing lynt warnings related to indentation.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk c1440f7aa -> 8e88c7b03 Fixing lynt warnings related to indentation. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6aa50c1d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6aa50c1d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6aa50c1d Branch: refs/heads/python-sdk Commit: 6aa50c1d5249ac70c37196a9957874aaf26dd54d Parents: 9f9c986 Author: Gus KatsiapisAuthored: Sun Nov 20 12:33:16 2016 -0800 Committer: Luke Cwik Committed: Mon Nov 21 11:29:07 2016 -0800 -- sdks/python/apache_beam/io/fileio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6aa50c1d/sdks/python/apache_beam/io/fileio.py -- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index cb7f25c..30044c3 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -866,7 +866,7 @@ class FileSink(iobase.Sink): # (possibly unsharded) file_path_prefix and a (possibly empty) # file_name_suffix. suffix = ( - '.' + os.path.basename(self.file_path_prefix) + self.file_name_suffix) +'.' + os.path.basename(self.file_path_prefix) + self.file_name_suffix) return FileSinkWriter(self, os.path.join(init_result, uid) + suffix) def finalize_write(self, init_result, writer_results):
[2/2] incubator-beam git commit: Fixes a few "used but undeclared" dependency problems in starter archetype
Fixes a few "used but undeclared" dependency problems in starter archetype This closes #1367 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c4089ee7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c4089ee7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c4089ee7 Branch: refs/heads/master Commit: c4089ee7752ef6b5866bd0e124d6f8aeec5b547a Parents: abd9fb3 cbb360f Author: Luke CwikAuthored: Thu Nov 17 05:01:44 2016 -0800 Committer: Luke Cwik Committed: Thu Nov 17 05:01:44 2016 -0800 -- sdks/java/maven-archetypes/examples/pom.xml | 42 sdks/java/maven-archetypes/starter/pom.xml | 6 2 files changed, 48 insertions(+) --
[1/2] incubator-beam git commit: Adds used but undeclared dependencies to archetype
Repository: incubator-beam Updated Branches: refs/heads/master abd9fb3d7 -> c4089ee77 Adds used but undeclared dependencies to archetype Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cbb360fe Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cbb360fe Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cbb360fe Branch: refs/heads/master Commit: cbb360fef3f36e2646e7c6751df308035416f78e Parents: 2011102 Author: Eugene KirpichovAuthored: Tue Nov 15 16:01:42 2016 -0800 Committer: Eugene Kirpichov Committed: Wed Nov 16 13:41:52 2016 -0800 -- sdks/java/maven-archetypes/examples/pom.xml | 42 sdks/java/maven-archetypes/starter/pom.xml | 6 2 files changed, 48 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cbb360fe/sdks/java/maven-archetypes/examples/pom.xml -- diff --git a/sdks/java/maven-archetypes/examples/pom.xml b/sdks/java/maven-archetypes/examples/pom.xml index e36c4fc..b8555bc 100644 --- a/sdks/java/maven-archetypes/examples/pom.xml +++ b/sdks/java/maven-archetypes/examples/pom.xml @@ -90,5 +90,47 @@ beam-sdks-java-io-google-cloud-platform runtime + + + org.slf4j + slf4j-api + runtime + + + + joda-time + joda-time + runtime + + + + com.google.apis + google-api-services-bigquery + runtime + + + + com.google.apis + google-api-services-pubsub + runtime + + + + com.google.guava + guava + runtime + + + + com.google.http-client + google-http-client + runtime + + + + com.google.api-client + google-api-client + runtime + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cbb360fe/sdks/java/maven-archetypes/starter/pom.xml -- diff --git a/sdks/java/maven-archetypes/starter/pom.xml b/sdks/java/maven-archetypes/starter/pom.xml index c38f80c..0543470 100644 --- a/sdks/java/maven-archetypes/starter/pom.xml +++ b/sdks/java/maven-archetypes/starter/pom.xml @@ -79,5 +79,11 @@ beam-sdks-java-core runtime + + + org.slf4j + slf4j-api + runtime +
[1/2] incubator-beam git commit: fix exception during formatting java.util.MissingFormatArgumentException: Format specifier '%s'
Repository: incubator-beam Updated Branches: refs/heads/master 084a5e8ae -> f802919c2 fix exception during formatting java.util.MissingFormatArgumentException: Format specifier '%s' Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/752a8da3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/752a8da3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/752a8da3 Branch: refs/heads/master Commit: 752a8da377d8bb17a61826bd7e00bd3798c23881 Parents: 084a5e8 Author: Alexey DiominAuthored: Wed Nov 9 23:10:40 2016 +0400 Committer: Alexey Diomin Committed: Wed Nov 9 23:10:40 2016 +0400 -- .../core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/752a8da3/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java index 591f145..b62c5af 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java @@ -715,7 +715,7 @@ public class AvroCoder extends StandardCoder { } else { // If it was an unknown type encoded as an array, be conservative and assume // that we don't know anything about the order. -reportError(context, "encoding %s as an ARRAY was unexpected"); +reportError(context, "encoding %s as an ARRAY was unexpected", type); return; }
[2/2] incubator-beam git commit: fix exception during formatting
fix exception during formatting This closes #1322 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f802919c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f802919c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f802919c Branch: refs/heads/master Commit: f802919c20c1844b47701a3f621b9d01cffdc21a Parents: 084a5e8 752a8da Author: Luke CwikAuthored: Wed Nov 9 11:26:57 2016 -0800 Committer: Luke Cwik Committed: Wed Nov 9 11:26:57 2016 -0800 -- .../core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
[1/3] incubator-beam git commit: Changes in AvroCoder serialization so it can serialize in Kryo
Repository: incubator-beam Updated Branches: refs/heads/master afa0c31bd -> bfc527d63 Changes in AvroCoder serialization so it can serialize in Kryo Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/06c18468 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/06c18468 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/06c18468 Branch: refs/heads/master Commit: 06c1846860176cc2bd971f8ad7037c97594af866 Parents: afa0c31 Author: Aviem ZurAuthored: Thu Sep 8 11:21:41 2016 +0300 Committer: Luke Cwik Committed: Tue Nov 8 07:47:34 2016 -0800 -- sdks/java/core/pom.xml | 7 ++ .../org/apache/beam/sdk/coders/AvroCoder.java | 126 +++ .../apache/beam/sdk/coders/AvroCoderTest.java | 33 + 3 files changed, 112 insertions(+), 54 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c18468/sdks/java/core/pom.xml -- diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index 17ef193..c7b46d8 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -473,5 +473,12 @@ google-cloud-dataflow-java-proto-library-all test + + + com.esotericsoftware.kryo + kryo + 2.21 + test + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/06c18468/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java index 7894d14..4f0239e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.Serializable; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collection; @@ -164,7 +163,9 @@ public class AvroCoder extends StandardCoder { }; private final Class type; - private final transient Schema schema; + private transient Schema schema; + + private final String schemaStr; private final List nonDeterministicReasons; @@ -174,36 +175,16 @@ public class AvroCoder extends StandardCoder { // Cache the old encoder/decoder and let the factories reuse them when possible. To be threadsafe, // these are ThreadLocal. This code does not need to be re-entrant as AvroCoder does not use // an inner coder. - private final transient ThreadLocal decoder; - private final transient ThreadLocal encoder; - private final transient ThreadLocal writer; - private final transient ThreadLocal reader; + private transient ThreadLocal memoizedDecoder; + private transient ThreadLocal memoizedEncoder; + private transient ThreadLocal memoizedWriter; + private transient ThreadLocal memoizedReader; protected AvroCoder(Class type, Schema schema) { this.type = type; this.schema = schema; - +this.schemaStr = schema.toString(); nonDeterministicReasons = new AvroDeterminismChecker().check(TypeDescriptor.of(type), schema); - -// Decoder and Encoder start off null for each thread. They are allocated and potentially -// reused inside encode/decode. -this.decoder = new ThreadLocal<>(); -this.encoder = new ThreadLocal<>(); - -// Reader and writer are allocated once per thread and are "final" for thread-local Coder -// instance. -this.reader = new ThreadLocal () { - @Override - public DatumReader initialValue() { -return createDatumReader(); - } -}; -this.writer = new ThreadLocal () { - @Override - public DatumWriter initialValue() { -return createDatumWriter(); - } -}; } /** @@ -246,33 +227,29 @@ public class AvroCoder extends StandardCoder { return type; } - private Object writeReplace() { -// When serialized by Java, instances of AvroCoder should be replaced by -// a SerializedAvroCoderProxy. -return new SerializedAvroCoderProxy<>(type, schema.toString()); - } - @Override public void encode(T value, OutputStream outStream, Context context) throws IOException { // Get a BinaryEncoder instance from the ThreadLocal cache and attempt to reuse it. +ThreadLocal encoder = getEncoder(); BinaryEncoder encoderInstance = ENCODER_FACTORY.directBinaryEncoder(outStream,
[3/3] incubator-beam git commit: [BEAM-626] Changes in AvroCoder serialization so it can serialize in Kryo
[BEAM-626] Changes in AvroCoder serialization so it can serialize in Kryo This closes #1246 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bfc527d6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bfc527d6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bfc527d6 Branch: refs/heads/master Commit: bfc527d6355e2dfa489658a337f3ae45ade99cb9 Parents: afa0c31 fae3ec5 Author: Luke CwikAuthored: Tue Nov 8 07:49:13 2016 -0800 Committer: Luke Cwik Committed: Tue Nov 8 07:49:13 2016 -0800 -- sdks/java/core/pom.xml | 7 ++ .../org/apache/beam/sdk/coders/AvroCoder.java | 124 --- .../apache/beam/sdk/coders/AvroCoderTest.java | 48 +++ 3 files changed, 134 insertions(+), 45 deletions(-) --
[2/2] incubator-beam git commit: [BEAM-917] ExpectedLogs: clear saved records after each test.
[BEAM-917] ExpectedLogs: clear saved records after each test. This closes #1289 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/339dee95 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/339dee95 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/339dee95 Branch: refs/heads/master Commit: 339dee9542497d845873dbd939c7868bdd9c0835 Parents: c6d9bf2 6bf729e Author: Luke CwikAuthored: Mon Nov 7 17:38:06 2016 -0800 Committer: Luke Cwik Committed: Mon Nov 7 17:38:06 2016 -0800 -- .../apache/beam/sdk/testing/ExpectedLogs.java | 15 ++ .../beam/sdk/testing/ExpectedLogsTest.java | 30 +++- 2 files changed, 39 insertions(+), 6 deletions(-) --
[1/2] incubator-beam git commit: [BEAM-917] ExpectedLogs: clear saved records after each test.
Repository: incubator-beam Updated Branches: refs/heads/master c6d9bf297 -> 339dee954 [BEAM-917] ExpectedLogs: clear saved records after each test. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6bf729e9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6bf729e9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6bf729e9 Branch: refs/heads/master Commit: 6bf729e90c38e910138d332c994325223c220abd Parents: c6d9bf2 Author: Pei HeAuthored: Fri Nov 4 18:45:47 2016 -0700 Committer: Luke Cwik Committed: Mon Nov 7 17:36:00 2016 -0800 -- .../apache/beam/sdk/testing/ExpectedLogs.java | 15 ++ .../beam/sdk/testing/ExpectedLogsTest.java | 30 +++- 2 files changed, 39 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6bf729e9/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogs.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogs.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogs.java index a8e3f94..3e51f34 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogs.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogs.java @@ -268,6 +268,7 @@ public class ExpectedLogs extends ExternalResource { protected void after() { log.removeHandler(logSaver); log.setLevel(previousLevel); +logSaver.reset(); } private final Logger log; @@ -285,11 +286,7 @@ public class ExpectedLogs extends ExternalResource { */ @ThreadSafe private static class LogSaver extends Handler { -Collection logRecords = new ConcurrentLinkedDeque<>(); - -public Collection getLogs() { - return logRecords; -} +private final Collection logRecords = new ConcurrentLinkedDeque<>(); @Override public void publish(LogRecord record) { @@ -301,5 +298,13 @@ public class ExpectedLogs extends ExternalResource { @Override public void close() throws SecurityException {} + +private Collection getLogs() { + return logRecords; +} + +private void reset() { + logRecords.clear(); +} } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6bf729e9/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java index 84d5584..1762d0d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ExpectedLogsTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.testing; import static org.apache.beam.sdk.testing.SystemNanoTimeSleeper.sleepMillis; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; @@ -30,8 +31,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.junit.Rule; import org.junit.Test; +import org.junit.runner.Description; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.junit.runners.model.Statement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +42,6 @@ import org.slf4j.LoggerFactory; @RunWith(JUnit4.class) public class ExpectedLogsTest { private static final Logger LOG = LoggerFactory.getLogger(ExpectedLogsTest.class); - private Random random = new Random(); @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(ExpectedLogsTest.class); @@ -146,6 +148,32 @@ public class ExpectedLogsTest { } } + @Test + public void testLogsCleared() throws Throwable { +final String messageUnexpected = "Message prior to ExpectedLogs."; +final String messageExpected = "Message expected."; +LOG.info(messageUnexpected); + +expectedLogs = ExpectedLogs.none(ExpectedLogsTest.class); +final boolean[] evaluateRan = new boolean[1]; + +expectedLogs.apply( +new Statement() { + @Override + public void evaluate() throws Throwable { +evaluateRan[0] = true; +expectedLogs.verifyNotLogged(messageUnexpected); +LOG.info(messageExpected); +expectedLogs.verifyInfo(messageExpected); + } +}, +Description.EMPTY).evaluate(); +assertTrue(evaluateRan[0]); +// Verify expectedLogs is cleared. +expectedLogs.verifyNotLogged(messageExpected); +
[1/2] incubator-beam git commit: [BEAM-725] Migrate to use the generic credentials library compatible with Apiary and gRPC instead of the Apiary only credentials library.
Repository: incubator-beam Updated Branches: refs/heads/master b5f847355 -> baa7fb031 [BEAM-725] Migrate to use the generic credentials library compatible with Apiary and gRPC instead of the Apiary only credentials library. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bb260ecd Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bb260ecd Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bb260ecd Branch: refs/heads/master Commit: bb260ecd34e14a29e7939912a101b3733e379248 Parents: b5f8473 Author: Luke CwikAuthored: Mon Nov 7 09:40:38 2016 -0800 Committer: Luke Cwik Committed: Mon Nov 7 13:08:39 2016 -0800 -- pom.xml | 21 ++ runners/google-cloud-dataflow-java/pom.xml | 15 +- .../dataflow/util/DataflowTransport.java| 9 +- sdks/java/core/pom.xml | 19 +- .../org/apache/beam/sdk/options/GcpOptions.java | 136 ++--- .../beam/sdk/testing/BigqueryMatcher.java | 15 +- .../apache/beam/sdk/util/CredentialFactory.java | 4 +- .../org/apache/beam/sdk/util/Credentials.java | 192 --- .../beam/sdk/util/GcpCredentialFactory.java | 41 +++- .../beam/sdk/util/NoopCredentialFactory.java| 9 +- .../apache/beam/sdk/util/PubsubGrpcClient.java | 18 +- .../apache/beam/sdk/util/PubsubJsonClient.java | 9 +- .../apache/beam/sdk/util/TestCredential.java| 44 +++-- .../org/apache/beam/sdk/util/Transport.java | 9 +- .../beam/sdk/util/PubsubGrpcClientTest.java | 9 +- sdks/java/io/google-cloud-platform/pom.xml | 9 +- .../beam/sdk/io/gcp/datastore/DatastoreV1.java | 25 ++- .../beam/sdk/io/gcp/datastore/V1TestUtil.java | 25 ++- 18 files changed, 186 insertions(+), 423 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb260ecd/pom.xml -- diff --git a/pom.xml b/pom.xml index c1bd5c8..bd6037e 100644 --- a/pom.xml +++ b/pom.xml @@ -112,6 +112,7 @@ 1.2.0 1.0-rc2 1.1 +0.6.0 1.22.0 1.4.5 0.5.160304 @@ -530,6 +531,26 @@ +com.google.auth +google-auth-library-credentials +${google-auth.version} + + + +com.google.auth +google-auth-library-oauth2-http +${google-auth.version} + + + +com.google.guava +guava-jdk5 + + + + + com.google.apis google-api-services-bigquery ${bigquery.version} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb260ecd/runners/google-cloud-dataflow-java/pom.xml -- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index 59b1465..6ed41d0 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -171,11 +171,6 @@ - com.google.oauth-client - google-oauth-client - - - com.google.http-client google-http-client @@ -202,6 +197,16 @@ + com.google.auth + google-auth-library-credentials + + + + com.google.auth + google-auth-library-oauth2-http + + + com.google.cloud.bigdataoss util http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bb260ecd/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java -- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java index 0391594..e0026de 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java @@ -20,10 +20,11 @@ package org.apache.beam.runners.dataflow.util; import static org.apache.beam.sdk.util.Transport.getJsonFactory; import static org.apache.beam.sdk.util.Transport.getTransport; -import com.google.api.client.auth.oauth2.Credential; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.services.clouddebugger.v2.Clouddebugger; import com.google.api.services.dataflow.Dataflow; +import com.google.auth.Credentials; +import com.google.auth.http.HttpCredentialsAdapter; import
[2/2] incubator-beam git commit: [BEAM-725] Migrate to use the generic Google credentials library
[BEAM-725] Migrate to use the generic Google credentials library This closes #1294 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/baa7fb03 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/baa7fb03 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/baa7fb03 Branch: refs/heads/master Commit: baa7fb0317afa4a18462ed66ab0a7ba1e8eb2c89 Parents: b5f8473 bb260ec Author: Luke CwikAuthored: Mon Nov 7 13:09:19 2016 -0800 Committer: Luke Cwik Committed: Mon Nov 7 13:09:19 2016 -0800 -- pom.xml | 21 ++ runners/google-cloud-dataflow-java/pom.xml | 15 +- .../dataflow/util/DataflowTransport.java| 9 +- sdks/java/core/pom.xml | 19 +- .../org/apache/beam/sdk/options/GcpOptions.java | 136 ++--- .../beam/sdk/testing/BigqueryMatcher.java | 15 +- .../apache/beam/sdk/util/CredentialFactory.java | 4 +- .../org/apache/beam/sdk/util/Credentials.java | 192 --- .../beam/sdk/util/GcpCredentialFactory.java | 41 +++- .../beam/sdk/util/NoopCredentialFactory.java| 9 +- .../apache/beam/sdk/util/PubsubGrpcClient.java | 18 +- .../apache/beam/sdk/util/PubsubJsonClient.java | 9 +- .../apache/beam/sdk/util/TestCredential.java| 44 +++-- .../org/apache/beam/sdk/util/Transport.java | 9 +- .../beam/sdk/util/PubsubGrpcClientTest.java | 9 +- sdks/java/io/google-cloud-platform/pom.xml | 9 +- .../beam/sdk/io/gcp/datastore/DatastoreV1.java | 25 ++- .../beam/sdk/io/gcp/datastore/V1TestUtil.java | 25 ++- 18 files changed, 186 insertions(+), 423 deletions(-) --
[1/2] incubator-beam git commit: Remove @Default from IT options
Repository: incubator-beam Updated Branches: refs/heads/master 99062d103 -> 46fbfe06b Remove @Default from IT options Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1eccd29b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1eccd29b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1eccd29b Branch: refs/heads/master Commit: 1eccd29b72aedc87e69763fbcc828b5694335e1f Parents: 99062d1 Author: Mark LiuAuthored: Thu Nov 3 16:45:43 2016 -0700 Committer: Luke Cwik Committed: Fri Nov 4 14:19:43 2016 -0700 -- .../beam/examples/WindowedWordCountIT.java | 11 --- .../org/apache/beam/examples/WordCountIT.java| 19 +++ .../examples/cookbook/BigQueryTornadoesIT.java | 11 --- 3 files changed, 27 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eccd29b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java -- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java index 6742654..d545ad2 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java @@ -17,9 +17,9 @@ */ package org.apache.beam.examples; +import com.google.common.base.Strings; import java.io.IOException; import org.apache.beam.examples.WindowedWordCount.Options; -import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.testing.BigqueryMatcher; @@ -37,12 +37,13 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class WindowedWordCountIT { + private static final String DEFAULT_OUTPUT_CHECKSUM = "ff54f6f42b2afeb146206c1e8e915deaee0362b4"; + /** * Options for the {@link WindowedWordCount} Integration Test. */ public interface WindowedWordCountITOptions extends Options, TestPipelineOptions, StreamingOptions { -@Default.String("ff54f6f42b2afeb146206c1e8e915deaee0362b4") String getChecksum(); void setChecksum(String value); } @@ -66,9 +67,13 @@ public class WindowedWordCountIT { String query = String.format("SELECT word, SUM(count) FROM [%s:%s.%s] GROUP BY word", options.getProject(), options.getBigQueryDataset(), options.getBigQueryTable()); +String outputChecksum = +Strings.isNullOrEmpty(options.getChecksum()) +? DEFAULT_OUTPUT_CHECKSUM +: options.getChecksum(); options.setOnSuccessMatcher( new BigqueryMatcher( -options.getAppName(), options.getProject(), query, options.getChecksum())); +options.getAppName(), options.getProject(), query, outputChecksum)); WindowedWordCount.main(TestPipeline.convertToArgs(options)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1eccd29b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java -- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java index 2f2ea46..8f170af 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java @@ -18,9 +18,9 @@ package org.apache.beam.examples; +import com.google.common.base.Strings; import java.util.Date; import org.apache.beam.examples.WordCount.WordCountOptions; -import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.FileChecksumMatcher; import org.apache.beam.sdk.testing.TestPipeline; @@ -36,6 +36,8 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class WordCountIT { + private static final String DEFAULT_OUTPUT_CHECKSUM = "8ae94f799f97cfd1cb5e8125951b32dfb52e1f12"; + /** * Options for the WordCount Integration Test. * @@ -43,9 +45,8 @@ public class WordCountIT { * with customized input. */ public interface WordCountITOptions extends TestPipelineOptions, WordCountOptions { -@Default.String("c04722202dee29c442b55ead54c6000693e85e77") -String getOutputChecksum(); -void setOutputChecksum(String value); +String getChecksum(); +void setChecksum(String value); } @Test @@ -58,11 +59,13 @@ public class WordCountIT {
[1/2] incubator-beam git commit: [BEAM-790] Validate PipelineOptions Default annotation
Repository: incubator-beam Updated Branches: refs/heads/master 6a05cf4a9 -> bdd71c4bc [BEAM-790] Validate PipelineOptions Default annotation Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f605b02a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f605b02a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f605b02a Branch: refs/heads/master Commit: f605b02a72258aee20d607e54a443113bfb762ad Parents: 6a05cf4 Author: Pei HeAuthored: Thu Oct 20 18:07:35 2016 -0700 Committer: Luke Cwik Committed: Thu Nov 3 11:38:42 2016 -0700 -- .../sdk/options/PipelineOptionsFactory.java | 248 ++- .../sdk/options/ProxyInvocationHandler.java | 5 +- .../beam/sdk/util/common/ReflectHelpers.java| 17 ++ .../sdk/options/PipelineOptionsFactoryTest.java | 239 ++ .../sdk/options/ProxyInvocationHandlerTest.java | 29 ++- .../sdk/util/common/ReflectHelpersTest.java | 25 ++ 6 files changed, 491 insertions(+), 72 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f605b02a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java index eefe8c7..304e166 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java @@ -1057,56 +1057,130 @@ public class PipelineOptionsFactory { methodNameToAllMethodMap.put(method, method); } -List incompletelyIgnoredGetters = new ArrayList<>(); -List ignoredSetters = new ArrayList<>(); +// Verify that there is no getter with a mixed @JsonIgnore annotation. +validateGettersHaveConsistentAnnotation( +methodNameToAllMethodMap, descriptors, AnnotationPredicates.JSON_IGNORE); -for (PropertyDescriptor descriptor : descriptors) { +// Verify that there is no getter with a mixed @Default annotation. +validateGettersHaveConsistentAnnotation( +methodNameToAllMethodMap, descriptors, AnnotationPredicates.DEFAULT_VALUE); + +// Verify that no setter has @JsonIgnore. +validateSettersDoNotHaveAnnotation( +methodNameToAllMethodMap, descriptors, AnnotationPredicates.JSON_IGNORE); + +// Verify that no setter has @Default. +validateSettersDoNotHaveAnnotation( +methodNameToAllMethodMap, descriptors, AnnotationPredicates.DEFAULT_VALUE); + } + + /** + * Validates that getters don't have mixed annotation. + */ + private static void validateGettersHaveConsistentAnnotation( + SortedSetMultimap methodNameToAllMethodMap, + List descriptors, + final AnnotationPredicates annotationPredicates) { +List inconsistentlyAnnotatedGetters = new ArrayList<>(); +for (final PropertyDescriptor descriptor : descriptors) { if (descriptor.getReadMethod() == null - || descriptor.getWriteMethod() == null - || IGNORED_METHODS.contains(descriptor.getReadMethod()) - || IGNORED_METHODS.contains(descriptor.getWriteMethod())) { + || IGNORED_METHODS.contains(descriptor.getReadMethod())) { continue; } - // Verify that there is no getter with a mixed @JsonIgnore annotation and verify - // that no setter has @JsonIgnore. + SortedSet getters = methodNameToAllMethodMap.get(descriptor.getReadMethod()); - SortedSet gettersWithJsonIgnore = Sets.filter(getters, JsonIgnorePredicate.INSTANCE); + SortedSet gettersWithTheAnnotation = + Sets.filter(getters, annotationPredicates.forMethod); + Set distinctAnnotations = Sets.newLinkedHashSet(FluentIterable + .from(gettersWithTheAnnotation) + .transformAndConcat(new Function () { +@Nonnull +@Override +public Iterable apply(@Nonnull Method method) { + return FluentIterable.of(method.getAnnotations()); +} + }) + .filter(annotationPredicates.forAnnotation)); + + + if (distinctAnnotations.size() > 1) { +throw new IllegalArgumentException(String.format( +"Property [%s] is marked with contradictory annotations. Found [%s].", +descriptor.getName(), +FluentIterable.from(gettersWithTheAnnotation) +.transformAndConcat(new Function () { + @Nonnull + @Override + public
[2/2] incubator-beam git commit: [BEAM-790] Validate PipelineOptions Default annotation.
[BEAM-790] Validate PipelineOptions Default annotation. This closes #1159 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bdd71c4b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bdd71c4b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bdd71c4b Branch: refs/heads/master Commit: bdd71c4bcb6d761c5a71b433e2a27379a10da45d Parents: 6a05cf4 f605b02 Author: Luke CwikAuthored: Thu Nov 3 11:56:22 2016 -0700 Committer: Luke Cwik Committed: Thu Nov 3 11:56:22 2016 -0700 -- .../sdk/options/PipelineOptionsFactory.java | 248 ++- .../sdk/options/ProxyInvocationHandler.java | 5 +- .../beam/sdk/util/common/ReflectHelpers.java| 17 ++ .../sdk/options/PipelineOptionsFactoryTest.java | 239 ++ .../sdk/options/ProxyInvocationHandlerTest.java | 29 ++- .../sdk/util/common/ReflectHelpersTest.java | 25 ++ 6 files changed, 491 insertions(+), 72 deletions(-) --
[1/2] incubator-beam git commit: Move resource filtering later to avoid spurious rebuilds
Repository: incubator-beam Updated Branches: refs/heads/master d8eb8be13 -> 8bc68c59b Move resource filtering later to avoid spurious rebuilds Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/24223657 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/24223657 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/24223657 Branch: refs/heads/master Commit: 2422365719c71cade97e1e74f1fb7f42b264244f Parents: 54a7374 Author: Kenneth KnowlesAuthored: Wed Oct 26 19:56:35 2016 -0700 Committer: Luke Cwik Committed: Mon Oct 31 16:56:43 2016 -0700 -- sdks/java/core/pom.xml | 29 ++--- 1 file changed, 22 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24223657/sdks/java/core/pom.xml -- diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index 9937cb8..e171c92 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -40,13 +40,6 @@ - - -src/main/resources -true - - - @@ -146,6 +139,28 @@ org.apache.maven.plugins +maven-resources-plugin + + +resources +compile + + resources + + + + + src/main/resources + true + + + + + + + + +org.apache.maven.plugins maven-jar-plugin
[2/2] incubator-beam git commit: [BEAM-822] Move resource filtering later to avoid spurious rebuilds
[BEAM-822] Move resource filtering later to avoid spurious rebuilds This closes #1204 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8bc68c59 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8bc68c59 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8bc68c59 Branch: refs/heads/master Commit: 8bc68c59b6992890e9f18df277cb3bb9ddeee361 Parents: d8eb8be 2422365 Author: Luke CwikAuthored: Mon Oct 31 17:01:03 2016 -0700 Committer: Luke Cwik Committed: Mon Oct 31 17:01:03 2016 -0700 -- sdks/java/core/pom.xml | 29 ++--- 1 file changed, 22 insertions(+), 7 deletions(-) --
incubator-beam git commit: [BEAM-813] support metadata in Avro sink
Repository: incubator-beam Updated Branches: refs/heads/master 25102f798 -> eba099f56 [BEAM-813] support metadata in Avro sink Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eba099f5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eba099f5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eba099f5 Branch: refs/heads/master Commit: eba099f564dba3dfbba30ae3533496b9e14f57a7 Parents: 25102f7 Author: Neville LiAuthored: Mon Oct 24 18:56:36 2016 -0400 Committer: Luke Cwik Committed: Wed Oct 26 15:30:50 2016 -0700 -- .../java/org/apache/beam/sdk/io/AvroIO.java | 143 --- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 29 2 files changed, 154 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/eba099f5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index d912ff7..6deca7f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -21,11 +21,16 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.common.io.BaseEncoding; import java.io.IOException; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; +import java.util.Map; import java.util.regex.Pattern; import javax.annotation.Nullable; + import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; @@ -39,6 +44,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.values.PBegin; @@ -455,6 +461,15 @@ public class AvroIO { } /** + * Returns a {@link PTransform} that writes Avro file(s) with the specified metadata. + * + * Supported value types are String, Long, and byte[]. + */ +public static Bound withMetadata(Map metadata) { + return new Bound<>(GenericRecord.class).withMetadata(metadata); +} + +/** * A {@link PTransform} that writes a bounded {@link PCollection} to an Avro file (or * multiple Avro files matching a sharding pattern). * @@ -464,6 +479,8 @@ public class AvroIO { private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX; private static final SerializableAvroCodecFactory DEFAULT_CODEC = new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6)); + // This should be a multiple of 4 to not get a partial encoded byte. + private static final int METADATA_BYTES_MAX_LENGTH = 40; /** The filename to write to. */ @Nullable @@ -486,6 +503,8 @@ public class AvroIO { * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html */ final SerializableAvroCodecFactory codec; + /** Avro file metadata. */ + final ImmutableMap metadata; Bound(Class type) { this( @@ -497,7 +516,8 @@ public class AvroIO { type, null, true, -DEFAULT_CODEC); +DEFAULT_CODEC, +ImmutableMap. of()); } Bound( @@ -509,7 +529,8 @@ public class AvroIO { Class type, Schema schema, boolean validate, - SerializableAvroCodecFactory codec) { + SerializableAvroCodecFactory codec, + Map metadata) { super(name); this.filenamePrefix = filenamePrefix; this.filenameSuffix = filenameSuffix; @@ -519,6 +540,18 @@ public class AvroIO { this.schema = schema; this.validate = validate; this.codec = codec; + +Map badKeys = Maps.newLinkedHashMap(); +for (Map.Entry entry : metadata.entrySet()) { + Object v = entry.getValue(); + if (!(v instanceof String || v instanceof Long || v instanceof byte[])) { +
[2/2] incubator-beam git commit: [BEAM-790] Refactor PipelineOptionsFactory validation.
[BEAM-790] Refactor PipelineOptionsFactory validation. This closes #1151 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4c905823 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4c905823 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4c905823 Branch: refs/heads/master Commit: 4c9058236e97434a31596e8e955f09eb8929c43a Parents: ea8583c b809883 Author: Luke CwikAuthored: Fri Oct 21 15:43:26 2016 -0700 Committer: Luke Cwik Committed: Fri Oct 21 15:43:26 2016 -0700 -- .../sdk/options/PipelineOptionsFactory.java | 151 +-- 1 file changed, 105 insertions(+), 46 deletions(-) --
[1/2] incubator-beam git commit: [BEAM-790] Refactor PipelineOptionsFactory validation into separate methods.
Repository: incubator-beam Updated Branches: refs/heads/master ea8583c4f -> 4c9058236 [BEAM-790] Refactor PipelineOptionsFactory validation into separate methods. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b8098838 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b8098838 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b8098838 Branch: refs/heads/master Commit: b809883813a992167bfa4ad59759fb992c0694d2 Parents: ea8583c Author: Pei HeAuthored: Thu Oct 20 16:32:45 2016 -0700 Committer: Luke Cwik Committed: Fri Oct 21 15:43:00 2016 -0700 -- .../sdk/options/PipelineOptionsFactory.java | 151 +-- 1 file changed, 105 insertions(+), 46 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b8098838/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java index 7206b11..eefe8c7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java @@ -958,7 +958,7 @@ public class PipelineOptionsFactory { /** * Validates that a given class conforms to the following properties: * - * Any property with the same name must have the same return type for all derived + * Any method with the same name must have the same return type for all derived * interfaces of {@link PipelineOptions}. * Every bean property of any interface derived from {@link PipelineOptions} must have a * getter and setter method. @@ -979,22 +979,37 @@ public class PipelineOptionsFactory { private static List validateClass(Class iface, Set validatedPipelineOptionsInterfaces, Class klass) throws IntrospectionException { -Set methods = Sets.newHashSet(IGNORED_METHODS); -// Ignore synthetic methods -for (Method method : klass.getMethods()) { - if (Modifier.isStatic(method.getModifiers()) || method.isSynthetic()) { -methods.add(method); - } -} -// Ignore methods on the base PipelineOptions interface. -try { - methods.add(iface.getMethod("as", Class.class)); - methods.add(iface.getMethod("populateDisplayData", DisplayData.Builder.class)); -} catch (NoSuchMethodException | SecurityException e) { - throw new RuntimeException(e); -} - // Verify that there are no methods with the same name with two different return types. +validateReturnType(iface); + +SortedSet allInterfaceMethods = FluentIterable +.from(ReflectHelpers.getClosureOfMethodsOnInterfaces( +validatedPipelineOptionsInterfaces)) +.append(ReflectHelpers.getClosureOfMethodsOnInterface(iface)) +.filter(NOT_SYNTHETIC_PREDICATE) +.toSortedSet(MethodComparator.INSTANCE); + +List descriptors = getPropertyDescriptors(allInterfaceMethods, iface); + +// Verify that all method annotations are valid. +validateMethodAnnotations(allInterfaceMethods, descriptors); + +// Verify that each property has a matching read and write method. +validateGettersSetters(iface, descriptors); + +// Verify all methods are bean methods or known methods. +validateMethodsAreEitherBeanMethodOrKnownMethod(iface, klass, descriptors); + +return descriptors; + } + + /** + * Validates that any method with the same name must have the same return type for all derived + * interfaces of {@link PipelineOptions}. + * + * @param iface The interface to validate. + */ + private static void validateReturnType(Class iface) { Iterable interfaceMethods = FluentIterable .from(ReflectHelpers.getClosureOfMethodsOnInterface(iface)) .filter(NOT_SYNTHETIC_PREDICATE) @@ -1019,24 +1034,29 @@ public class PipelineOptionsFactory { } } throwForMultipleDefinitions(iface, multipleDefinitions); + } -// Verify that there is no getter with a mixed @JsonIgnore annotation and verify -// that no setter has @JsonIgnore. -SortedSet allInterfaceMethods = -FluentIterable.from( -ReflectHelpers.getClosureOfMethodsOnInterfaces( -validatedPipelineOptionsInterfaces)) -.append(ReflectHelpers.getClosureOfMethodsOnInterface(iface)) -.filter(NOT_SYNTHETIC_PREDICATE) -.toSortedSet(MethodComparator.INSTANCE); + /** + * Validates that a given class conforms to the
[1/2] incubator-beam git commit: Fix typo and dependencies in beam-runners-direct-java
Repository: incubator-beam Updated Branches: refs/heads/master 18e5d128b -> ea8583c4f Fix typo and dependencies in beam-runners-direct-java Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5f946c6c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5f946c6c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5f946c6c Branch: refs/heads/master Commit: 5f946c6c00cfea0b38b64db6da33bcf2d2783fa3 Parents: 18e5d12 Author: Kenneth KnowlesAuthored: Wed Oct 19 19:26:35 2016 -0700 Committer: Luke Cwik Committed: Fri Oct 21 15:25:54 2016 -0700 -- pom.xml| 4 ++-- runners/direct-java/pom.xml| 11 +-- runners/google-cloud-dataflow-java/pom.xml | 2 +- 3 files changed, 12 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5f946c6c/pom.xml -- diff --git a/pom.xml b/pom.xml index 1263827..de4bd63 100644 --- a/pom.xml +++ b/pom.xml @@ -691,13 +691,13 @@ org.apache.beam beam-sdks-java-core ${project.version} -test-jar +tests test org.apache.beam -beam-sdks-java-core +beam-runners-core-java ${project.version} tests test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5f946c6c/runners/direct-java/pom.xml -- diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml index 6cb1838..aec0e71 100644 --- a/runners/direct-java/pom.xml +++ b/runners/direct-java/pom.xml @@ -83,7 +83,7 @@ true org.apache.beam:beam-sdks-java-core -org.apache.beam:beam-runners-java-core +org.apache.beam:beam-runners-core-java @@ -331,7 +331,14 @@ org.apache.beam beam-sdks-java-core - test-jar + tests + test + + + + org.apache.beam + beam-runners-core-java + tests test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5f946c6c/runners/google-cloud-dataflow-java/pom.xml -- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index b035028..0f2d3b2 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -402,7 +402,7 @@ org.apache.beam beam-sdks-java-core - test-jar + tests test
[2/2] incubator-beam git commit: [BEAM-755] Fix typo and dependencies in beam-runners-direct-java
[BEAM-755] Fix typo and dependencies in beam-runners-direct-java This closes #1140 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ea8583c4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ea8583c4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ea8583c4 Branch: refs/heads/master Commit: ea8583c4f45e56a6ecb4b6dbe7eb255e5f27693d Parents: 18e5d12 5f946c6 Author: Luke CwikAuthored: Fri Oct 21 15:26:17 2016 -0700 Committer: Luke Cwik Committed: Fri Oct 21 15:26:17 2016 -0700 -- pom.xml| 4 ++-- runners/direct-java/pom.xml| 11 +-- runners/google-cloud-dataflow-java/pom.xml | 2 +- 3 files changed, 12 insertions(+), 5 deletions(-) --
[1/2] incubator-beam git commit: Drop static from interfaces
Repository: incubator-beam Updated Branches: refs/heads/master 307819327 -> b5e15c23a Drop static from interfaces It's redundant. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/54736a80 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/54736a80 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/54736a80 Branch: refs/heads/master Commit: 54736a80443b7a66a56507ad5b0f45e674408811 Parents: 3078193 Author: Dan HalperinAuthored: Thu Oct 20 09:34:04 2016 -0700 Committer: Dan Halperin Committed: Thu Oct 20 09:34:08 2016 -0700 -- .../beam/examples/DebuggingWordCount.java | 2 +- .../apache/beam/examples/WindowedWordCount.java | 2 +- .../org/apache/beam/examples/WordCount.java | 2 +- .../beam/examples/complete/AutoComplete.java| 2 +- .../apache/beam/examples/complete/TfIdf.java| 2 +- .../examples/complete/TopWikipediaSessions.java | 2 +- .../examples/cookbook/BigQueryTornadoes.java| 2 +- .../cookbook/CombinePerKeyExamples.java | 2 +- .../beam/examples/cookbook/DeDupExample.java| 2 +- .../beam/examples/cookbook/FilterExamples.java | 2 +- .../beam/examples/cookbook/JoinExamples.java| 2 +- .../examples/cookbook/MaxPerKeyExamples.java| 2 +- .../beam/examples/complete/game/GameStats.java | 2 +- .../examples/complete/game/HourlyTeamScore.java | 2 +- .../examples/complete/game/LeaderBoard.java | 2 +- .../beam/examples/complete/game/UserScore.java | 2 +- .../beam/runners/direct/DirectRunner.java | 2 +- .../beam/runners/direct/WatermarkManager.java | 2 +- .../beam/runners/spark/examples/WordCount.java | 2 +- .../apache/beam/sdk/coders/DelegateCoder.java | 2 +- .../apache/beam/sdk/io/CompressedSource.java| 4 +- .../org/apache/beam/sdk/io/FileBasedSink.java | 2 +- .../org/apache/beam/sdk/testing/PAssert.java| 2 +- .../org/apache/beam/sdk/transforms/Combine.java | 2 +- .../sdk/transforms/reflect/DoFnSignatures.java | 2 +- .../CopyOnAccessInMemoryStateInternals.java | 2 +- .../sdk/options/PipelineOptionsFactoryTest.java | 44 ++-- .../beam/sdk/options/PipelineOptionsTest.java | 6 +-- .../options/PipelineOptionsValidatorTest.java | 18 .../sdk/options/ProxyInvocationHandlerTest.java | 36 .../beam/sdk/options/ValueProviderTest.java | 6 +-- .../apache/beam/sdk/util/ApiSurfaceTest.java| 22 +- .../PipelineOptionsFactoryJava8Test.java| 8 ++-- .../src/main/java/DebuggingWordCount.java | 2 +- .../src/main/java/WindowedWordCount.java| 2 +- .../src/main/java/WordCount.java| 2 +- 36 files changed, 100 insertions(+), 100 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54736a80/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java -- diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java index eb38227..90d77b3 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java @@ -158,7 +158,7 @@ public class DebuggingWordCount { * Inherits standard configuration options and all options defined in * {@link WordCount.WordCountOptions}. */ - public static interface WordCountOptions extends WordCount.WordCountOptions { + public interface WordCountOptions extends WordCount.WordCountOptions { @Description("Regex filter pattern to use in DebuggingWordCount. " + "Only words matching this pattern will be counted.") http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54736a80/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java -- diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java index c8bd9d3..4e254bd 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java @@ -166,7 +166,7 @@ public class WindowedWordCount { * table, as well as the {@link WordCount.WordCountOptions} support for * specification of the input file. */ - public static interface Options extends WordCount.WordCountOptions, + public interface Options extends WordCount.WordCountOptions, ExampleOptions, ExampleBigQueryTableOptions {
[2/2] incubator-beam git commit: Drop static from interfaces
Drop static from interfaces This closes #1147 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b5e15c23 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b5e15c23 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b5e15c23 Branch: refs/heads/master Commit: b5e15c23a97a36b6e159610ad6ea7cd92a6c8c3e Parents: 3078193 54736a8 Author: Luke CwikAuthored: Thu Oct 20 09:54:57 2016 -0700 Committer: Luke Cwik Committed: Thu Oct 20 09:54:57 2016 -0700 -- .../beam/examples/DebuggingWordCount.java | 2 +- .../apache/beam/examples/WindowedWordCount.java | 2 +- .../org/apache/beam/examples/WordCount.java | 2 +- .../beam/examples/complete/AutoComplete.java| 2 +- .../apache/beam/examples/complete/TfIdf.java| 2 +- .../examples/complete/TopWikipediaSessions.java | 2 +- .../examples/cookbook/BigQueryTornadoes.java| 2 +- .../cookbook/CombinePerKeyExamples.java | 2 +- .../beam/examples/cookbook/DeDupExample.java| 2 +- .../beam/examples/cookbook/FilterExamples.java | 2 +- .../beam/examples/cookbook/JoinExamples.java| 2 +- .../examples/cookbook/MaxPerKeyExamples.java| 2 +- .../beam/examples/complete/game/GameStats.java | 2 +- .../examples/complete/game/HourlyTeamScore.java | 2 +- .../examples/complete/game/LeaderBoard.java | 2 +- .../beam/examples/complete/game/UserScore.java | 2 +- .../beam/runners/direct/DirectRunner.java | 2 +- .../beam/runners/direct/WatermarkManager.java | 2 +- .../beam/runners/spark/examples/WordCount.java | 2 +- .../apache/beam/sdk/coders/DelegateCoder.java | 2 +- .../apache/beam/sdk/io/CompressedSource.java| 4 +- .../org/apache/beam/sdk/io/FileBasedSink.java | 2 +- .../org/apache/beam/sdk/testing/PAssert.java| 2 +- .../org/apache/beam/sdk/transforms/Combine.java | 2 +- .../sdk/transforms/reflect/DoFnSignatures.java | 2 +- .../CopyOnAccessInMemoryStateInternals.java | 2 +- .../sdk/options/PipelineOptionsFactoryTest.java | 44 ++-- .../beam/sdk/options/PipelineOptionsTest.java | 6 +-- .../options/PipelineOptionsValidatorTest.java | 18 .../sdk/options/ProxyInvocationHandlerTest.java | 36 .../beam/sdk/options/ValueProviderTest.java | 6 +-- .../apache/beam/sdk/util/ApiSurfaceTest.java| 22 +- .../PipelineOptionsFactoryJava8Test.java| 8 ++-- .../src/main/java/DebuggingWordCount.java | 2 +- .../src/main/java/WindowedWordCount.java| 2 +- .../src/main/java/WordCount.java| 2 +- 36 files changed, 100 insertions(+), 100 deletions(-) --
[1/3] incubator-beam git commit: [BEAM-605] Create Bigquery Verifier
Repository: incubator-beam Updated Branches: refs/heads/master c472e1227 -> 6c6f824aa [BEAM-605] Create Bigquery Verifier Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8e225d7c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8e225d7c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8e225d7c Branch: refs/heads/master Commit: 8e225d7c36812cbcc831269d76279700e29131f7 Parents: c472e12 Author: Mark LiuAuthored: Thu Sep 22 14:34:20 2016 -0700 Committer: Luke Cwik Committed: Wed Oct 19 16:22:56 2016 -0700 -- .../examples/cookbook/BigQueryTornadoesIT.java | 14 +- .../beam/sdk/testing/BigqueryMatcher.java | 235 +++ .../beam/sdk/testing/BigqueryMatcherTest.java | 176 ++ 3 files changed, 424 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8e225d7c/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java -- diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java index 8bcab4a..7e15389 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java @@ -18,7 +18,10 @@ package org.apache.beam.examples.cookbook; +import org.apache.beam.sdk.options.BigQueryOptions; +import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.BigqueryMatcher; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; import org.junit.Test; @@ -35,7 +38,10 @@ public class BigQueryTornadoesIT { * Options for the BigQueryTornadoes Integration Test. */ public interface BigQueryTornadoesITOptions - extends TestPipelineOptions, BigQueryTornadoes.Options { + extends TestPipelineOptions, BigQueryTornadoes.Options, BigQueryOptions { +@Default.String("043e8e6ee32384df0cda4c241b8ab897f2ce0f2f") +String getChecksum(); +void setChecksum(String value); } @Test @@ -46,6 +52,12 @@ public class BigQueryTornadoesIT { options.setOutput(String.format("%s.%s", "BigQueryTornadoesIT", "monthly_tornadoes_" + System.currentTimeMillis())); +String query = +String.format("SELECT month, tornado_count FROM [%s]", options.getOutput()); +options.setOnSuccessMatcher( +new BigqueryMatcher( +options.getAppName(), options.getProject(), query, options.getChecksum())); + BigQueryTornadoes.main(TestPipeline.convertToArgs(options)); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8e225d7c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java new file mode 100644 index 000..7646caa --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.api.client.auth.oauth2.Credential; +import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; +import com.google.api.client.http.HttpTransport; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.Sleeper; +import
[2/3] incubator-beam git commit: Add BigQuery Verifier to WindowedWordCountIT
Add BigQuery Verifier to WindowedWordCountIT Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dd46523d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dd46523d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dd46523d Branch: refs/heads/master Commit: dd46523dc2bca4aee11265a2fb065cc137920b1d Parents: 8e225d7 Author: Mark LiuAuthored: Thu Oct 6 14:34:55 2016 -0700 Committer: Luke Cwik Committed: Wed Oct 19 16:22:57 2016 -0700 -- .../apache/beam/examples/WindowedWordCountIT.java | 11 +++ .../beam/examples/cookbook/BigQueryTornadoesIT.java | 2 +- .../apache/beam/sdk/testing/BigqueryMatcher.java| 16 ++-- .../beam/sdk/testing/BigqueryMatcherTest.java | 2 +- 4 files changed, 23 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd46523d/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java -- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java index 379d1b0..6742654 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java @@ -19,8 +19,10 @@ package org.apache.beam.examples; import java.io.IOException; import org.apache.beam.examples.WindowedWordCount.Options; +import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.testing.BigqueryMatcher; import org.apache.beam.sdk.testing.StreamingIT; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; @@ -40,6 +42,9 @@ public class WindowedWordCountIT { */ public interface WindowedWordCountITOptions extends Options, TestPipelineOptions, StreamingOptions { +@Default.String("ff54f6f42b2afeb146206c1e8e915deaee0362b4") +String getChecksum(); +void setChecksum(String value); } @Test @@ -59,6 +64,12 @@ public class WindowedWordCountIT { TestPipeline.testingPipelineOptions().as(WindowedWordCountITOptions.class); options.setStreaming(isStreaming); +String query = String.format("SELECT word, SUM(count) FROM [%s:%s.%s] GROUP BY word", +options.getProject(), options.getBigQueryDataset(), options.getBigQueryTable()); +options.setOnSuccessMatcher( +new BigqueryMatcher( +options.getAppName(), options.getProject(), query, options.getChecksum())); + WindowedWordCount.main(TestPipeline.convertToArgs(options)); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd46523d/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java -- diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java index 7e15389..27a5a8f 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java @@ -39,7 +39,7 @@ public class BigQueryTornadoesIT { */ public interface BigQueryTornadoesITOptions extends TestPipelineOptions, BigQueryTornadoes.Options, BigQueryOptions { -@Default.String("043e8e6ee32384df0cda4c241b8ab897f2ce0f2f") +@Default.String("1ab4c7ec460b94bbb3c3885b178bf0e6bed56e1f") String getChecksum(); void setChecksum(String value); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd46523d/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java index 7646caa..95208ce 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java @@ -40,8 +40,10 @@ import com.google.common.hash.Hashing; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Objects; +import javax.annotation.Nonnull; import
[3/3] incubator-beam git commit: [BEAM-605] Create Bigquery Verifier
[BEAM-605] Create Bigquery Verifier This closes #990 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6c6f824a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6c6f824a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6c6f824a Branch: refs/heads/master Commit: 6c6f824aa25b90f094ecec189c5ad6a0ccf365cf Parents: c472e12 dd46523 Author: Luke CwikAuthored: Wed Oct 19 16:23:21 2016 -0700 Committer: Luke Cwik Committed: Wed Oct 19 16:23:21 2016 -0700 -- .../beam/examples/WindowedWordCountIT.java | 11 + .../examples/cookbook/BigQueryTornadoesIT.java | 14 +- .../beam/sdk/testing/BigqueryMatcher.java | 239 +++ .../beam/sdk/testing/BigqueryMatcherTest.java | 176 ++ 4 files changed, 439 insertions(+), 1 deletion(-) --
[1/2] incubator-beam git commit: "Intern" schemas and schema strings to prevent out of memory issues when dealing with many many files in Avro sources.
Repository: incubator-beam Updated Branches: refs/heads/master 6d686288e -> 7f562cc10 "Intern" schemas and schema strings to prevent out of memory issues when dealing with many many files in Avro sources. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ebc62025 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ebc62025 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ebc62025 Branch: refs/heads/master Commit: ebc62025d26af5b5b3e2568e6e0e9e9df9d72546 Parents: 6d68628 Author: Luke CwikAuthored: Fri Oct 14 11:46:37 2016 -0700 Committer: Luke Cwik Committed: Tue Oct 18 12:50:56 2016 -0700 -- .../java/org/apache/beam/sdk/io/AvroSource.java | 97 +--- .../org/apache/beam/sdk/io/AvroSourceTest.java | 43 + 2 files changed, 125 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ebc62025/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java index f7ce3c2..aaf72ac 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java @@ -21,10 +21,13 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import com.google.common.annotations.VisibleForTesting; import java.io.ByteArrayInputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.io.InvalidObjectException; +import java.io.ObjectStreamException; import java.io.PushbackInputStream; import java.nio.ByteBuffer; import java.nio.channels.Channels; @@ -32,6 +35,8 @@ import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; import java.util.Arrays; import java.util.Collection; +import java.util.Map; +import java.util.WeakHashMap; import java.util.zip.Inflater; import java.util.zip.InflaterInputStream; import javax.annotation.concurrent.GuardedBy; @@ -164,7 +169,7 @@ public class AvroSource extends BlockBasedSource { * to read records of the given type from a file pattern. */ public static Read.Bounded readFromFileWithClass(String filePattern, Class clazz) { -return Read.from(new AvroSource(filePattern, DEFAULT_MIN_BUNDLE_SIZE, +return Read.from(new AvroSource<>(filePattern, DEFAULT_MIN_BUNDLE_SIZE, ReflectData.get().getSchema(clazz).toString(), clazz, null, null)); } @@ -218,14 +223,14 @@ public class AvroSource extends BlockBasedSource { * Does not modify this object. */ public AvroSource withMinBundleSize(long minBundleSize) { -return new AvroSource( +return new AvroSource<>( getFileOrPatternSpec(), minBundleSize, readSchemaString, type, codec, syncMarker); } private AvroSource(String fileNameOrPattern, long minBundleSize, String schema, Class type, String codec, byte[] syncMarker) { super(fileNameOrPattern, minBundleSize); -this.readSchemaString = schema; +this.readSchemaString = internSchemaString(schema); this.codec = codec; this.syncMarker = syncMarker; this.type = type; @@ -235,11 +240,11 @@ public class AvroSource extends BlockBasedSource { private AvroSource(String fileName, long minBundleSize, long startOffset, long endOffset, String schema, Class type, String codec, byte[] syncMarker, String fileSchema) { super(fileName, minBundleSize, startOffset, endOffset); -this.readSchemaString = schema; +this.readSchemaString = internSchemaString(schema); this.codec = codec; this.syncMarker = syncMarker; this.type = type; -this.fileSchemaString = fileSchema; +this.fileSchemaString = internSchemaString(fileSchema); } @Override @@ -277,13 +282,18 @@ public class AvroSource extends BlockBasedSource { readSchemaString = metadata.getSchemaString(); } } -return new AvroSource(fileName, getMinBundleSize(), start, end, readSchemaString, type, +// Note that if the fileSchemaString is equivalent to the readSchemaString, "intern"ing +// the string will occur within the constructor and return the same reference as the +// readSchemaString. This allows for Java to have an efficient serialization since it +// will only encode the schema once while just storing pointers to the encoded version +// within this source. +return new
[2/2] incubator-beam git commit: "Intern" schemas and schema strings to prevent out of memory issues
"Intern" schemas and schema strings to prevent out of memory issues This closes #1109 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7f562cc1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7f562cc1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7f562cc1 Branch: refs/heads/master Commit: 7f562cc10753147bdd424e0b237c50b1b33f852c Parents: 6d68628 ebc6202 Author: Luke CwikAuthored: Tue Oct 18 12:51:19 2016 -0700 Committer: Luke Cwik Committed: Tue Oct 18 12:51:19 2016 -0700 -- .../java/org/apache/beam/sdk/io/AvroSource.java | 97 +--- .../org/apache/beam/sdk/io/AvroSourceTest.java | 43 + 2 files changed, 125 insertions(+), 15 deletions(-) --
[2/2] incubator-beam git commit: Restore trigger-related tests missed in #1083
Restore trigger-related tests missed in #1083 This closes #1127 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6d686288 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6d686288 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6d686288 Branch: refs/heads/master Commit: 6d686288efd5fd64d43ba9802314f2cbbc8df72e Parents: 71c69b3 8d43e8a Author: Luke CwikAuthored: Tue Oct 18 11:01:14 2016 -0700 Committer: Luke Cwik Committed: Tue Oct 18 11:01:14 2016 -0700 -- .../beam/sdk/util/ExecutableTriggerTest.java| 127 +++ .../sdk/util/FinishedTriggersBitSetTest.java| 55 .../sdk/util/FinishedTriggersProperties.java| 110 .../beam/sdk/util/FinishedTriggersSetTest.java | 60 + 4 files changed, 352 insertions(+) --
[1/2] incubator-beam git commit: Restore trigger-related tests missed in #1083
Repository: incubator-beam Updated Branches: refs/heads/master 71c69b31b -> 6d686288e Restore trigger-related tests missed in #1083 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8d43e8aa Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8d43e8aa Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8d43e8aa Branch: refs/heads/master Commit: 8d43e8aa7ccb154e17d6840c25c7a72684c615aa Parents: 71c69b3 Author: Kenneth KnowlesAuthored: Tue Oct 18 10:11:37 2016 -0700 Committer: Luke Cwik Committed: Tue Oct 18 11:00:47 2016 -0700 -- .../beam/sdk/util/ExecutableTriggerTest.java| 127 +++ .../sdk/util/FinishedTriggersBitSetTest.java| 55 .../sdk/util/FinishedTriggersProperties.java| 110 .../beam/sdk/util/FinishedTriggersSetTest.java | 60 + 4 files changed, 352 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8d43e8aa/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java new file mode 100644 index 000..1e3a1ff --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link ExecutableTrigger}. + */ +@RunWith(JUnit4.class) +public class ExecutableTriggerTest { + + @Test + public void testIndexAssignmentLeaf() throws Exception { +StubTrigger t1 = new StubTrigger(); +ExecutableTrigger executable = ExecutableTrigger.create(t1); +assertEquals(0, executable.getTriggerIndex()); + } + + @Test + public void testIndexAssignmentOneLevel() throws Exception { +StubTrigger t1 = new StubTrigger(); +StubTrigger t2 = new StubTrigger(); +StubTrigger t = new StubTrigger(t1, t2); + +ExecutableTrigger executable = ExecutableTrigger.create(t); + +assertEquals(0, executable.getTriggerIndex()); +assertEquals(1, executable.subTriggers().get(0).getTriggerIndex()); +assertSame(t1, executable.subTriggers().get(0).getSpec()); +assertEquals(2, executable.subTriggers().get(1).getTriggerIndex()); +assertSame(t2, executable.subTriggers().get(1).getSpec()); + } + + @Test + public void testIndexAssignmentTwoLevel() throws Exception { +StubTrigger t11 = new StubTrigger(); +StubTrigger t12 = new StubTrigger(); +StubTrigger t13 = new StubTrigger(); +StubTrigger t14 = new StubTrigger(); +StubTrigger t21 = new StubTrigger(); +StubTrigger t22 = new StubTrigger(); +StubTrigger t1 = new StubTrigger(t11, t12, t13, t14); +StubTrigger t2 = new StubTrigger(t21, t22); +StubTrigger t = new StubTrigger(t1, t2); + +ExecutableTrigger executable = ExecutableTrigger.create(t); + +assertEquals(0, executable.getTriggerIndex()); +assertEquals(1, executable.subTriggers().get(0).getTriggerIndex()); +assertEquals(6, executable.subTriggers().get(0).getFirstIndexAfterSubtree()); +assertEquals(6, executable.subTriggers().get(1).getTriggerIndex()); + +assertSame(t1, executable.getSubTriggerContaining(1).getSpec()); +assertSame(t2, executable.getSubTriggerContaining(6).getSpec()); +assertSame(t1, executable.getSubTriggerContaining(2).getSpec()); +assertSame(t1,
incubator-beam git commit: [BEAM-764] Remove cloneAs from PipelineOptions
Repository: incubator-beam Updated Branches: refs/heads/master 75f54682a -> 71c69b31b [BEAM-764] Remove cloneAs from PipelineOptions Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/71c69b31 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/71c69b31 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/71c69b31 Branch: refs/heads/master Commit: 71c69b31b6894064bf8111007f947150ff725528 Parents: 75f5468 Author: Pei HeAuthored: Mon Oct 17 14:13:42 2016 -0700 Committer: Luke Cwik Committed: Tue Oct 18 10:56:43 2016 -0700 -- .../beam/sdk/options/PipelineOptions.java | 8 .../sdk/options/PipelineOptionsFactory.java | 1 - .../sdk/options/ProxyInvocationHandler.java | 22 -- .../beam/sdk/options/PipelineOptionsTest.java | 43 4 files changed, 74 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/71c69b31/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index 3d6cad6..5588543 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -211,14 +211,6 @@ public interface PipelineOptions extends HasDisplayData { T as(Class kls); /** - * Makes a deep clone of this object, and transforms the cloned object into the specified - * type {@code kls}. See {@link #as} for more information about the conversion. - * - * Properties that are marked with {@code @JsonIgnore} will not be cloned. - */ - T cloneAs(Class kls); - - /** * The pipeline runner that will be used to execute the pipeline. * For registered runners, the class name can be specified, otherwise the fully * qualified name needs to be specified. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/71c69b31/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java index 1c8a835..7206b11 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java @@ -989,7 +989,6 @@ public class PipelineOptionsFactory { // Ignore methods on the base PipelineOptions interface. try { methods.add(iface.getMethod("as", Class.class)); - methods.add(iface.getMethod("cloneAs", Class.class)); methods.add(iface.getMethod("populateDisplayData", DisplayData.Builder.class)); } catch (NoSuchMethodException | SecurityException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/71c69b31/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java index 47d7cee..a77dcc6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java @@ -134,10 +134,6 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData { @SuppressWarnings("unchecked") Class clazz = (Class) args[0]; return as(clazz); -} else if (args != null && "cloneAs".equals(method.getName()) && args[0] instanceof Class) { - @SuppressWarnings("unchecked") - Class clazz = (Class) args[0]; - return cloneAs(proxy, clazz); } else if (args != null && "populateDisplayData".equals(method.getName()) && args[0] instanceof DisplayData.Builder) { @SuppressWarnings("unchecked") @@ -223,24 +219,6 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData { } /** - * Backing implementation for {@link PipelineOptions#cloneAs(Class)}. - * - * @return A copy of the PipelineOptions. - */ - synchronized T cloneAs(Object proxy, Class iface) { -PipelineOptions clonedOptions; -try { - clonedOptions =
[2/2] incubator-beam git commit: BEAM-756 fix JavadocPackage checkstyle suppression
BEAM-756 fix JavadocPackage checkstyle suppression This closes #1114 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/006cb127 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/006cb127 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/006cb127 Branch: refs/heads/master Commit: 006cb127e84e045d58f13705e81d4769ae0cbf89 Parents: 4e14ac2 6a69184 Author: Luke CwikAuthored: Tue Oct 18 10:07:59 2016 -0700 Committer: Luke Cwik Committed: Tue Oct 18 10:07:59 2016 -0700 -- .../src/main/resources/beam/suppressions.xml| 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-) --
[1/2] incubator-beam git commit: BEAM-756 fix JavadocPackage checkstyle suppression
Repository: incubator-beam Updated Branches: refs/heads/master 4e14ac2c2 -> 006cb127e BEAM-756 fix JavadocPackage checkstyle suppression Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6a691843 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6a691843 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6a691843 Branch: refs/heads/master Commit: 6a691843460255abec96690778954990246a6b9d Parents: 4e14ac2 Author: Thomas WeiseAuthored: Mon Oct 17 11:39:45 2016 -0700 Committer: Luke Cwik Committed: Tue Oct 18 10:07:32 2016 -0700 -- .../src/main/resources/beam/suppressions.xml| 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6a691843/sdks/java/build-tools/src/main/resources/beam/suppressions.xml -- diff --git a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml index 00d6729..9f60c25 100644 --- a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml +++ b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml @@ -17,13 +17,13 @@ "http://www.puppycrawl.com/dtds/suppressions_1_1.dtd;> - - - + + + - - - + + +
[2/2] incubator-beam git commit: Restore ReshuffleTriggerTest to SDK
Restore ReshuffleTriggerTest to SDK This closes #1118 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4e14ac2c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4e14ac2c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4e14ac2c Branch: refs/heads/master Commit: 4e14ac2c2444f3d7f6e17286d0a4b9a500244aca Parents: c81376b 619746b Author: Luke CwikAuthored: Tue Oct 18 09:48:05 2016 -0700 Committer: Luke Cwik Committed: Tue Oct 18 09:48:05 2016 -0700 -- .../beam/sdk/util/ReshuffleTriggerTest.java | 67 1 file changed, 67 insertions(+) --
[1/2] incubator-beam git commit: Restore ReshuffleTriggerTest to SDK
Repository: incubator-beam Updated Branches: refs/heads/master c81376bc7 -> 4e14ac2c2 Restore ReshuffleTriggerTest to SDK Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/619746b6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/619746b6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/619746b6 Branch: refs/heads/master Commit: 619746b6cdbd46bc108f8b0d0214a9e8f266b6b1 Parents: c81376b Author: Kenneth KnowlesAuthored: Mon Oct 17 12:44:29 2016 -0700 Committer: Kenneth Knowles Committed: Tue Oct 18 09:38:41 2016 -0700 -- .../beam/sdk/util/ReshuffleTriggerTest.java | 67 1 file changed, 67 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/619746b6/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java new file mode 100644 index 000..83077f4 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link ReshuffleTrigger}. + */ +@RunWith(JUnit4.class) +public class ReshuffleTriggerTest { + + /** Public so that other tests can instantiate {@link ReshuffleTrigger}. */ + public static ReshuffleTrigger forTest() { +return new ReshuffleTrigger<>(); + } + + @Test + public void testShouldFire() throws Exception { +TriggerTester tester = TriggerTester.forTrigger( +new ReshuffleTrigger(), FixedWindows.of(Duration.millis(100))); +IntervalWindow arbitraryWindow = new IntervalWindow(new Instant(300), new Instant(400)); +assertTrue(tester.shouldFire(arbitraryWindow)); + } + + @Test + public void testOnTimer() throws Exception { +TriggerTester tester = TriggerTester.forTrigger( +new ReshuffleTrigger(), FixedWindows.of(Duration.millis(100))); +IntervalWindow arbitraryWindow = new IntervalWindow(new Instant(100), new Instant(200)); +tester.fireIfShouldFire(arbitraryWindow); +assertFalse(tester.isMarkedFinished(arbitraryWindow)); + } + + @Test + public void testToString() { +Trigger trigger = new ReshuffleTrigger<>(); +assertEquals("ReshuffleTrigger()", trigger.toString()); + } +}
[1/2] incubator-beam git commit: Update Dataflow container image for current version of Apache Beam.
Repository: incubator-beam Updated Branches: refs/heads/master 6b5ff4c4a -> ecc2abfdb Update Dataflow container image for current version of Apache Beam. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/760f30dd Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/760f30dd Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/760f30dd Branch: refs/heads/master Commit: 760f30ddaf186f6d7b0a1c84e1a580ee249bd344 Parents: 6b5ff4c Author: Luke CwikAuthored: Mon Oct 17 21:58:26 2016 -0700 Committer: Luke Cwik Committed: Tue Oct 18 08:27:28 2016 -0700 -- .../java/org/apache/beam/runners/dataflow/DataflowRunner.java| 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/760f30dd/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java -- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 55a01f7..5f83788 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -208,9 +208,9 @@ public class DataflowRunner extends PipelineRunner { // Default Docker container images that execute Dataflow worker harness, residing in Google // Container Registry, separately for Batch and Streaming. public static final String BATCH_WORKER_HARNESS_CONTAINER_IMAGE = - "dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20160926"; + "dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20161017"; public static final String STREAMING_WORKER_HARNESS_CONTAINER_IMAGE = - "dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20160926"; + "dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20161017"; // The limit of CreateJob request size. private static final int CREATE_JOB_REQUEST_LIMIT_BYTES = 10 * 1024 * 1024;
[3/3] incubator-beam git commit: [BEAM-596] Replace DirectResult#awaitCompletion with waitUntilFinish
[BEAM-596] Replace DirectResult#awaitCompletion with waitUntilFinish This closes #1098 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b8e6eea6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b8e6eea6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b8e6eea6 Branch: refs/heads/master Commit: b8e6eea691b48e14c4e2c3e84609d750769e09ee Parents: c2c650a 06bd074 Author: Luke CwikAuthored: Fri Oct 14 17:21:46 2016 -0700 Committer: Luke Cwik Committed: Fri Oct 14 17:21:46 2016 -0700 -- .../beam/runners/direct/CompletionCallback.java | 4 +- .../beam/runners/direct/DirectOptions.java | 4 +- .../beam/runners/direct/DirectRunner.java | 27 +++-- .../direct/ExecutorServiceParallelExecutor.java | 40 ++-- .../beam/runners/direct/PipelineExecutor.java | 2 +- .../beam/runners/direct/TransformExecutor.java | 10 ++--- .../beam/runners/direct/DirectRunnerTest.java | 6 +-- .../runners/direct/TransformExecutorTest.java | 16 8 files changed, 54 insertions(+), 55 deletions(-) --
[2/3] incubator-beam git commit: Stop Catching Errors in the DirectRunner
Stop Catching Errors in the DirectRunner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5e51c840 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5e51c840 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5e51c840 Branch: refs/heads/master Commit: 5e51c84003c2c9e03d51f94cbc2be07569bf090e Parents: c2c650a Author: Thomas GrohAuthored: Fri Oct 14 10:32:14 2016 -0700 Committer: Luke Cwik Committed: Fri Oct 14 17:21:18 2016 -0700 -- .../beam/runners/direct/CompletionCallback.java | 4 +- .../beam/runners/direct/DirectRunner.java | 14 ++- .../direct/ExecutorServiceParallelExecutor.java | 40 ++-- .../beam/runners/direct/PipelineExecutor.java | 2 +- .../beam/runners/direct/TransformExecutor.java | 10 ++--- .../runners/direct/TransformExecutorTest.java | 16 6 files changed, 39 insertions(+), 47 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5e51c840/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java index 8e51d6f..2986df1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java @@ -38,7 +38,7 @@ interface CompletionCallback { void handleEmpty(AppliedPTransform transform); /** - * Handle a result that terminated abnormally due to the provided {@link Throwable}. + * Handle a result that terminated abnormally due to the provided {@link Exception}. */ - void handleThrowable(CommittedBundle inputBundle, Throwable t); + void handleException(CommittedBundle inputBundle, Exception t); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5e51c840/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 6ef2472..664a915 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -403,18 +403,10 @@ public class DirectRunner * * See also {@link PipelineExecutor#awaitCompletion()}. */ -public State awaitCompletion() throws Throwable { +public State awaitCompletion() throws Exception { if (!state.isTerminal()) { -try { - executor.awaitCompletion(); - state = State.DONE; -} catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw e; -} catch (Throwable t) { - state = State.FAILED; - throw t; -} +executor.awaitCompletion(); +state = State.DONE; } return state; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5e51c840/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 3274524..e32f671 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -234,7 +234,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { } @Override - public void awaitCompletion() throws Throwable { + public void awaitCompletion() throws Exception { VisibleExecutorUpdate update; do { // Get an update; don't block forever if another thread has handled it @@ -243,8 +243,8 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { // there are no updates to process and no updates will ever be published because the // executor is shutdown return; - } else if (update != null && update.throwable.isPresent()) { -throw update.throwable.get(); + } else if (update != null && update.exception.isPresent()) { +
[1/2] incubator-beam git commit: Perform initial splitting in the DirectRunner
Repository: incubator-beam Updated Branches: refs/heads/master 86d222aab -> 0a413e78e Perform initial splitting in the DirectRunner This allows sources to be read from in parallel and generates initial splits. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f68fea02 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f68fea02 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f68fea02 Branch: refs/heads/master Commit: f68fea02b63e5844b9ccbd31ff8e02da407f65b7 Parents: 86d222a Author: Thomas GrohAuthored: Wed Oct 5 16:11:21 2016 -0700 Committer: Luke Cwik Committed: Fri Oct 14 13:54:55 2016 -0700 -- .../direct/BoundedReadEvaluatorFactory.java | 40 ++- .../beam/runners/direct/DirectOptions.java | 23 ++ .../beam/runners/direct/DirectRunner.java | 11 +- .../beam/runners/direct/EmptyInputProvider.java | 12 +- .../direct/ExecutorServiceParallelExecutor.java | 15 +- .../beam/runners/direct/RootInputProvider.java | 7 +- .../runners/direct/RootProviderRegistry.java| 5 +- .../direct/TestStreamEvaluatorFactory.java | 4 +- .../direct/TransformEvaluatorRegistry.java | 10 +- .../direct/UnboundedReadEvaluatorFactory.java | 35 ++- .../beam/runners/direct/WatermarkManager.java | 1 + .../direct/BoundedReadEvaluatorFactoryTest.java | 41 ++- .../direct/FlattenEvaluatorFactoryTest.java | 9 +- .../direct/TestStreamEvaluatorFactoryTest.java | 2 +- .../UnboundedReadEvaluatorFactoryTest.java | 55 +++- .../sdk/io/gcp/bigquery/BigQueryAvroUtils.java | 69 - .../io/gcp/bigquery/BigQueryAvroUtilsTest.java | 132 +++-- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 292 ++- .../sdk/io/gcp/bigtable/BigtableIOTest.java | 9 +- 19 files changed, 662 insertions(+), 110 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f68fea02/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java index 326a535..843dcd6 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java @@ -18,28 +18,32 @@ package org.apache.beam.runners.direct; import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableList; import java.io.IOException; import java.util.Collection; -import java.util.Collections; +import java.util.List; import javax.annotation.Nullable; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.runners.direct.StepTransformResult.Builder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; -import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Read.Bounded; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators} * for the {@link Bounded Read.Bounded} primitive {@link PTransform}. */ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { + private static final Logger LOG = LoggerFactory.getLogger(BoundedReadEvaluatorFactory.class); private final EvaluationContext evaluationContext; BoundedReadEvaluatorFactory(EvaluationContext evaluationContext) { @@ -126,18 +130,32 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { } @Override -public Collection getInitialInputs(AppliedPTransform transform) { - return createInitialSplits((AppliedPTransform) transform); +public Collection getInitialInputs( +AppliedPTransform transform, int targetParallelism) throws Exception { + return createInitialSplits((AppliedPTransform) transform, targetParallelism); } -private Collection createInitialSplits( -AppliedPTransform> transform) { +private +
[2/2] incubator-beam git commit: [BEAM-310] Perform initial splitting in the DirectRunner
[BEAM-310] Perform initial splitting in the DirectRunner This closes #1063 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0a413e78 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0a413e78 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0a413e78 Branch: refs/heads/master Commit: 0a413e78e8f937ab3c221b78be25037021685e0d Parents: 86d222a f68fea0 Author: Luke CwikAuthored: Fri Oct 14 13:55:24 2016 -0700 Committer: Luke Cwik Committed: Fri Oct 14 13:55:24 2016 -0700 -- .../direct/BoundedReadEvaluatorFactory.java | 40 ++- .../beam/runners/direct/DirectOptions.java | 23 ++ .../beam/runners/direct/DirectRunner.java | 11 +- .../beam/runners/direct/EmptyInputProvider.java | 12 +- .../direct/ExecutorServiceParallelExecutor.java | 15 +- .../beam/runners/direct/RootInputProvider.java | 7 +- .../runners/direct/RootProviderRegistry.java| 5 +- .../direct/TestStreamEvaluatorFactory.java | 4 +- .../direct/TransformEvaluatorRegistry.java | 10 +- .../direct/UnboundedReadEvaluatorFactory.java | 35 ++- .../beam/runners/direct/WatermarkManager.java | 1 + .../direct/BoundedReadEvaluatorFactoryTest.java | 41 ++- .../direct/FlattenEvaluatorFactoryTest.java | 9 +- .../direct/TestStreamEvaluatorFactoryTest.java | 2 +- .../UnboundedReadEvaluatorFactoryTest.java | 55 +++- .../sdk/io/gcp/bigquery/BigQueryAvroUtils.java | 69 - .../io/gcp/bigquery/BigQueryAvroUtilsTest.java | 132 +++-- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 292 ++- .../sdk/io/gcp/bigtable/BigtableIOTest.java | 9 +- 19 files changed, 662 insertions(+), 110 deletions(-) --
[2/2] incubator-beam git commit: [BEAM-739] Log exceptions full stack track in IT tests
[BEAM-739] Log exceptions full stack track in IT tests This closes #1079 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4f991fd8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4f991fd8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4f991fd8 Branch: refs/heads/master Commit: 4f991fd828b0d03556fee7029a1d19657af318ed Parents: 135790b 1affccc Author: Luke CwikAuthored: Wed Oct 12 08:22:12 2016 -0700 Committer: Luke Cwik Committed: Wed Oct 12 08:22:12 2016 -0700 -- examples/java/pom.xml | 2 -- pom.xml| 4 sdks/java/io/google-cloud-platform/pom.xml | 1 - sdks/java/io/kinesis/pom.xml | 1 - 4 files changed, 4 insertions(+), 4 deletions(-) --
[1/2] incubator-beam git commit: Disable trimStackTrace in maven-failsafe-plugin
Repository: incubator-beam Updated Branches: refs/heads/master 135790bc9 -> 4f991fd82 Disable trimStackTrace in maven-failsafe-plugin Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1affcccd Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1affcccd Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1affcccd Branch: refs/heads/master Commit: 1affcccd4f4dafa0c6c6ec2360c5fbcdf55247ce Parents: 135790b Author: Pei HeAuthored: Tue Oct 11 15:56:58 2016 -0700 Committer: Luke Cwik Committed: Wed Oct 12 08:21:48 2016 -0700 -- examples/java/pom.xml | 2 -- pom.xml| 4 sdks/java/io/google-cloud-platform/pom.xml | 1 - sdks/java/io/kinesis/pom.xml | 1 - 4 files changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1affcccd/examples/java/pom.xml -- diff --git a/examples/java/pom.xml b/examples/java/pom.xml index 1923366..aa7c22a 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -127,7 +127,6 @@ maven-failsafe-plugin false - true @@ -404,7 +403,6 @@ maven-failsafe-plugin false - true http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1affcccd/pom.xml -- diff --git a/pom.xml b/pom.xml index 7295261..c135f17 100644 --- a/pom.xml +++ b/pom.xml @@ -981,6 +981,10 @@ org.apache.maven.plugins maven-failsafe-plugin 2.19.1 + +false +true +
[1/2] incubator-beam git commit: [BEAM-736] Fix BigQueryTornadoesIT, broken by PR-1039
Repository: incubator-beam Updated Branches: refs/heads/master c7c13157f -> daf69f87e [BEAM-736] Fix BigQueryTornadoesIT, broken by PR-1039 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1d3f31da Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1d3f31da Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1d3f31da Branch: refs/heads/master Commit: 1d3f31da7ee9769200e8a93ec3626cd4cd636f67 Parents: c7c1315 Author: Pei HeAuthored: Mon Oct 10 14:30:30 2016 -0700 Committer: Luke Cwik Committed: Mon Oct 10 17:28:19 2016 -0700 -- .../java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1d3f31da/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java -- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 3d1aba6..5aa952c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -542,11 +542,11 @@ public class BigQueryIO { // For these cases the withoutValidation method can be used to disable the check. if (validate && table != null) { // Check for source table presence for early failure notification. - DatasetService datasetService = bigQueryServices.getDatasetService(bqOptions); + DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions); verifyDatasetPresence(datasetService, table); verifyTablePresence(datasetService, table); } else if (validate && query != null) { - JobService jobService = bigQueryServices.getJobService(bqOptions); + JobService jobService = getBigQueryServices().getJobService(bqOptions); try { jobService.dryRunQuery( bqOptions.getProject(),
[2/2] incubator-beam git commit: [BEAM-736] Fix BigQueryTornadoesIT, broken by PR-1039
[BEAM-736] Fix BigQueryTornadoesIT, broken by PR-1039 This closes #1078 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/daf69f87 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/daf69f87 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/daf69f87 Branch: refs/heads/master Commit: daf69f87e7271b2620e9bc023a8a360a485f856a Parents: c7c1315 1d3f31d Author: Luke CwikAuthored: Mon Oct 10 17:28:52 2016 -0700 Committer: Luke Cwik Committed: Mon Oct 10 17:28:52 2016 -0700 -- .../java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) --
[2/2] incubator-beam git commit: Move GcsPathValidatorTest to package matching class under test
Move GcsPathValidatorTest to package matching class under test This closes #1074 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c7c13157 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c7c13157 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c7c13157 Branch: refs/heads/master Commit: c7c13157fae00a0af6bcc3aad2369c34956507ed Parents: 570de74 a37ec0a Author: Luke CwikAuthored: Mon Oct 10 11:49:28 2016 -0700 Committer: Luke Cwik Committed: Mon Oct 10 11:49:28 2016 -0700 -- .../dataflow/util/GcsPathValidatorTest.java | 103 --- .../beam/sdk/util/GcsPathValidatorTest.java | 100 ++ 2 files changed, 100 insertions(+), 103 deletions(-) --
[1/2] incubator-beam git commit: Move GcsPathValidatorTest to package matching class under test
Repository: incubator-beam Updated Branches: refs/heads/master 570de74da -> c7c13157f Move GcsPathValidatorTest to package matching class under test Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a37ec0ae Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a37ec0ae Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a37ec0ae Branch: refs/heads/master Commit: a37ec0ae9d07915fe33e932578628b02b3161d02 Parents: 570de74 Author: Kenneth KnowlesAuthored: Mon Oct 10 11:28:11 2016 -0700 Committer: Kenneth Knowles Committed: Mon Oct 10 11:28:11 2016 -0700 -- .../dataflow/util/GcsPathValidatorTest.java | 103 --- .../beam/sdk/util/GcsPathValidatorTest.java | 100 ++ 2 files changed, 100 insertions(+), 103 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a37ec0ae/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java deleted file mode 100644 index adf4fc2..000 --- a/sdks/java/core/src/test/java/org/apache/beam/runners/dataflow/util/GcsPathValidatorTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.util; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.when; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.options.GcsOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.util.GcsPathValidator; -import org.apache.beam.sdk.util.GcsUtil; -import org.apache.beam.sdk.util.TestCredential; -import org.apache.beam.sdk.util.gcsfs.GcsPath; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -/** Tests for {@link GcsPathValidator}. */ -@RunWith(JUnit4.class) -public class GcsPathValidatorTest { - @Rule public ExpectedException expectedException = ExpectedException.none(); - - @Mock private GcsUtil mockGcsUtil; - private GcsPathValidator validator; - - private class FakeRunner extends PipelineRunner { -@Override -public PipelineResult run(Pipeline pipeline) { - throw new UnsupportedOperationException(); -} - } - - @Before - public void setUp() throws Exception { -MockitoAnnotations.initMocks(this); -when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true); -when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod(); -GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); -options.setRunner(FakeRunner.class); -options.setGcpCredential(new TestCredential()); -options.setGcsUtil(mockGcsUtil); -validator = GcsPathValidator.fromOptions(options); - } - - @Test - public void testValidFilePattern() { -validator.validateInputFilePatternSupported("gs://bucket/path"); - } - - @Test - public void testInvalidFilePattern() { -expectedException.expect(IllegalArgumentException.class); -expectedException.expectMessage( -"FakeRunner expected a valid 'gs://' path but was given '/local/path'"); -validator.validateInputFilePatternSupported("/local/path"); - } - - @Test - public void testWhenBucketDoesNotExist() throws Exception { -when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(false); -expectedException.expect(IllegalArgumentException.class); -
[1/2] incubator-beam git commit: BigQueryIO: port trivial fixes from Dataflow version.
Repository: incubator-beam Updated Branches: refs/heads/master 2492604e4 -> 570de74da BigQueryIO: port trivial fixes from Dataflow version. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/39b9de5f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/39b9de5f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/39b9de5f Branch: refs/heads/master Commit: 39b9de5feab7be37f88e44e99784375a8ae82bc7 Parents: 2492604 Author: Pei HeAuthored: Mon Oct 3 21:19:37 2016 -0700 Committer: Luke Cwik Committed: Mon Oct 10 09:01:29 2016 -0700 -- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java| 21 +++- 1 file changed, 7 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/39b9de5f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java -- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 716fe39..3d1aba6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -268,12 +268,6 @@ public class BigQueryIO { private static final Pattern TABLE_SPEC = Pattern.compile(DATASET_TABLE_REGEXP); - // TODO: make this private and remove improper access from BigQueryIOTranslator. - public static final String SET_PROJECT_FROM_OPTIONS_WARNING = - "No project specified for BigQuery table \"%1$s.%2$s\". Assuming it is in \"%3$s\". If the" - + " table is in a different project please specify it as a part of the BigQuery table" - + " definition."; - private static final String RESOURCE_NOT_FOUND_ERROR = "BigQuery %1$s not found for table \"%2$s\" . Please create the %1$s before pipeline" + " execution. If the %1$s is created by an earlier stage of the pipeline, this" @@ -614,6 +608,7 @@ public class BigQueryIO { JobReference jobRef = new JobReference() .setProjectId(executingProject) .setJobId(getExtractJobId(jobIdToken)); + Job extractJob = bqServices.getJobService(bqOptions) .getJob(jobRef); @@ -805,8 +800,7 @@ public class BigQueryIO { BigQueryServices bqServices, String executingProject) { super(jobIdToken, extractDestinationDir, bqServices, executingProject); - checkNotNull(table, "table"); - this.jsonTable = toJsonString(table); + this.jsonTable = toJsonString(checkNotNull(table, "table")); this.tableSizeBytes = new AtomicReference<>(); } @@ -960,6 +954,7 @@ public class BigQueryIO { super.populateDisplayData(builder); builder.add(DisplayData.item("query", query)); } + private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions) throws InterruptedException, IOException { if (dryRunJobStats.get() == null) { @@ -1755,10 +1750,8 @@ public class BigQueryIO { new StreamWithDeDup(getTable(), tableRefFunction, getSchema(), bqServices)); } -TableReference table = fromJsonString(jsonTableRef, TableReference.class); -if (Strings.isNullOrEmpty(table.getProjectId())) { - table.setProjectId(options.getProject()); -} +TableReference table = getTableWithDefaultProject(options); + String jobIdToken = "beam_job_" + randomUUIDString(); String tempLocation = options.getTempLocation(); String tempFilePrefix; @@ -2653,7 +2646,7 @@ public class BigQueryIO { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.addIfNotNull(DisplayData.item("tableSpec", tableSpec)); + builder.addIfNotNull(DisplayData.item("table", tableSpec)); if (tableRefFunction != null) { builder.add(DisplayData.item("tableFn", tableRefFunction.getClass()) .withLabel("Table Reference Function")); @@ -2745,7 +2738,7 @@ public class BigQueryIO { UNKNOWN, } - private static Status parseStatus(Job job) { + private static Status parseStatus(@Nullable Job job) { if (job == null) { return Status.UNKNOWN; }
[2/2] incubator-beam git commit: BigQueryIO: port trivial fixes from Dataflow version.
BigQueryIO: port trivial fixes from Dataflow version. This closes #1042 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/570de74d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/570de74d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/570de74d Branch: refs/heads/master Commit: 570de74da05082f1a97b54191356f13c7fea5657 Parents: 2492604 39b9de5 Author: Luke CwikAuthored: Mon Oct 10 09:02:08 2016 -0700 Committer: Luke Cwik Committed: Mon Oct 10 09:02:08 2016 -0700 -- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java| 21 +++- 1 file changed, 7 insertions(+), 14 deletions(-) --
[1/4] incubator-beam git commit: Forward port Dataflow PR-454 to Beam
Repository: incubator-beam Updated Branches: refs/heads/master ecbc64117 -> 9b71f1636 Forward port Dataflow PR-454 to Beam Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0ac0caf2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0ac0caf2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0ac0caf2 Branch: refs/heads/master Commit: 0ac0caf2f78064a820f8a6ae23624162dcd1419f Parents: cca861b Author: Pei HeAuthored: Mon Oct 3 20:39:32 2016 -0700 Committer: Luke Cwik Committed: Thu Oct 6 15:33:03 2016 -0700 -- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java| 34 +++- .../sdk/io/gcp/bigquery/BigQueryServices.java | 2 +- .../io/gcp/bigquery/BigQueryServicesImpl.java | 8 ++--- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 11 +-- 4 files changed, 31 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0ac0caf2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java -- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 6d20c3f..eb98ea8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -553,7 +553,12 @@ public class BigQueryIO { } else if (validate && query != null) { JobService jobService = bigQueryServices.getJobService(bqOptions); try { -jobService.dryRunQuery(bqOptions.getProject(), query, useLegacySql); +jobService.dryRunQuery( +bqOptions.getProject(), +new JobConfigurationQuery() +.setQuery(query) +.setFlattenResults(flattenResults) +.setUseLegacySql(useLegacySql)); } catch (Exception e) { throw new IllegalArgumentException( String.format(QUERY_VALIDATION_FAILURE_ERROR, query), e); @@ -926,10 +931,7 @@ public class BigQueryIO { executeQuery( executingProject, queryJobId, - query, tableToExtract, - flattenResults, - useLegacySql, bqServices.getJobService(bqOptions)); return tableToExtract; } @@ -955,34 +957,29 @@ public class BigQueryIO { private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions) throws InterruptedException, IOException { if (dryRunJobStats.get() == null) { -JobStatistics jobStats = -bqServices.getJobService(bqOptions).dryRunQuery(executingProject, query, useLegacySql); +JobStatistics jobStats = bqServices.getJobService(bqOptions).dryRunQuery( +executingProject, createBasicQueryConfig()); dryRunJobStats.compareAndSet(null, jobStats); } return dryRunJobStats.get(); } -private static void executeQuery( +private void executeQuery( String executingProject, String jobId, -String query, TableReference destinationTable, -boolean flattenResults, -boolean useLegacySql, JobService jobService) throws IOException, InterruptedException { JobReference jobRef = new JobReference() .setProjectId(executingProject) .setJobId(jobId); - JobConfigurationQuery queryConfig = new JobConfigurationQuery(); - queryConfig - .setQuery(query) + + JobConfigurationQuery queryConfig = createBasicQueryConfig() .setAllowLargeResults(true) .setCreateDisposition("CREATE_IF_NEEDED") .setDestinationTable(destinationTable) - .setFlattenResults(flattenResults) - .setUseLegacySql(useLegacySql) .setPriority("BATCH") .setWriteDisposition("WRITE_EMPTY"); + jobService.startQueryJob(jobRef, queryConfig); Job job = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES); if (parseStatus(job) != Status.SUCCEEDED) { @@ -990,6 +987,13 @@ public class BigQueryIO { } } +private JobConfigurationQuery createBasicQueryConfig() { + return new JobConfigurationQuery() + .setQuery(query) + .setFlattenResults(flattenResults) + .setUseLegacySql(useLegacySql); +} + private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
[3/4] incubator-beam git commit: Forward port Dataflow PR-453 to Beam
Forward port Dataflow PR-453 to Beam Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9f33e869 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9f33e869 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9f33e869 Branch: refs/heads/master Commit: 9f33e8692fe4852b1825c5464eafa8d9e9786425 Parents: 0ac0caf Author: Pei HeAuthored: Mon Oct 3 21:05:44 2016 -0700 Committer: Luke Cwik Committed: Thu Oct 6 15:33:04 2016 -0700 -- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java| 24 +++--- .../sdk/io/gcp/bigquery/BigQueryServices.java | 3 +- .../io/gcp/bigquery/BigQueryServicesImpl.java | 19 +++-- .../gcp/bigquery/BigQueryTableRowIterator.java | 2 +- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 83 5 files changed, 112 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f33e869/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java -- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index eb98ea8..716fe39 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -528,6 +528,7 @@ public class BigQueryIO { checkState( table != null || query != null, "Invalid BigQueryIO.Read: one of table reference and query must be set"); + if (table != null) { checkState( flattenResults == null, @@ -910,21 +911,26 @@ public class BigQueryIO { protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException, InterruptedException { // 1. Find the location of the query. - TableReference dryRunTempTable = dryRunQueryIfNeeded(bqOptions) - .getQuery() - .getReferencedTables() - .get(0); + String location = null; + List referencedTables = + dryRunQueryIfNeeded(bqOptions).getQuery().getReferencedTables(); DatasetService tableService = bqServices.getDatasetService(bqOptions); - String location = tableService.getTable( - dryRunTempTable.getProjectId(), - dryRunTempTable.getDatasetId(), - dryRunTempTable.getTableId()).getLocation(); + if (referencedTables != null && !referencedTables.isEmpty()) { +TableReference queryTable = referencedTables.get(0); +location = tableService.getTable( +queryTable.getProjectId(), +queryTable.getDatasetId(), +queryTable.getTableId()).getLocation(); + } // 2. Create the temporary dataset in the query location. TableReference tableToExtract = JSON_FACTORY.fromString(jsonQueryTempTable, TableReference.class); tableService.createDataset( - tableToExtract.getProjectId(), tableToExtract.getDatasetId(), location, ""); + tableToExtract.getProjectId(), + tableToExtract.getDatasetId(), + location, + "Dataset for BigQuery query job temporary table"); // 3. Execute the query. String queryJobId = jobIdToken + "-query"; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f33e869/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java -- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index 1d9fb28..ca7e491 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -141,7 +141,8 @@ interface BigQueryServices extends Serializable { /** * Create a {@link Dataset} with the given {@code location} and {@code description}. */ -void createDataset(String projectId, String datasetId, String location, String description) +void createDataset( +String projectId, String datasetId, @Nullable String location, @Nullable String description) throws IOException, InterruptedException; /**
[2/4] incubator-beam git commit: Forward port Dataflow PR-431 to Beam
Forward port Dataflow PR-431 to Beam Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cca861ba Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cca861ba Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cca861ba Branch: refs/heads/master Commit: cca861ba82a2e6ba6c6af122be0b8a9932d53cc5 Parents: ecbc641 Author: Pei HeAuthored: Mon Oct 3 19:37:02 2016 -0700 Committer: Luke Cwik Committed: Thu Oct 6 15:33:03 2016 -0700 -- .../org/apache/beam/sdk/util/PropertyNames.java | 1 + .../beam/sdk/io/gcp/bigquery/BigQueryIO.java| 129 +-- .../sdk/io/gcp/bigquery/BigQueryServices.java | 5 +- .../io/gcp/bigquery/BigQueryServicesImpl.java | 16 ++- .../gcp/bigquery/BigQueryTableRowIterator.java | 14 +- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 44 +-- .../bigquery/BigQueryTableRowIteratorTest.java | 6 +- 7 files changed, 149 insertions(+), 66 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cca861ba/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java index cc9fa5e..b17bcad 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PropertyNames.java @@ -30,6 +30,7 @@ public class PropertyNames { public static final String BIGQUERY_TABLE = "table"; public static final String BIGQUERY_QUERY = "bigquery_query"; public static final String BIGQUERY_FLATTEN_RESULTS = "bigquery_flatten_results"; + public static final String BIGQUERY_USE_LEGACY_SQL = "bigquery_use_legacy_sql"; public static final String BIGQUERY_WRITE_DISPOSITION = "write_disposition"; public static final String BIGQUERY_EXPORT_FORMAT = "bigquery_export_format"; public static final String BIGQUERY_EXPORT_SCHEMA = "bigquery_export_schema"; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cca861ba/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java -- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 91f6073..6d20c3f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -384,6 +384,7 @@ public class BigQueryIO { */ final boolean validate; @Nullable final Boolean flattenResults; + @Nullable final Boolean useLegacySql; @Nullable BigQueryServices bigQueryServices; private static final String QUERY_VALIDATION_FAILURE_ERROR = @@ -397,17 +398,20 @@ public class BigQueryIO { null /* jsonTableRef */, true /* validate */, null /* flattenResults */, +null /* useLegacySql */, null /* bigQueryServices */); } private Bound( String name, @Nullable String query, @Nullable String jsonTableRef, boolean validate, - @Nullable Boolean flattenResults, @Nullable BigQueryServices bigQueryServices) { + @Nullable Boolean flattenResults, @Nullable Boolean useLegacySql, + @Nullable BigQueryServices bigQueryServices) { super(name); this.jsonTableRef = jsonTableRef; this.query = query; this.validate = validate; this.flattenResults = flattenResults; +this.useLegacySql = useLegacySql; this.bigQueryServices = bigQueryServices; } @@ -428,7 +432,8 @@ public class BigQueryIO { */ public Bound from(TableReference table) { return new Bound( -name, query, toJsonString(table), validate, flattenResults, bigQueryServices); +name, query, toJsonString(table), validate, flattenResults, useLegacySql, +bigQueryServices); } /** @@ -440,10 +445,15 @@ public class BigQueryIO { * "flattenResults" in the https://cloud.google.com/bigquery/docs/reference/v2/jobs;> * Jobs documentation for more information. To disable flattening, use * {@link BigQueryIO.Read.Bound#withoutResultFlattening}. + * + * By default, the query will use BigQuery's legacy SQL dialect. To use the BigQuery +
[3/4] incubator-beam git commit: Remove KeyedResourcePool
Remove KeyedResourcePool This interface is no longer used. Instead, the runner ensures that bundles will be provided containing the appropriate input to the TestStreamEvaluatorFactory. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41fb16f0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41fb16f0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41fb16f0 Branch: refs/heads/master Commit: 41fb16f014a79d2b9c149c5b369db12b61c4c774 Parents: 7306e16 Author: Thomas GrohAuthored: Wed Oct 5 13:12:48 2016 -0700 Committer: Luke Cwik Committed: Thu Oct 6 15:14:38 2016 -0700 -- .../direct/BoundedReadEvaluatorFactory.java | 40 +++-- .../beam/runners/direct/DirectRunner.java | 2 + .../beam/runners/direct/EmptyInputProvider.java | 49 ++ .../direct/ExecutorServiceParallelExecutor.java | 27 ++- .../runners/direct/FlattenEvaluatorFactory.java | 18 +- .../beam/runners/direct/KeyedResourcePool.java | 47 -- .../runners/direct/LockedKeyedResourcePool.java | 95 --- .../beam/runners/direct/RootInputProvider.java | 41 + .../runners/direct/RootProviderRegistry.java| 65 .../direct/RootTransformEvaluatorFactory.java | 42 - .../direct/TestStreamEvaluatorFactory.java | 39 +++-- .../direct/TransformEvaluatorRegistry.java | 17 +- .../direct/UnboundedReadEvaluatorFactory.java | 56 --- .../direct/BoundedReadEvaluatorFactoryTest.java | 3 +- .../direct/FlattenEvaluatorFactoryTest.java | 3 +- .../direct/LockedKeyedResourcePoolTest.java | 163 --- .../direct/TestStreamEvaluatorFactoryTest.java | 3 +- .../UnboundedReadEvaluatorFactoryTest.java | 8 +- 18 files changed, 269 insertions(+), 449 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41fb16f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java index 4936ad9..326a535 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java @@ -39,28 +39,13 @@ import org.apache.beam.sdk.values.PCollection; * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators} * for the {@link Bounded Read.Bounded} primitive {@link PTransform}. */ -final class BoundedReadEvaluatorFactory implements RootTransformEvaluatorFactory { +final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { private final EvaluationContext evaluationContext; BoundedReadEvaluatorFactory(EvaluationContext evaluationContext) { this.evaluationContext = evaluationContext; } - @Override - public Collection getInitialInputs(AppliedPTransform transform) { -return createInitialSplits((AppliedPTransform) transform); - } - - private Collection createInitialSplits( - AppliedPTransform> transform) { -BoundedSource source = transform.getTransform().getSource(); -return Collections. singleton( -evaluationContext -. createRootBundle() - .add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source))) -.commit(BoundedWindow.TIMESTAMP_MAX_VALUE)); - } - @SuppressWarnings({"unchecked", "rawtypes"}) @Override @Nullable @@ -132,4 +117,27 @@ final class BoundedReadEvaluatorFactory implements RootTransformEvaluatorFactory abstract BoundedSource getSource(); } + + static class InputProvider implements RootInputProvider { +private final EvaluationContext evaluationContext; + +InputProvider(EvaluationContext evaluationContext) { + this.evaluationContext = evaluationContext; +} + +@Override +public Collection getInitialInputs(AppliedPTransform transform) { + return createInitialSplits((AppliedPTransform) transform); +} + +private Collection createInitialSplits( +AppliedPTransform> transform) { + BoundedSource source = transform.getTransform().getSource(); + return Collections. singleton( + evaluationContext + . createRootBundle() + .add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(source))) +
[1/4] incubator-beam git commit: Add RootTransformEvaluatorFactory
Repository: incubator-beam Updated Branches: refs/heads/master 0ddba6d8d -> ecbc64117 http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7306e16b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java -- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index 77c0bcb..25642dd 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -19,7 +19,7 @@ package org.apache.beam.runners.direct; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; @@ -28,16 +28,20 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ContiguousSet; import com.google.common.collect.DiscreteDomain; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import com.google.common.collect.Range; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.NoSuchElementException; import javax.annotation.Nullable; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; +import org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator; +import org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory.UnboundedSourceShard; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; @@ -68,16 +72,17 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class UnboundedReadEvaluatorFactoryTest { private PCollection longs; - private TransformEvaluatorFactory factory; + private UnboundedReadEvaluatorFactory factory; private EvaluationContext context; private UncommittedBundle output; private BundleFactory bundleFactory = ImmutableListBundleFactory.create(); + private UnboundedSourcesource; + @Before public void setup() { -UnboundedSource source = -CountingSource.unboundedWithTimestampFn(new LongToInstantFn()); +source = CountingSource.unboundedWithTimestampFn(new LongToInstantFn()); TestPipeline p = TestPipeline.create(); longs = p.apply(Read.from(source)); @@ -89,49 +94,36 @@ public class UnboundedReadEvaluatorFactoryTest { @Test public void unboundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception { -TransformEvaluator evaluator = -factory.forApplication(longs.getProducingTransformInternal(), null); + when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); -TransformResult result = evaluator.finishBundle(); -assertThat( -result.getWatermarkHold(), Matchers.lessThan(DateTime.now().toInstant())); -assertThat( -output.commit(Instant.now()).getElements(), -containsInAnyOrder( -tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L), -tgw(0L))); - } +Collection initialInputs = +factory.getInitialInputs(longs.getProducingTransformInternal()); - /** - * Demonstrate that multiple sequential creations will produce additional elements if the source - * can provide them. - */ - @Test - public void unboundedSourceInMemoryTransformEvaluatorMultipleSequentialCalls() throws Exception { -TransformEvaluator evaluator = -factory.forApplication(longs.getProducingTransformInternal(), null); +CommittedBundle inputShards = Iterables.getOnlyElement(initialInputs); +UnboundedSourceShard inputShard = +(UnboundedSourceShard ) +Iterables.getOnlyElement(inputShards.getElements()).getValue(); +TransformEvaluator> evaluator = +factory.forApplication( +longs.getProducingTransformInternal(), inputShards); +evaluator.processElement((WindowedValue) Iterables.getOnlyElement(inputShards.getElements())); TransformResult result = evaluator.finishBundle(); + +WindowedValue residual = Iterables.getOnlyElement(result.getUnprocessedElements()); +assertThat( +residual.getTimestamp(), Matchers.lessThan(DateTime.now().toInstant())); +UnboundedSourceShard residualShard = +
[2/4] incubator-beam git commit: Add RootTransformEvaluatorFactory
Add RootTransformEvaluatorFactory Use for Root Transforms. These transforms generate their own initial inputs, which the Evaluator is responsible for providing back to them to generate elements from the root PCollections. Update ExecutorServiceParallelExecutor to schedule roots based on the provided transforms. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7306e16b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7306e16b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7306e16b Branch: refs/heads/master Commit: 7306e16b7c92b16fac4491dcc07e6b45cd7fff62 Parents: 0ddba6d Author: Thomas GrohAuthored: Fri Sep 30 16:28:35 2016 -0700 Committer: Luke Cwik Committed: Thu Oct 6 15:14:37 2016 -0700 -- .../examples/complete/game/LeaderBoardTest.java | 5 +- .../direct/BoundedReadEvaluatorFactory.java | 117 .../runners/direct/DirectTimerInternals.java| 2 +- .../beam/runners/direct/EvaluationContext.java | 5 + .../direct/ExecutorServiceParallelExecutor.java | 54 ++-- .../runners/direct/FlattenEvaluatorFactory.java | 35 ++- .../direct/RootTransformEvaluatorFactory.java | 42 +++ .../direct/TestStreamEvaluatorFactory.java | 140 +- .../direct/TransformEvaluatorFactory.java | 2 +- .../direct/TransformEvaluatorRegistry.java | 26 +- .../direct/UnboundedReadEvaluatorFactory.java | 267 ++- .../beam/runners/direct/WatermarkManager.java | 11 + .../direct/BoundedReadEvaluatorFactoryTest.java | 139 +- .../direct/FlattenEvaluatorFactoryTest.java | 14 +- .../direct/TestStreamEvaluatorFactoryTest.java | 196 ++ .../UnboundedReadEvaluatorFactoryTest.java | 204 +++--- 16 files changed, 666 insertions(+), 593 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7306e16b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java -- diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java index 40cac36..9cba704 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/LeaderBoardTest.java @@ -18,7 +18,6 @@ package org.apache.beam.examples.complete.game; -import static org.apache.beam.sdk.testing.PAssert.that; import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertThat; @@ -107,7 +106,7 @@ public class LeaderBoardTest implements Serializable { String blueTeam = TestUser.BLUE_ONE.getTeam(); String redTeam = TestUser.RED_ONE.getTeam(); -that(teamScores) +PAssert.that(teamScores) .inOnTimePane(new IntervalWindow(baseTime, TEAM_WINDOW_DURATION)) .containsInAnyOrder(KV.of(blueTeam, 12), KV.of(redTeam, 4)); @@ -339,7 +338,7 @@ public class LeaderBoardTest implements Serializable { // User scores are emitted in speculative panes in the Global Window - this matcher choice // ensures that panes emitted by the watermark advancing to positive infinity are not included, // as that will not occur outside of tests -that(userScores) +PAssert.that(userScores) .inEarlyGlobalWindowPanes() .containsInAnyOrder(KV.of(TestUser.BLUE_ONE.getUser(), 15), KV.of(TestUser.RED_ONE.getUser(), 7), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7306e16b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java index 2260135..4936ad9 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java @@ -17,16 +17,17 @@ */ package org.apache.beam.runners.direct; +import com.google.auto.value.AutoValue; import java.io.IOException; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; +import java.util.Collection; +import java.util.Collections; import javax.annotation.Nullable; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
[4/4] incubator-beam git commit: [BEAM-310] Add RootTransformEvaluatorFactory
[BEAM-310] Add RootTransformEvaluatorFactory This closes #1051 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ecbc6411 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ecbc6411 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ecbc6411 Branch: refs/heads/master Commit: ecbc64117e3bb4e1be35a7539463e62e1a3ba303 Parents: 0ddba6d 41fb16f Author: Luke CwikAuthored: Thu Oct 6 15:15:12 2016 -0700 Committer: Luke Cwik Committed: Thu Oct 6 15:15:12 2016 -0700 -- .../examples/complete/game/LeaderBoardTest.java | 5 +- .../direct/BoundedReadEvaluatorFactory.java | 123 - .../beam/runners/direct/DirectRunner.java | 2 + .../runners/direct/DirectTimerInternals.java| 2 +- .../beam/runners/direct/EmptyInputProvider.java | 49 .../beam/runners/direct/EvaluationContext.java | 5 + .../direct/ExecutorServiceParallelExecutor.java | 77 -- .../runners/direct/FlattenEvaluatorFactory.java | 17 +- .../beam/runners/direct/KeyedResourcePool.java | 47 .../runners/direct/LockedKeyedResourcePool.java | 95 --- .../beam/runners/direct/RootInputProvider.java | 41 +++ .../runners/direct/RootProviderRegistry.java| 65 + .../direct/TestStreamEvaluatorFactory.java | 137 +- .../direct/TransformEvaluatorFactory.java | 2 +- .../direct/TransformEvaluatorRegistry.java | 9 +- .../direct/UnboundedReadEvaluatorFactory.java | 273 ++- .../beam/runners/direct/WatermarkManager.java | 11 + .../direct/BoundedReadEvaluatorFactoryTest.java | 140 +- .../direct/FlattenEvaluatorFactoryTest.java | 15 +- .../direct/LockedKeyedResourcePoolTest.java | 163 --- .../direct/TestStreamEvaluatorFactoryTest.java | 197 ++--- .../UnboundedReadEvaluatorFactoryTest.java | 204 +++--- 22 files changed, 786 insertions(+), 893 deletions(-) --
[1/2] incubator-beam git commit: Fix Precommit Integration Test Failure
Repository: incubator-beam Updated Branches: refs/heads/master f554398bc -> 0ddba6d8d Fix Precommit Integration Test Failure Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f5aa6eb7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f5aa6eb7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f5aa6eb7 Branch: refs/heads/master Commit: f5aa6eb73cb475714dc5d7b1b959329023fa5672 Parents: f554398 Author: Mark LiuAuthored: Wed Oct 5 17:50:05 2016 -0700 Committer: Luke Cwik Committed: Thu Oct 6 10:55:21 2016 -0700 -- examples/java/pom.xml | 79 +- 1 file changed, 15 insertions(+), 64 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5aa6eb7/examples/java/pom.xml -- diff --git a/examples/java/pom.xml b/examples/java/pom.xml index 0dd8d6d..1923366 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -117,7 +117,6 @@ jenkins-precommit -WordCountIT false true @@ -138,12 +137,17 @@ verify + +WordCountIT.java +WindowedWordCountIT.java + all 4 [ "--project=apache-beam-testing", + "--tempLocation=gs://temp-storage-for-end-to-end-tests", "--tempRoot=gs://temp-storage-for-end-to-end-tests", "--runner=org.apache.beam.runners.direct.DirectRunner" ] @@ -158,6 +162,9 @@ verify + +WordCountIT.java + all 4 @@ -178,6 +185,9 @@ verify + +WordCountIT.java + all 4 @@ -198,6 +208,10 @@ verify + +WordCountIT.java +WindowedWordCountIT.java + all 4 @@ -244,69 +258,6 @@ - - - jenkins-precommit-streaming - -WindowedWordCountIT -false -true - - - - -org.apache.maven.plugins -maven-failsafe-plugin - - false - true - - - -direct-runner-integration-tests - - integration-test - verify - - - all - 4 - - - [ - "--project=apache-beam-testing", - "--tempLocation=gs://temp-storage-for-end-to-end-tests", - "--runner=org.apache.beam.runners.direct.DirectRunner" - ] - - - - - -dataflow-runner-integration-tests - - integration-test - verify - - - all - 4 - - - [ - "--project=apache-beam-testing", - "--tempRoot=gs://temp-storage-for-end-to-end-tests", - "--runner=org.apache.beam.runners.dataflow.testing.TestDataflowRunner" - ] - - - - - - - - -
[2/2] incubator-beam git commit: Fix Streaming Integration Test Failure in Precommit
Fix Streaming Integration Test Failure in Precommit This closes #1059 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0ddba6d8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0ddba6d8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0ddba6d8 Branch: refs/heads/master Commit: 0ddba6d8d2e2d9ea9586615e5ed0d495f12086e5 Parents: f554398 f5aa6eb Author: Luke CwikAuthored: Thu Oct 6 10:55:57 2016 -0700 Committer: Luke Cwik Committed: Thu Oct 6 10:55:57 2016 -0700 -- examples/java/pom.xml | 79 +- 1 file changed, 15 insertions(+), 64 deletions(-) --
[1/3] incubator-beam git commit: De-pluralize error message expectation in LatestFnTests
Repository: incubator-beam Updated Branches: refs/heads/master 8130bc36f -> f554398bc De-pluralize error message expectation in LatestFnTests Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/60a8aef7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/60a8aef7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/60a8aef7 Branch: refs/heads/master Commit: 60a8aef713925e643cf9ec2bcb5c0b518aaa49e0 Parents: 8130bc3 Author: Kenneth KnowlesAuthored: Tue Oct 4 13:23:17 2016 -0700 Committer: Luke Cwik Committed: Thu Oct 6 09:49:52 2016 -0700 -- .../test/java/org/apache/beam/sdk/transforms/LatestFnTests.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/60a8aef7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java index 84b5b68..459a966 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java @@ -103,7 +103,7 @@ public class LatestFnTests { @Test public void testAddInputNullAccumulator() { thrown.expect(NullPointerException.class); -thrown.expectMessage("accumulators"); +thrown.expectMessage("accumulator"); fn.addInput(null, TV); }
[3/3] incubator-beam git commit: Forbid files ending in Tests.java as they should end in Test.java
Forbid files ending in Tests.java as they should end in Test.java Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f554398b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f554398b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f554398b Branch: refs/heads/master Commit: f554398bc09df194409fd0dd3577f7e800b709cc Parents: 52e43ac Author: Kenneth KnowlesAuthored: Tue Oct 4 15:09:54 2016 -0700 Committer: Luke Cwik Committed: Thu Oct 6 09:49:53 2016 -0700 -- sdks/java/build-tools/src/main/resources/beam/checkstyle.xml | 5 + 1 file changed, 5 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f554398b/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml -- diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml index c26f94e..ae4fcba 100644 --- a/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml +++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml @@ -53,6 +53,11 @@ page at http://checkstyle.sourceforge.net/config.html --> + + + + +
[2/3] incubator-beam git commit: Move LatestFnTests to LatestFnTest
Move LatestFnTests to LatestFnTest Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/52e43ac7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/52e43ac7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/52e43ac7 Branch: refs/heads/master Commit: 52e43ac7b8257ecbcda61eb3b14406c36df08a3b Parents: 60a8aef Author: Kenneth KnowlesAuthored: Tue Oct 4 13:23:37 2016 -0700 Committer: Luke Cwik Committed: Thu Oct 6 09:49:53 2016 -0700 -- .../beam/sdk/transforms/LatestFnTest.java | 233 +++ .../beam/sdk/transforms/LatestFnTests.java | 233 --- 2 files changed, 233 insertions(+), 233 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/52e43ac7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java new file mode 100644 index 000..31acb08 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.isOneOf; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Objects; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit tests for {@link Latest.LatestFn}. + * */ +@RunWith(JUnit4.class) +public class LatestFnTest { + private static final Instant INSTANT = new Instant(100); + private static final long VALUE = 100 * INSTANT.getMillis(); + + private static final TimestampedValue TV = TimestampedValue.of(VALUE, INSTANT); + private static final TimestampedValue TV_MINUS_TEN = + TimestampedValue.of(VALUE - 10, INSTANT.minus(10)); + private static final TimestampedValue TV_PLUS_TEN = + TimestampedValue.of(VALUE + 10, INSTANT.plus(10)); + + @Rule + public final ExpectedException thrown = ExpectedException.none(); + + private final Latest.LatestFn fn = new Latest.LatestFn<>(); + private final Instant baseTimestamp = Instant.now(); + + @Test + public void testDefaultValue() { +assertThat(fn.defaultValue(), nullValue()); + } + + @Test + public void testCreateAccumulator() { +assertEquals(TimestampedValue.atMinimumTimestamp(null), fn.createAccumulator()); + } + + @Test + public void testAddInputInitialAdd() { +TimestampedValue input = TV; +assertEquals(input, fn.addInput(fn.createAccumulator(), input)); + } + + @Test + public void testAddInputMinTimestamp() { +TimestampedValue input = TimestampedValue.atMinimumTimestamp(1234L); +assertEquals(input, fn.addInput(fn.createAccumulator(), input)); + } + + @Test + public void testAddInputEarlierValue() { +assertEquals(TV, fn.addInput(TV, TV_MINUS_TEN)); + } + + @Test + public void testAddInputLaterValue() { +assertEquals(TV_PLUS_TEN, fn.addInput(TV, TV_PLUS_TEN)); + } + + @Test + public void testAddInputSameTimestamp() { +TimestampedValue accum = TimestampedValue.of(100L, INSTANT); +TimestampedValue input = TimestampedValue.of(200L, INSTANT); + +assertThat("Latest for values
[2/2] incubator-beam git commit: Static import Assert.assertEquals in DataflowUnboundedReadFromBoundedSourceTest
Static import Assert.assertEquals in DataflowUnboundedReadFromBoundedSourceTest This closes #1056 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f27354f7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f27354f7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f27354f7 Branch: refs/heads/master Commit: f27354f775a2db527cd97e1a4109f063245b44d2 Parents: 5918fed 099fd9c Author: Luke CwikAuthored: Wed Oct 5 14:36:32 2016 -0700 Committer: Luke Cwik Committed: Wed Oct 5 14:36:32 2016 -0700 -- .../internal/DataflowUnboundedReadFromBoundedSourceTest.java | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) --
[1/2] incubator-beam git commit: Static import Assert.assertEquals
Repository: incubator-beam Updated Branches: refs/heads/master 5918fed8a -> f27354f77 Static import Assert.assertEquals Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/099fd9c7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/099fd9c7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/099fd9c7 Branch: refs/heads/master Commit: 099fd9c740bac1bed5d3b7227c2f5b895808e99a Parents: 5918fed Author: Scott WegnerAuthored: Wed Oct 5 13:58:40 2016 -0700 Committer: Scott Wegner Committed: Wed Oct 5 13:58:40 2016 -0700 -- .../internal/DataflowUnboundedReadFromBoundedSourceTest.java | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/099fd9c7/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSourceTest.java -- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSourceTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSourceTest.java index d9e3558..3a88935 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSourceTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSourceTest.java @@ -18,12 +18,13 @@ package org.apache.beam.runners.dataflow.internal; +import static org.junit.Assert.assertEquals; + import java.io.IOException; import java.util.List; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.options.PipelineOptions; -import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -38,7 +39,7 @@ public class DataflowUnboundedReadFromBoundedSourceTest { DataflowUnboundedReadFromBoundedSource read = new DataflowUnboundedReadFromBoundedSource<>(new NoopNamedSource()); -Assert.assertEquals("Read(NoopNamedSource)", read.getKindString()); +assertEquals("Read(NoopNamedSource)", read.getKindString()); } @Test @@ -47,7 +48,7 @@ public class DataflowUnboundedReadFromBoundedSourceTest { DataflowUnboundedReadFromBoundedSource read = new DataflowUnboundedReadFromBoundedSource<>(anonSource); -Assert.assertEquals("Read(AnonymousSource)", read.getKindString()); +assertEquals("Read(AnonymousSource)", read.getKindString()); } /** Source implementation only useful for its identity. */
[2/2] incubator-beam git commit: [BEAM-695] PipelineOptions display data needs to handle array types
[BEAM-695] PipelineOptions display data needs to handle array types This closes #1034 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8462acbc Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8462acbc Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8462acbc Branch: refs/heads/master Commit: 8462acbcbe464ed51898cf0055ced7405e36efde Parents: 2ee444d bed5000 Author: Luke CwikAuthored: Mon Oct 3 09:02:10 2016 -0700 Committer: Luke Cwik Committed: Mon Oct 3 09:02:10 2016 -0700 -- .../sdk/options/ProxyInvocationHandler.java | 22 +-- .../sdk/options/ProxyInvocationHandlerTest.java | 23 2 files changed, 43 insertions(+), 2 deletions(-) --
[1/2] incubator-beam git commit: PipelineOptions display data needs to handle array types
Repository: incubator-beam Updated Branches: refs/heads/master 2ee444d15 -> 8462acbcb PipelineOptions display data needs to handle array types PipelineOptions generates display data for arbitrary option types using #toString(). For array types, this gives an message like [Ljava.lang.String;@fc25934b]. Instead, we detect array types and use Arrays.deepString to build a string based on array values. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bed5000a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bed5000a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bed5000a Branch: refs/heads/master Commit: bed5000a0f19312736e74725d186d16a504e5031 Parents: 2ee444d Author: Scott WegnerAuthored: Fri Sep 30 10:05:00 2016 -0700 Committer: Luke Cwik Committed: Mon Oct 3 09:01:36 2016 -0700 -- .../sdk/options/ProxyInvocationHandler.java | 22 +-- .../sdk/options/ProxyInvocationHandlerTest.java | 23 2 files changed, 43 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed5000a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java index 204ad97..aa6f500 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java @@ -292,7 +292,7 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData { builder.add(DisplayData.item(option.getKey(), type, value) .withNamespace(pipelineInterface)); } else { - builder.add(DisplayData.item(option.getKey(), value.toString()) + builder.add(DisplayData.item(option.getKey(), displayDataString(value)) .withNamespace(pipelineInterface)); } } @@ -321,7 +321,7 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData { builder.add(DisplayData.item(jsonOption.getKey(), type, value) .withNamespace(spec.getDefiningInterface())); } else { -builder.add(DisplayData.item(jsonOption.getKey(), value.toString()) +builder.add(DisplayData.item(jsonOption.getKey(), displayDataString(value)) .withNamespace(spec.getDefiningInterface())); } } @@ -330,6 +330,24 @@ class ProxyInvocationHandler implements InvocationHandler, HasDisplayData { } /** + * {@link Object#toString()} wrapper to extract display data values for various types. + */ + private String displayDataString(Object value) { +checkNotNull(value, "value cannot be null"); +if (!value.getClass().isArray()) { + return value.toString(); +} +if (!value.getClass().getComponentType().isPrimitive()) { + return Arrays.deepToString((Object[]) value); +} + +// At this point, we have some type of primitive array. Arrays.deepToString(..) requires an +// Object array, but will unwrap nested primitive arrays. +String wrapped = Arrays.deepToString(new Object[] {value}); +return wrapped.substring(1, wrapped.length() - 1); + } + + /** * Marker interface used when the original {@link PipelineOptions} interface is not known at * runtime. This can occur if {@link PipelineOptions} are deserialized from JSON. * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed5000a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java index 1ba6b43..5d8ef43 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java @@ -854,6 +854,29 @@ public class ProxyInvocationHandlerTest { } @Test + public void testDisplayDataArrayValue() throws Exception { +ArrayOptions options = PipelineOptionsFactory.as(ArrayOptions.class); +options.setDeepArray(new String[][] {new String[] {"a", "b"}, new String[] {"c"}}); +options.setDeepPrimitiveArray(new int[][] {new int[] {1, 2}, new int[] {3}}); + +DisplayData data =
[1/3] incubator-beam git commit: Add default bucket scaffolding.
Repository: incubator-beam Updated Branches: refs/heads/master 202acd1d6 -> 2ee444d15 Add default bucket scaffolding. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9d6e7c71 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9d6e7c71 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9d6e7c71 Branch: refs/heads/master Commit: 9d6e7c71b6863a3a87eb3b4ad7b4a5ce75707955 Parents: ccd9bad Author: sammcveetyAuthored: Fri Sep 30 21:24:58 2016 -0400 Committer: Luke Cwik Committed: Mon Oct 3 08:20:12 2016 -0700 -- pom.xml | 13 +++ .../DataflowPipelineTranslatorTest.java | 2 +- .../runners/dataflow/DataflowRunnerTest.java| 7 +- sdks/java/core/pom.xml | 5 + .../options/CloudResourceManagerOptions.java| 40 +++ .../apache/beam/sdk/util/GcpProjectUtil.java| 106 ++ .../apache/beam/sdk/util/GcsPathValidator.java | 2 +- .../java/org/apache/beam/sdk/util/GcsUtil.java | 67 ++- .../org/apache/beam/sdk/util/Transport.java | 17 +++ .../dataflow/util/GcsPathValidatorTest.java | 4 +- .../apache/beam/sdk/util/ApiSurfaceTest.java| 1 + .../beam/sdk/util/GcpProjectUtilTest.java | 76 + .../org/apache/beam/sdk/util/GcsUtilTest.java | 112 ++- 13 files changed, 413 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/pom.xml -- diff --git a/pom.xml b/pom.xml index 3cd5255..7295261 100644 --- a/pom.xml +++ b/pom.xml @@ -103,6 +103,7 @@ 1.8.1 v2-rev295-1.22.0 +v1-rev6-1.22.0 0.1.0 v2-rev8-1.22.0 v1b3-rev36-1.22.0 @@ -483,6 +484,18 @@ com.google.apis +google-api-services-cloudresourcemanager +${cloudresourcemanager.version} + + +com.google.guava +guava-jdk5 + + + + + +com.google.apis google-api-services-pubsub ${pubsub.version} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java -- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 2b7013d..98d2fb0 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -155,7 +155,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { return ImmutableList.of((GcsPath) invocation.getArguments()[0]); } }); -when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(true); +when(mockGcsUtil.bucketAccessible(any(GcsPath.class))).thenReturn(true); when(mockGcsUtil.isGcsPatternSupported(anyString())).thenCallRealMethod(); DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d6e7c71/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java -- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 58a01aa..b0ee231 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -187,9 +187,9 @@ public class DataflowRunnerTest { * Build a mock {@link GcsUtil} with return values. * * @param bucketExist first return value - * @param bucketExists next return values + * @param bucketAccessible next return values */ - private GcsUtil buildMockGcsUtil(Boolean bucketExist, Boolean... bucketExists) + private GcsUtil buildMockGcsUtil(Boolean bucketExist, Boolean... bucketAccessible) throws IOException { GcsUtil mockGcsUtil = mock(GcsUtil.class);
[2/3] incubator-beam git commit: Add initial bucket stuff.
Add initial bucket stuff. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ccd9bad1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ccd9bad1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ccd9bad1 Branch: refs/heads/master Commit: ccd9bad18dcc0a1df7ebc82f43e89ddec838c037 Parents: 202acd1 Author: sammcveetyAuthored: Sat Sep 17 21:19:53 2016 -0400 Committer: Luke Cwik Committed: Mon Oct 3 08:20:12 2016 -0700 -- .../java/org/apache/beam/sdk/util/GcsUtil.java | 67 +++- 1 file changed, 64 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ccd9bad1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java index 41c372e..4befb1a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -349,16 +349,45 @@ public class GcsUtil { } /** + * Returns the project number of the project which owns this bucket. + * If the bucket exists, it must be accessible otherwise the permissions + * exception will be propagated. + */ + public long bucketOwner(GcsPath path) throws IOException { +return getBucket( +path, +BACKOFF_FACTORY.backoff(), +Sleeper.DEFAULT).getProjectNumber(); + } + + /** + * Creates a bucket for the provided project or propagates an error. + */ + public void createBucket(GcsPath path, long projectNumber) throws IOException { +return createBucket( +path, +projectNumber, +BACKOFF_FACTORY.backoff(), +Sleeper.DEFAULT); + } + + /** * Returns whether the GCS bucket exists. This will return false if the bucket * is inaccessible due to permissions. */ @VisibleForTesting boolean bucketExists(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { +return getBucket(path, backoff, sleeper) != null; + } + + @VisibleForTesting + @Nullable + Storage.Bucket getBucket(GcsPath path, BackOff backoff, Sleeper sleeper) throws IOException { Storage.Buckets.Get getBucket = storageClient.buckets().get(path.getBucket()); try { -ResilientOperation.retry( +Storage.Bucket bucket = ResilientOperation.retry( ResilientOperation.getGoogleRequestCallable(getBucket), backoff, new RetryDeterminer() { @@ -372,10 +401,11 @@ public class GcsUtil { }, IOException.class, sleeper); -return true; + +return bucket; } catch (GoogleJsonResponseException e) { if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) { - return false; + return null; } throw e; } catch (InterruptedException e) { @@ -386,6 +416,37 @@ public class GcsUtil { } } + @VisibleForTesting + void createBucket(GcsPath path, long projectNumber, BackOff backoff, Sleeper sleeper) +throws IOException { +Storage.Buckets.Insert insertBucket = +storageClient.buckets().insert(path.getBucket()); +insertBucket.setProject(String.valueOf(projectNumber)); + +try { + ResilientOperation.retry( +ResilientOperation.getGoogleRequestCallable(insertBucket), +backoff, +new RetryDeterminer() { + @Override +public boolean shouldRetry(IOException e) { +if (errorExtractor.itemNotFound(e) || errorExtractor.accessDenied(e)) { + return false; +} +return RetryDeterminer.SOCKET_ERRORS.shouldRetry(e); + } +}, +IOException.class, +sleeper); + return; +} catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException( +String.format("Error while attempting to create bucket gs://%s for rproject %s", + path.getBucket(), projectNumber), e); +} + } + private static void executeBatches(List batches) throws IOException { ListeningExecutorService executor = MoreExecutors.listeningDecorator( MoreExecutors.getExitingExecutorService(
[3/3] incubator-beam git commit: Add initial scaffolding for default bucket (take 2)
Add initial scaffolding for default bucket (take 2) This closes #1014 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2ee444d1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2ee444d1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2ee444d1 Branch: refs/heads/master Commit: 2ee444d15a25a1020413196684a0b711d3d7758b Parents: 202acd1 9d6e7c7 Author: Luke CwikAuthored: Mon Oct 3 08:20:39 2016 -0700 Committer: Luke Cwik Committed: Mon Oct 3 08:20:39 2016 -0700 -- pom.xml | 13 +++ .../DataflowPipelineTranslatorTest.java | 2 +- .../runners/dataflow/DataflowRunnerTest.java| 7 +- sdks/java/core/pom.xml | 5 + .../options/CloudResourceManagerOptions.java| 40 +++ .../apache/beam/sdk/util/GcpProjectUtil.java| 106 ++ .../apache/beam/sdk/util/GcsPathValidator.java | 2 +- .../java/org/apache/beam/sdk/util/GcsUtil.java | 94 ++-- .../org/apache/beam/sdk/util/Transport.java | 17 +++ .../dataflow/util/GcsPathValidatorTest.java | 4 +- .../apache/beam/sdk/util/ApiSurfaceTest.java| 1 + .../beam/sdk/util/GcpProjectUtilTest.java | 76 + .../org/apache/beam/sdk/util/GcsUtilTest.java | 112 ++- 13 files changed, 457 insertions(+), 22 deletions(-) --
[2/2] incubator-beam git commit: Forward port PR-411 and PR-420 from Dataflow
Forward port PR-411 and PR-420 from Dataflow This closes #1032 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b237e2f0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b237e2f0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b237e2f0 Branch: refs/heads/master Commit: b237e2f052b2a80c56b0914133a58ed5fdb9dbcf Parents: 7d46698 8b4550d Author: Luke CwikAuthored: Fri Sep 30 14:14:18 2016 -0700 Committer: Luke Cwik Committed: Fri Sep 30 14:14:18 2016 -0700 -- .../gcp/bigquery/BigQueryTableRowIterator.java | 114 ++- .../bigquery/BigQueryTableRowIteratorTest.java | 143 +++ 2 files changed, 190 insertions(+), 67 deletions(-) --
[1/2] incubator-beam git commit: Forward port PR-411 and PR-420 from Dataflow
Repository: incubator-beam Updated Branches: refs/heads/master 7d46698f2 -> b237e2f05 Forward port PR-411 and PR-420 from Dataflow Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8b4550d2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8b4550d2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8b4550d2 Branch: refs/heads/master Commit: 8b4550d27920b53d04291dc383f28c9f7f77ca32 Parents: 7d46698 Author: Pei HeAuthored: Thu Sep 29 15:13:28 2016 -0700 Committer: Luke Cwik Committed: Fri Sep 30 14:13:46 2016 -0700 -- .../gcp/bigquery/BigQueryTableRowIterator.java | 114 ++- .../bigquery/BigQueryTableRowIteratorTest.java | 143 +++ 2 files changed, 190 insertions(+), 67 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b4550d2/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java -- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java index 0ee01d9..64b1dc6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java @@ -28,7 +28,6 @@ import com.google.api.client.util.ClassInfo; import com.google.api.client.util.Data; import com.google.api.client.util.Sleeper; import com.google.api.services.bigquery.Bigquery; -import com.google.api.services.bigquery.Bigquery.Jobs.Insert; import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.DatasetReference; import com.google.api.services.bigquery.model.ErrorProto; @@ -36,6 +35,7 @@ import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfiguration; import com.google.api.services.bigquery.model.JobConfigurationQuery; import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableCell; @@ -137,16 +137,7 @@ class BigQueryTableRowIterator implements AutoCloseable { ref = executeQueryAndWaitForCompletion(); } // Get table schema. -Bigquery.Tables.Get get = -client.tables().get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId()); - -Table table = -executeWithBackOff( -get, -"Error opening BigQuery table %s of dataset %s : {}", -ref.getTableId(), -ref.getDatasetId()); -schema = table.getSchema(); +schema = getTable(ref).getSchema(); } public boolean advance() throws IOException, InterruptedException { @@ -168,12 +159,11 @@ class BigQueryTableRowIterator implements AutoCloseable { list.setPageToken(pageToken); } - TableDataList result = - executeWithBackOff( - list, - "Error reading from BigQuery table %s of dataset %s : {}", - ref.getTableId(), - ref.getDatasetId()); + TableDataList result = executeWithBackOff( + list, + String.format( + "Error reading from BigQuery table %s of dataset %s.", + ref.getTableId(), ref.getDatasetId())); pageToken = result.getPageToken(); iteratorOverCurrentBatch = @@ -332,19 +322,36 @@ class BigQueryTableRowIterator implements AutoCloseable { return row; } + // Get the BiqQuery table. + private Table getTable(TableReference ref) throws IOException, InterruptedException { +Bigquery.Tables.Get get = +client.tables().get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId()); + +return executeWithBackOff( +get, +String.format( +"Error opening BigQuery table %s of dataset %s.", +ref.getTableId(), +ref.getDatasetId())); + } + // Create a new BigQuery dataset - private void createDataset(String datasetId) throws IOException, InterruptedException { + private void createDataset(String datasetId, @Nullable String location) + throws IOException, InterruptedException { Dataset dataset = new Dataset(); DatasetReference reference = new DatasetReference(); reference.setProjectId(projectId);
[2/2] incubator-beam git commit: Refactor BundleFactory methods
Refactor BundleFactory methods Remove the inputBundle parameter to createBundle and createKeyedBundle. This parameter is unless the model is capable of propagating keys between PTransforms. Remove the PCollection parameter to createRootBundle. createRootBundle should be a par Make [Un]CommmittedBundle#getPCollection nullable. Bundles are utilized by the runner to control the processing of elements, but may not always belong to a PCollection. Update ImmutabilityCheckingBundleFactory to return an underlying bundle as the result of createRootBundle. Use createBundle in Root transform evaluators. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/759b6cad Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/759b6cad Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/759b6cad Branch: refs/heads/master Commit: 759b6cada9f1b724e32457b900a469f8a113542d Parents: a1ac222 Author: Thomas GrohAuthored: Tue Sep 27 13:06:35 2016 -0700 Committer: Luke Cwik Committed: Wed Sep 28 10:48:23 2016 -0700 -- .../direct/BoundedReadEvaluatorFactory.java | 2 +- .../beam/runners/direct/BundleFactory.java | 15 ++-- .../beam/runners/direct/DirectRunner.java | 8 +- .../beam/runners/direct/EvaluationContext.java | 12 +-- .../direct/ExecutorServiceParallelExecutor.java | 3 +- .../runners/direct/FlattenEvaluatorFactory.java | 2 +- .../direct/GroupByKeyOnlyEvaluatorFactory.java | 7 +- .../ImmutabilityCheckingBundleFactory.java | 18 ++-- .../direct/ImmutableListBundleFactory.java | 72 +--- .../beam/runners/direct/ParDoEvaluator.java | 2 +- .../beam/runners/direct/StructuralKey.java | 88 +--- .../direct/TestStreamEvaluatorFactory.java | 2 +- .../direct/UnboundedReadEvaluatorFactory.java | 2 +- .../direct/UncommittedBundleOutputManager.java | 4 +- .../runners/direct/WindowEvaluatorFactory.java | 8 +- .../direct/BoundedReadEvaluatorFactoryTest.java | 26 +++--- .../runners/direct/CommittedResultTest.java | 10 +-- .../EncodabilityEnforcementFactoryTest.java | 6 +- .../runners/direct/EvaluationContextTest.java | 31 ++- .../direct/FlattenEvaluatorFactoryTest.java | 11 ++- .../direct/GroupByKeyEvaluatorFactoryTest.java | 22 ++--- .../GroupByKeyOnlyEvaluatorFactoryTest.java | 31 +++ .../ImmutabilityCheckingBundleFactoryTest.java | 59 +++-- .../ImmutabilityEnforcementFactoryTest.java | 6 +- .../direct/ImmutableListBundleFactoryTest.java | 52 ++-- .../beam/runners/direct/ParDoEvaluatorTest.java | 7 +- .../direct/ParDoMultiEvaluatorFactoryTest.java | 48 +-- .../direct/ParDoSingleEvaluatorFactoryTest.java | 24 +++--- .../runners/direct/StepTransformResultTest.java | 4 +- .../beam/runners/direct/StructuralKeyTest.java | 9 ++ .../direct/TestStreamEvaluatorFactoryTest.java | 15 ++-- .../runners/direct/TransformExecutorTest.java | 12 +-- .../UnboundedReadEvaluatorFactoryTest.java | 28 +++ .../direct/ViewEvaluatorFactoryTest.java| 2 +- .../runners/direct/WatermarkManagerTest.java| 38 - .../direct/WindowEvaluatorFactoryTest.java | 6 +- 36 files changed, 297 insertions(+), 395 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java index 9c77946..2260135 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java @@ -132,7 +132,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { source.createReader(evaluationContext.getPipelineOptions())) { boolean contentsRemaining = reader.start(); UncommittedBundle output = -evaluationContext.createRootBundle(transform.getOutput()); +evaluationContext.createBundle(transform.getOutput()); while (contentsRemaining) { output.add( WindowedValue.timestampedValueInGlobalWindow( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/759b6cad/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java -- diff --git
incubator-beam git commit: Upgrade maven-compiler-plugin to fix incremental build
Repository: incubator-beam Updated Branches: refs/heads/master 307d592d2 -> b5853a624 Upgrade maven-compiler-plugin to fix incremental build We are hitting an issue with AutoValue and the current version of maven-compiler-plugin which breaks incremental build. See: https://issues.apache.org/jira/browse/MCOMPILER-236 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b5853a62 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b5853a62 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b5853a62 Branch: refs/heads/master Commit: b5853a624d8e3743ef786498a4e0617db1630360 Parents: 307d592 Author: Scott WegnerAuthored: Tue Sep 27 15:09:46 2016 -0700 Committer: Luke Cwik Committed: Wed Sep 28 09:25:04 2016 -0700 -- pom.xml| 2 +- .../examples/src/main/resources/archetype-resources/pom.xml| 2 +- .../starter/src/main/resources/archetype-resources/pom.xml | 2 +- .../starter/src/test/resources/projects/basic/reference/pom.xml| 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b5853a62/pom.xml -- diff --git a/pom.xml b/pom.xml index 8104e11..3cd5255 100644 --- a/pom.xml +++ b/pom.xml @@ -789,7 +789,7 @@ maven-compiler-plugin - 3.3 + 3.5.1 1.7 1.7 http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b5853a62/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml -- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml index d3418ef..147c2dc 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml @@ -31,7 +31,7 @@ org.apache.maven.plugins maven-compiler-plugin -3.3 +3.5.1 ${targetPlatform} ${targetPlatform} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b5853a62/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml -- diff --git a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml index abcd0d0..0f7d3de 100644 --- a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml +++ b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml @@ -29,7 +29,7 @@ org.apache.maven.plugins maven-compiler-plugin -3.3 +3.5.1 ${targetPlatform} ${targetPlatform} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b5853a62/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml -- diff --git a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml index da94713..66091fe 100644 --- a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml +++ b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml @@ -29,7 +29,7 @@ org.apache.maven.plugins maven-compiler-plugin -3.3 +3.5.1 1.7 1.7
[3/5] incubator-beam git commit: [BEAM-604] Use Watermark to Finish Streaming Job in TestDataflowRunner
[BEAM-604] Use Watermark to Finish Streaming Job in TestDataflowRunner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f6bd47ba Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f6bd47ba Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f6bd47ba Branch: refs/heads/master Commit: f6bd47ba79615d90660b4105d8a6c5c276af8551 Parents: 2571cfc Author: Mark LiuAuthored: Fri Sep 2 14:41:11 2016 -0700 Committer: Luke Cwik Committed: Tue Sep 27 17:03:56 2016 -0700 -- .../beam/examples/WindowedWordCountIT.java | 49 ++ .../dataflow/testing/TestDataflowRunner.java| 52 +- .../testing/TestDataflowRunnerTest.java | 158 +++ 3 files changed, 255 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f6bd47ba/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java -- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java new file mode 100644 index 000..890ca2b --- /dev/null +++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.examples; + +import java.io.IOException; +import org.apache.beam.examples.WindowedWordCount.Options; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * End-to-end integration test of {@link WindowedWordCount}. + */ +@RunWith(JUnit4.class) +public class WindowedWordCountIT { + + /** + * Options for the {@link WindowedWordCount} Integration Test. + */ + public interface TestOptions extends Options, TestPipelineOptions { + } + + @Test + public void testWindowedWordCount() throws IOException { +PipelineOptionsFactory.register(TestOptions.class); +TestOptions options = TestPipeline.testingPipelineOptions().as(TestOptions.class); + +WindowedWordCount.main(TestPipeline.convertToArgs(options)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f6bd47ba/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java -- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java index 9be773b..c569cd4 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java @@ -58,6 +58,8 @@ import org.slf4j.LoggerFactory; */ public class TestDataflowRunner extends PipelineRunner { private static final String TENTATIVE_COUNTER = "tentative"; + private static final String WATERMARK_METRIC_SUFFIX = "windmill-data-watermark"; + private static final long MAX_WATERMARK_VALUE = -2L; private static final Logger LOG = LoggerFactory.getLogger(TestDataflowRunner.class); private final TestDataflowPipelineOptions options; @@ -121,11 +123,17 @@ public class TestDataflowRunner extends PipelineRunner { if (result.isPresent()) { return result; } +result = checkMaxWatermark(job); +if (result.isPresent()) { + return result; +} Thread.sleep(1L); }
[1/5] incubator-beam git commit: Add StreamingIT category to integration test
Repository: incubator-beam Updated Branches: refs/heads/master 2571cfcf5 -> db47c63ab Add StreamingIT category to integration test Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fbae96f2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fbae96f2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fbae96f2 Branch: refs/heads/master Commit: fbae96f208afe3dca3ef620c4a312345a6314e13 Parents: 9f0588a Author: Mark LiuAuthored: Thu Sep 8 10:52:36 2016 -0700 Committer: Luke Cwik Committed: Tue Sep 27 17:03:56 2016 -0700 -- examples/java/pom.xml | 31 .../beam/examples/WindowedWordCountIT.java | 18 ++-- .../apache/beam/sdk/testing/StreamingIT.java| 24 +++ 3 files changed, 71 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbae96f2/examples/java/pom.xml -- diff --git a/examples/java/pom.xml b/examples/java/pom.xml index 9a48ec6..31244db 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -216,6 +216,37 @@ + + + + disable-streaming-ITs + false + + + +org.apache.maven.plugins +maven-failsafe-plugin + + + + integration-test + verify + + + org.apache.beam.sdk.testing.StreamingIT + + + + + + + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbae96f2/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java -- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java index 890ca2b..cddcd4a 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java @@ -21,9 +21,12 @@ package org.apache.beam.examples; import java.io.IOException; import org.apache.beam.examples.WindowedWordCount.Options; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.testing.StreamingIT; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -36,13 +39,24 @@ public class WindowedWordCountIT { /** * Options for the {@link WindowedWordCount} Integration Test. */ - public interface TestOptions extends Options, TestPipelineOptions { + public interface TestOptions extends Options, TestPipelineOptions, StreamingOptions{ } @Test - public void testWindowedWordCount() throws IOException { + public void testWindowedWordCountInBatch() throws IOException { +testWindowedWordCountPipeline(false); + } + + @Test + @Category(StreamingIT.class) + public void testWindowedWordCountInStreaming() throws IOException { +testWindowedWordCountPipeline(true); + } + + private void testWindowedWordCountPipeline(boolean isStreaming) throws IOException { PipelineOptionsFactory.register(TestOptions.class); TestOptions options = TestPipeline.testingPipelineOptions().as(TestOptions.class); +options.setStreaming(isStreaming); WindowedWordCount.main(TestPipeline.convertToArgs(options)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fbae96f2/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StreamingIT.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StreamingIT.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StreamingIT.java new file mode 100644 index 000..b3dd4a0 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StreamingIT.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the
[4/5] incubator-beam git commit: More javadoc and keep retry in case of get metrics exception
More javadoc and keep retry in case of get metrics exception Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/370c1714 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/370c1714 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/370c1714 Branch: refs/heads/master Commit: 370c171450215d3fac4208875850279a796415c9 Parents: fbae96f Author: Mark LiuAuthored: Wed Sep 14 13:18:40 2016 -0700 Committer: Luke Cwik Committed: Tue Sep 27 17:03:57 2016 -0700 -- examples/java/pom.xml | 7 +- .../beam/examples/WindowedWordCountIT.java | 13 +- .../dataflow/testing/TestDataflowRunner.java| 158 +- .../testing/TestDataflowRunnerTest.java | 287 ++- .../apache/beam/sdk/testing/StreamingIT.java| 13 +- 5 files changed, 263 insertions(+), 215 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/370c1714/examples/java/pom.xml -- diff --git a/examples/java/pom.xml b/examples/java/pom.xml index 31244db..6b1b7ce 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -218,11 +218,8 @@ disable-streaming-ITs http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/370c1714/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java -- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java index cddcd4a..379d1b0 100644 --- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java +++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.beam.examples; import java.io.IOException; @@ -39,23 +38,25 @@ public class WindowedWordCountIT { /** * Options for the {@link WindowedWordCount} Integration Test. */ - public interface TestOptions extends Options, TestPipelineOptions, StreamingOptions{ + public interface WindowedWordCountITOptions + extends Options, TestPipelineOptions, StreamingOptions { } @Test public void testWindowedWordCountInBatch() throws IOException { -testWindowedWordCountPipeline(false); +testWindowedWordCountPipeline(false /* isStreaming */); } @Test @Category(StreamingIT.class) public void testWindowedWordCountInStreaming() throws IOException { -testWindowedWordCountPipeline(true); +testWindowedWordCountPipeline(true /* isStreaming */); } private void testWindowedWordCountPipeline(boolean isStreaming) throws IOException { -PipelineOptionsFactory.register(TestOptions.class); -TestOptions options = TestPipeline.testingPipelineOptions().as(TestOptions.class); +PipelineOptionsFactory.register(WindowedWordCountITOptions.class); +WindowedWordCountITOptions options = + TestPipeline.testingPipelineOptions().as(WindowedWordCountITOptions.class); options.setStreaming(isStreaming); WindowedWordCount.main(TestPipeline.convertToArgs(options)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/370c1714/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java -- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java index b8b4eaf..a152505 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java @@ -22,6 +22,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import com.google.api.services.dataflow.model.JobMessage; import com.google.api.services.dataflow.model.JobMetrics; import com.google.api.services.dataflow.model.MetricUpdate; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Throwables; @@ -31,6 +32,7 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import
[2/5] incubator-beam git commit: Fix checkMaxWatermark causing batch test failed
Fix checkMaxWatermark causing batch test failed Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9f0588a2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9f0588a2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9f0588a2 Branch: refs/heads/master Commit: 9f0588a2d63dd8538675b128a488ea5fa9c491f2 Parents: f6bd47b Author: Mark LiuAuthored: Wed Sep 7 11:59:02 2016 -0700 Committer: Luke Cwik Committed: Tue Sep 27 17:03:56 2016 -0700 -- .../dataflow/testing/TestDataflowRunner.java| 20 +++--- .../testing/TestDataflowRunnerTest.java | 40 ++-- 2 files changed, 33 insertions(+), 27 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f0588a2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java -- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java index c569cd4..b8b4eaf 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java @@ -120,11 +120,7 @@ public class TestDataflowRunner extends PipelineRunner { try { for (;;) { Optional result = checkForSuccess(job); -if (result.isPresent()) { - return result; -} -result = checkMaxWatermark(job); -if (result.isPresent()) { +if (result.isPresent() && (!result.get() || checkMaxWatermark(job))) { return result; } Thread.sleep(1L); @@ -217,7 +213,7 @@ public class TestDataflowRunner extends PipelineRunner { + "{} expected assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions); return Optional.of(false); - } else if (successes > 0 && successes >= expectedNumberOfAssertions) { + } else if (successes >= expectedNumberOfAssertions) { LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of " + "{} expected assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions); @@ -231,13 +227,7 @@ public class TestDataflowRunner extends PipelineRunner { return Optional.absent(); } - Optional checkMaxWatermark(DataflowPipelineJob job) throws IOException { -State state = job.getState(); -if (state == State.FAILED || state == State.CANCELLED) { - LOG.info("The pipeline {}", state); - return Optional.of(false); -} - + boolean checkMaxWatermark(DataflowPipelineJob job) throws IOException { JobMetrics metrics = options.getDataflowClient().projects().jobs() .getMetrics(job.getProjectId(), job.getJobId()).execute(); @@ -260,10 +250,10 @@ public class TestDataflowRunner extends PipelineRunner { } if (hasMaxWatermark) { LOG.info("All watermarks of job {} reach to max value.", job.getJobId()); -return Optional.of(true); +return true; } } -return Optional.absent(); +return false; } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f0588a2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java -- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java index 70c4562..3818b35 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java @@ -19,8 +19,10 @@ package org.apache.beam.runners.dataflow.testing; import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static
[5/5] incubator-beam git commit: [BEAM-604] Use Watermark to Finish Streaming Job in TestDataflowRunner
[BEAM-604] Use Watermark to Finish Streaming Job in TestDataflowRunner This closes #916 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/db47c63a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/db47c63a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/db47c63a Branch: refs/heads/master Commit: db47c63abe73fce979529ee9e16721eee4d649d7 Parents: 2571cfc 370c171 Author: Luke CwikAuthored: Tue Sep 27 17:04:27 2016 -0700 Committer: Luke Cwik Committed: Tue Sep 27 17:04:27 2016 -0700 -- examples/java/pom.xml | 28 ++ .../beam/examples/WindowedWordCountIT.java | 64 + .../dataflow/testing/TestDataflowRunner.java| 144 +++--- .../testing/TestDataflowRunnerTest.java | 287 +++ .../apache/beam/sdk/testing/StreamingIT.java| 35 +++ 5 files changed, 466 insertions(+), 92 deletions(-) --