Repository: flink
Updated Branches:
  refs/heads/master 256c9c4da -> 123be2276


[FLINK-4085][Kinesis] Set Flink-specific user agent

This closes #2175


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/123be227
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/123be227
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/123be227

Branch: refs/heads/master
Commit: 123be22765ebf32a26e3b74dbbd00a36999a5862
Parents: 256c9c4
Author: Robert Metzger <rmetz...@apache.org>
Authored: Tue Jun 28 13:54:12 2016 +0200
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Wed Jun 29 13:43:04 2016 +0200

----------------------------------------------------------------------
 .../connectors/kinesis/proxy/KinesisProxy.java     | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/123be227/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index b4ae3f5..d035c03 100644
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.connectors.kinesis.proxy;
 
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
 import com.amazonaws.regions.Region;
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.kinesis.AmazonKinesisClient;
@@ -29,6 +31,7 @@ import 
com.amazonaws.services.kinesis.model.LimitExceededException;
 import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
 import com.amazonaws.services.kinesis.model.StreamStatus;
 import com.amazonaws.services.kinesis.model.Shard;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 import 
org.apache.flink.streaming.connectors.kinesis.config.KinesisConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
@@ -58,9 +61,6 @@ public class KinesisProxy {
        /** The actual Kinesis client from the AWS SDK that we will be using to 
make calls */
        private final AmazonKinesisClient kinesisClient;
 
-       /** The AWS region that this proxy will be making calls to */
-       private final String regionId;
-
        /** Configuration properties of this Flink Kinesis Connector */
        private final Properties configProps;
 
@@ -72,9 +72,14 @@ public class KinesisProxy {
        public KinesisProxy(Properties configProps) {
                this.configProps = checkNotNull(configProps);
 
-               this.regionId = 
configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION);
-               AmazonKinesisClient client = new 
AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials());
-               
client.setRegion(Region.getRegion(Regions.fromName(this.regionId)));
+               /* The AWS region that this proxy will be making calls to */
+               String regionId = 
configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION);
+               // set Flink as a user agent
+               ClientConfiguration config = new 
ClientConfigurationFactory().getConfig();
+               config.setUserAgent("Apache Flink " + 
EnvironmentInformation.getVersion() + " (" + 
EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis 
Connector");
+               AmazonKinesisClient client = new 
AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials(),
 config);
+
+               client.setRegion(Region.getRegion(Regions.fromName(regionId)));
 
                this.kinesisClient = client;
        }

Reply via email to