Repository: beam
Updated Branches:
  refs/heads/master 25b9c35a9 -> 7db0f13a1


[BEAM-2455] Backlog size retrieval for Kinesis source


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7cb3dda1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7cb3dda1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7cb3dda1

Branch: refs/heads/master
Commit: 7cb3dda1bc712f3cac5296e5d7cae80142bd8b10
Parents: 25b9c35
Author: Pawel Kaczmarczyk <p.kaczmarc...@ocado.com>
Authored: Wed Jul 12 17:16:33 2017 +0200
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Fri Sep 29 16:46:45 2017 -0700

----------------------------------------------------------------------
 sdks/java/io/kinesis/pom.xml                    |   6 ++
 .../beam/sdk/io/kinesis/AWSClientsProvider.java |  36 +++++++
 .../sdk/io/kinesis/KinesisClientProvider.java   |  33 ------
 .../apache/beam/sdk/io/kinesis/KinesisIO.java   | 107 +++++++++++++------
 .../beam/sdk/io/kinesis/KinesisReader.java      |  50 ++++++++-
 .../beam/sdk/io/kinesis/KinesisSource.java      |  40 ++++---
 .../sdk/io/kinesis/SimplifiedKinesisClient.java |  97 +++++++++++++++--
 .../io/kinesis/TransientKinesisException.java   |   4 +-
 .../beam/sdk/io/kinesis/AmazonKinesisMock.java  |  12 ++-
 .../sdk/io/kinesis/KinesisMockReadTest.java     |   5 +-
 .../beam/sdk/io/kinesis/KinesisReaderIT.java    |   5 +-
 .../beam/sdk/io/kinesis/KinesisReaderTest.java  |  34 +++++-
 .../io/kinesis/SimplifiedKinesisClientTest.java | 107 +++++++++++++++++++
 13 files changed, 434 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/pom.xml b/sdks/java/io/kinesis/pom.xml
index 872c590..2a54cc1 100644
--- a/sdks/java/io/kinesis/pom.xml
+++ b/sdks/java/io/kinesis/pom.xml
@@ -73,6 +73,12 @@
 
     <dependency>
       <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk-cloudwatch</artifactId>
