Repository: flink Updated Branches: refs/heads/master 256c9c4da -> 123be2276
[FLINK-4085][Kinesis] Set Flink-specific user agent This closes #2175 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/123be227 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/123be227 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/123be227 Branch: refs/heads/master Commit: 123be22765ebf32a26e3b74dbbd00a36999a5862 Parents: 256c9c4 Author: Robert Metzger <rmetz...@apache.org> Authored: Tue Jun 28 13:54:12 2016 +0200 Committer: Robert Metzger <rmetz...@apache.org> Committed: Wed Jun 29 13:43:04 2016 +0200 ---------------------------------------------------------------------- .../connectors/kinesis/proxy/KinesisProxy.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/123be227/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java index b4ae3f5..d035c03 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java @@ -17,6 +17,8 @@ package org.apache.flink.streaming.connectors.kinesis.proxy; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.ClientConfigurationFactory; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; import com.amazonaws.services.kinesis.AmazonKinesisClient; @@ -29,6 +31,7 @@ import com.amazonaws.services.kinesis.model.LimitExceededException; import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.StreamStatus; import com.amazonaws.services.kinesis.model.Shard; +import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; @@ -58,9 +61,6 @@ public class KinesisProxy { /** The actual Kinesis client from the AWS SDK that we will be using to make calls */ private final AmazonKinesisClient kinesisClient; - /** The AWS region that this proxy will be making calls to */ - private final String regionId; - /** Configuration properties of this Flink Kinesis Connector */ private final Properties configProps; @@ -72,9 +72,14 @@ public class KinesisProxy { public KinesisProxy(Properties configProps) { this.configProps = checkNotNull(configProps); - this.regionId = configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION); - AmazonKinesisClient client = new AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials()); - client.setRegion(Region.getRegion(Regions.fromName(this.regionId))); + /* The AWS region that this proxy will be making calls to */ + String regionId = configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION); + // set Flink as a user agent + ClientConfiguration config = new ClientConfigurationFactory().getConfig(); + config.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() + " (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector"); + AmazonKinesisClient client = new AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials(), config); + + client.setRegion(Region.getRegion(Regions.fromName(regionId))); this.kinesisClient = client; }