Repository: beam Updated Branches: refs/heads/master 66d7b6f84 -> ba5bee668
[BEAM-2958] Adding user agent string to PipelineOptions. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a83cae5b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a83cae5b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a83cae5b Branch: refs/heads/master Commit: a83cae5b3e9535134507b0306355300a9a19dfa1 Parents: 66d7b6f Author: Daniel Oliveira <daniel.o.program...@gmail.com> Authored: Wed Sep 27 12:24:17 2017 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Tue Oct 3 11:46:31 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 6 ++++ .../runners/dataflow/DataflowRunnerTest.java | 12 +++++++ .../beam/sdk/options/PipelineOptions.java | 36 ++++++++++++++++++++ .../beam/sdk/options/PipelineOptionsTest.java | 11 ++++++ .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 26 +++----------- 5 files changed, 70 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a83cae5b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index d5a9845..4cd3db0 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -297,6 +297,12 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { dataflowOptions.setGcsUploadBufferSizeBytes(GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT); } + DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo(); + String userAgent = String + .format("%s/%s", dataflowRunnerInfo.getName(), dataflowRunnerInfo.getVersion()) + .replace(" ", "_"); + dataflowOptions.setUserAgent(userAgent); + return new DataflowRunner(dataflowOptions); } http://git-wip-us.apache.org/repos/asf/beam/blob/a83cae5b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index f1e3805..0e3c266 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -341,6 +341,18 @@ public class DataflowRunnerTest implements Serializable { } @Test + public void testFromOptionsUserAgentFromPipelineInfo() throws Exception { + DataflowPipelineOptions options = buildPipelineOptions(); + DataflowRunner.fromOptions(options); + + String expectedName = DataflowRunnerInfo.getDataflowRunnerInfo().getName().replace(" ", "_"); + assertThat(options.getUserAgent(), containsString(expectedName)); + + String expectedVersion = DataflowRunnerInfo.getDataflowRunnerInfo().getVersion(); + assertThat(options.getUserAgent(), containsString(expectedVersion)); + } + + @Test public void testRun() throws IOException { DataflowPipelineOptions options = buildPipelineOptions(); Pipeline p = buildDataflowPipeline(options); http://git-wip-us.apache.org/repos/asf/beam/blob/a83cae5b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index 5cc0b3f..77117b6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.options.ProxyInvocationHandler.Deserializer; import org.apache.beam.sdk.options.ProxyInvocationHandler.Serializer; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.util.ReleaseInfo; import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; @@ -353,4 +354,39 @@ public interface PipelineOptions extends HasDisplayData { return NEXT_ID.getAndIncrement(); } } + + /** + * A user agent string as per RFC2616, describing the pipeline to external services. + * + * <p>https://www.ietf.org/rfc/rfc2616.txt + * + * <p>It should follow the BNF Form: + * <pre><code> + * user agent = 1*(product | comment) + * product = token ["/" product-version] + * product-version = token + * </code></pre> + * Where a token is a series of characters without a separator. + * + * <p>The string defaults to {@code [name]/[version]} based on the properties of the Apache Beam + * release. + */ + @Description("A user agent string describing the pipeline to external services." + + " The format should follow RFC2616. This option defaults to \"[name]/[version]\"" + + " where name and version are properties of the Apache Beam release.") + @Default.InstanceFactory(UserAgentFactory.class) + String getUserAgent(); + void setUserAgent(String userAgent); + + /** + * Returns a user agent string constructed from {@link ReleaseInfo#getName()} and + * {@link ReleaseInfo#getVersion()}, in the format {@code [name]/[version]}. + */ + class UserAgentFactory implements DefaultValueFactory<String> { + @Override + public String create(PipelineOptions options) { + ReleaseInfo info = ReleaseInfo.getReleaseInfo(); + return String.format("%s/%s", info.getName(), info.getVersion()).replace(" ", "_"); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/a83cae5b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java index 5e3211f..7f80c0c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.options; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -36,6 +37,8 @@ import org.junit.runners.JUnit4; /** Unit tests for {@link PipelineOptions}. */ @RunWith(JUnit4.class) public class PipelineOptionsTest { + private static final String DEFAULT_USER_AGENT_NAME = "Apache_Beam_SDK_for_Java"; + @Rule public ExpectedException expectedException = ExpectedException.none(); /** Interfaces used for testing that {@link PipelineOptions#as(Class)} functions. */ @@ -106,4 +109,12 @@ public class PipelineOptionsTest { } } } + + @Test + public void testUserAgentFactory() { + PipelineOptions options = PipelineOptionsFactory.create(); + String userAgent = options.getUserAgent(); + assertNotNull(userAgent); + assertTrue(userAgent.contains(DEFAULT_USER_AGENT_NAME)); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/a83cae5b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 252f6c5..47efa08 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -60,7 +60,6 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.util.ReleaseInfo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -289,10 +288,9 @@ public class BigtableIO { BigtableOptions.Builder clonedBuilder = options.toBuilder() .setUseCachedDataPool(true); - BigtableOptions optionsWithAgent = - clonedBuilder.setUserAgent(getBeamSdkPartOfUserAgent()).build(); + BigtableOptions clonedOptions = clonedBuilder.build(); - return toBuilder().setBigtableOptions(optionsWithAgent).build(); + return toBuilder().setBigtableOptions(clonedOptions).build(); } /** @@ -498,9 +496,8 @@ public class BigtableIO { .setUseBulkApi(true) .build()) .setUseCachedDataPool(true); - BigtableOptions optionsWithAgent = - clonedBuilder.setUserAgent(getBeamSdkPartOfUserAgent()).build(); - return toBuilder().setBigtableOptions(optionsWithAgent).build(); + BigtableOptions clonedOptions = clonedBuilder.build(); + return toBuilder().setBigtableOptions(clonedOptions).build(); } /** Disables validation that the table being written to exists. */ @@ -595,6 +592,7 @@ public class BigtableIO { return getBigtableService(); } BigtableOptions.Builder clonedOptions = getBigtableOptions().toBuilder(); + clonedOptions.setUserAgent(pipelineOptions.getUserAgent()); if (getBigtableOptions().getCredentialOptions() .getCredentialType() == CredentialType.DefaultCredentials) { clonedOptions.setCredentialOptions( @@ -1100,18 +1098,4 @@ public class BigtableIO { cause); } } - - /** - * A helper function to produce a Cloud Bigtable user agent string. This need only include - * information about the Apache Beam SDK itself, because Bigtable will automatically append - * other relevant system and Bigtable client-specific version information. - * - * @see com.google.cloud.bigtable.config.BigtableVersionInfo - */ - private static String getBeamSdkPartOfUserAgent() { - ReleaseInfo info = ReleaseInfo.getReleaseInfo(); - return - String.format("%s/%s", info.getName(), info.getVersion()) - .replace(" ", "_"); - } }