+      <version>${aws.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.amazonaws</groupId>
       <artifactId>amazon-kinesis-client</artifactId>
       <version>1.6.1</version>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/AWSClientsProvider.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/AWSClientsProvider.java
 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/AWSClientsProvider.java
new file mode 100644
index 0000000..c82e4b1
--- /dev/null
+++ 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/AWSClientsProvider.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+
+import java.io.Serializable;
+
+/**
+ * Provides instances of AWS clients.
+ *
+ * <p>Please note, that any instance of {@link AWSClientsProvider} must be
+ * {@link Serializable} to ensure it can be sent to worker machines.
+ */
+public interface AWSClientsProvider extends Serializable {
+
+  AmazonKinesis getKinesisClient();
+
+  AmazonCloudWatch getCloudWatchClient();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
deleted file mode 100644
index c48f9cc..0000000
--- 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.kinesis;
-
-import com.amazonaws.services.kinesis.AmazonKinesis;
-
-import java.io.Serializable;
-
-/**
- * Provides instances of {@link AmazonKinesis} interface.
- *
- * <p>Please note, that any instance of {@link KinesisClientProvider} must be
- * {@link Serializable} to ensure it can be sent to worker machines.
- */
-public interface KinesisClientProvider extends Serializable {
-
-  AmazonKinesis get();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
index ef39a91..96f7a04 100644
--- 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
+++ 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
@@ -23,6 +23,8 @@ import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.BasicAWSCredentials;
 import com.amazonaws.internal.StaticCredentialsProvider;
 import com.amazonaws.regions.Regions;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.AmazonKinesisClient;
 import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
@@ -46,8 +48,9 @@ import org.joda.time.Instant;
  *
  * <pre>{@code
  * p.apply(KinesisIO.read()
- *     .from("streamName", InitialPositionInStream.LATEST)
- *     .withClientProvider("AWS_KEY", _"AWS_SECRET", STREAM_REGION)
+ *     .withStreamName("streamName")
+ *     .withInitialPositionInStream(InitialPositionInStream.LATEST)
+ *     .withAWSClientsProvider("AWS_KEY", _"AWS_SECRET", STREAM_REGION)
  *  .apply( ... ) // other transformations
  * }</pre>
  *
@@ -60,23 +63,28 @@ import org.joda.time.Instant;
  *     <li>{@link InitialPositionInStream#TRIM_HORIZON} - reading will begin at
  *        the very beginning of the stream</li>
  *   </ul></li>
- *   <li>data used to initialize {@link AmazonKinesis} client:
+ *   <li>data used to initialize {@link AmazonKinesis} and {@link 
AmazonCloudWatch} clients:
  *   <ul>
  *     <li>credentials (aws key, aws secret)</li>
  *    <li>region where the stream is located</li>
  *   </ul></li>
  * </ul>
  *
- * <p>In case when you want to set up {@link AmazonKinesis} client by your own
- * (for example if you're using more sophisticated authorization methods like 
Amazon STS, etc.)
- * you can do it by implementing {@link KinesisClientProvider} class:
+ * <p>In case when you want to set up {@link AmazonKinesis} or {@link 
AmazonCloudWatch} client by
+ * your own (for example if you're using more sophisticated authorization 
methods like Amazon
+ * STS, etc.) you can do it by implementing {@link AWSClientsProvider} class:
  *
  * <pre>{@code
- * public class MyCustomKinesisClientProvider implements KinesisClientProvider 
{
+ * public class MyCustomKinesisClientProvider implements AWSClientsProvider {
  *   {@literal @}Override
- *   public AmazonKinesis get() {
+ *   public AmazonKinesis getKinesisClient() {
  *     // set up your client here
  *   }
+ *
+ *   public AmazonCloudWatch getCloudWatchClient() {
+ *     // set up your client here
+ *   }
+ *
  * }
  * }</pre>
  *
@@ -84,8 +92,9 @@ import org.joda.time.Instant;
  *
  * <pre>{@code
  * p.apply(KinesisIO.read()
- *    .from("streamName", InitialPositionInStream.LATEST)
- *    .withClientProvider(new MyCustomKinesisClientProvider())
+ *    .withStreamName("streamName")
+ *    .withInitialPositionInStream(InitialPositionInStream.LATEST)
+ *    .withAWSClientsProvider(new MyCustomKinesisClientProvider())
  *  .apply( ... ) // other transformations
  * }</pre>
  *
@@ -94,8 +103,9 @@ import org.joda.time.Instant;
  *
  * <pre>{@code
  * p.apply(KinesisIO.read()
- *     .from("streamName", instant)
- *     .withClientProvider(new MyCustomKinesisClientProvider())
+ *     .withStreamName("streamName")
+ *     .withInitialTimestampInStream(instant)
+ *     .withAWSClientsProvider(new MyCustomKinesisClientProvider())
  *  .apply( ... ) // other transformations
  * }</pre>
  *
@@ -105,7 +115,10 @@ public final class KinesisIO {
 
   /** Returns a new {@link Read} transform for reading from Kinesis. */
   public static Read read() {
-    return new AutoValue_KinesisIO_Read.Builder().setMaxNumRecords(-1).build();
+    return new AutoValue_KinesisIO_Read.Builder()
+        .setMaxNumRecords(-1)
+        .setUpToDateThreshold(Duration.ZERO)
+        .build();
   }
 
   /** Implementation of {@link #read}. */
@@ -119,13 +132,15 @@ public final class KinesisIO {
     abstract StartingPoint getInitialPosition();
 
     @Nullable
-    abstract KinesisClientProvider getClientProvider();
+    abstract AWSClientsProvider getAWSClientsProvider();
 
     abstract int getMaxNumRecords();
 
     @Nullable
     abstract Duration getMaxReadTime();
 
+    abstract Duration getUpToDateThreshold();
+
     abstract Builder toBuilder();
 
     @AutoValue.Builder
@@ -135,54 +150,61 @@ public final class KinesisIO {
 
       abstract Builder setInitialPosition(StartingPoint startingPoint);
 
-      abstract Builder setClientProvider(KinesisClientProvider clientProvider);
+      abstract Builder setAWSClientsProvider(AWSClientsProvider 
clientProvider);
 
       abstract Builder setMaxNumRecords(int maxNumRecords);
 
       abstract Builder setMaxReadTime(Duration maxReadTime);
 
+      abstract Builder setUpToDateThreshold(Duration upToDateThreshold);
+
       abstract Read build();
     }
 
     /**
-     * Specify reading from streamName at some initial position.
+     * Specify reading from streamName.
+     */
+    public Read withStreamName(String streamName) {
+      return toBuilder().setStreamName(streamName).build();
+    }
+
+    /**
+     * Specify reading from some initial position in stream.
      */
-    public Read from(String streamName, InitialPositionInStream 
initialPosition) {
+    public Read withInitialPositionInStream(InitialPositionInStream 
initialPosition) {
       return toBuilder()
-          .setStreamName(streamName)
           .setInitialPosition(new StartingPoint(initialPosition))
           .build();
     }
 
     /**
-     * Specify reading from streamName beginning at given {@link Instant}.
+     * Specify reading beginning at given {@link Instant}.
      * This {@link Instant} must be in the past, i.e. before {@link 
Instant#now()}.
      */
-    public Read from(String streamName, Instant initialTimestamp) {
+    public Read withInitialTimestampInStream(Instant initialTimestamp) {
       return toBuilder()
-          .setStreamName(streamName)
           .setInitialPosition(new StartingPoint(initialTimestamp))
           .build();
     }
 
     /**
-     * Allows to specify custom {@link KinesisClientProvider}.
-     * {@link KinesisClientProvider} provides {@link AmazonKinesis} instances 
which are later
-     * used for communication with Kinesis.
-     * You should use this method if {@link Read#withClientProvider(String, 
String, Regions)}
+     * Allows to specify custom {@link AWSClientsProvider}.
+     * {@link AWSClientsProvider} provides {@link AmazonKinesis} and {@link 
AmazonCloudWatch}
+     * instances which are later used for communication with Kinesis.
+     * You should use this method if {@link 
Read#withAWSClientsProvider(String, String, Regions)}
      * does not suit your needs.
      */
-    public Read withClientProvider(KinesisClientProvider 
kinesisClientProvider) {
-      return toBuilder().setClientProvider(kinesisClientProvider).build();
+    public Read withAWSClientsProvider(AWSClientsProvider awsClientsProvider) {
+      return toBuilder().setAWSClientsProvider(awsClientsProvider).build();
     }
 
     /**
      * Specify credential details and region to be used to read from Kinesis.
      * If you need more sophisticated credential protocol, then you should 
look at
-     * {@link Read#withClientProvider(KinesisClientProvider)}.
+     * {@link Read#withAWSClientsProvider(AWSClientsProvider)}.
      */
-    public Read withClientProvider(String awsAccessKey, String awsSecretKey, 
Regions region) {
-      return withClientProvider(new BasicKinesisProvider(awsAccessKey, 
awsSecretKey, region));
+    public Read withAWSClientsProvider(String awsAccessKey, String 
awsSecretKey, Regions region) {
+      return withAWSClientsProvider(new BasicKinesisProvider(awsAccessKey, 
awsSecretKey, region));
     }
 
     /** Specifies to read at most a given number of records. */
@@ -198,11 +220,23 @@ public final class KinesisIO {
       return toBuilder().setMaxReadTime(maxReadTime).build();
     }
 
+    /**
+     * Specifies how late records consumed by this source can be to still be 
considered on time.
+     * When this limit is exceeded the actual backlog size will be evaluated 
and the runner might
+     * decide to scale the amount of resources allocated to the pipeline in 
order to
+     * speed up ingestion.
+     */
+    public Read withUpToDateThreshold(Duration upToDateThreshold) {
+      checkArgument(upToDateThreshold != null, "upToDateThreshold can not be 
null");
+      return toBuilder().setUpToDateThreshold(upToDateThreshold).build();
+    }
+
     @Override
     public PCollection<KinesisRecord> expand(PBegin input) {
       org.apache.beam.sdk.io.Read.Unbounded<KinesisRecord> read =
           org.apache.beam.sdk.io.Read.from(
-              new KinesisSource(getClientProvider(), getStreamName(), 
getInitialPosition()));
+              new KinesisSource(getAWSClientsProvider(), getStreamName(),
+                  getInitialPosition(), getUpToDateThreshold()));
       if (getMaxNumRecords() > 0) {
         BoundedReadFromUnboundedSource<KinesisRecord> bounded =
             read.withMaxNumRecords(getMaxNumRecords());
@@ -216,7 +250,7 @@ public final class KinesisIO {
       }
     }
 
-    private static final class BasicKinesisProvider implements 
KinesisClientProvider {
+    private static final class BasicKinesisProvider implements 
AWSClientsProvider {
 
       private final String accessKey;
       private final String secretKey;
@@ -240,11 +274,18 @@ public final class KinesisIO {
       }
 
       @Override
-      public AmazonKinesis get() {
+      public AmazonKinesis getKinesisClient() {
         AmazonKinesisClient client = new 
AmazonKinesisClient(getCredentialsProvider());
         client.withRegion(region);
         return client;
       }
+
+      @Override
+      public AmazonCloudWatch getCloudWatchClient() {
+        AmazonCloudWatchClient client = new 
AmazonCloudWatchClient(getCredentialsProvider());
+        client.withRegion(region);
+        return client;
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
index 1abcd98..8095150 100644
--- 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
+++ 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
@@ -62,25 +62,41 @@ class KinesisReader extends 
UnboundedSource.UnboundedReader<KinesisRecord> {
   private static final int MIN_WATERMARK_SPREAD = 2;
 
   private final SimplifiedKinesisClient kinesis;
-  private final UnboundedSource<KinesisRecord, ?> source;
+  private final KinesisSource source;
   private final CheckpointGenerator initialCheckpointGenerator;
   private RoundRobin<ShardRecordsIterator> shardIterators;
   private CustomOptional<KinesisRecord> currentRecord = 
CustomOptional.absent();
   private MovingFunction minReadTimestampMsSinceEpoch;
   private Instant lastWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+  private long lastBacklogBytes;
+  private Instant backlogBytesLastCheckTime = new Instant(0L);
+  private Duration upToDateThreshold;
+  private Duration backlogBytesCheckThreshold;
 
-  public KinesisReader(SimplifiedKinesisClient kinesis,
+  KinesisReader(SimplifiedKinesisClient kinesis,
       CheckpointGenerator initialCheckpointGenerator,
-      UnboundedSource<KinesisRecord, ?> source) {
+      KinesisSource source,
+      Duration upToDateThreshold) {
+    this(kinesis, initialCheckpointGenerator, source, upToDateThreshold,
+        Duration.standardSeconds(30));
+  }
+
+  KinesisReader(SimplifiedKinesisClient kinesis,
+      CheckpointGenerator initialCheckpointGenerator,
+      KinesisSource source,
+      Duration upToDateThreshold,
+      Duration backlogBytesCheckThreshold) {
     this.kinesis = checkNotNull(kinesis, "kinesis");
-    this.initialCheckpointGenerator =
-        checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator");
+    this.initialCheckpointGenerator = checkNotNull(initialCheckpointGenerator,
+        "initialCheckpointGenerator");
     this.source = source;
     this.minReadTimestampMsSinceEpoch = new 
MovingFunction(SAMPLE_PERIOD.getMillis(),
         SAMPLE_UPDATE.getMillis(),
         MIN_WATERMARK_SPREAD,
         MIN_WATERMARK_MESSAGES,
         Min.ofLongs());
+    this.upToDateThreshold = upToDateThreshold;
+    this.backlogBytesCheckThreshold = backlogBytesCheckThreshold;
   }
 
   /**
@@ -181,4 +197,28 @@ class KinesisReader extends 
UnboundedSource.UnboundedReader<KinesisRecord> {
     return source;
   }
 
+  /**
+   * Returns total size of all records that remain in Kinesis stream after 
current watermark.
+   * When currently processed record is not further behind than {@link 
#upToDateThreshold}
+   * then this method returns 0.
+   */
+  @Override
+  public long getTotalBacklogBytes() {
+    Instant watermark = getWatermark();
+    if (watermark.plus(upToDateThreshold).isAfterNow()) {
+      return 0L;
+    }
+    if 
(backlogBytesLastCheckTime.plus(backlogBytesCheckThreshold).isAfterNow()) {
+      return lastBacklogBytes;
+    }
+    try {
+      lastBacklogBytes = kinesis.getBacklogBytes(source.getStreamName(), 
watermark);
+      backlogBytesLastCheckTime = Instant.now();
+    } catch (TransientKinesisException e) {
+      LOG.warn("Transient exception occurred.", e);
+    }
+    LOG.info("Total backlog bytes for {} stream with {} watermark: {}", 
source.getStreamName(),
+        watermark, lastBacklogBytes);
+    return lastBacklogBytes;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
index 144bd80..b1a6c19 100644
--- 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
+++ 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,18 +37,24 @@ class KinesisSource extends UnboundedSource<KinesisRecord, 
KinesisReaderCheckpoi
 
   private static final Logger LOG = 
LoggerFactory.getLogger(KinesisSource.class);
 
-  private final KinesisClientProvider kinesis;
+  private final AWSClientsProvider awsClientsProvider;
+  private final String streamName;
+  private final Duration upToDateThreshold;
   private CheckpointGenerator initialCheckpointGenerator;
 
-  public KinesisSource(KinesisClientProvider kinesis, String streamName,
-      StartingPoint startingPoint) {
-    this(kinesis, new DynamicCheckpointGenerator(streamName, startingPoint));
+  KinesisSource(AWSClientsProvider awsClientsProvider, String streamName,
+      StartingPoint startingPoint, Duration upToDateThreshold) {
+    this(awsClientsProvider, new DynamicCheckpointGenerator(streamName, 
startingPoint), streamName,
+        upToDateThreshold);
   }
 
-  private KinesisSource(KinesisClientProvider kinesisClientProvider,
-      CheckpointGenerator initialCheckpoint) {
-    this.kinesis = kinesisClientProvider;
+  private KinesisSource(AWSClientsProvider awsClientsProvider,
+      CheckpointGenerator initialCheckpoint, String streamName,
+      Duration upToDateThreshold) {
+    this.awsClientsProvider = awsClientsProvider;
     this.initialCheckpointGenerator = initialCheckpoint;
+    this.streamName = streamName;
+    this.upToDateThreshold = upToDateThreshold;
     validate();
   }
 
@@ -60,14 +67,16 @@ class KinesisSource extends UnboundedSource<KinesisRecord, 
KinesisReaderCheckpoi
   public List<KinesisSource> split(int desiredNumSplits,
       PipelineOptions options) throws Exception {
     KinesisReaderCheckpoint checkpoint =
-        
initialCheckpointGenerator.generate(SimplifiedKinesisClient.from(kinesis));
+        
initialCheckpointGenerator.generate(SimplifiedKinesisClient.from(awsClientsProvider));
 
     List<KinesisSource> sources = newArrayList();
 
     for (KinesisReaderCheckpoint partition : 
checkpoint.splitInto(desiredNumSplits)) {
       sources.add(new KinesisSource(
-          kinesis,
-          new StaticCheckpointGenerator(partition)));
+          awsClientsProvider,
+          new StaticCheckpointGenerator(partition),
+          streamName,
+          upToDateThreshold));
     }
     return sources;
   }
@@ -90,9 +99,10 @@ class KinesisSource extends UnboundedSource<KinesisRecord, 
KinesisReaderCheckpoi
     LOG.info("Creating new reader using {}", checkpointGenerator);
 
     return new KinesisReader(
-        SimplifiedKinesisClient.from(kinesis),
+        SimplifiedKinesisClient.from(awsClientsProvider),
         checkpointGenerator,
-        this);
+        this,
+        upToDateThreshold);
   }
 
   @Override
@@ -102,7 +112,7 @@ class KinesisSource extends UnboundedSource<KinesisRecord, 
KinesisReaderCheckpoi
 
   @Override
   public void validate() {
-    checkNotNull(kinesis);
+    checkNotNull(awsClientsProvider);
     checkNotNull(initialCheckpointGenerator);
   }
 
@@ -110,4 +120,8 @@ class KinesisSource extends UnboundedSource<KinesisRecord, 
KinesisReaderCheckpoi
   public Coder<KinesisRecord> getOutputCoder() {
     return KinesisRecordCoder.of();
   }
+
+  String getStreamName() {
+    return streamName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
index e83fc8b..74605e5 100644
--- 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
+++ 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
@@ -17,7 +17,15 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.amazonaws.AmazonClientException;
 import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
+import com.amazonaws.services.cloudwatch.model.Datapoint;
+import com.amazonaws.services.cloudwatch.model.Dimension;
+import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsRequest;
+import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsResult;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
@@ -31,11 +39,13 @@ import 
com.amazonaws.services.kinesis.model.ShardIteratorType;
 import com.amazonaws.services.kinesis.model.StreamDescription;
 import com.google.common.collect.Lists;
 
+import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.concurrent.Callable;
 
 import org.joda.time.Instant;
+import org.joda.time.Minutes;
 
 /**
  * Wraps {@link AmazonKinesis} class providing much simpler interface and
@@ -43,14 +53,22 @@ import org.joda.time.Instant;
  */
 class SimplifiedKinesisClient {
 
+  private static final String KINESIS_NAMESPACE = "AWS/Kinesis";
+  private static final String INCOMING_RECORDS_METRIC = "IncomingBytes";
+  private static final int PERIOD_GRANULARITY_IN_SECONDS = 60;
+  private static final String SUM_STATISTIC = "Sum";
+  private static final String STREAM_NAME_DIMENSION = "StreamName";
   private final AmazonKinesis kinesis;
+  private final AmazonCloudWatch cloudWatch;
 
-  public SimplifiedKinesisClient(AmazonKinesis kinesis) {
-    this.kinesis = kinesis;
+  public SimplifiedKinesisClient(AmazonKinesis kinesis, AmazonCloudWatch 
cloudWatch) {
+    this.kinesis = checkNotNull(kinesis, "kinesis");
+    this.cloudWatch = checkNotNull(cloudWatch, "cloudWatch");
   }
 
-  public static SimplifiedKinesisClient from(KinesisClientProvider provider) {
-    return new SimplifiedKinesisClient(provider.get());
+  public static SimplifiedKinesisClient from(AWSClientsProvider provider) {
+    return new SimplifiedKinesisClient(provider.getKinesisClient(),
+        provider.getCloudWatchClient());
   }
 
   public String getShardIterator(final String streamName, final String shardId,
@@ -133,13 +151,69 @@ class SimplifiedKinesisClient {
   }
 
   /**
+   * Gets total size in bytes of all events that remain in Kinesis stream 
after specified instant.
+   *
+   * @return total size in bytes of all Kinesis events after specified instant
+   */
+  public long getBacklogBytes(String streamName, Instant countSince)
+      throws TransientKinesisException {
+    return getBacklogBytes(streamName, countSince, new Instant());
+  }
+
+  /**
+   * Gets total size in bytes of all events that remain in Kinesis stream 
between specified
+   * instants.
+   *
+   * @return total size in bytes of all Kinesis events after specified instant
+   */
+  public long getBacklogBytes(final String streamName, final Instant 
countSince,
+      final Instant countTo) throws TransientKinesisException {
+    return wrapExceptions(new Callable<Long>() {
+
+      @Override
+      public Long call() throws Exception {
+        Minutes period = Minutes.minutesBetween(countSince, countTo);
+        if (period.isLessThan(Minutes.ONE)) {
+          return 0L;
+        }
+
+        GetMetricStatisticsRequest request = 
createMetricStatisticsRequest(streamName,
+            countSince, countTo, period);
+
+        long totalSizeInBytes = 0;
+        GetMetricStatisticsResult result = 
cloudWatch.getMetricStatistics(request);
+        for (Datapoint point : result.getDatapoints()) {
+          totalSizeInBytes += point
+              .getSum()
+              .longValue();
+        }
+        return totalSizeInBytes;
+      }
+    });
+  }
+
+  GetMetricStatisticsRequest createMetricStatisticsRequest(String streamName, 
Instant countSince,
+      Instant countTo, Minutes period) {
+    return new GetMetricStatisticsRequest()
+        .withNamespace(KINESIS_NAMESPACE)
+        .withMetricName(INCOMING_RECORDS_METRIC)
+        .withPeriod(period.getMinutes() * PERIOD_GRANULARITY_IN_SECONDS)
+        .withStartTime(countSince.toDate())
+        .withEndTime(countTo.toDate())
+        .withStatistics(Collections.singletonList(SUM_STATISTIC))
+        .withDimensions(Collections.singletonList(new Dimension()
+            .withName(STREAM_NAME_DIMENSION)
+            .withValue(streamName)));
+  }
+
+  /**
    * Wraps Amazon specific exceptions into more friendly format.
    *
-   * @throws TransientKinesisException              - in case of recoverable 
situation, i.e.
-   *                                  the request rate is too high, Kinesis 
remote service
-   *                                  failed, network issue, etc.
-   * @throws ExpiredIteratorException - if iterator needs to be refreshed
-   * @throws RuntimeException         - in all other cases
+   * @throws TransientKinesisException - in case of recoverable situation, i.e.
+   *                                   the request rate is too high, Kinesis 
remote service
+   *                                   failed, network issue, etc.
+   * @throws ExpiredIteratorException  - if iterator needs to be refreshed
+   * @throws RuntimeException          - in all other cases
    */
   private <T> T wrapExceptions(Callable<T> callable) throws 
TransientKinesisException {
     try {
@@ -155,6 +229,11 @@ class SimplifiedKinesisClient {
             "Kinesis backend failed. Wait some time and retry.", e);
       }
       throw new RuntimeException("Kinesis client side failure", e);
+    } catch (AmazonClientException e) {
+      if (e.isRetryable()) {
+        throw new TransientKinesisException("Retryable client failure", e);
+      }
+      throw new RuntimeException("Not retryable client failure", e);
     } catch (Exception e) {
       throw new RuntimeException("Unknown kinesis failure, when trying to 
reach kinesis", e);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
index 68ca0d7..0ea37ec 100644
--- 
a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
+++ 
b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
@@ -17,14 +17,14 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-import com.amazonaws.AmazonServiceException;
+import com.amazonaws.AmazonClientException;
 
 /**
  * A transient exception thrown by Kinesis.
  */
 class TransientKinesisException extends Exception {
 
-  public TransientKinesisException(String s, AmazonServiceException e) {
+  public TransientKinesisException(String s, AmazonClientException e) {
     super(s, e);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
index fe257ad..d6e8817 100644
--- 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
+++ 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
@@ -26,6 +26,7 @@ import static 
org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode
 import com.amazonaws.AmazonWebServiceRequest;
 import com.amazonaws.ResponseMetadata;
 import com.amazonaws.regions.Region;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.model.AddTagsToStreamRequest;
 import com.amazonaws.services.kinesis.model.AddTagsToStreamResult;
@@ -74,6 +75,7 @@ import javax.annotation.Nullable;
 
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.joda.time.Instant;
+import org.mockito.Mockito;
 
 /**
  * Mock implemenation of {@link AmazonKinesis} for testing.
@@ -117,7 +119,7 @@ class AmazonKinesisMock implements AmazonKinesis {
     }
   }
 
-  static class Provider implements KinesisClientProvider {
+  static class Provider implements AWSClientsProvider {
 
     private final List<List<TestData>> shardedData;
     private final int numberOfRecordsPerGet;
@@ -128,7 +130,7 @@ class AmazonKinesisMock implements AmazonKinesis {
     }
 
     @Override
-    public AmazonKinesis get() {
+    public AmazonKinesis getKinesisClient() {
       return new AmazonKinesisMock(transform(shardedData,
           new Function<List<TestData>, List<Record>>() {
 
@@ -143,6 +145,12 @@ class AmazonKinesisMock implements AmazonKinesis {
               });
             }
           }), numberOfRecordsPerGet);
+
+    }
+
+    @Override
+    public AmazonCloudWatch getCloudWatchClient() {
+      return Mockito.mock(AmazonCloudWatch.class);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
index 44ad67d..73554bb 100644
--- 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
+++ 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
@@ -51,8 +51,9 @@ public class KinesisMockReadTest {
     PCollection<AmazonKinesisMock.TestData> result = p
         .apply(
             KinesisIO.read()
-                .from("stream", InitialPositionInStream.TRIM_HORIZON)
-                .withClientProvider(new AmazonKinesisMock.Provider(testData, 
10))
+                .withStreamName("stream")
+                
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
+                .withAWSClientsProvider(new 
AmazonKinesisMock.Provider(testData, 10))
                 .withMaxNumRecords(noOfShards * noOfEventsPerShard))
         .apply(ParDo.of(new KinesisRecordToTestData()));
     PAssert.that(result).containsInAnyOrder(Iterables.concat(testData));

http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
index 5781033..7126594 100644
--- 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
+++ 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
@@ -84,8 +84,9 @@ public class KinesisReaderIT {
 
     PCollection<String> result = p.
         apply(KinesisIO.read()
-            .from(options.getAwsKinesisStream(), Instant.now())
-            .withClientProvider(options.getAwsAccessKey(), 
options.getAwsSecretKey(),
+            .withStreamName(options.getAwsKinesisStream())
+            .withInitialTimestampInStream(Instant.now())
+            .withAWSClientsProvider(options.getAwsAccessKey(), 
options.getAwsSecretKey(),
                 Regions.fromName(options.getAwsKinesisRegion()))
             .withMaxReadTime(Duration.standardMinutes(3))
         ).

http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
index 1af74b6..22d8bce 100644
--- 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
+++ 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.io.kinesis;
 
 import static java.util.Arrays.asList;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -26,6 +28,7 @@ import java.io.IOException;
 import java.util.NoSuchElementException;
 
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Test;
@@ -50,6 +53,8 @@ public class KinesisReaderTest {
   private ShardRecordsIterator firstIterator, secondIterator;
   @Mock
   private KinesisRecord a, b, c, d;
+  @Mock
+  private KinesisSource kinesisSource;
 
   private KinesisReader reader;
 
@@ -67,7 +72,7 @@ public class KinesisReaderTest {
     when(c.getApproximateArrivalTimestamp()).thenReturn(Instant.now());
     when(d.getApproximateArrivalTimestamp()).thenReturn(Instant.now());
 
-    reader = new KinesisReader(kinesis, generator, null);
+    reader = new KinesisReader(kinesis, generator, kinesisSource, 
Duration.ZERO, Duration.ZERO);
   }
 
   @Test
@@ -195,4 +200,31 @@ public class KinesisReaderTest {
     return record;
   }
 
+  @Test
+  public void 
getTotalBacklogBytesShouldReturnLastSeenValueWhenKinesisExceptionsOccur()
+      throws TransientKinesisException {
+    when(kinesisSource.getStreamName()).thenReturn("stream1");
+    when(kinesis.getBacklogBytes(eq("stream1"), any(Instant.class)))
+        .thenReturn(10L)
+        .thenThrow(TransientKinesisException.class)
+        .thenReturn(20L);
+
+    assertThat(reader.getTotalBacklogBytes()).isEqualTo(10);
+    assertThat(reader.getTotalBacklogBytes()).isEqualTo(10);
+    assertThat(reader.getTotalBacklogBytes()).isEqualTo(20);
+  }
+
+  @Test
+  public void 
getTotalBacklogBytesShouldReturnLastSeenValueWhenCalledFrequently()
+      throws TransientKinesisException {
+    KinesisReader backlogCachingReader = new KinesisReader(kinesis, generator, 
kinesisSource,
+        Duration.ZERO, Duration.standardSeconds(30));
+    when(kinesisSource.getStreamName()).thenReturn("stream1");
+    when(kinesis.getBacklogBytes(eq("stream1"), any(Instant.class)))
+        .thenReturn(10L)
+        .thenReturn(20L);
+
+    assertThat(backlogCachingReader.getTotalBacklogBytes()).isEqualTo(10);
+    assertThat(backlogCachingReader.getTotalBacklogBytes()).isEqualTo(10);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7cb3dda1/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
index 2f8757c..75c0ae0 100644
--- 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
+++ 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
@@ -21,9 +21,14 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
 import static org.mockito.BDDMockito.given;
 import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verifyZeroInteractions;
 
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.AmazonServiceException.ErrorType;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
+import com.amazonaws.services.cloudwatch.model.Datapoint;
+import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsRequest;
+import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsResult;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
@@ -38,6 +43,7 @@ import com.amazonaws.services.kinesis.model.StreamDescription;
 import java.util.List;
 
 import org.joda.time.Instant;
+import org.joda.time.Minutes;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
@@ -58,6 +64,8 @@ public class SimplifiedKinesisClientTest {
 
   @Mock
   private AmazonKinesis kinesis;
+  @Mock
+  private AmazonCloudWatch cloudWatch;
   @InjectMocks
   private SimplifiedKinesisClient underTest;
 
@@ -219,6 +227,105 @@ public class SimplifiedKinesisClientTest {
     }
   }
 
+  @Test
+  public void shouldCountBytesWhenSingleDataPointReturned() throws Exception {
+    Instant countSince = new Instant("2017-04-06T10:00:00.000Z");
+    Instant countTo = new Instant("2017-04-06T11:00:00.000Z");
+    Minutes periodTime = Minutes.minutesBetween(countSince, countTo);
+    GetMetricStatisticsRequest metricStatisticsRequest =
+        underTest.createMetricStatisticsRequest(STREAM, countSince, countTo, 
periodTime);
+    GetMetricStatisticsResult result = new GetMetricStatisticsResult()
+        .withDatapoints(new Datapoint().withSum(1.0));
+
+    
given(cloudWatch.getMetricStatistics(metricStatisticsRequest)).willReturn(result);
+
+    long backlogBytes = underTest.getBacklogBytes(STREAM, countSince, countTo);
+
+    assertThat(backlogBytes).isEqualTo(1L);
+  }
+
+  @Test
+  public void shouldCountBytesWhenMultipleDataPointsReturned() throws 
Exception {
+    Instant countSince = new Instant("2017-04-06T10:00:00.000Z");
+    Instant countTo = new Instant("2017-04-06T11:00:00.000Z");
+    Minutes periodTime = Minutes.minutesBetween(countSince, countTo);
+    GetMetricStatisticsRequest metricStatisticsRequest =
+        underTest.createMetricStatisticsRequest(STREAM, countSince, countTo, 
periodTime);
+    GetMetricStatisticsResult result = new GetMetricStatisticsResult()
+        .withDatapoints(
+            new Datapoint().withSum(1.0),
+            new Datapoint().withSum(3.0),
+            new Datapoint().withSum(2.0)
+        );
+
+    
given(cloudWatch.getMetricStatistics(metricStatisticsRequest)).willReturn(result);
+
+    long backlogBytes = underTest.getBacklogBytes(STREAM, countSince, countTo);
+
+    assertThat(backlogBytes).isEqualTo(6L);
+  }
+
+  @Test
+  public void shouldNotCallCloudWatchWhenSpecifiedPeriodTooShort() throws 
Exception {
+    Instant countSince = new Instant("2017-04-06T10:00:00.000Z");
+    Instant countTo = new Instant("2017-04-06T10:00:02.000Z");
+
+    long backlogBytes = underTest.getBacklogBytes(STREAM, countSince, countTo);
+
+    assertThat(backlogBytes).isEqualTo(0L);
+    verifyZeroInteractions(cloudWatch);
+  }
+
+  @Test
+  public void shouldHandleLimitExceededExceptionForGetBacklogBytes() {
+    shouldHandleGetBacklogBytesError(new LimitExceededException(""),
+        TransientKinesisException.class);
+  }
+
+  @Test
+  public void 
shouldHandleProvisionedThroughputExceededExceptionForGetBacklogBytes() {
+    shouldHandleGetBacklogBytesError(new 
ProvisionedThroughputExceededException(""),
+        TransientKinesisException.class);
+  }
+
+  @Test
+  public void shouldHandleServiceErrorForGetBacklogBytes() {
+    
shouldHandleGetBacklogBytesError(newAmazonServiceException(ErrorType.Service),
+        TransientKinesisException.class);
+  }
+
+  @Test
+  public void shouldHandleClientErrorForGetBacklogBytes() {
+    
shouldHandleGetBacklogBytesError(newAmazonServiceException(ErrorType.Client),
+        RuntimeException.class);
+  }
+
+  @Test
+  public void shouldHandleUnexpectedExceptionForGetBacklogBytes() {
+    shouldHandleGetBacklogBytesError(new NullPointerException(),
+        RuntimeException.class);
+  }
+
+  private void shouldHandleGetBacklogBytesError(
+      Exception thrownException,
+      Class<? extends Exception> expectedExceptionClass) {
+    Instant countSince = new Instant("2017-04-06T10:00:00.000Z");
+    Instant countTo = new Instant("2017-04-06T11:00:00.000Z");
+    Minutes periodTime = Minutes.minutesBetween(countSince, countTo);
+    GetMetricStatisticsRequest metricStatisticsRequest =
+        underTest.createMetricStatisticsRequest(STREAM, countSince, countTo, 
periodTime);
+
+    
given(cloudWatch.getMetricStatistics(metricStatisticsRequest)).willThrow(thrownException);
+    try {
+      underTest.getBacklogBytes(STREAM, countSince, countTo);
+      failBecauseExceptionWasNotThrown(expectedExceptionClass);
+    } catch (Exception e) {
+      assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
+    } finally {
+      reset(kinesis);
+    }
+  }
+
   private AmazonServiceException newAmazonServiceException(ErrorType 
errorType) {
     AmazonServiceException exception = new AmazonServiceException("");
     exception.setErrorType(errorType);

Reply via email to