Repository: beam Updated Branches: refs/heads/master 25b9c35a9 -> 7db0f13a1
[BEAM-2455] Backlog size retrieval for Kinesis source Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7cb3dda1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7cb3dda1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7cb3dda1 Branch: refs/heads/master Commit: 7cb3dda1bc712f3cac5296e5d7cae80142bd8b10 Parents: 25b9c35 Author: Pawel Kaczmarczyk <p.kaczmarc...@ocado.com> Authored: Wed Jul 12 17:16:33 2017 +0200 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Fri Sep 29 16:46:45 2017 -0700 ---------------------------------------------------------------------- sdks/java/io/kinesis/pom.xml | 6 ++ .../beam/sdk/io/kinesis/AWSClientsProvider.java | 36 +++++++ .../sdk/io/kinesis/KinesisClientProvider.java | 33 ------ .../apache/beam/sdk/io/kinesis/KinesisIO.java | 107 +++++++++++++------ .../beam/sdk/io/kinesis/KinesisReader.java | 50 ++++++++- .../beam/sdk/io/kinesis/KinesisSource.java | 40 ++++--- .../sdk/io/kinesis/SimplifiedKinesisClient.java | 97 +++++++++++++++-- .../io/kinesis/TransientKinesisException.java | 4 +- .../beam/sdk/io/kinesis/AmazonKinesisMock.java | 12 ++- .../sdk/io/kinesis/KinesisMockReadTest.java | 5 +- .../beam/sdk/io/kinesis/KinesisReaderIT.java | 5 +- .../beam/sdk/io/kinesis/KinesisReaderTest.java | 34 +++++- .../io/kinesis/SimplifiedKinesisClientTest.java | 107 +++++++++++++++++++ 13 files changed, 434 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/pom.xml b/sdks/java/io/kinesis/pom.xml index 872c590..2a54cc1 100644 --- a/sdks/java/io/kinesis/pom.xml +++ b/sdks/java/io/kinesis/pom.xml @@ -73,6 +73,12 @@ <dependency> <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-cloudwatch</artifactId> + <version>${aws.version}</version> + </dependency> + + <dependency> + <groupId>com.amazonaws</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>1.6.1</version> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/AWSClientsProvider.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/AWSClientsProvider.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/AWSClientsProvider.java new file mode 100644 index 0000000..c82e4b1 --- /dev/null +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/AWSClientsProvider.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kinesis; + +import com.amazonaws.services.cloudwatch.AmazonCloudWatch; +import com.amazonaws.services.kinesis.AmazonKinesis; + +import java.io.Serializable; + +/** + * Provides instances of AWS clients. + * + * <p>Please note, that any instance of {@link AWSClientsProvider} must be + * {@link Serializable} to ensure it can be sent to worker machines. + */ +public interface AWSClientsProvider extends Serializable { + + AmazonKinesis getKinesisClient(); + + AmazonCloudWatch getCloudWatchClient(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java deleted file mode 100644 index c48f9cc..0000000 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.kinesis; - -import com.amazonaws.services.kinesis.AmazonKinesis; - -import java.io.Serializable; - -/** - * Provides instances of {@link AmazonKinesis} interface. - * - * <p>Please note, that any instance of {@link KinesisClientProvider} must be - * {@link Serializable} to ensure it can be sent to worker machines. - */ -public interface KinesisClientProvider extends Serializable { - - AmazonKinesis get(); -} http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java index ef39a91..96f7a04 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java @@ -23,6 +23,8 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.internal.StaticCredentialsProvider; import com.amazonaws.regions.Regions; +import com.amazonaws.services.cloudwatch.AmazonCloudWatch; +import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; @@ -46,8 +48,9 @@ import org.joda.time.Instant; * * <pre>{@code * p.apply(KinesisIO.read() - * .from("streamName", InitialPositionInStream.LATEST) - * .withClientProvider("AWS_KEY", _"AWS_SECRET", STREAM_REGION) + * .withStreamName("streamName") + * .withInitialPositionInStream(InitialPositionInStream.LATEST) + * .withAWSClientsProvider("AWS_KEY", _"AWS_SECRET", STREAM_REGION) * .apply( ... ) // other transformations * }</pre> * @@ -60,23 +63,28 @@ import org.joda.time.Instant; * <li>{@link InitialPositionInStream#TRIM_HORIZON} - reading will begin at * the very beginning of the stream</li> * </ul></li> - * <li>data used to initialize {@link AmazonKinesis} client: + * <li>data used to initialize {@link AmazonKinesis} and {@link AmazonCloudWatch} clients: * <ul> * <li>credentials (aws key, aws secret)</li> * <li>region where the stream is located</li> * </ul></li> * </ul> * - * <p>In case when you want to set up {@link AmazonKinesis} client by your own - * (for example if you're using more sophisticated authorization methods like Amazon STS, etc.) - * you can do it by implementing {@link KinesisClientProvider} class: + * <p>In case when you want to set up {@link AmazonKinesis} or {@link AmazonCloudWatch} client by + * your own (for example if you're using more sophisticated authorization methods like Amazon + * STS, etc.) you can do it by implementing {@link AWSClientsProvider} class: * * <pre>{@code - * public class MyCustomKinesisClientProvider implements KinesisClientProvider { + * public class MyCustomKinesisClientProvider implements AWSClientsProvider { * {@literal @}Override - * public AmazonKinesis get() { + * public AmazonKinesis getKinesisClient() { * // set up your client here * } + * + * public AmazonCloudWatch getCloudWatchClient() { + * // set up your client here + * } + * * } * }</pre> * @@ -84,8 +92,9 @@ import org.joda.time.Instant; * * <pre>{@code * p.apply(KinesisIO.read() - * .from("streamName", InitialPositionInStream.LATEST) - * .withClientProvider(new MyCustomKinesisClientProvider()) + * .withStreamName("streamName") + * .withInitialPositionInStream(InitialPositionInStream.LATEST) + * .withAWSClientsProvider(new MyCustomKinesisClientProvider()) * .apply( ... ) // other transformations * }</pre> * @@ -94,8 +103,9 @@ import org.joda.time.Instant; * * <pre>{@code * p.apply(KinesisIO.read() - * .from("streamName", instant) - * .withClientProvider(new MyCustomKinesisClientProvider()) + * .withStreamName("streamName") + * .withInitialTimestampInStream(instant) + * .withAWSClientsProvider(new MyCustomKinesisClientProvider()) * .apply( ... ) // other transformations * }</pre> * @@ -105,7 +115,10 @@ public final class KinesisIO { /** Returns a new {@link Read} transform for reading from Kinesis. */ public static Read read() { - return new AutoValue_KinesisIO_Read.Builder().setMaxNumRecords(-1).build(); + return new AutoValue_KinesisIO_Read.Builder() + .setMaxNumRecords(-1) + .setUpToDateThreshold(Duration.ZERO) + .build(); } /** Implementation of {@link #read}. */ @@ -119,13 +132,15 @@ public final class KinesisIO { abstract StartingPoint getInitialPosition(); @Nullable - abstract KinesisClientProvider getClientProvider(); + abstract AWSClientsProvider getAWSClientsProvider(); abstract int getMaxNumRecords(); @Nullable abstract Duration getMaxReadTime(); + abstract Duration getUpToDateThreshold(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -135,54 +150,61 @@ public final class KinesisIO { abstract Builder setInitialPosition(StartingPoint startingPoint); - abstract Builder setClientProvider(KinesisClientProvider clientProvider); + abstract Builder setAWSClientsProvider(AWSClientsProvider clientProvider); abstract Builder setMaxNumRecords(int maxNumRecords); abstract Builder setMaxReadTime(Duration maxReadTime); + abstract Builder setUpToDateThreshold(Duration upToDateThreshold); + abstract Read build(); } /** - * Specify reading from streamName at some initial position. + * Specify reading from streamName. + */ + public Read withStreamName(String streamName) { + return toBuilder().setStreamName(streamName).build(); + } + + /** + * Specify reading from some initial position in stream. */ - public Read from(String streamName, InitialPositionInStream initialPosition) { + public Read withInitialPositionInStream(InitialPositionInStream initialPosition) { return toBuilder() - .setStreamName(streamName) .setInitialPosition(new StartingPoint(initialPosition)) .build(); } /** - * Specify reading from streamName beginning at given {@link Instant}. + * Specify reading beginning at given {@link Instant}. * This {@link Instant} must be in the past, i.e. before {@link Instant#now()}. */ - public Read from(String streamName, Instant initialTimestamp) { + public Read withInitialTimestampInStream(Instant initialTimestamp) { return toBuilder() - .setStreamName(streamName) .setInitialPosition(new StartingPoint(initialTimestamp)) .build(); } /** - * Allows to specify custom {@link KinesisClientProvider}. - * {@link KinesisClientProvider} provides {@link AmazonKinesis} instances which are later - * used for communication with Kinesis. - * You should use this method if {@link Read#withClientProvider(String, String, Regions)} + * Allows to specify custom {@link AWSClientsProvider}. + * {@link AWSClientsProvider} provides {@link AmazonKinesis} and {@link AmazonCloudWatch} + * instances which are later used for communication with Kinesis. + * You should use this method if {@link Read#withAWSClientsProvider(String, String, Regions)} * does not suit your needs. */ - public Read withClientProvider(KinesisClientProvider kinesisClientProvider) { - return toBuilder().setClientProvider(kinesisClientProvider).build(); + public Read withAWSClientsProvider(AWSClientsProvider awsClientsProvider) { + return toBuilder().setAWSClientsProvider(awsClientsProvider).build(); } /** * Specify credential details and region to be used to read from Kinesis. * If you need more sophisticated credential protocol, then you should look at - * {@link Read#withClientProvider(KinesisClientProvider)}. + * {@link Read#withAWSClientsProvider(AWSClientsProvider)}. */ - public Read withClientProvider(String awsAccessKey, String awsSecretKey, Regions region) { - return withClientProvider(new BasicKinesisProvider(awsAccessKey, awsSecretKey, region)); + public Read withAWSClientsProvider(String awsAccessKey, String awsSecretKey, Regions region) { + return withAWSClientsProvider(new BasicKinesisProvider(awsAccessKey, awsSecretKey, region)); } /** Specifies to read at most a given number of records. */ @@ -198,11 +220,23 @@ public final class KinesisIO { return toBuilder().setMaxReadTime(maxReadTime).build(); } + /** + * Specifies how late records consumed by this source can be to still be considered on time. + * When this limit is exceeded the actual backlog size will be evaluated and the runner might + * decide to scale the amount of resources allocated to the pipeline in order to + * speed up ingestion. + */ + public Read withUpToDateThreshold(Duration upToDateThreshold) { + checkArgument(upToDateThreshold != null, "upToDateThreshold can not be null"); + return toBuilder().setUpToDateThreshold(upToDateThreshold).build(); + } + @Override public PCollection<KinesisRecord> expand(PBegin input) { org.apache.beam.sdk.io.Read.Unbounded<KinesisRecord> read = org.apache.beam.sdk.io.Read.from( - new KinesisSource(getClientProvider(), getStreamName(), getInitialPosition())); + new KinesisSource(getAWSClientsProvider(), getStreamName(), + getInitialPosition(), getUpToDateThreshold())); if (getMaxNumRecords() > 0) { BoundedReadFromUnboundedSource<KinesisRecord> bounded = read.withMaxNumRecords(getMaxNumRecords()); @@ -216,7 +250,7 @@ public final class KinesisIO { } } - private static final class BasicKinesisProvider implements KinesisClientProvider { + private static final class BasicKinesisProvider implements AWSClientsProvider { private final String accessKey; private final String secretKey; @@ -240,11 +274,18 @@ public final class KinesisIO { } @Override - public AmazonKinesis get() { + public AmazonKinesis getKinesisClient() { AmazonKinesisClient client = new AmazonKinesisClient(getCredentialsProvider()); client.withRegion(region); return client; } + + @Override + public AmazonCloudWatch getCloudWatchClient() { + AmazonCloudWatchClient client = new AmazonCloudWatchClient(getCredentialsProvider()); + client.withRegion(region); + return client; + } } } } http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java index 1abcd98..8095150 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java @@ -62,25 +62,41 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> { private static final int MIN_WATERMARK_SPREAD = 2; private final SimplifiedKinesisClient kinesis; - private final UnboundedSource<KinesisRecord, ?> source; + private final KinesisSource source; private final CheckpointGenerator initialCheckpointGenerator; private RoundRobin<ShardRecordsIterator> shardIterators; private CustomOptional<KinesisRecord> currentRecord = CustomOptional.absent(); private MovingFunction minReadTimestampMsSinceEpoch; private Instant lastWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; + private long lastBacklogBytes; + private Instant backlogBytesLastCheckTime = new Instant(0L); + private Duration upToDateThreshold; + private Duration backlogBytesCheckThreshold; - public KinesisReader(SimplifiedKinesisClient kinesis, + KinesisReader(SimplifiedKinesisClient kinesis, CheckpointGenerator initialCheckpointGenerator, - UnboundedSource<KinesisRecord, ?> source) { + KinesisSource source, + Duration upToDateThreshold) { + this(kinesis, initialCheckpointGenerator, source, upToDateThreshold, + Duration.standardSeconds(30)); + } + + KinesisReader(SimplifiedKinesisClient kinesis, + CheckpointGenerator initialCheckpointGenerator, + KinesisSource source, + Duration upToDateThreshold, + Duration backlogBytesCheckThreshold) { this.kinesis = checkNotNull(kinesis, "kinesis"); - this.initialCheckpointGenerator = - checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator"); + this.initialCheckpointGenerator = checkNotNull(initialCheckpointGenerator, + "initialCheckpointGenerator"); this.source = source; this.minReadTimestampMsSinceEpoch = new MovingFunction(SAMPLE_PERIOD.getMillis(), SAMPLE_UPDATE.getMillis(), MIN_WATERMARK_SPREAD, MIN_WATERMARK_MESSAGES, Min.ofLongs()); + this.upToDateThreshold = upToDateThreshold; + this.backlogBytesCheckThreshold = backlogBytesCheckThreshold; } /** @@ -181,4 +197,28 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> { return source; } + /** + * Returns total size of all records that remain in Kinesis stream after current watermark. + * When currently processed record is not further behind than {@link #upToDateThreshold} + * then this method returns 0. + */ + @Override + public long getTotalBacklogBytes() { + Instant watermark = getWatermark(); + if (watermark.plus(upToDateThreshold).isAfterNow()) { + return 0L; + } + if (backlogBytesLastCheckTime.plus(backlogBytesCheckThreshold).isAfterNow()) { + return lastBacklogBytes; + } + try { + lastBacklogBytes = kinesis.getBacklogBytes(source.getStreamName(), watermark); + backlogBytesLastCheckTime = Instant.now(); + } catch (TransientKinesisException e) { + LOG.warn("Transient exception occurred.", e); + } + LOG.info("Total backlog bytes for {} stream with {} watermark: {}", source.getStreamName(), + watermark, lastBacklogBytes); + return lastBacklogBytes; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java index 144bd80..b1a6c19 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; +import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,18 +37,24 @@ class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoi private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class); - private final KinesisClientProvider kinesis; + private final AWSClientsProvider awsClientsProvider; + private final String streamName; + private final Duration upToDateThreshold; private CheckpointGenerator initialCheckpointGenerator; - public KinesisSource(KinesisClientProvider kinesis, String streamName, - StartingPoint startingPoint) { - this(kinesis, new DynamicCheckpointGenerator(streamName, startingPoint)); + KinesisSource(AWSClientsProvider awsClientsProvider, String streamName, + StartingPoint startingPoint, Duration upToDateThreshold) { + this(awsClientsProvider, new DynamicCheckpointGenerator(streamName, startingPoint), streamName, + upToDateThreshold); } - private KinesisSource(KinesisClientProvider kinesisClientProvider, - CheckpointGenerator initialCheckpoint) { - this.kinesis = kinesisClientProvider; + private KinesisSource(AWSClientsProvider awsClientsProvider, + CheckpointGenerator initialCheckpoint, String streamName, + Duration upToDateThreshold) { + this.awsClientsProvider = awsClientsProvider; this.initialCheckpointGenerator = initialCheckpoint; + this.streamName = streamName; + this.upToDateThreshold = upToDateThreshold; validate(); } @@ -60,14 +67,16 @@ class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoi public List<KinesisSource> split(int desiredNumSplits, PipelineOptions options) throws Exception { KinesisReaderCheckpoint checkpoint = - initialCheckpointGenerator.generate(SimplifiedKinesisClient.from(kinesis)); + initialCheckpointGenerator.generate(SimplifiedKinesisClient.from(awsClientsProvider)); List<KinesisSource> sources = newArrayList(); for (KinesisReaderCheckpoint partition : checkpoint.splitInto(desiredNumSplits)) { sources.add(new KinesisSource( - kinesis, - new StaticCheckpointGenerator(partition))); + awsClientsProvider, + new StaticCheckpointGenerator(partition), + streamName, + upToDateThreshold)); } return sources; } @@ -90,9 +99,10 @@ class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoi LOG.info("Creating new reader using {}", checkpointGenerator); return new KinesisReader( - SimplifiedKinesisClient.from(kinesis), + SimplifiedKinesisClient.from(awsClientsProvider), checkpointGenerator, - this); + this, + upToDateThreshold); } @Override @@ -102,7 +112,7 @@ class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoi @Override public void validate() { - checkNotNull(kinesis); + checkNotNull(awsClientsProvider); checkNotNull(initialCheckpointGenerator); } @@ -110,4 +120,8 @@ class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoi public Coder<KinesisRecord> getOutputCoder() { return KinesisRecordCoder.of(); } + + String getStreamName() { + return streamName; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java index e83fc8b..74605e5 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java @@ -17,7 +17,15 @@ */ package org.apache.beam.sdk.io.kinesis; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.cloudwatch.AmazonCloudWatch; +import com.amazonaws.services.cloudwatch.model.Datapoint; +import com.amazonaws.services.cloudwatch.model.Dimension; +import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsRequest; +import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsResult; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; @@ -31,11 +39,13 @@ import com.amazonaws.services.kinesis.model.ShardIteratorType; import com.amazonaws.services.kinesis.model.StreamDescription; import com.google.common.collect.Lists; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.concurrent.Callable; import org.joda.time.Instant; +import org.joda.time.Minutes; /** * Wraps {@link AmazonKinesis} class providing much simpler interface and @@ -43,14 +53,22 @@ import org.joda.time.Instant; */ class SimplifiedKinesisClient { + private static final String KINESIS_NAMESPACE = "AWS/Kinesis"; + private static final String INCOMING_RECORDS_METRIC = "IncomingBytes"; + private static final int PERIOD_GRANULARITY_IN_SECONDS = 60; + private static final String SUM_STATISTIC = "Sum"; + private static final String STREAM_NAME_DIMENSION = "StreamName"; private final AmazonKinesis kinesis; + private final AmazonCloudWatch cloudWatch; - public SimplifiedKinesisClient(AmazonKinesis kinesis) { - this.kinesis = kinesis; + public SimplifiedKinesisClient(AmazonKinesis kinesis, AmazonCloudWatch cloudWatch) { + this.kinesis = checkNotNull(kinesis, "kinesis"); + this.cloudWatch = checkNotNull(cloudWatch, "cloudWatch"); } - public static SimplifiedKinesisClient from(KinesisClientProvider provider) { - return new SimplifiedKinesisClient(provider.get()); + public static SimplifiedKinesisClient from(AWSClientsProvider provider) { + return new SimplifiedKinesisClient(provider.getKinesisClient(), + provider.getCloudWatchClient()); } public String getShardIterator(final String streamName, final String shardId, @@ -133,13 +151,69 @@ class SimplifiedKinesisClient { } /** + * Gets total size in bytes of all events that remain in Kinesis stream after specified instant. + * + * @return total size in bytes of all Kinesis events after specified instant + */ + public long getBacklogBytes(String streamName, Instant countSince) + throws TransientKinesisException { + return getBacklogBytes(streamName, countSince, new Instant()); + } + + /** + * Gets total size in bytes of all events that remain in Kinesis stream between specified + * instants. + * + * @return total size in bytes of all Kinesis events after specified instant + */ + public long getBacklogBytes(final String streamName, final Instant countSince, + final Instant countTo) throws TransientKinesisException { + return wrapExceptions(new Callable<Long>() { + + @Override + public Long call() throws Exception { + Minutes period = Minutes.minutesBetween(countSince, countTo); + if (period.isLessThan(Minutes.ONE)) { + return 0L; + } + + GetMetricStatisticsRequest request = createMetricStatisticsRequest(streamName, + countSince, countTo, period); + + long totalSizeInBytes = 0; + GetMetricStatisticsResult result = cloudWatch.getMetricStatistics(request); + for (Datapoint point : result.getDatapoints()) { + totalSizeInBytes += point + .getSum() + .longValue(); + } + return totalSizeInBytes; + } + }); + } + + GetMetricStatisticsRequest createMetricStatisticsRequest(String streamName, Instant countSince, + Instant countTo, Minutes period) { + return new GetMetricStatisticsRequest() + .withNamespace(KINESIS_NAMESPACE) + .withMetricName(INCOMING_RECORDS_METRIC) + .withPeriod(period.getMinutes() * PERIOD_GRANULARITY_IN_SECONDS) + .withStartTime(countSince.toDate()) + .withEndTime(countTo.toDate()) + .withStatistics(Collections.singletonList(SUM_STATISTIC)) + .withDimensions(Collections.singletonList(new Dimension() + .withName(STREAM_NAME_DIMENSION) + .withValue(streamName))); + } + + /** * Wraps Amazon specific exceptions into more friendly format. * - * @throws TransientKinesisException - in case of recoverable situation, i.e. - * the request rate is too high, Kinesis remote service - * failed, network issue, etc. - * @throws ExpiredIteratorException - if iterator needs to be refreshed - * @throws RuntimeException - in all other cases + * @throws TransientKinesisException - in case of recoverable situation, i.e. + * the request rate is too high, Kinesis remote service + * failed, network issue, etc. + * @throws ExpiredIteratorException - if iterator needs to be refreshed + * @throws RuntimeException - in all other cases */ private <T> T wrapExceptions(Callable<T> callable) throws TransientKinesisException { try { @@ -155,6 +229,11 @@ class SimplifiedKinesisClient { "Kinesis backend failed. Wait some time and retry.", e); } throw new RuntimeException("Kinesis client side failure", e); + } catch (AmazonClientException e) { + if (e.isRetryable()) { + throw new TransientKinesisException("Retryable client failure", e); + } + throw new RuntimeException("Not retryable client failure", e); } catch (Exception e) { throw new RuntimeException("Unknown kinesis failure, when trying to reach kinesis", e); } http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java index 68ca0d7..0ea37ec 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java @@ -17,14 +17,14 @@ */ package org.apache.beam.sdk.io.kinesis; -import com.amazonaws.AmazonServiceException; +import com.amazonaws.AmazonClientException; /** * A transient exception thrown by Kinesis. */ class TransientKinesisException extends Exception { - public TransientKinesisException(String s, AmazonServiceException e) { + public TransientKinesisException(String s, AmazonClientException e) { super(s, e); } } http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java index fe257ad..d6e8817 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java @@ -26,6 +26,7 @@ import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode import com.amazonaws.AmazonWebServiceRequest; import com.amazonaws.ResponseMetadata; import com.amazonaws.regions.Region; +import com.amazonaws.services.cloudwatch.AmazonCloudWatch; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.model.AddTagsToStreamRequest; import com.amazonaws.services.kinesis.model.AddTagsToStreamResult; @@ -74,6 +75,7 @@ import javax.annotation.Nullable; import org.apache.commons.lang.builder.EqualsBuilder; import org.joda.time.Instant; +import org.mockito.Mockito; /** * Mock implemenation of {@link AmazonKinesis} for testing. @@ -117,7 +119,7 @@ class AmazonKinesisMock implements AmazonKinesis { } } - static class Provider implements KinesisClientProvider { + static class Provider implements AWSClientsProvider { private final List<List<TestData>> shardedData; private final int numberOfRecordsPerGet; @@ -128,7 +130,7 @@ class AmazonKinesisMock implements AmazonKinesis { } @Override - public AmazonKinesis get() { + public AmazonKinesis getKinesisClient() { return new AmazonKinesisMock(transform(shardedData, new Function<List<TestData>, List<Record>>() { @@ -143,6 +145,12 @@ class AmazonKinesisMock implements AmazonKinesis { }); } }), numberOfRecordsPerGet); + + } + + @Override + public AmazonCloudWatch getCloudWatchClient() { + return Mockito.mock(AmazonCloudWatch.class); } } http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java index 44ad67d..73554bb 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java @@ -51,8 +51,9 @@ public class KinesisMockReadTest { PCollection<AmazonKinesisMock.TestData> result = p .apply( KinesisIO.read() - .from("stream", InitialPositionInStream.TRIM_HORIZON) - .withClientProvider(new AmazonKinesisMock.Provider(testData, 10)) + .withStreamName("stream") + .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) + .withAWSClientsProvider(new AmazonKinesisMock.Provider(testData, 10)) .withMaxNumRecords(noOfShards * noOfEventsPerShard)) .apply(ParDo.of(new KinesisRecordToTestData())); PAssert.that(result).containsInAnyOrder(Iterables.concat(testData)); http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java index 5781033..7126594 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java @@ -84,8 +84,9 @@ public class KinesisReaderIT { PCollection<String> result = p. apply(KinesisIO.read() - .from(options.getAwsKinesisStream(), Instant.now()) - .withClientProvider(options.getAwsAccessKey(), options.getAwsSecretKey(), + .withStreamName(options.getAwsKinesisStream()) + .withInitialTimestampInStream(Instant.now()) + .withAWSClientsProvider(options.getAwsAccessKey(), options.getAwsSecretKey(), Regions.fromName(options.getAwsKinesisRegion())) .withMaxReadTime(Duration.standardMinutes(3)) ). http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java index 1af74b6..22d8bce 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java @@ -19,6 +19,8 @@ package org.apache.beam.sdk.io.kinesis; import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -26,6 +28,7 @@ import java.io.IOException; import java.util.NoSuchElementException; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; import org.junit.Test; @@ -50,6 +53,8 @@ public class KinesisReaderTest { private ShardRecordsIterator firstIterator, secondIterator; @Mock private KinesisRecord a, b, c, d; + @Mock + private KinesisSource kinesisSource; private KinesisReader reader; @@ -67,7 +72,7 @@ public class KinesisReaderTest { when(c.getApproximateArrivalTimestamp()).thenReturn(Instant.now()); when(d.getApproximateArrivalTimestamp()).thenReturn(Instant.now()); - reader = new KinesisReader(kinesis, generator, null); + reader = new KinesisReader(kinesis, generator, kinesisSource, Duration.ZERO, Duration.ZERO); } @Test @@ -195,4 +200,31 @@ public class KinesisReaderTest { return record; } + @Test + public void getTotalBacklogBytesShouldReturnLastSeenValueWhenKinesisExceptionsOccur() + throws TransientKinesisException { + when(kinesisSource.getStreamName()).thenReturn("stream1"); + when(kinesis.getBacklogBytes(eq("stream1"), any(Instant.class))) + .thenReturn(10L) + .thenThrow(TransientKinesisException.class) + .thenReturn(20L); + + assertThat(reader.getTotalBacklogBytes()).isEqualTo(10); + assertThat(reader.getTotalBacklogBytes()).isEqualTo(10); + assertThat(reader.getTotalBacklogBytes()).isEqualTo(20); + } + + @Test + public void getTotalBacklogBytesShouldReturnLastSeenValueWhenCalledFrequently() + throws TransientKinesisException { + KinesisReader backlogCachingReader = new KinesisReader(kinesis, generator, kinesisSource, + Duration.ZERO, Duration.standardSeconds(30)); + when(kinesisSource.getStreamName()).thenReturn("stream1"); + when(kinesis.getBacklogBytes(eq("stream1"), any(Instant.class))) + .thenReturn(10L) + .thenReturn(20L); + + assertThat(backlogCachingReader.getTotalBacklogBytes()).isEqualTo(10); + assertThat(backlogCachingReader.getTotalBacklogBytes()).isEqualTo(10); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java index 2f8757c..75c0ae0 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java @@ -21,9 +21,14 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verifyZeroInteractions; import com.amazonaws.AmazonServiceException; import com.amazonaws.AmazonServiceException.ErrorType; +import com.amazonaws.services.cloudwatch.AmazonCloudWatch; +import com.amazonaws.services.cloudwatch.model.Datapoint; +import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsRequest; +import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsResult; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.model.DescribeStreamResult; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; @@ -38,6 +43,7 @@ import com.amazonaws.services.kinesis.model.StreamDescription; import java.util.List; import org.joda.time.Instant; +import org.joda.time.Minutes; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; @@ -58,6 +64,8 @@ public class SimplifiedKinesisClientTest { @Mock private AmazonKinesis kinesis; + @Mock + private AmazonCloudWatch cloudWatch; @InjectMocks private SimplifiedKinesisClient underTest; @@ -219,6 +227,105 @@ public class SimplifiedKinesisClientTest { } } + @Test + public void shouldCountBytesWhenSingleDataPointReturned() throws Exception { + Instant countSince = new Instant("2017-04-06T10:00:00.000Z"); + Instant countTo = new Instant("2017-04-06T11:00:00.000Z"); + Minutes periodTime = Minutes.minutesBetween(countSince, countTo); + GetMetricStatisticsRequest metricStatisticsRequest = + underTest.createMetricStatisticsRequest(STREAM, countSince, countTo, periodTime); + GetMetricStatisticsResult result = new GetMetricStatisticsResult() + .withDatapoints(new Datapoint().withSum(1.0)); + + given(cloudWatch.getMetricStatistics(metricStatisticsRequest)).willReturn(result); + + long backlogBytes = underTest.getBacklogBytes(STREAM, countSince, countTo); + + assertThat(backlogBytes).isEqualTo(1L); + } + + @Test + public void shouldCountBytesWhenMultipleDataPointsReturned() throws Exception { + Instant countSince = new Instant("2017-04-06T10:00:00.000Z"); + Instant countTo = new Instant("2017-04-06T11:00:00.000Z"); + Minutes periodTime = Minutes.minutesBetween(countSince, countTo); + GetMetricStatisticsRequest metricStatisticsRequest = + underTest.createMetricStatisticsRequest(STREAM, countSince, countTo, periodTime); + GetMetricStatisticsResult result = new GetMetricStatisticsResult() + .withDatapoints( + new Datapoint().withSum(1.0), + new Datapoint().withSum(3.0), + new Datapoint().withSum(2.0) + ); + + given(cloudWatch.getMetricStatistics(metricStatisticsRequest)).willReturn(result); + + long backlogBytes = underTest.getBacklogBytes(STREAM, countSince, countTo); + + assertThat(backlogBytes).isEqualTo(6L); + } + + @Test + public void shouldNotCallCloudWatchWhenSpecifiedPeriodTooShort() throws Exception { + Instant countSince = new Instant("2017-04-06T10:00:00.000Z"); + Instant countTo = new Instant("2017-04-06T10:00:02.000Z"); + + long backlogBytes = underTest.getBacklogBytes(STREAM, countSince, countTo); + + assertThat(backlogBytes).isEqualTo(0L); + verifyZeroInteractions(cloudWatch); + } + + @Test + public void shouldHandleLimitExceededExceptionForGetBacklogBytes() { + shouldHandleGetBacklogBytesError(new LimitExceededException(""), + TransientKinesisException.class); + } + + @Test + public void shouldHandleProvisionedThroughputExceededExceptionForGetBacklogBytes() { + shouldHandleGetBacklogBytesError(new ProvisionedThroughputExceededException(""), + TransientKinesisException.class); + } + + @Test + public void shouldHandleServiceErrorForGetBacklogBytes() { + shouldHandleGetBacklogBytesError(newAmazonServiceException(ErrorType.Service), + TransientKinesisException.class); + } + + @Test + public void shouldHandleClientErrorForGetBacklogBytes() { + shouldHandleGetBacklogBytesError(newAmazonServiceException(ErrorType.Client), + RuntimeException.class); + } + + @Test + public void shouldHandleUnexpectedExceptionForGetBacklogBytes() { + shouldHandleGetBacklogBytesError(new NullPointerException(), + RuntimeException.class); + } + + private void shouldHandleGetBacklogBytesError( + Exception thrownException, + Class<? extends Exception> expectedExceptionClass) { + Instant countSince = new Instant("2017-04-06T10:00:00.000Z"); + Instant countTo = new Instant("2017-04-06T11:00:00.000Z"); + Minutes periodTime = Minutes.minutesBetween(countSince, countTo); + GetMetricStatisticsRequest metricStatisticsRequest = + underTest.createMetricStatisticsRequest(STREAM, countSince, countTo, periodTime); + + given(cloudWatch.getMetricStatistics(metricStatisticsRequest)).willThrow(thrownException); + try { + underTest.getBacklogBytes(STREAM, countSince, countTo); + failBecauseExceptionWasNotThrown(expectedExceptionClass); + } catch (Exception e) { + assertThat(e).isExactlyInstanceOf(expectedExceptionClass); + } finally { + reset(kinesis); + } + } + private AmazonServiceException newAmazonServiceException(ErrorType errorType) { AmazonServiceException exception = new AmazonServiceException(""); exception.setErrorType(errorType);