This is an automated email from the ASF dual-hosted git repository. ibzib pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 8f3818e [BEAM-8254] add workerRegion and workerZone options to Java SDK new e7f1539 Merge pull request #9961 from ibzib/java-worker 8f3818e is described below commit 8f3818e9a256e3a24129ba7762e7f7c3ddd3f783 Author: Kyle Weaver <kcwea...@google.com> AuthorDate: Thu Oct 31 16:50:35 2019 -0700 [BEAM-8254] add workerRegion and workerZone options to Java SDK --- .../beam/runners/dataflow/DataflowRunner.java | 34 +++++++++++++++ .../beam/runners/dataflow/DataflowRunnerTest.java | 50 ++++++++++++++++++++++ .../sdk/extensions/gcp/options/GcpOptions.java | 31 ++++++++++++++ 3 files changed, 115 insertions(+) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 9f7394f..51ee59c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -90,6 +90,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.extensions.gcp.storage.PathValidator; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.FileBasedSink; @@ -158,6 +159,7 @@ import org.apache.beam.sdk.values.ValueWithRecordId; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Utf8; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; @@ -253,6 +255,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { + "https://cloud.google.com/compute/docs/regions-zones/regions-zones"); } + validateWorkerSettings(PipelineOptionsValidator.validate(GcpOptions.class, options)); + PathValidator validator = dataflowOptions.getPathValidator(); String gcpTempLocation; try { @@ -358,6 +362,36 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } @VisibleForTesting + static void validateWorkerSettings(GcpOptions gcpOptions) { + Preconditions.checkArgument( + gcpOptions.getZone() == null || gcpOptions.getWorkerRegion() == null, + "Cannot use option zone with workerRegion. Prefer either workerZone or workerRegion."); + Preconditions.checkArgument( + gcpOptions.getZone() == null || gcpOptions.getWorkerZone() == null, + "Cannot use option zone with workerZone. Prefer workerZone."); + Preconditions.checkArgument( + gcpOptions.getWorkerRegion() == null || gcpOptions.getWorkerZone() == null, + "workerRegion and workerZone options are mutually exclusive."); + + DataflowPipelineOptions dataflowOptions = gcpOptions.as(DataflowPipelineOptions.class); + boolean hasExperimentWorkerRegion = false; + if (dataflowOptions.getExperiments() != null) { + for (String experiment : dataflowOptions.getExperiments()) { + if (experiment.startsWith("worker_region")) { + hasExperimentWorkerRegion = true; + break; + } + } + } + Preconditions.checkArgument( + !hasExperimentWorkerRegion || gcpOptions.getWorkerRegion() == null, + "Experiment worker_region and option workerRegion are mutually exclusive."); + Preconditions.checkArgument( + !hasExperimentWorkerRegion || gcpOptions.getWorkerZone() == null, + "Experiment worker_region and option workerZone are mutually exclusive."); + } + + @VisibleForTesting protected DataflowRunner(DataflowPipelineOptions options) { this.options = options; this.dataflowClient = DataflowClient.create(options); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index f46d034..4438e14 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -34,6 +34,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.junit.Assume.assumeFalse; @@ -93,6 +94,7 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory; import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator; import org.apache.beam.sdk.extensions.gcp.util.GcsUtil; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; @@ -103,6 +105,7 @@ import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.WriteFiles; import org.apache.beam.sdk.io.WriteFilesResult; import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -538,6 +541,53 @@ public class DataflowRunnerTest implements Serializable { } @Test + public void testZoneAndWorkerRegionMutuallyExclusive() { + GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + options.setZone("us-east1-b"); + options.setWorkerRegion("us-east1"); + assertThrows( + IllegalArgumentException.class, () -> DataflowRunner.validateWorkerSettings(options)); + } + + @Test + public void testZoneAndWorkerZoneMutuallyExclusive() { + GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + options.setZone("us-east1-b"); + options.setWorkerZone("us-east1-c"); + assertThrows( + IllegalArgumentException.class, () -> DataflowRunner.validateWorkerSettings(options)); + } + + @Test + public void testExperimentRegionAndWorkerRegionMutuallyExclusive() { + GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); + ExperimentalOptions.addExperiment(dataflowOptions, "worker_region=us-west1"); + options.setWorkerRegion("us-east1"); + assertThrows( + IllegalArgumentException.class, () -> DataflowRunner.validateWorkerSettings(options)); + } + + @Test + public void testExperimentRegionAndWorkerZoneMutuallyExclusive() { + GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); + ExperimentalOptions.addExperiment(dataflowOptions, "worker_region=us-west1"); + options.setWorkerZone("us-east1-b"); + assertThrows( + IllegalArgumentException.class, () -> DataflowRunner.validateWorkerSettings(options)); + } + + @Test + public void testWorkerRegionAndWorkerZoneMutuallyExclusive() { + GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + options.setWorkerRegion("us-east1"); + options.setWorkerZone("us-east1-b"); + assertThrows( + IllegalArgumentException.class, () -> DataflowRunner.validateWorkerSettings(options)); + } + + @Test public void testRun() throws IOException { DataflowPipelineOptions options = buildPipelineOptions(); Pipeline p = buildDataflowPipeline(options); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java index 37cf6bc..cb11569 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java @@ -101,6 +101,37 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions { void setZone(String value); /** + * The Compute Engine region (https://cloud.google.com/compute/docs/regions-zones/regions-zones) + * in which worker processing should occur, e.g. "us-west1". Mutually exclusive with {@link + * #getWorkerZone()}. If neither workerRegion nor workerZone is specified, default to same value + * as region. + */ + @Description( + "The Compute Engine region " + + "(https://cloud.google.com/compute/docs/regions-zones/regions-zones) in which worker " + + "processing should occur, e.g. \"us-west1\". Mutually exclusive with workerZone. If " + + "neither workerRegion nor workerZone is specified, default to same value as region.") + String getWorkerRegion(); + + void setWorkerRegion(String workerRegion); + + /** + * The Compute Engine zone (https://cloud.google.com/compute/docs/regions-zones/regions-zones) in + * which worker processing should occur, e.g. "us-west1-a". Mutually exclusive with {@link + * #getWorkerRegion()}. If neither workerRegion nor workerZone is specified, the Dataflow service + * will choose a zone in region based on available capacity. + */ + @Description( + "The Compute Engine zone " + + "(https://cloud.google.com/compute/docs/regions-zones/regions-zones) in which worker " + + "processing should occur, e.g. \"us-west1-a\". Mutually exclusive with workerRegion. " + + "If neither workerRegion nor workerZone is specified, the Dataflow service will choose " + + "a zone in region based on available capacity.") + String getWorkerZone(); + + void setWorkerZone(String workerZone); + + /** * The class of the credential factory that should be created and used to create credentials. If * gcpCredential has not been set explicitly, an instance of this class will be constructed and * used as a credential factory.