This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 3e8e44d59563532ba770c5b26426f66cee40a937 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Fri Jan 19 14:04:13 2018 +0100 CAMEL-12164 - Camel-AWS Kinesis: Expose options to avoid a required client in the registry --- .../src/main/docs/aws-kinesis-component.adoc | 9 +++- .../component/aws/kinesis/KinesisComponent.java | 1 - .../aws/kinesis/KinesisConfiguration.java | 53 ++++++++++++++++++++-- .../component/aws/kinesis/KinesisEndpoint.java | 52 +++++++++++++++++++-- .../KinesisComponentConfigurationTest.java} | 37 ++++++--------- .../KinesisConsumerClosedShardWithFailTest.java | 2 + .../KinesisConsumerClosedShardWithSilentTest.java | 1 + .../component/aws/kinesis/KinesisConsumerTest.java | 1 + .../component/aws/kinesis/KinesisEndpointTest.java | 8 ++-- 9 files changed, 127 insertions(+), 37 deletions(-) diff --git a/components/camel-aws/src/main/docs/aws-kinesis-component.adoc b/components/camel-aws/src/main/docs/aws-kinesis-component.adoc index 5ec4b7d..559a89c 100644 --- a/components/camel-aws/src/main/docs/aws-kinesis-component.adoc +++ b/components/camel-aws/src/main/docs/aws-kinesis-component.adoc @@ -52,12 +52,15 @@ with the following path and query parameters: | *streamName* | *Required* Name of the stream | | String |=== -==== Query Parameters (25 parameters): +==== Query Parameters (30 parameters): [width="100%",cols="2,5,^1,2",options="header"] |=== | Name | Description | Default | Type -| *amazonKinesisClient* (common) | *Required* Amazon Kinesis client to use for all requests for this endpoint | | AmazonKinesis +| *amazonKinesisClient* (common) | Amazon Kinesis client to use for all requests for this endpoint | | AmazonKinesis +| *proxyHost* (common) | To define a proxy host when instantiating the DDBStreams client | | String +| *proxyPort* (common) | To define a proxy port when instantiating the DDBStreams client | | Integer +| *region* (common) | The region in which Kinesis client needs to work | | String | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN or ERROR level and ignored. | false | boolean | *iteratorType* (consumer) | Defines where in the Kinesis stream to start getting records | TRIM_HORIZON | ShardIteratorType | *maxResultsPerRequest* (consumer) | Maximum number of records that will be fetched in each poll | 1 | int @@ -82,6 +85,8 @@ with the following path and query parameters: | *startScheduler* (scheduler) | Whether the scheduler should be auto started. | true | boolean | *timeUnit* (scheduler) | Time unit for initialDelay and delay options. | MILLISECONDS | TimeUnit | *useFixedDelay* (scheduler) | Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in JDK for details. | true | boolean +| *accessKey* (security) | Amazon AWS Access Key | | String +| *secretKey* (security) | Amazon AWS Secret Key | | String |=== // endpoint options: END diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java index a264d3c..34789af 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java @@ -39,7 +39,6 @@ public class KinesisComponent extends DefaultComponent { setProperties(configuration, parameters); KinesisEndpoint endpoint = new KinesisEndpoint(uri, configuration, this); - setProperties(endpoint, parameters); return endpoint; } } diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java index c42e5ef..a6d38e3 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java @@ -29,8 +29,13 @@ public class KinesisConfiguration { @UriPath(description = "Name of the stream") @Metadata(required = "true") private String streamName; + @UriParam(label = "security", secret = true, description = "Amazon AWS Access Key") + private String accessKey; + @UriParam(label = "security", secret = true, description = "Amazon AWS Secret Key") + private String secretKey; + @UriParam(description = "The region in which Kinesis client needs to work") + private String region; @UriParam(description = "Amazon Kinesis client to use for all requests for this endpoint") - @Metadata(required = "true") private AmazonKinesis amazonKinesisClient; @UriParam(label = "consumer", description = "Maximum number of records that will be fetched in each poll", defaultValue = "1") private int maxResultsPerRequest = 1; @@ -45,8 +50,11 @@ public class KinesisConfiguration { + "in case of silent there will be no logging and the consumer will start from the beginning," + "in case of fail a ReachedClosedStateException will be raised") private KinesisShardClosedStrategyEnum shardClosed; - - // required for injection. + @UriParam(description = "To define a proxy host when instantiating the DDBStreams client") + private String proxyHost; + @UriParam(description = "To define a proxy port when instantiating the DDBStreams client") + private Integer proxyPort; + public AmazonKinesis getAmazonKinesisClient() { return amazonKinesisClient; } @@ -102,5 +110,44 @@ public class KinesisConfiguration { public void setShardClosed(KinesisShardClosedStrategyEnum shardClosed) { this.shardClosed = shardClosed; } + + public String getAccessKey() { + return accessKey; + } + + public void setAccessKey(String accessKey) { + this.accessKey = accessKey; + } + + public String getSecretKey() { + return secretKey; + } + + public void setSecretKey(String secretKey) { + this.secretKey = secretKey; + } + + public String getRegion() { + return region; + } + + public void setRegion(String region) { + this.region = region; + } + + public String getProxyHost() { + return proxyHost; + } + + public void setProxyHost(String proxyHost) { + this.proxyHost = proxyHost; + } + + public Integer getProxyPort() { + return proxyPort; + } + public void setProxyPort(Integer proxyPort) { + this.proxyPort = proxyPort; + } } diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java index 1a7e79b..127cdd6 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java @@ -16,7 +16,13 @@ */ package org.apache.camel.component.aws.kinesis; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.ShardIteratorType; @@ -27,6 +33,7 @@ import org.apache.camel.Producer; import org.apache.camel.impl.ScheduledPollEndpoint; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; +import org.apache.camel.util.ObjectHelper; /** * The aws-kinesis component is for consuming and producing records from Amazon @@ -37,7 +44,9 @@ public class KinesisEndpoint extends ScheduledPollEndpoint { @UriParam private KinesisConfiguration configuration; - + + private AmazonKinesis kinesisClient; + public KinesisEndpoint(String uri, KinesisConfiguration configuration, KinesisComponent component) { super(uri, component); this.configuration = configuration; @@ -45,11 +54,15 @@ public class KinesisEndpoint extends ScheduledPollEndpoint { @Override protected void doStart() throws Exception { + super.doStart(); + kinesisClient = configuration.getAmazonKinesisClient() != null ? configuration.getAmazonKinesisClient() + : createKinesisClient(); + + if ((configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || configuration.getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) && configuration.getSequenceNumber().isEmpty()) { throw new IllegalArgumentException("Sequence Number must be specified with iterator Types AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER"); } - super.doStart(); } @Override @@ -80,10 +93,43 @@ public class KinesisEndpoint extends ScheduledPollEndpoint { } public AmazonKinesis getClient() { - return configuration.getAmazonKinesisClient(); + return kinesisClient; } public KinesisConfiguration getConfiguration() { return configuration; } + + AmazonKinesis createKinesisClient() { + AmazonKinesis client = null; + ClientConfiguration clientConfiguration = null; + AmazonKinesisClientBuilder clientBuilder = null; + boolean isClientConfigFound = false; + if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) { + clientConfiguration = new ClientConfiguration(); + clientConfiguration.setProxyHost(configuration.getProxyHost()); + clientConfiguration.setProxyPort(configuration.getProxyPort()); + isClientConfigFound = true; + } + if (configuration.getAccessKey() != null && configuration.getSecretKey() != null) { + AWSCredentials credentials = new BasicAWSCredentials(configuration.getAccessKey(), configuration.getSecretKey()); + AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials); + if (isClientConfigFound) { + clientBuilder = AmazonKinesisClientBuilder.standard().withClientConfiguration(clientConfiguration).withCredentials(credentialsProvider); + } else { + clientBuilder = AmazonKinesisClientBuilder.standard().withCredentials(credentialsProvider); + } + } else { + if (isClientConfigFound) { + clientBuilder = AmazonKinesisClientBuilder.standard(); + } else { + clientBuilder = AmazonKinesisClientBuilder.standard().withClientConfiguration(clientConfiguration); + } + } + if (ObjectHelper.isNotEmpty(configuration.getRegion())) { + clientBuilder = clientBuilder.withRegion(configuration.getRegion()); + } + client = clientBuilder.build(); + return client; + } } diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisComponentConfigurationTest.java similarity index 52% copy from components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java copy to components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisComponentConfigurationTest.java index a264d3c..0e45169 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisComponentConfigurationTest.java @@ -16,30 +16,19 @@ */ package org.apache.camel.component.aws.kinesis; -import java.util.Map; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; -import org.apache.camel.CamelContext; -import org.apache.camel.Endpoint; -import org.apache.camel.impl.DefaultComponent; - -public class KinesisComponent extends DefaultComponent { - - public KinesisComponent() { - this(null); - } - - public KinesisComponent(CamelContext context) { - super(context); - } - - @Override - protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { - KinesisConfiguration configuration = new KinesisConfiguration(); - configuration.setStreamName(remaining); - setProperties(configuration, parameters); +public class KinesisComponentConfigurationTest extends CamelTestSupport { + + @Test + public void createEndpointWithAccessAndSecretKey() throws Exception { + KinesisComponent component = new KinesisComponent(context); + KinesisEndpoint endpoint = (KinesisEndpoint)component.createEndpoint("aws-kinesis://some_stream_name?accessKey=xxxxx&secretKey=yyyyy"); - KinesisEndpoint endpoint = new KinesisEndpoint(uri, configuration, this); - setProperties(endpoint, parameters); - return endpoint; + assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName()); + assertEquals("xxxxx", endpoint.getConfiguration().getAccessKey()); + assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey()); } -} + +} \ No newline at end of file diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithFailTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithFailTest.java index 6f0b7e9..eb1d24f 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithFailTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithFailTest.java @@ -67,7 +67,9 @@ public class KinesisConsumerClosedShardWithFailTest { configuration.setShardClosed(KinesisShardClosedStrategyEnum.fail); configuration.setStreamName("streamName"); KinesisEndpoint endpoint = new KinesisEndpoint(null, configuration, component); + endpoint.start(); undertest = new KinesisConsumer(endpoint, processor); + SequenceNumberRange range = new SequenceNumberRange().withEndingSequenceNumber("20"); Shard shard = new Shard().withShardId("shardId").withSequenceNumberRange(range); diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithSilentTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithSilentTest.java index 91e5b7c..7815002 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithSilentTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithSilentTest.java @@ -71,6 +71,7 @@ public class KinesisConsumerClosedShardWithSilentTest { configuration.setShardClosed(KinesisShardClosedStrategyEnum.silent); configuration.setStreamName("streamName"); KinesisEndpoint endpoint = new KinesisEndpoint(null, configuration, component); + endpoint.start(); undertest = new KinesisConsumer(endpoint, processor); SequenceNumberRange range = new SequenceNumberRange().withEndingSequenceNumber("20"); diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java index 6653025..9f990da 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java @@ -72,6 +72,7 @@ public class KinesisConsumerTest { configuration.setShardClosed(KinesisShardClosedStrategyEnum.silent); configuration.setStreamName("streamName"); KinesisEndpoint endpoint = new KinesisEndpoint(null, configuration, component); + endpoint.start(); undertest = new KinesisConsumer(endpoint, processor); SequenceNumberRange range = new SequenceNumberRange().withEndingSequenceNumber(null); diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java index a49e074..4f02ec4 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java @@ -55,7 +55,7 @@ public class KinesisEndpointTest { + "&sequenceNumber=123" ); - assertThat(endpoint.getClient(), is(amazonKinesisClient)); + assertThat(endpoint.getConfiguration().getAmazonKinesisClient(), is(amazonKinesisClient)); assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name")); assertThat(endpoint.getConfiguration().getIteratorType(), is(ShardIteratorType.LATEST)); assertThat(endpoint.getConfiguration().getMaxResultsPerRequest(), is(101)); @@ -69,7 +69,7 @@ public class KinesisEndpointTest { + "?amazonKinesisClient=#kinesisClient" ); - assertThat(endpoint.getClient(), is(amazonKinesisClient)); + assertThat(endpoint.getConfiguration().getAmazonKinesisClient(), is(amazonKinesisClient)); assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name")); assertThat(endpoint.getConfiguration().getIteratorType(), is(ShardIteratorType.TRIM_HORIZON)); assertThat(endpoint.getConfiguration().getMaxResultsPerRequest(), is(1)); @@ -84,7 +84,7 @@ public class KinesisEndpointTest { + "&sequenceNumber=123" ); - assertThat(endpoint.getClient(), is(amazonKinesisClient)); + assertThat(endpoint.getConfiguration().getAmazonKinesisClient(), is(amazonKinesisClient)); assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name")); assertThat(endpoint.getConfiguration().getIteratorType(), is(ShardIteratorType.AFTER_SEQUENCE_NUMBER)); assertThat(endpoint.getConfiguration().getShardId(), is("abc")); @@ -100,7 +100,7 @@ public class KinesisEndpointTest { + "&sequenceNumber=123" ); - assertThat(endpoint.getClient(), is(amazonKinesisClient)); + assertThat(endpoint.getConfiguration().getAmazonKinesisClient(), is(amazonKinesisClient)); assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name")); assertThat(endpoint.getConfiguration().getIteratorType(), is(ShardIteratorType.AT_SEQUENCE_NUMBER)); assertThat(endpoint.getConfiguration().getShardId(), is("abc")); -- To stop receiving notification emails like this one, please contact "commits@camel.apache.org" <commits@camel.apache.org>.