This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 91473c0579a Add GetSize implementation for DetectNewPartitions SDF 
(#23997)
91473c0579a is described below

commit 91473c0579ae4812826b984c4bf02c7e3566b02d
Author: Thiago Nunes <thiagotnu...@google.com>
AuthorDate: Tue Nov 22 05:43:15 2022 +1100

    Add GetSize implementation for DetectNewPartitions SDF (#23997)
    
    * feat: implement getsize for DetectNewPartitions
    
    Adds the GetSize method for the DetectNewPartitions SDF. This will
    calculate the backlog bytes by combining the throughput of the DoFn with
    the work remaining.
    
    * test: add tests for DetectNewPartitionsAction
    
    * fix: fix checkstyle violation
    
    Use vendored guava
    
    * feat: use avro coder for estimating SDF backlog
    
    Instead of using utf8 encoded length, use the avro estimated byte size,
    which more accuretely depicts the size of a record.
    
    * feat: updates DetectNewPartition GetSize
    
    Reports the backlog size of the DetectNewPartitions SDF as the number of
    partitions to be scheduled times the average partition size in bytes.
    
    Updates the throughput estimator to use a smaller window and always
    average the throughput size by the whole size of the window.
    
    * refactor: refactor bytes calculation for records
    
    Move the bytes calculation into the models
    
    * refactor: fix checkstyle violations
    
    * refactor: address PR comments
    
    Rename local variables, makes the window size for the throughput
    estimator to be passed in through the constructor, use a better
    estimate of the partition token.
    
    * refactor: makes model coder static
    
    This improves the performance of the byte estimation significantly.
    
    * feat: update throughput estimator logic
    
    The throughput estimator did not consider bytes from a given second
    until the window slided to the next second. In order to simplify the
    logic we consider all bytes reported in the throughput.
    
    * typo: fix typo in comments
    
    * fix: fix throughtput estimator data structure ops
    
    * refactor: use ptransform coder for size estimate
    
    Uses the PTransform Coder for estimating the size of DataChangeRecords
    and PartitionMetadata records instead of always using AvroCoders.
    
    * refactor: infer coders from pcollections
    
    Infers the coders from the PCollections which is a safer way of
    obtaining them.
    
    * refactor: removes StringUtils
    
    In order to avoid repackaged dependency
    
    * refactor: refactor logs
    
    * refactor: extract constants
    
    * refactor: use for loop instead of stream api
    
    * tests: add comments to throughput estimator test
---
 .../apache/beam/sdk/io/gcp/spanner/SpannerIO.java  |  36 +++--
 .../changestreams/ChangeStreamsConstants.java      |  30 ++++
 .../changestreams/action/ActionFactory.java        |  15 +-
 .../action/ChildPartitionsRecordAction.java        |  21 +--
 .../action/DataChangeRecordAction.java             |  40 +++--
 .../action/DetectNewPartitionsAction.java          |  16 +-
 .../action/HeartbeatRecordAction.java              |   6 +-
 .../action/QueryChangeStreamAction.java            |  37 ++---
 .../changestreams/dao/PartitionMetadataDao.java    |  25 +++
 .../dofn/DetectNewPartitionsDoFn.java              |  40 ++++-
 .../dofn/PostProcessingMetricsDoFn.java            |   9 +-
 .../dofn/ReadChangeStreamPartitionDoFn.java        |  56 ++++---
 .../estimator/BytesThroughputEstimator.java        | 149 +++++++++++++++++
 .../changestreams/estimator/EncodingException.java |  25 +++
 .../estimator/NullThroughputEstimator.java         |  59 +++++++
 .../changestreams/estimator/SizeEstimator.java     |  68 ++++++++
 .../estimator/ThroughputEstimator.java             |  44 ++++++
 .../changestreams/estimator/package-info.java      |  23 +++
 .../restriction/ThroughputEstimator.java           | 115 --------------
 .../restriction/TimestampRangeTracker.java         |  10 +-
 .../action/DataChangeRecordActionTest.java         |  20 ++-
 .../action/DetectNewPartitionsActionTest.java      | 134 ++++++++++++++++
 .../action/QueryChangeStreamActionTest.java        |  51 ++----
 .../dofn/ReadChangeStreamPartitionDoFnTest.java    |  18 +--
 .../estimator/BytesThroughputEstimatorTest.java    | 176 +++++++++++++++++++++
 .../estimator/NullThroughputEstimatorTest.java     |  38 +++++
 .../restriction/ThroughputEstimatorTest.java       | 155 ------------------
 27 files changed, 959 insertions(+), 457 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index b4dd9f7af9c..d584f303986 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -24,6 +24,7 @@ import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsCons
 import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_INCLUSIVE_START_AT;
 import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_RPC_PRIORITY;
 import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT;
+import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.THROUGHPUT_WINDOW_SECONDS;
 import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.NameGenerator.generatePartitionMetadataTableName;
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
@@ -66,8 +67,10 @@ import 
org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
 import org.apache.beam.runners.core.metrics.ServiceCallMetric;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.CleanUpReadChangeStreamDoFn;
@@ -75,9 +78,11 @@ import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.DetectNewPartitions
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.InitializeDoFn;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.PostProcessingMetricsDoFn;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.BytesThroughputEstimator;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.SizeEstimator;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.MapperFactory;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
-import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.Metrics;
@@ -1633,7 +1638,6 @@ public class SpannerIO {
               : getInclusiveEndAt();
       final MapperFactory mapperFactory = new MapperFactory();
       final ChangeStreamMetrics metrics = new ChangeStreamMetrics();
-      final ThroughputEstimator throughputEstimator = new 
ThroughputEstimator();
       final RpcPriority rpcPriority = 
MoreObjects.firstNonNull(getRpcPriority(), RpcPriority.HIGH);
       final DaoFactory daoFactory =
           new DaoFactory(
@@ -1650,8 +1654,7 @@ public class SpannerIO {
       final DetectNewPartitionsDoFn detectNewPartitionsDoFn =
           new DetectNewPartitionsDoFn(daoFactory, mapperFactory, 
actionFactory, metrics);
       final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
-          new ReadChangeStreamPartitionDoFn(
-              daoFactory, mapperFactory, actionFactory, metrics, 
throughputEstimator);
+          new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, 
actionFactory, metrics);
       final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
           new PostProcessingMetricsDoFn(metrics);
 
@@ -1662,19 +1665,34 @@ public class SpannerIO {
           .as(SpannerChangeStreamOptions.class)
           .setMetadataTable(partitionMetadataTableName);
 
-      PCollection<byte[]> impulseOut = input.apply(Impulse.create());
-      PCollection<DataChangeRecord> results =
+      final PCollection<byte[]> impulseOut = input.apply(Impulse.create());
+      final PCollection<PartitionMetadata> partitionsOut =
           impulseOut
               .apply("Initialize the connector", ParDo.of(initializeDoFn))
-              .apply("Detect new partitions", 
ParDo.of(detectNewPartitionsDoFn))
+              .apply("Detect new partitions", 
ParDo.of(detectNewPartitionsDoFn));
+      final Coder<PartitionMetadata> partitionMetadataCoder = 
partitionsOut.getCoder();
+      final SizeEstimator<PartitionMetadata> partitionMetadataSizeEstimator =
+          new SizeEstimator<>(partitionMetadataCoder);
+      final long averagePartitionBytesSize =
+          
partitionMetadataSizeEstimator.sizeOf(ChangeStreamsConstants.SAMPLE_PARTITION);
+      
detectNewPartitionsDoFn.setAveragePartitionBytesSize(averagePartitionBytesSize);
+
+      final PCollection<DataChangeRecord> dataChangeRecordsOut =
+          partitionsOut
               .apply("Read change stream partition", 
ParDo.of(readChangeStreamPartitionDoFn))
               .apply("Gather metrics", ParDo.of(postProcessingMetricsDoFn));
+      final Coder<DataChangeRecord> dataChangeRecordCoder = 
dataChangeRecordsOut.getCoder();
+      final SizeEstimator<DataChangeRecord> dataChangeRecordSizeEstimator =
+          new SizeEstimator<>(dataChangeRecordCoder);
+      final BytesThroughputEstimator<DataChangeRecord> throughputEstimator =
+          new BytesThroughputEstimator<>(THROUGHPUT_WINDOW_SECONDS, 
dataChangeRecordSizeEstimator);
+      
readChangeStreamPartitionDoFn.setThroughputEstimator(throughputEstimator);
 
       impulseOut
           .apply(WithTimestamps.of(e -> GlobalWindow.INSTANCE.maxTimestamp()))
-          .apply(Wait.on(results))
+          .apply(Wait.on(dataChangeRecordsOut))
           .apply(ParDo.of(new CleanUpReadChangeStreamDoFn(daoFactory)));
-      return results;
+      return dataChangeRecordsOut;
     }
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java
index c8603dc3d22..7e3d17f44e5 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamsConstants.java
@@ -19,6 +19,10 @@ package org.apache.beam.sdk.io.gcp.spanner.changestreams;
 
 import com.google.cloud.Timestamp;
 import com.google.cloud.spanner.Options.RpcPriority;
+import java.util.Collections;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
 
 /**
  * Single place for defining the constants used in the {@code 
Spanner.readChangeStreams()}
@@ -45,4 +49,30 @@ public class ChangeStreamsConstants {
 
   /** The default priority for a change stream query is {@link 
RpcPriority#HIGH}. */
   public static final RpcPriority DEFAULT_RPC_PRIORITY = RpcPriority.HIGH;
+
+  /** The sliding window size in seconds for throughput reporting. */
+  public static final int THROUGHPUT_WINDOW_SECONDS = 10;
+
+  /**
+   * We use the following partition token to provide an estimate size of a 
partition token. A usual
+   * partition token has around 140 characters.
+   */
+  private static final String SAMPLE_PARTITION_TOKEN =
+      String.join("", Collections.nCopies(140, "*"));
+  /**
+   * We use a bogus partition here to estimate the average size of a partition 
metadata record.
+   *
+   * <p>The only dynamically allocated size field here is the "parentTokens", 
which is a set and can
+   * expand. In practice, however, partitions have 1 to 2 parents at most.
+   */
+  public static final PartitionMetadata SAMPLE_PARTITION =
+      PartitionMetadata.newBuilder()
+          .setPartitionToken(SAMPLE_PARTITION_TOKEN)
+          .setParentTokens(Sets.newHashSet(SAMPLE_PARTITION_TOKEN))
+          .setStartTimestamp(Timestamp.now())
+          .setHeartbeatMillis(1_000L)
+          .setState(State.CREATED)
+          .setWatermark(Timestamp.now())
+          .setCreatedAt(Timestamp.now())
+          .build();
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
index cca8506d0e6..b1f6c3f4aee 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
@@ -21,9 +21,10 @@ import java.io.Serializable;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.ThroughputEstimator;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.ChangeStreamRecordMapper;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
-import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
 import org.joda.time.Duration;
 
 /**
@@ -50,9 +51,10 @@ public class ActionFactory implements Serializable {
    *
    * @return singleton instance of the {@link DataChangeRecordAction}
    */
-  public synchronized DataChangeRecordAction dataChangeRecordAction() {
+  public synchronized DataChangeRecordAction dataChangeRecordAction(
+      ThroughputEstimator<DataChangeRecord> throughputEstimator) {
     if (dataChangeRecordActionInstance == null) {
-      dataChangeRecordActionInstance = new DataChangeRecordAction();
+      dataChangeRecordActionInstance = new 
DataChangeRecordAction(throughputEstimator);
     }
     return dataChangeRecordActionInstance;
   }
@@ -109,7 +111,6 @@ public class ActionFactory implements Serializable {
    * @param childPartitionsRecordAction action class to process {@link
    *     
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord}s
    * @param metrics metrics gathering class
-   * @param throughputEstimator an estimator to calculate local throughput.
    * @return single instance of the {@link QueryChangeStreamAction}
    */
   public synchronized QueryChangeStreamAction queryChangeStreamAction(
@@ -120,8 +121,7 @@ public class ActionFactory implements Serializable {
       DataChangeRecordAction dataChangeRecordAction,
       HeartbeatRecordAction heartbeatRecordAction,
       ChildPartitionsRecordAction childPartitionsRecordAction,
-      ChangeStreamMetrics metrics,
-      ThroughputEstimator throughputEstimator) {
+      ChangeStreamMetrics metrics) {
     if (queryChangeStreamActionInstance == null) {
       queryChangeStreamActionInstance =
           new QueryChangeStreamAction(
@@ -132,8 +132,7 @@ public class ActionFactory implements Serializable {
               dataChangeRecordAction,
               heartbeatRecordAction,
               childPartitionsRecordAction,
-              metrics,
-              throughputEstimator);
+              metrics);
     }
     return queryChangeStreamActionInstance;
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java
index e39df3e2d44..70286b41778 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java
@@ -109,13 +109,12 @@ public class ChildPartitionsRecordAction {
 
     final String token = partition.getPartitionToken();
 
-    LOG.debug("[" + token + "] Processing child partition record " + record);
+    LOG.debug("[{}] Processing child partition record {}", token, record);
 
     final Timestamp startTimestamp = record.getStartTimestamp();
     final Instant startInstant = new 
Instant(startTimestamp.toSqlTimestamp().getTime());
     if (!tracker.tryClaim(startTimestamp)) {
-      LOG.debug(
-          "[" + token + "] Could not claim queryChangeStream(" + 
startTimestamp + "), stopping");
+      LOG.debug("[{}] Could not claim queryChangeStream({}), stopping", token, 
startTimestamp);
       return Optional.of(ProcessContinuation.stop());
     }
     watermarkEstimator.setWatermark(startInstant);
@@ -124,7 +123,7 @@ public class ChildPartitionsRecordAction {
       processChildPartition(partition, record, childPartition);
     }
 
-    LOG.debug("[" + token + "] Child partitions action completed 
successfully");
+    LOG.debug("[{}] Child partitions action completed successfully", token);
     return Optional.empty();
   }
 
@@ -137,11 +136,7 @@ public class ChildPartitionsRecordAction {
     final String childPartitionToken = childPartition.getToken();
     final boolean isSplit = isSplit(childPartition);
     LOG.debug(
-        "["
-            + partitionToken
-            + "] Processing child partition"
-            + (isSplit ? " split" : " merge")
-            + " event");
+        "[{}] Processing child partition {} event", partitionToken, (isSplit ? 
"split" : "merge"));
 
     final PartitionMetadata row =
         toPartitionMetadata(
@@ -149,7 +144,7 @@ public class ChildPartitionsRecordAction {
             partition.getEndTimestamp(),
             partition.getHeartbeatMillis(),
             childPartition);
-    LOG.debug("[" + partitionToken + "] Inserting child partition token " + 
childPartitionToken);
+    LOG.debug("[{}] Inserting child partition token {}", partitionToken, 
childPartitionToken);
     final Boolean insertedRow =
         partitionMetadataDao
             .runInTransaction(
@@ -168,11 +163,7 @@ public class ChildPartitionsRecordAction {
       metrics.incPartitionRecordMergeCount();
     } else {
       LOG.debug(
-          "["
-              + partitionToken
-              + "] Child token "
-              + childPartitionToken
-              + " already exists, skipping...");
+          "[{}] Child token {} already exists, skipping...", partitionToken, 
childPartitionToken);
     }
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java
index f806c7fcb74..e391feb6e25 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java
@@ -19,29 +19,34 @@ package 
org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
 
 import com.google.cloud.Timestamp;
 import java.util.Optional;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.ThroughputEstimator;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
-import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Utf8;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This class is part of the process for {@link
- * 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn}
 SDF. It is
+ * This class is part of the process for {@link ReadChangeStreamPartitionDoFn} 
SDF. It is
  * responsible for processing {@link DataChangeRecord}s. The records will 
simply be emitted to the
  * received output receiver.
  */
 public class DataChangeRecordAction {
   private static final Logger LOG = 
LoggerFactory.getLogger(DataChangeRecordAction.class);
+  private final ThroughputEstimator<DataChangeRecord> throughputEstimator;
+
+  /** @param throughputEstimator an estimator to calculate local throughput of 
this action. */
+  public DataChangeRecordAction(ThroughputEstimator<DataChangeRecord> 
throughputEstimator) {
+    this.throughputEstimator = throughputEstimator;
+  }
 
   /**
    * This is the main processing function for a {@link DataChangeRecord}. It 
returns an {@link
@@ -62,14 +67,10 @@ public class DataChangeRecordAction {
    *
    * @param partition the current partition being processed
    * @param record the change stream data record received
-   * @param tracker the restriction tracker of the {@link
-   *     
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn}
 SDF
-   * @param outputReceiver the output receiver of the {@link
-   *     
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn}
 SDF
-   * @param watermarkEstimator the watermark estimator of the {@link
-   *     
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn}
 SDF
-   * @param throughputEstimator an estimator to calculate local throughput of 
the {@link
-   *     
org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn}.
+   * @param tracker the restriction tracker of the {@link 
ReadChangeStreamPartitionDoFn} SDF
+   * @param outputReceiver the output receiver of the {@link 
ReadChangeStreamPartitionDoFn} SDF
+   * @param watermarkEstimator the watermark estimator of the {@link 
ReadChangeStreamPartitionDoFn}
+   *     SDF
    * @return {@link Optional#empty()} if the caller can continue processing 
more records. A non
    *     empty {@link Optional} with {@link ProcessContinuation#stop()} if 
this function was unable
    *     to claim the {@link ChildPartitionsRecord} timestamp
@@ -80,28 +81,23 @@ public class DataChangeRecordAction {
       DataChangeRecord record,
       RestrictionTracker<TimestampRange, Timestamp> tracker,
       OutputReceiver<DataChangeRecord> outputReceiver,
-      ManualWatermarkEstimator<Instant> watermarkEstimator,
-      ThroughputEstimator throughputEstimator) {
+      ManualWatermarkEstimator<Instant> watermarkEstimator) {
 
     final String token = partition.getPartitionToken();
-    LOG.debug("[" + token + "] Processing data record " + 
record.getCommitTimestamp());
+    LOG.debug("[{}] Processing data record {}", token, 
record.getCommitTimestamp());
 
     final Timestamp commitTimestamp = record.getCommitTimestamp();
     final Instant commitInstant = new 
Instant(commitTimestamp.toSqlTimestamp().getTime());
     if (!tracker.tryClaim(commitTimestamp)) {
-      LOG.debug(
-          "[" + token + "] Could not claim queryChangeStream(" + 
commitTimestamp + "), stopping");
+      LOG.debug("[{}] Could not claim queryChangeStream({}), stopping", token, 
commitTimestamp);
       return Optional.of(ProcessContinuation.stop());
     }
     outputReceiver.outputWithTimestamp(record, commitInstant);
     watermarkEstimator.setWatermark(commitInstant);
 
-    // The size of a record is represented by the number of bytes needed for 
the
-    // string representation of the record. Here, we only try to achieve an 
estimate
-    // instead of an accurate throughput.
-    throughputEstimator.update(Timestamp.now(), 
Utf8.encodedLength(record.toString()));
+    throughputEstimator.update(Timestamp.now(), record);
 
-    LOG.debug("[" + token + "] Data record action completed successfully");
+    LOG.debug("[{}] Data record action completed successfully", token);
     return Optional.empty();
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
index a22e9b9a538..348e23e366e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java
@@ -129,8 +129,7 @@ public class DetectNewPartitionsAction {
         partitions.add(partition);
       }
     }
-    LOG.info(
-        "Found " + partitions.size() + " to be scheduled (readTimestamp = " + 
readTimestamp + ")");
+    LOG.info("Found {} to be scheduled (readTimestamp = {})", 
partitions.size(), readTimestamp);
     return partitions;
   }
 
@@ -180,14 +179,11 @@ public class DetectNewPartitionsAction {
           partition.toBuilder().setScheduledAt(scheduledAt).build();
 
       LOG.info(
-          "["
-              + updatedPartition.getPartitionToken()
-              + "] Scheduled partition at "
-              + updatedPartition.getScheduledAt()
-              + " with start time "
-              + updatedPartition.getStartTimestamp()
-              + " and end time "
-              + updatedPartition.getEndTimestamp());
+          "[{}] Scheduled partition at {} with start time {} and end time {}",
+          updatedPartition.getPartitionToken(),
+          updatedPartition.getScheduledAt(),
+          updatedPartition.getStartTimestamp(),
+          updatedPartition.getEndTimestamp());
 
       receiver.outputWithTimestamp(partition, new 
Instant(minWatermark.toSqlTimestamp()));
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java
index a940f57b989..d009f774b3b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java
@@ -75,18 +75,18 @@ public class HeartbeatRecordAction {
       ManualWatermarkEstimator<Instant> watermarkEstimator) {
 
     final String token = partition.getPartitionToken();
-    LOG.debug("[" + token + "] Processing heartbeat record " + record);
+    LOG.debug("[{}] Processing heartbeat record {}", token, record);
 
     final Timestamp timestamp = record.getTimestamp();
     final Instant timestampInstant = new 
Instant(timestamp.toSqlTimestamp().getTime());
     if (!tracker.tryClaim(timestamp)) {
-      LOG.debug("[" + token + "] Could not claim queryChangeStream(" + 
timestamp + "), stopping");
+      LOG.debug("[{}] Could not claim queryChangeStream({}), stopping", token, 
timestamp);
       return Optional.of(ProcessContinuation.stop());
     }
     metrics.incHeartbeatRecordCount();
     watermarkEstimator.setWatermark(timestampInstant);
 
-    LOG.debug("[" + token + "] Heartbeat record action completed 
successfully");
+    LOG.debug("[{}] Heartbeat record action completed successfully", token);
     return Optional.empty();
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
index 4265d1356ab..f3b895371a6 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
@@ -33,7 +33,6 @@ import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRec
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
-import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
 import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
@@ -73,7 +72,6 @@ public class QueryChangeStreamAction {
   private final HeartbeatRecordAction heartbeatRecordAction;
   private final ChildPartitionsRecordAction childPartitionsRecordAction;
   private final ChangeStreamMetrics metrics;
-  private final ThroughputEstimator throughputEstimator;
 
   /**
    * Constructs an action class for performing a change stream query for a 
given partition.
@@ -88,7 +86,6 @@ public class QueryChangeStreamAction {
    * @param heartbeatRecordAction action class to process {@link 
HeartbeatRecord}s
    * @param childPartitionsRecordAction action class to process {@link 
ChildPartitionsRecord}s
    * @param metrics metrics gathering class
-   * @param throughputEstimator an estimator to calculate local throughput.
    */
   QueryChangeStreamAction(
       ChangeStreamDao changeStreamDao,
@@ -98,8 +95,7 @@ public class QueryChangeStreamAction {
       DataChangeRecordAction dataChangeRecordAction,
       HeartbeatRecordAction heartbeatRecordAction,
       ChildPartitionsRecordAction childPartitionsRecordAction,
-      ChangeStreamMetrics metrics,
-      ThroughputEstimator throughputEstimator) {
+      ChangeStreamMetrics metrics) {
     this.changeStreamDao = changeStreamDao;
     this.partitionMetadataDao = partitionMetadataDao;
     this.changeStreamRecordMapper = changeStreamRecordMapper;
@@ -108,7 +104,6 @@ public class QueryChangeStreamAction {
     this.heartbeatRecordAction = heartbeatRecordAction;
     this.childPartitionsRecordAction = childPartitionsRecordAction;
     this.metrics = metrics;
-    this.throughputEstimator = throughputEstimator;
   }
 
   /**
@@ -188,8 +183,7 @@ public class QueryChangeStreamAction {
                     (DataChangeRecord) record,
                     tracker,
                     receiver,
-                    watermarkEstimator,
-                    throughputEstimator);
+                    watermarkEstimator);
           } else if (record instanceof HeartbeatRecord) {
             maybeContinuation =
                 heartbeatRecordAction.run(
@@ -199,12 +193,12 @@ public class QueryChangeStreamAction {
                 childPartitionsRecordAction.run(
                     updatedPartition, (ChildPartitionsRecord) record, tracker, 
watermarkEstimator);
           } else {
-            LOG.error("[" + token + "] Unknown record type " + 
record.getClass());
+            LOG.error("[{}] Unknown record type {}", token, record.getClass());
             throw new IllegalArgumentException("Unknown record type " + 
record.getClass());
           }
 
           if (maybeContinuation.isPresent()) {
-            LOG.debug("[" + token + "] Continuation present, returning " + 
maybeContinuation);
+            LOG.debug("[{}] Continuation present, returning {}", token, 
maybeContinuation);
             bundleFinalizer.afterBundleCommit(
                 Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT),
                 updateWatermarkCallback(token, watermarkEstimator));
@@ -225,24 +219,21 @@ public class QueryChangeStreamAction {
       */
       if (isTimestampOutOfRange(e)) {
         LOG.debug(
-            "["
-                + token
-                + "] query change stream is out of range for "
-                + startTimestamp
-                + " to "
-                + endTimestamp
-                + ", finishing stream");
+            "[{}] query change stream is out of range for {} to {}, finishing 
stream",
+            token,
+            startTimestamp,
+            endTimestamp);
       } else {
         throw e;
       }
     }
 
-    LOG.debug("[" + token + "] change stream completed successfully");
+    LOG.debug("[{}] change stream completed successfully", token);
     if (tracker.tryClaim(endTimestamp)) {
-      LOG.debug("[" + token + "] Finishing partition");
+      LOG.debug("[{}] Finishing partition", token);
       partitionMetadataDao.updateToFinished(token);
       metrics.decActivePartitionReadCounter();
-      LOG.info("[" + token + "] Partition finished");
+      LOG.info("[{}] Partition finished", token);
     }
     return ProcessContinuation.stop();
   }
@@ -251,15 +242,15 @@ public class QueryChangeStreamAction {
       String token, WatermarkEstimator<Instant> watermarkEstimator) {
     return () -> {
       final Instant watermark = watermarkEstimator.currentWatermark();
-      LOG.debug("[" + token + "] Updating current watermark to " + watermark);
+      LOG.debug("[{}] Updating current watermark to {}", token, watermark);
       try {
         partitionMetadataDao.updateWatermark(
             token, Timestamp.ofTimeMicroseconds(watermark.getMillis() * 
1_000L));
       } catch (SpannerException e) {
         if (e.getErrorCode() == ErrorCode.NOT_FOUND) {
-          LOG.debug("[" + token + "] Unable to update the current watermark, 
partition NOT FOUND");
+          LOG.debug("[{}] Unable to update the current watermark, partition 
NOT FOUND", token);
         } else {
-          LOG.error("[" + token + "] Error updating the current watermark: " + 
e.getMessage(), e);
+          LOG.error("[{}] Error updating the current watermark: {}", token, 
e.getMessage(), e);
         }
       }
     };
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
index 1bf5af0b321..0d044b09119 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java
@@ -171,6 +171,31 @@ public class PartitionMetadataDao {
     return databaseClient.singleUse().executeQuery(statement);
   }
 
+  /**
+   * Counts all partitions with a {@link 
PartitionMetadataAdminDao#COLUMN_CREATED_AT} less than the
+   * given timestamp.
+   */
+  public long countPartitionsCreatedAfter(Timestamp timestamp) {
+    final Statement statement =
+        Statement.newBuilder(
+                "SELECT COUNT(*) as count FROM "
+                    + metadataTableName
+                    + " WHERE "
+                    + COLUMN_CREATED_AT
+                    + " > @timestamp")
+            .bind("timestamp")
+            .to(timestamp)
+            .build();
+
+    try (ResultSet resultSet = 
databaseClient.singleUse().executeQuery(statement)) {
+      if (resultSet.next()) {
+        return resultSet.getLong("count");
+      } else {
+        return 0;
+      }
+    }
+  }
+
   /**
    * Inserts the partition metadata.
    *
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/DetectNewPartitionsDoFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/DetectNewPartitionsDoFn.java
index 632a06334cc..f552f12cc7b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/DetectNewPartitionsDoFn.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/DetectNewPartitionsDoFn.java
@@ -36,6 +36,8 @@ import 
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A SplittableDoFn (SDF) that is responsible for scheduling partitions to be 
queried. This
@@ -51,6 +53,7 @@ import org.joda.time.Instant;
 public class DetectNewPartitionsDoFn extends DoFn<PartitionMetadata, 
PartitionMetadata> {
 
   private static final long serialVersionUID = 1523712495885011374L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(DetectNewPartitionsDoFn.class);
   private static final Duration DEFAULT_RESUME_DURATION = 
Duration.millis(100L);
 
   private final Duration resumeDuration;
@@ -58,6 +61,8 @@ public class DetectNewPartitionsDoFn extends 
DoFn<PartitionMetadata, PartitionMe
   private final MapperFactory mapperFactory;
   private final ActionFactory actionFactory;
   private final ChangeStreamMetrics metrics;
+  private long averagePartitionBytesSize;
+  private boolean averagePartitionBytesSizeSet;
   private transient DetectNewPartitionsAction detectNewPartitionsAction;
 
   /**
@@ -83,6 +88,7 @@ public class DetectNewPartitionsDoFn extends 
DoFn<PartitionMetadata, PartitionMe
     this.actionFactory = actionFactory;
     this.metrics = metrics;
     this.resumeDuration = DEFAULT_RESUME_DURATION;
+    this.averagePartitionBytesSizeSet = false;
   }
 
   @GetInitialWatermarkEstimatorState
@@ -109,9 +115,27 @@ public class DetectNewPartitionsDoFn extends 
DoFn<PartitionMetadata, PartitionMe
         TimestampUtils.previous(createdAt), 
com.google.cloud.Timestamp.MAX_VALUE);
   }
 
+  @GetSize
+  public double getSize(@Restriction TimestampRange restriction) {
+    if (!averagePartitionBytesSizeSet) {
+      LOG.warn(
+          "Average partition bytes size has not been initialized, GetSize will 
always return 0, which will interfere with autoscaling.");
+    }
+    final com.google.cloud.Timestamp readTimestamp = restriction.getFrom();
+    final PartitionMetadataDao dao = daoFactory.getPartitionMetadataDao();
+    final long partitionsToSchedule = 
dao.countPartitionsCreatedAfter(readTimestamp);
+    final long sizeEstimate = partitionsToSchedule * averagePartitionBytesSize;
+
+    LOG.debug(
+        "getSize() = {} ({} partitionsToSchedule * {} 
averagePartitionBytesSize)",
+        sizeEstimate,
+        partitionsToSchedule,
+        averagePartitionBytesSize);
+    return sizeEstimate;
+  }
+
   @NewTracker
-  public DetectNewPartitionsRangeTracker restrictionTracker(
-      @Restriction TimestampRange restriction) {
+  public DetectNewPartitionsRangeTracker newTracker(@Restriction 
TimestampRange restriction) {
     return new DetectNewPartitionsRangeTracker(restriction);
   }
 
@@ -137,4 +161,16 @@ public class DetectNewPartitionsDoFn extends 
DoFn<PartitionMetadata, PartitionMe
 
     return detectNewPartitionsAction.run(tracker, receiver, 
watermarkEstimator);
   }
+
+  /**
+   * Sets the average partition bytes size to estimate the backlog of this 
DoFn. Must be called
+   * after the initialization of this DoFn.
+   *
+   * @param averagePartitionBytesSize the estimated average size of a 
partition record used in the
+   *     backlog bytes calculation ({@link 
org.apache.beam.sdk.transforms.DoFn.GetSize})
+   */
+  public void setAveragePartitionBytesSize(long averagePartitionBytesSize) {
+    this.averagePartitionBytesSize = averagePartitionBytesSize;
+    this.averagePartitionBytesSizeSet = true;
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/PostProcessingMetricsDoFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/PostProcessingMetricsDoFn.java
index 377f4054ca5..3529ba336e3 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/PostProcessingMetricsDoFn.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/PostProcessingMetricsDoFn.java
@@ -94,10 +94,9 @@ public class PostProcessingMetricsDoFn extends 
DoFn<DataChangeRecord, DataChange
 
     if (commitedToEmittedMillis > COMMITTED_TO_EMITTED_THRESHOLD_MS) {
       LOG.debug(
-          "Data record took "
-              + commitedToEmittedMillis
-              + "ms to be emitted: "
-              + dataChangeRecord.getMetadata());
+          "Data record took {}ms to be emitted: {}",
+          commitedToEmittedMillis,
+          dataChangeRecord.getMetadata());
     }
   }
 
@@ -111,7 +110,7 @@ public class PostProcessingMetricsDoFn extends 
DoFn<DataChangeRecord, DataChange
     final long streamMillis = streamDuration.getMillis();
 
     if (streamMillis > STREAM_THRESHOLD_MS) {
-      LOG.debug("Data record took " + streamMillis + "ms to be streamed: " + 
metadata);
+      LOG.debug("Data record took {}ms to be streamed: {}", streamMillis, 
metadata);
     }
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
index f5cf02fb7d3..61e31f10918 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
@@ -28,13 +28,15 @@ import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.action.QueryChangeStream
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.BytesThroughputEstimator;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.NullThroughputEstimator;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.ThroughputEstimator;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.ChangeStreamRecordMapper;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.MapperFactory;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker;
-import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampUtils;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -62,13 +64,17 @@ public class ReadChangeStreamPartitionDoFn extends 
DoFn<PartitionMetadata, DataC
 
   private static final long serialVersionUID = -7574596218085711975L;
   private static final Logger LOG = 
LoggerFactory.getLogger(ReadChangeStreamPartitionDoFn.class);
-  private static final double AUTOSCALING_SIZE_MULTIPLIER = 2.0D;
+  private static final BigDecimal MAX_DOUBLE = 
BigDecimal.valueOf(Double.MAX_VALUE);
 
   private final DaoFactory daoFactory;
   private final MapperFactory mapperFactory;
   private final ActionFactory actionFactory;
   private final ChangeStreamMetrics metrics;
-  private final ThroughputEstimator throughputEstimator;
+  /**
+   * Needs to be set through the {@link
+   * 
ReadChangeStreamPartitionDoFn#setThroughputEstimator(BytesThroughputEstimator)} 
call.
+   */
+  private ThroughputEstimator<DataChangeRecord> throughputEstimator;
 
   private transient QueryChangeStreamAction queryChangeStreamAction;
 
@@ -85,19 +91,17 @@ public class ReadChangeStreamPartitionDoFn extends 
DoFn<PartitionMetadata, DataC
    * @param mapperFactory the {@link MapperFactory} to construct {@link 
ChangeStreamRecordMapper}s
    * @param actionFactory the {@link ActionFactory} to construct actions
    * @param metrics the {@link ChangeStreamMetrics} to emit partition related 
metrics
-   * @param throughputEstimator an estimator to calculate local throughput.
    */
   public ReadChangeStreamPartitionDoFn(
       DaoFactory daoFactory,
       MapperFactory mapperFactory,
       ActionFactory actionFactory,
-      ChangeStreamMetrics metrics,
-      ThroughputEstimator throughputEstimator) {
+      ChangeStreamMetrics metrics) {
     this.daoFactory = daoFactory;
     this.mapperFactory = mapperFactory;
     this.actionFactory = actionFactory;
     this.metrics = metrics;
-    this.throughputEstimator = throughputEstimator;
+    this.throughputEstimator = new NullThroughputEstimator<>();
   }
 
   @GetInitialWatermarkEstimatorState
@@ -152,17 +156,15 @@ public class ReadChangeStreamPartitionDoFn extends 
DoFn<PartitionMetadata, DataC
     final BigDecimal timeGapInSeconds =
         BigDecimal.valueOf(newTracker(partition, 
range).getProgress().getWorkRemaining());
     final BigDecimal throughput = 
BigDecimal.valueOf(this.throughputEstimator.get());
+    final double size =
+        timeGapInSeconds
+            .multiply(throughput)
+            // Cap it at Double.MAX_VALUE to avoid an overflow.
+            .min(MAX_DOUBLE)
+            .doubleValue();
     LOG.debug(
-        "Reported getSize() - remaining work: " + timeGapInSeconds + " 
throughput:" + throughput);
-    // Cap it at Double.MAX_VALUE to avoid an overflow.
-    return timeGapInSeconds
-        .multiply(throughput)
-        // The multiplier is required because the job tries to reach the 
minimum number of workers
-        // and this leads to a very high cpu utilization. The multiplier would 
increase the reported
-        // size and help to reduce the cpu usage. In the future, this can 
become a custom parameter.
-        .multiply(BigDecimal.valueOf(AUTOSCALING_SIZE_MULTIPLIER))
-        .min(BigDecimal.valueOf(Double.MAX_VALUE))
-        .doubleValue();
+        "getSize() = {} ({} timeGapInSeconds * {} throughput)", size, 
timeGapInSeconds, throughput);
+    return size;
   }
 
   @NewTracker
@@ -184,7 +186,8 @@ public class ReadChangeStreamPartitionDoFn extends 
DoFn<PartitionMetadata, DataC
     final ChangeStreamRecordMapper changeStreamRecordMapper =
         mapperFactory.changeStreamRecordMapper();
     final PartitionMetadataMapper partitionMetadataMapper = 
mapperFactory.partitionMetadataMapper();
-    final DataChangeRecordAction dataChangeRecordAction = 
actionFactory.dataChangeRecordAction();
+    final DataChangeRecordAction dataChangeRecordAction =
+        actionFactory.dataChangeRecordAction(throughputEstimator);
     final HeartbeatRecordAction heartbeatRecordAction =
         actionFactory.heartbeatRecordAction(metrics);
     final ChildPartitionsRecordAction childPartitionsRecordAction =
@@ -199,8 +202,7 @@ public class ReadChangeStreamPartitionDoFn extends 
DoFn<PartitionMetadata, DataC
             dataChangeRecordAction,
             heartbeatRecordAction,
             childPartitionsRecordAction,
-            metrics,
-            throughputEstimator);
+            metrics);
   }
 
   /**
@@ -228,10 +230,20 @@ public class ReadChangeStreamPartitionDoFn extends 
DoFn<PartitionMetadata, DataC
 
     final String token = partition.getPartitionToken();
 
-    LOG.debug(
-        "[" + token + "] Processing element with restriction " + 
tracker.currentRestriction());
+    LOG.debug("[{}] Processing element with restriction {}", token, 
tracker.currentRestriction());
 
     return queryChangeStreamAction.run(
         partition, tracker, receiver, watermarkEstimator, bundleFinalizer);
   }
+
+  /**
+   * Sets the estimator to calculate the backlog of this function. Must be 
called after the
+   * initialization of this DoFn.
+   *
+   * @param throughputEstimator an estimator to calculate local throughput.
+   */
+  public void setThroughputEstimator(
+      BytesThroughputEstimator<DataChangeRecord> throughputEstimator) {
+    this.throughputEstimator = throughputEstimator;
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/BytesThroughputEstimator.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/BytesThroughputEstimator.java
new file mode 100644
index 00000000000..f253ba32916
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/BytesThroughputEstimator.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator;
+
+import com.google.cloud.Timestamp;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+/**
+ * An estimator to provide an estimate on the throughput of the outputted 
elements.
+ *
+ * <p>This estimator will keep track of the bytes of reported records within a 
sliding window. The
+ * window consists of the configured number of seconds and each record's bytes 
will fall into
+ * exactly one given second bucket. When more than window size seconds have 
passed from the current
+ * time, the bytes reported for the seconds that fall of the window will not 
be considered anymore.
+ * The bytes of the records will be estimated using the configured {@link
+ * org.apache.beam.sdk.coders.Coder}.
+ */
+public class BytesThroughputEstimator<T> implements ThroughputEstimator<T> {
+
+  private static final long serialVersionUID = -3597929310338724800L;
+  private static final BigDecimal MAX_DOUBLE = 
BigDecimal.valueOf(Double.MAX_VALUE);
+
+  /** Keeps track of how many bytes of throughput have been seen in a given 
timestamp. */
+  private static class ThroughputEntry implements Serializable {
+
+    private static final long serialVersionUID = 3752325891215855332L;
+
+    private final Timestamp timestamp;
+    private BigDecimal bytes;
+
+    public ThroughputEntry(Timestamp timestamp, long bytes) {
+      this.timestamp = timestamp;
+      this.bytes = BigDecimal.valueOf(bytes);
+    }
+
+    public Timestamp getTimestamp() {
+      return timestamp;
+    }
+
+    public long getSeconds() {
+      return timestamp.getSeconds();
+    }
+
+    public BigDecimal getBytes() {
+      return bytes;
+    }
+
+    public void addBytes(long bytesToAdd) {
+      bytes = bytes.add(BigDecimal.valueOf(bytesToAdd));
+    }
+  }
+
+  // The deque holds a number of windows in the past in order to calculate
+  // a rolling windowing throughput.
+  private final Deque<ThroughputEntry> deque;
+  // The number of seconds to be accounted for when calculating the throughput
+  private final int windowSizeSeconds;
+  // Estimates the size in bytes of throughput elements
+  private final SizeEstimator<T> sizeEstimator;
+
+  public BytesThroughputEstimator(int windowSizeSeconds, SizeEstimator<T> 
sizeEstimator) {
+    this.deque = new ArrayDeque<>();
+    this.windowSizeSeconds = windowSizeSeconds;
+    this.sizeEstimator = sizeEstimator;
+  }
+
+  /**
+   * Updates the estimator with the bytes of records.
+   *
+   * @param timeOfRecords the committed timestamp of the records
+   * @param element the element to estimate the byte size of
+   */
+  @SuppressWarnings("nullness") // queue is never null, nor the peeked element
+  @Override
+  public void update(Timestamp timeOfRecords, T element) {
+    long bytes = sizeEstimator.sizeOf(element);
+    synchronized (deque) {
+      if (deque.isEmpty() || timeOfRecords.getSeconds() > 
deque.getLast().getSeconds()) {
+        deque.addLast(new ThroughputEntry(timeOfRecords, bytes));
+      } else {
+        deque.getLast().addBytes(bytes);
+      }
+      cleanQueue(deque.getLast().getTimestamp());
+    }
+  }
+
+  /** Returns the estimated throughput bytes for now. */
+  @Override
+  public double get() {
+    return getFrom(Timestamp.now());
+  }
+
+  /**
+   * Returns the estimated throughput bytes for a specified time.
+   *
+   * @param time the specified timestamp to check throughput
+   */
+  @Override
+  public double getFrom(Timestamp time) {
+    synchronized (deque) {
+      cleanQueue(time);
+      if (deque.size() == 0) {
+        return 0D;
+      }
+      BigDecimal throughput = BigDecimal.ZERO;
+      for (ThroughputEntry entry : deque) {
+        throughput = throughput.add(entry.getBytes());
+      }
+      return throughput
+          // Prevents negative values
+          .max(BigDecimal.ZERO)
+          .divide(BigDecimal.valueOf(windowSizeSeconds), 
MathContext.DECIMAL128)
+          // Cap it to Double.MAX_VALUE
+          .min(MAX_DOUBLE)
+          .doubleValue();
+    }
+  }
+
+  private void cleanQueue(Timestamp time) {
+    while (deque.size() > 0) {
+      final ThroughputEntry entry = deque.getFirst();
+      if (entry != null && entry.getSeconds() >= time.getSeconds() - 
windowSizeSeconds) {
+        break;
+      }
+      // Remove the element if the timestamp of the first element is beyond
+      // the time range to look backward.
+      deque.removeFirst();
+    }
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/EncodingException.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/EncodingException.java
new file mode 100644
index 00000000000..b53a04b541c
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/EncodingException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator;
+
+/** Represents an error during encoding (serializing) a class. */
+public class EncodingException extends RuntimeException {
+  public EncodingException(Throwable e) {
+    super(e);
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/NullThroughputEstimator.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/NullThroughputEstimator.java
new file mode 100644
index 00000000000..9f3c33123d0
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/NullThroughputEstimator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator;
+
+import com.google.cloud.Timestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * NoOp implementation of a throughput estimator. This will always return 0 as 
the throughput and it
+ * will warn users that this is being used (it should not be used in 
production).
+ */
+public class NullThroughputEstimator<T> implements ThroughputEstimator<T> {
+
+  private static final long serialVersionUID = -4487552302910442742L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(NullThroughputEstimator.class);
+
+  /**
+   * NoOp.
+   *
+   * @param timeOfRecords ignored
+   * @param element ignored
+   */
+  @Override
+  public void update(Timestamp timeOfRecords, T element) {
+    LOG.warn(
+        "Trying to update throughput using {}, this operation will have no 
effect",
+        this.getClass().getSimpleName());
+  }
+
+  /**
+   * Always returns 0.
+   *
+   * @param time ignored
+   * @return 0
+   */
+  @Override
+  public double getFrom(Timestamp time) {
+    LOG.warn(
+        "Trying to retrieve throughput using {}, this operation will always 
return 0",
+        this.getClass().getSimpleName());
+    return 0;
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/SizeEstimator.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/SizeEstimator.java
new file mode 100644
index 00000000000..9ef23dcbad4
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/SizeEstimator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+
+/**
+ * This class is used to estimate the size in bytes of a given element. It 
uses the given {@link
+ * Coder} to calculate the size of the element.
+ */
+public class SizeEstimator<T> implements Serializable {
+
+  private static final long serialVersionUID = 8422268027457798184L;
+
+  private static class SizeEstimatorObserver extends ElementByteSizeObserver
+      implements Serializable {
+
+    private static final long serialVersionUID = 4569562919962045617L;
+    private long observedBytes;
+
+    @Override
+    protected void reportElementSize(long elementByteSize) {
+      observedBytes = elementByteSize;
+    }
+  }
+
+  private final Coder<T> coder;
+  private final SizeEstimatorObserver observer;
+
+  public SizeEstimator(Coder<T> coder) {
+    this.coder = coder;
+    this.observer = new SizeEstimatorObserver();
+  }
+
+  /**
+   * Estimates the size in bytes of the given element with the configured 
{@link Coder} .
+   *
+   * @param element the element instance to be estimated
+   * @return the estimated size in bytes of the given element
+   */
+  public long sizeOf(T element) {
+    try {
+      coder.registerByteSizeObserver(element, observer);
+      observer.advance();
+
+      return observer.observedBytes;
+    } catch (Exception e) {
+      throw new EncodingException(e);
+    }
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/ThroughputEstimator.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/ThroughputEstimator.java
new file mode 100644
index 00000000000..9ec8e08acb1
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/ThroughputEstimator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator;
+
+import com.google.cloud.Timestamp;
+import java.io.Serializable;
+
+/** An estimator to calculate the throughput of the outputted elements from a 
DoFn. */
+public interface ThroughputEstimator<T> extends Serializable {
+  /**
+   * Updates the estimator with the size of the records.
+   *
+   * @param timeOfRecords the committed timestamp of the records
+   * @param element the element to estimate the byte size of
+   */
+  void update(Timestamp timeOfRecords, T element);
+
+  /** Returns the estimated throughput for now. */
+  default double get() {
+    return getFrom(Timestamp.now());
+  }
+
+  /**
+   * Returns the estimated throughput for a specified time.
+   *
+   * @param time the specified timestamp to check throughput
+   */
+  double getFrom(Timestamp time);
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/package-info.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/package-info.java
new file mode 100644
index 00000000000..a099ae5bc20
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Classes related to estimating the throughput of the change streams SDFs. */
+@Experimental
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator;
+
+import org.apache.beam.sdk.annotations.Experimental;
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimator.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimator.java
deleted file mode 100644
index 70484c83ebc..00000000000
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimator.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;
-
-import com.google.cloud.Timestamp;
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.math.MathContext;
-import java.util.ArrayDeque;
-import java.util.Queue;
-import 
org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.ImmutablePair;
-
-/** An estimator to provide an estimate on the throughput of the outputted 
elements. */
-public class ThroughputEstimator implements Serializable {
-
-  // The number of seconds to look in the past.
-  public static final int WINDOW_SIZE_SECONDS = 60;
-
-  private static final long serialVersionUID = -3597929310338724800L;
-  // The start time of each per-second window.
-  private Timestamp startTimeOfCurrentWindow;
-  // The bytes of the current window.
-  private BigDecimal bytesInCurrentWindow;
-  // The total bytes of all windows in the queue.
-  private BigDecimal bytesInQueue;
-  // The queue holds a number of windows in the past in order to calculate
-  // a rolling windowing throughput.
-  private final Queue<ImmutablePair<Timestamp, BigDecimal>> queue;
-
-  public ThroughputEstimator() {
-    queue = new ArrayDeque<>();
-    startTimeOfCurrentWindow = Timestamp.MIN_VALUE;
-    bytesInCurrentWindow = BigDecimal.valueOf(0L);
-    bytesInQueue = BigDecimal.valueOf(0L);
-  }
-
-  /**
-   * Updates the estimator with the bytes of records.
-   *
-   * @param timeOfRecords the committed timestamp of the records
-   * @param bytes the total bytes of the records
-   */
-  public void update(Timestamp timeOfRecords, long bytes) {
-    synchronized (queue) {
-      BigDecimal bytesNum = BigDecimal.valueOf(bytes);
-      if (startTimeOfCurrentWindow.equals(Timestamp.MIN_VALUE)) {
-        bytesInCurrentWindow = bytesNum;
-        startTimeOfCurrentWindow = timeOfRecords;
-        return;
-      }
-
-      if (timeOfRecords.getSeconds() < startTimeOfCurrentWindow.getSeconds() + 
1) {
-        bytesInCurrentWindow = bytesInCurrentWindow.add(bytesNum);
-      } else {
-        queue.add(new ImmutablePair<>(startTimeOfCurrentWindow, 
bytesInCurrentWindow));
-        bytesInQueue = bytesInQueue.add(bytesInCurrentWindow);
-
-        bytesInCurrentWindow = bytesNum;
-        startTimeOfCurrentWindow = timeOfRecords;
-      }
-      cleanQueue(startTimeOfCurrentWindow);
-    }
-  }
-
-  /** Returns the estimated throughput for now. */
-  public double get() {
-    return getFrom(Timestamp.now());
-  }
-
-  /**
-   * Returns the estimated throughput for a specified time.
-   *
-   * @param time the specified timestamp to check throughput
-   */
-  public double getFrom(Timestamp time) {
-    synchronized (queue) {
-      cleanQueue(time);
-      if (queue.size() == 0) {
-        return 0D;
-      }
-      return bytesInQueue
-          .divide(BigDecimal.valueOf(queue.size()), MathContext.DECIMAL128)
-          .max(BigDecimal.ZERO)
-          .doubleValue();
-    }
-  }
-
-  private void cleanQueue(Timestamp time) {
-    while (queue.size() > 0) {
-      ImmutablePair<Timestamp, BigDecimal> peek = queue.peek();
-      if (peek != null && peek.getLeft().getSeconds() >= time.getSeconds() - 
WINDOW_SIZE_SECONDS) {
-        break;
-      }
-      // Remove the element if the timestamp of the first element is beyond
-      // the time range to look backward.
-      ImmutablePair<Timestamp, BigDecimal> pair = queue.remove();
-      bytesInQueue = 
bytesInQueue.subtract(pair.getRight()).max(BigDecimal.ZERO);
-    }
-  }
-}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java
index a3f3e95ae6a..e2ac2773741 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java
@@ -224,12 +224,10 @@ public class TimestampRangeTracker extends 
RestrictionTracker<TimestampRange, Ti
     final BigDecimal workRemaining = end.subtract(current).max(BigDecimal.ONE);
 
     LOG.debug(
-        "Reported progress - current:"
-            + current.doubleValue()
-            + " end:"
-            + end.doubleValue()
-            + " workRemaining:"
-            + workRemaining.doubleValue());
+        "Reported progress current: {}, end: {}, workRemaining: {}",
+        current.doubleValue(),
+        end.doubleValue(),
+        workRemaining.doubleValue());
 
     return Progress.from(current.doubleValue(), workRemaining.doubleValue());
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java
index 97cd7c647b9..ac8d4872529 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java
@@ -19,7 +19,7 @@ package 
org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -27,9 +27,9 @@ import static org.mockito.Mockito.when;
 
 import com.google.cloud.Timestamp;
 import java.util.Optional;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.BytesThroughputEstimator;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
-import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
@@ -46,16 +46,16 @@ public class DataChangeRecordActionTest {
   private RestrictionTracker<TimestampRange, Timestamp> tracker;
   private OutputReceiver<DataChangeRecord> outputReceiver;
   private ManualWatermarkEstimator<Instant> watermarkEstimator;
-  private ThroughputEstimator throughputEstimator;
+  private BytesThroughputEstimator<DataChangeRecord> throughputEstimator;
 
   @Before
   public void setUp() {
-    action = new DataChangeRecordAction();
+    throughputEstimator = mock(BytesThroughputEstimator.class);
+    action = new DataChangeRecordAction(throughputEstimator);
     partition = mock(PartitionMetadata.class);
     tracker = mock(RestrictionTracker.class);
     outputReceiver = mock(OutputReceiver.class);
     watermarkEstimator = mock(ManualWatermarkEstimator.class);
-    throughputEstimator = mock(ThroughputEstimator.class);
   }
 
   @Test
@@ -69,13 +69,12 @@ public class DataChangeRecordActionTest {
     when(partition.getPartitionToken()).thenReturn(partitionToken);
 
     final Optional<ProcessContinuation> maybeContinuation =
-        action.run(
-            partition, record, tracker, outputReceiver, watermarkEstimator, 
throughputEstimator);
+        action.run(partition, record, tracker, outputReceiver, 
watermarkEstimator);
 
     assertEquals(Optional.empty(), maybeContinuation);
     verify(outputReceiver).outputWithTimestamp(record, instant);
     verify(watermarkEstimator).setWatermark(instant);
-    verify(throughputEstimator).update(any(Timestamp.class), anyLong());
+    verify(throughputEstimator).update(any(), eq(record));
   }
 
   @Test
@@ -88,12 +87,11 @@ public class DataChangeRecordActionTest {
     when(partition.getPartitionToken()).thenReturn(partitionToken);
 
     final Optional<ProcessContinuation> maybeContinuation =
-        action.run(
-            partition, record, tracker, outputReceiver, watermarkEstimator, 
throughputEstimator);
+        action.run(partition, record, tracker, outputReceiver, 
watermarkEstimator);
 
     assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation);
     verify(outputReceiver, never()).outputWithTimestamp(any(), any());
     verify(watermarkEstimator, never()).setWatermark(any());
-    verify(throughputEstimator, never()).update(any(Timestamp.class), 
anyLong());
+    verify(throughputEstimator, never()).update(any(), any());
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsActionTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsActionTest.java
new file mode 100644
index 00000000000..e86bd1bf423
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsActionTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.ResultSet;
+import java.util.Arrays;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DetectNewPartitionsActionTest {
+
+  private PartitionMetadataDao dao;
+  private PartitionMetadataMapper mapper;
+  private ChangeStreamMetrics metrics;
+  private Duration resumeDuration;
+  private RestrictionTracker<TimestampRange, Timestamp> tracker;
+  private TimestampRange restriction;
+  private OutputReceiver<PartitionMetadata> receiver;
+  private ManualWatermarkEstimator<Instant> watermarkEstimator;
+  private DetectNewPartitionsAction action;
+
+  @Before
+  public void setUp() throws Exception {
+    dao = mock(PartitionMetadataDao.class);
+    mapper = mock(PartitionMetadataMapper.class);
+    metrics = mock(ChangeStreamMetrics.class);
+    resumeDuration = Duration.standardSeconds(1);
+    tracker = mock(RestrictionTracker.class);
+    restriction = mock(TimestampRange.class);
+    receiver = mock(OutputReceiver.class);
+    watermarkEstimator = mock(ManualWatermarkEstimator.class);
+
+    action = new DetectNewPartitionsAction(dao, mapper, metrics, 
resumeDuration);
+
+    when(tracker.currentRestriction()).thenReturn(restriction);
+  }
+
+  @Test
+  public void testSchedulesPartitionsAndResumesWhenPartitionsWereCreated() {
+    final Timestamp from = Timestamp.ofTimeMicroseconds(10L);
+    final Timestamp minWatermark = Timestamp.ofTimeMicroseconds(20L);
+    final Instant minWatermarkInstant = new 
Instant(minWatermark.toSqlTimestamp());
+    final ResultSet resultSet = mock(ResultSet.class);
+    final Timestamp partitionCreatedAt = Timestamp.ofTimeMicroseconds(15L);
+    final Timestamp partitionScheduledAt = Timestamp.ofTimeMicroseconds(30L);
+    final PartitionMetadata partition1 = mock(PartitionMetadata.class, 
RETURNS_DEEP_STUBS);
+    final PartitionMetadata partition2 = mock(PartitionMetadata.class, 
RETURNS_DEEP_STUBS);
+    when(partition1.getPartitionToken()).thenReturn("token1");
+    when(partition1.getCreatedAt()).thenReturn(partitionCreatedAt);
+    when(partition2.getPartitionToken()).thenReturn("token2");
+    when(partition2.getCreatedAt()).thenReturn(partitionCreatedAt);
+    when(restriction.getFrom()).thenReturn(from);
+    when(dao.getUnfinishedMinWatermark()).thenReturn(minWatermark);
+    when(dao.getAllPartitionsCreatedAfter(from)).thenReturn(resultSet);
+    when(dao.updateToScheduled(Arrays.asList("token1", 
"token2"))).thenReturn(partitionScheduledAt);
+    when(resultSet.next()).thenReturn(true, true, false);
+    when(mapper.from(any())).thenReturn(partition1, partition2);
+    when(tracker.tryClaim(partitionCreatedAt)).thenReturn(true);
+
+    final ProcessContinuation continuation = action.run(tracker, receiver, 
watermarkEstimator);
+
+    assertEquals(ProcessContinuation.resume().withResumeDelay(resumeDuration), 
continuation);
+    verify(watermarkEstimator).setWatermark(minWatermarkInstant);
+    verify(receiver, times(2)).outputWithTimestamp(any(), 
eq(minWatermarkInstant));
+  }
+
+  @Test
+  public void testDoesNothingWhenNoPartitionsWereCreated() {
+    final Timestamp from = Timestamp.ofTimeMicroseconds(10L);
+    final Timestamp minWatermark = Timestamp.ofTimeMicroseconds(20L);
+    final Instant minWatermarkInstant = new 
Instant(minWatermark.toSqlTimestamp());
+    final ResultSet resultSet = mock(ResultSet.class);
+    when(restriction.getFrom()).thenReturn(from);
+    when(dao.getUnfinishedMinWatermark()).thenReturn(minWatermark);
+    when(dao.getAllPartitionsCreatedAfter(from)).thenReturn(resultSet);
+    when(resultSet.next()).thenReturn(false);
+
+    final ProcessContinuation continuation = action.run(tracker, receiver, 
watermarkEstimator);
+
+    assertEquals(ProcessContinuation.resume().withResumeDelay(resumeDuration), 
continuation);
+    verify(watermarkEstimator).setWatermark(minWatermarkInstant);
+    verify(receiver, never()).outputWithTimestamp(any(), any());
+  }
+
+  @Test
+  public void testTerminatesWhenAllPartitionsAreFinished() {
+    final Timestamp from = Timestamp.ofTimeMicroseconds(10L);
+    when(restriction.getFrom()).thenReturn(from);
+    when(dao.getUnfinishedMinWatermark()).thenReturn(null);
+
+    final ProcessContinuation continuation = action.run(tracker, receiver, 
watermarkEstimator);
+
+    assertEquals(ProcessContinuation.stop(), continuation);
+    verify(watermarkEstimator, never()).setWatermark(any());
+    verify(receiver, never()).outputWithTimestamp(any(), any());
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
index 1d9f7831061..38e00e519ac 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
@@ -20,7 +20,6 @@ package 
org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
 import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -41,7 +40,6 @@ import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRec
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
-import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
 import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
@@ -66,7 +64,6 @@ public class QueryChangeStreamActionTest {
   private PartitionMetadataDao partitionMetadataDao;
   private PartitionMetadata partition;
   private ChangeStreamMetrics metrics;
-  private ThroughputEstimator throughputEstimator;
   private TimestampRange restriction;
   private RestrictionTracker<TimestampRange, Timestamp> restrictionTracker;
   private OutputReceiver<DataChangeRecord> outputReceiver;
@@ -89,7 +86,6 @@ public class QueryChangeStreamActionTest {
     heartbeatRecordAction = mock(HeartbeatRecordAction.class);
     childPartitionsRecordAction = mock(ChildPartitionsRecordAction.class);
     metrics = mock(ChangeStreamMetrics.class);
-    throughputEstimator = mock(ThroughputEstimator.class);
 
     action =
         new QueryChangeStreamAction(
@@ -100,8 +96,7 @@ public class QueryChangeStreamActionTest {
             dataChangeRecordAction,
             heartbeatRecordAction,
             childPartitionsRecordAction,
-            metrics,
-            throughputEstimator);
+            metrics);
     final Struct row = mock(Struct.class);
     partition =
         PartitionMetadata.newBuilder()
@@ -149,20 +144,10 @@ public class QueryChangeStreamActionTest {
     when(changeStreamRecordMapper.toChangeStreamRecords(partition, 
rowAsStruct, resultSetMetadata))
         .thenReturn(Arrays.asList(record1, record2));
     when(dataChangeRecordAction.run(
-            partition,
-            record1,
-            restrictionTracker,
-            outputReceiver,
-            watermarkEstimator,
-            throughputEstimator))
+            partition, record1, restrictionTracker, outputReceiver, 
watermarkEstimator))
         .thenReturn(Optional.empty());
     when(dataChangeRecordAction.run(
-            partition,
-            record2,
-            restrictionTracker,
-            outputReceiver,
-            watermarkEstimator,
-            throughputEstimator))
+            partition, record2, restrictionTracker, outputReceiver, 
watermarkEstimator))
         .thenReturn(Optional.of(ProcessContinuation.stop()));
     when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
 
@@ -172,28 +157,14 @@ public class QueryChangeStreamActionTest {
 
     assertEquals(ProcessContinuation.stop(), result);
     verify(dataChangeRecordAction)
-        .run(
-            partition,
-            record1,
-            restrictionTracker,
-            outputReceiver,
-            watermarkEstimator,
-            throughputEstimator);
+        .run(partition, record1, restrictionTracker, outputReceiver, 
watermarkEstimator);
     verify(dataChangeRecordAction)
-        .run(
-            partition,
-            record2,
-            restrictionTracker,
-            outputReceiver,
-            watermarkEstimator,
-            throughputEstimator);
+        .run(partition, record2, restrictionTracker, outputReceiver, 
watermarkEstimator);
     verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, 
WATERMARK_TIMESTAMP);
 
     verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
     verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any());
     verify(restrictionTracker, never()).tryClaim(any());
-    // This is done in the DataChangeRecordAction, but not here
-    verify(throughputEstimator, never()).update(any(), anyLong());
   }
 
   @Test
@@ -232,10 +203,9 @@ public class QueryChangeStreamActionTest {
     verify(heartbeatRecordAction).run(partition, record2, restrictionTracker, 
watermarkEstimator);
     verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, 
WATERMARK_TIMESTAMP);
 
-    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
+    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any());
     verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any());
     verify(restrictionTracker, never()).tryClaim(any());
-    verify(throughputEstimator, never()).update(any(), anyLong());
   }
 
   @Test
@@ -278,10 +248,9 @@ public class QueryChangeStreamActionTest {
         .run(partition, record2, restrictionTracker, watermarkEstimator);
     verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, 
WATERMARK_TIMESTAMP);
 
-    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
+    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any());
     verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
     verify(restrictionTracker, never()).tryClaim(any());
-    verify(throughputEstimator, never()).update(any(), anyLong());
   }
 
   @Test
@@ -325,10 +294,9 @@ public class QueryChangeStreamActionTest {
         .run(partition, record2, restrictionTracker, watermarkEstimator);
     verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, 
WATERMARK_TIMESTAMP);
 
-    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
+    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any());
     verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
     verify(restrictionTracker, never()).tryClaim(any());
-    verify(throughputEstimator, never()).update(any(), anyLong());
   }
 
   @Test
@@ -352,10 +320,9 @@ public class QueryChangeStreamActionTest {
     verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, 
WATERMARK_TIMESTAMP);
     verify(partitionMetadataDao).updateToFinished(PARTITION_TOKEN);
 
-    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
+    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any());
     verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
     verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any());
-    verify(throughputEstimator, never()).update(any(), anyLong());
   }
 
   private static class BundleFinalizerStub implements BundleFinalizer {
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
index 91f87c07154..d9af6cecd95 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
@@ -35,12 +35,12 @@ import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.action.QueryChangeStream
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
+import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.BytesThroughputEstimator;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.ChangeStreamRecordMapper;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.MapperFactory;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
-import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
 import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
@@ -80,7 +80,8 @@ public class ReadChangeStreamPartitionDoFnTest {
     final DaoFactory daoFactory = mock(DaoFactory.class);
     final MapperFactory mapperFactory = mock(MapperFactory.class);
     final ChangeStreamMetrics metrics = mock(ChangeStreamMetrics.class);
-    final ThroughputEstimator throughputEstimator = 
mock(ThroughputEstimator.class);
+    final BytesThroughputEstimator<DataChangeRecord> throughputEstimator =
+        mock(BytesThroughputEstimator.class);
     final ActionFactory actionFactory = mock(ActionFactory.class);
     final PartitionMetadataDao partitionMetadataDao = 
mock(PartitionMetadataDao.class);
     final ChangeStreamDao changeStreamDao = mock(ChangeStreamDao.class);
@@ -91,9 +92,8 @@ public class ReadChangeStreamPartitionDoFnTest {
     childPartitionsRecordAction = mock(ChildPartitionsRecordAction.class);
     queryChangeStreamAction = mock(QueryChangeStreamAction.class);
 
-    doFn =
-        new ReadChangeStreamPartitionDoFn(
-            daoFactory, mapperFactory, actionFactory, metrics, 
throughputEstimator);
+    doFn = new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, 
actionFactory, metrics);
+    doFn.setThroughputEstimator(throughputEstimator);
 
     partition =
         PartitionMetadata.newBuilder()
@@ -118,7 +118,8 @@ public class ReadChangeStreamPartitionDoFnTest {
     
when(mapperFactory.changeStreamRecordMapper()).thenReturn(changeStreamRecordMapper);
     
when(mapperFactory.partitionMetadataMapper()).thenReturn(partitionMetadataMapper);
 
-    
when(actionFactory.dataChangeRecordAction()).thenReturn(dataChangeRecordAction);
+    when(actionFactory.dataChangeRecordAction(throughputEstimator))
+        .thenReturn(dataChangeRecordAction);
     
when(actionFactory.heartbeatRecordAction(metrics)).thenReturn(heartbeatRecordAction);
     when(actionFactory.childPartitionsRecordAction(partitionMetadataDao, 
metrics))
         .thenReturn(childPartitionsRecordAction);
@@ -130,8 +131,7 @@ public class ReadChangeStreamPartitionDoFnTest {
             dataChangeRecordAction,
             heartbeatRecordAction,
             childPartitionsRecordAction,
-            metrics,
-            throughputEstimator))
+            metrics))
         .thenReturn(queryChangeStreamAction);
 
     doFn.setup();
@@ -149,7 +149,7 @@ public class ReadChangeStreamPartitionDoFnTest {
     verify(queryChangeStreamAction)
         .run(partition, tracker, receiver, watermarkEstimator, 
bundleFinalizer);
 
-    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any(), any());
+    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), 
any());
     verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
     verify(childPartitionsRecordAction, never()).run(any(), any(), any(), 
any());
     verify(tracker, never()).tryClaim(any());
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/BytesThroughputEstimatorTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/BytesThroughputEstimatorTest.java
new file mode 100644
index 00000000000..c72cd874277
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/BytesThroughputEstimatorTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.cloud.Timestamp;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import 
org.apache.beam.repackaged.core.org.apache.commons.compress.utils.IOUtils;
+import 
org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.beam.sdk.coders.Coder;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BytesThroughputEstimatorTest {
+  private static final double DELTA = 1e-10;
+  private static final int WINDOW_SIZE_SECONDS = 10;
+  private BytesThroughputEstimator<byte[]> estimator;
+
+  @Before
+  public void setup() {
+    final SizeEstimator<byte[]> sizeEstimator = new SizeEstimator<>(new 
TestCoder());
+    estimator = new BytesThroughputEstimator<>(WINDOW_SIZE_SECONDS, 
sizeEstimator);
+  }
+
+  @Test
+  public void testThroughputIsZeroWhenNothingsBeenRegistered() {
+    assertEquals(0D, estimator.get(), DELTA);
+    assertEquals(0D, estimator.getFrom(Timestamp.now()), DELTA);
+  }
+
+  @Test
+  public void testThroughputCalculation() {
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(2, 0), new byte[10]);
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(3, 0), new byte[20]);
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(5, 0), new byte[30]);
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(10, 0), new byte[40]);
+    // (10 + 20 + 30 + 40) / 10 sec window = 10
+    assertEquals(10D, estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(11, 
0)), DELTA);
+
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(20, 0), new byte[10]);
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(21, 0), new byte[20]);
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(21, 0), new byte[10]);
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(29, 0), new byte[40]);
+    // (10 + 20 + 10 + 40) / 10 sec window = 8
+    assertEquals(8D, estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(30, 
0)), DELTA);
+
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(31, 0), new byte[10]);
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(35, 0), new byte[40]);
+    // (10 + 40) / 10 sec window = 5
+    assertEquals(5D, estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(41, 
0)), DELTA);
+
+    // No values in the past 10 seconds
+    assertEquals(0D, estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(50, 
0)), DELTA);
+  }
+
+  @Test
+  public void testThroughputIsAccumulatedWithin60SecondsWindow() {
+    List<ImmutablePair<Timestamp, byte[]>> pairs = generateTestData(100, 0, 
10);
+    pairs.sort(Comparator.comparing(ImmutablePair::getLeft));
+    final Timestamp lastUpdateTimestamp = pairs.get(pairs.size() - 
1).getLeft();
+
+    BigDecimal sum = BigDecimal.valueOf(0L);
+    for (ImmutablePair<Timestamp, byte[]> pair : pairs) {
+      sum = sum.add(BigDecimal.valueOf(pair.getRight().length));
+    }
+    final BigDecimal want =
+        sum.divide(BigDecimal.valueOf(WINDOW_SIZE_SECONDS), 
MathContext.DECIMAL128);
+
+    for (ImmutablePair<Timestamp, byte[]> pair : pairs) {
+      estimator.update(pair.getLeft(), pair.getRight());
+    }
+
+    double actual = estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(10, 0));
+    assertEquals(want.doubleValue(), actual, DELTA);
+
+    // After window without updates the throughput should be zero
+    final Timestamp afterWindowTimestamp =
+        Timestamp.ofTimeSecondsAndNanos(
+            lastUpdateTimestamp.getSeconds() + WINDOW_SIZE_SECONDS + 1,
+            lastUpdateTimestamp.getNanos());
+    assertEquals(0D, estimator.getFrom(afterWindowTimestamp), DELTA);
+  }
+
+  @Test
+  public void testThroughputIsAccumulatedWithin50SecondsWindow() {
+    final List<ImmutablePair<Timestamp, byte[]>> excludedPairs = 
generateTestData(300, 0, 10);
+    final List<ImmutablePair<Timestamp, byte[]>> expectedPairs = 
generateTestData(50, 10, 20);
+    final List<ImmutablePair<Timestamp, byte[]>> pairs =
+        Stream.concat(excludedPairs.stream(), expectedPairs.stream())
+            .sorted(Comparator.comparing(ImmutablePair::getLeft))
+            .collect(Collectors.toList());
+    final Timestamp lastUpdateTimestamp = pairs.get(pairs.size() - 
1).getLeft();
+
+    BigDecimal sum = BigDecimal.valueOf(0L);
+    for (ImmutablePair<Timestamp, byte[]> pair : expectedPairs) {
+      sum = sum.add(BigDecimal.valueOf(pair.getRight().length));
+    }
+    final BigDecimal want =
+        sum.divide(BigDecimal.valueOf(WINDOW_SIZE_SECONDS), 
MathContext.DECIMAL128);
+    for (ImmutablePair<Timestamp, byte[]> pair : pairs) {
+      estimator.update(pair.getLeft(), pair.getRight());
+    }
+
+    double actual = estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(20, 0));
+    assertEquals(want.doubleValue(), actual, DELTA);
+
+    // After window without updates the throughput should be zero
+    final Timestamp afterWindowTimestamp =
+        Timestamp.ofTimeSecondsAndNanos(
+            lastUpdateTimestamp.getSeconds() + WINDOW_SIZE_SECONDS + 1,
+            lastUpdateTimestamp.getNanos());
+    assertEquals(0D, estimator.getFrom(afterWindowTimestamp), DELTA);
+  }
+
+  private List<ImmutablePair<Timestamp, byte[]>> generateTestData(
+      int size, int startSeconds, int endSeconds) {
+    ThreadLocalRandom random = ThreadLocalRandom.current();
+    List<ImmutablePair<Timestamp, byte[]>> pairs = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      int seconds = random.nextInt(endSeconds - startSeconds) + startSeconds;
+      pairs.add(
+          new ImmutablePair<>(
+              Timestamp.ofTimeSecondsAndNanos(seconds, 0), new 
byte[random.nextInt(100)]));
+    }
+    return pairs;
+  }
+
+  private static class TestCoder extends Coder<byte[]> {
+    @Override
+    public void encode(byte[] value, OutputStream outStream) throws 
IOException {
+      outStream.write(value);
+    }
+
+    @Override
+    public byte[] decode(InputStream inStream) throws IOException {
+      return IOUtils.toByteArray(inStream);
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public void verifyDeterministic() {
+      // NoOp
+    }
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/NullThroughputEstimatorTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/NullThroughputEstimatorTest.java
new file mode 100644
index 00000000000..86d82cfdcca
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/estimator/NullThroughputEstimatorTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.cloud.Timestamp;
+import org.junit.Test;
+
+public class NullThroughputEstimatorTest {
+  private static final double DELTA = 1e-10;
+
+  @Test
+  public void alwaysReturns0AsEstimatedThroughput() {
+    final NullThroughputEstimator<byte[]> estimator = new 
NullThroughputEstimator<>();
+    assertEquals(estimator.get(), 0D, DELTA);
+
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(1, 0), new byte[10]);
+    assertEquals(estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(1, 0)), 0D, 
DELTA);
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(2, 0), new byte[20]);
+    assertEquals(estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(2, 0)), 0D, 
DELTA);
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimatorTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimatorTest.java
deleted file mode 100644
index 3ecd7b6c6c3..00000000000
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimatorTest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;
-
-import static 
org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator.WINDOW_SIZE_SECONDS;
-import static org.junit.Assert.assertEquals;
-
-import com.google.cloud.Timestamp;
-import java.math.BigDecimal;
-import java.math.MathContext;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import 
org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.ImmutablePair;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ThroughputEstimatorTest {
-  private static final double DELTA = 1e-10;
-  private ThroughputEstimator estimator;
-
-  @Before
-  public void setup() {
-    estimator = new ThroughputEstimator();
-  }
-
-  @Test
-  public void testThroughputCalculation() {
-    estimator.update(Timestamp.ofTimeSecondsAndNanos(20, 0), 10);
-    estimator.update(Timestamp.ofTimeSecondsAndNanos(30, 0), 20);
-    estimator.update(Timestamp.ofTimeSecondsAndNanos(59, 0), 30);
-    estimator.update(Timestamp.ofTimeSecondsAndNanos(60, 0), 40); // Exclusive
-    assertEquals(20D, estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(61, 
0)), DELTA);
-
-    estimator.update(Timestamp.ofTimeSecondsAndNanos(100, 0), 10);
-    estimator.update(Timestamp.ofTimeSecondsAndNanos(110, 0), 20);
-    estimator.update(Timestamp.ofTimeSecondsAndNanos(110, 0), 10);
-    estimator.update(Timestamp.ofTimeSecondsAndNanos(140, 0), 40); // Exclusive
-    assertEquals(20D, estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(141, 
0)), DELTA);
-
-    estimator.update(Timestamp.ofTimeSecondsAndNanos(201, 0), 10);
-    estimator.update(Timestamp.ofTimeSecondsAndNanos(250, 0), 40); // Exclusive
-    assertEquals(10D, estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(261, 
0)), DELTA);
-
-    // Does not take the update of 250s in account, because the window is not 
closed (no update on
-    // 251s)
-    assertEquals(0D, estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(262, 
0)), DELTA);
-  }
-
-  @Test
-  public void testThroughputIsAccumulatedWithin60SecondsWindow() {
-    final List<ImmutablePair<Timestamp, Long>> pairs = generateTestData(100, 
0, 60, Long.MAX_VALUE);
-    pairs.sort(Comparator.comparing(ImmutablePair::getLeft));
-    final Timestamp lastUpdateTimestamp = pairs.get(pairs.size() - 
1).getLeft();
-
-    final long count = 
pairs.stream().map(ImmutablePair::getLeft).distinct().count();
-    BigDecimal sum = BigDecimal.valueOf(0L);
-    for (ImmutablePair<Timestamp, Long> pair : pairs) {
-      sum = sum.add(BigDecimal.valueOf(pair.getRight()));
-    }
-    final BigDecimal want = sum.divide(BigDecimal.valueOf(count), 
MathContext.DECIMAL128);
-
-    for (ImmutablePair<Timestamp, Long> pair : pairs) {
-      estimator.update(pair.getLeft(), pair.getRight());
-    }
-
-    // This is needed to push the current window into the queue.
-    estimator.update(Timestamp.ofTimeSecondsAndNanos(60, 0), 10);
-    final double actual = 
estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(60, 0));
-    assertEquals(want.doubleValue(), actual, DELTA);
-
-    // After window without updates the throughput should be zero
-    final Timestamp afterWindowTimestamp =
-        Timestamp.ofTimeSecondsAndNanos(
-            lastUpdateTimestamp.getSeconds() + WINDOW_SIZE_SECONDS + 1,
-            lastUpdateTimestamp.getNanos());
-    assertEquals(0D, estimator.getFrom(afterWindowTimestamp), DELTA);
-  }
-
-  @Test
-  public void testThroughputIsAccumulatedWithin300SecondsWindow() {
-    final List<ImmutablePair<Timestamp, Long>> excludedPairs =
-        generateTestData(300, 0, 240, Long.MAX_VALUE);
-    final List<ImmutablePair<Timestamp, Long>> expectedPairs =
-        generateTestData(50, 240, 300, Long.MAX_VALUE);
-    final List<ImmutablePair<Timestamp, Long>> pairs =
-        Stream.concat(excludedPairs.stream(), expectedPairs.stream())
-            .sorted(Comparator.comparing(ImmutablePair::getLeft))
-            .collect(Collectors.toList());
-    final Timestamp lastUpdateTimestamp = pairs.get(pairs.size() - 
1).getLeft();
-
-    final long count = 
expectedPairs.stream().map(ImmutablePair::getLeft).distinct().count();
-    BigDecimal sum = BigDecimal.valueOf(0L);
-    for (ImmutablePair<Timestamp, Long> pair : expectedPairs) {
-      sum = sum.add(BigDecimal.valueOf(pair.getRight()));
-    }
-    final BigDecimal want = sum.divide(BigDecimal.valueOf(count), 
MathContext.DECIMAL128);
-    for (ImmutablePair<Timestamp, Long> pair : pairs) {
-      estimator.update(pair.getLeft(), pair.getRight());
-    }
-
-    // This is needed to push the current window into the queue.
-    estimator.update(Timestamp.ofTimeSecondsAndNanos(300, 0), 10);
-    double actual = estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(300, 0));
-    assertEquals(want.doubleValue(), actual, DELTA);
-
-    // After window without updates the throughput should be zero
-    final Timestamp afterWindowTimestamp =
-        Timestamp.ofTimeSecondsAndNanos(
-            lastUpdateTimestamp.getSeconds() + WINDOW_SIZE_SECONDS + 1,
-            lastUpdateTimestamp.getNanos());
-    assertEquals(0D, estimator.getFrom(afterWindowTimestamp), DELTA);
-  }
-
-  @Test
-  public void testThroughputShouldNotBeNegative() {
-    estimator.update(Timestamp.ofTimeSecondsAndNanos(0, 0), -10);
-    estimator.update(Timestamp.ofTimeSecondsAndNanos(1, 0), 10);
-    double actual = estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(0, 0));
-    assertEquals(0D, actual, DELTA);
-  }
-
-  private List<ImmutablePair<Timestamp, Long>> generateTestData(
-      int size, int startSeconds, int endSeconds, long maxBytes) {
-    Random random = new Random();
-    List<ImmutablePair<Timestamp, Long>> pairs = new ArrayList<>();
-    for (int i = 0; i < size; i++) {
-      int seconds = random.nextInt(endSeconds - startSeconds) + startSeconds;
-      pairs.add(
-          new ImmutablePair<>(
-              Timestamp.ofTimeSecondsAndNanos(seconds, 0),
-              ThreadLocalRandom.current().nextLong(maxBytes)));
-    }
-    return pairs;
-  }
-}

Reply via email to