This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 21cc2ba  [BEAM-5855] Remove duplicated code files from the Dataflow 
worker donation (#6821)
21cc2ba is described below

commit 21cc2bab72178f1e292b13356b23ed2609a758ba
Author: Sam sam <rohde.sam...@gmail.com>
AuthorDate: Wed Oct 24 18:47:33 2018 -0700

    [BEAM-5855] Remove duplicated code files from the Dataflow worker donation 
(#6821)
    
    * Remove duplicated code files from the Dataflow worker donation. Modify
    existing files to use deduplicated code.
---
 .../dataflow/harness/util/ThrowingBiConsumer.java  |  31 ---
 .../dataflow/harness/util/ThrowingBiFunction.java  |  31 ---
 .../dataflow/harness/util/ThrowingConsumer.java    |  31 ---
 .../dataflow/harness/util/ThrowingFunction.java    |  31 ---
 .../dataflow/harness/util/ThrowingRunnable.java    |  29 ---
 .../worker/DataflowBatchWorkerHarness.java         |   8 +-
 .../dataflow/worker/StreamingDataflowWorker.java   |   8 +-
 .../dataflow/worker/WorkerCustomSources.java       |   4 +-
 .../dataflow/worker/util/FluentBackoff.java        | 237 ---------------------
 .../worker/windmill/GrpcWindmillServer.java        |   8 +-
 .../RegisterAndProcessBundleOperationTest.java     |   2 +-
 .../dataflow/worker/util/FluentBackoffTest.java    | 218 -------------------
 12 files changed, 15 insertions(+), 623 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingBiConsumer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingBiConsumer.java
deleted file mode 100644
index a6c7e1d..0000000
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingBiConsumer.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.harness.util;
-
-import java.util.function.BiConsumer;
-
-/**
- * A {@link BiConsumer} which can throw {@link Exception}s.
- *
- * <p>Used to expand the allowed set of method references to be used by Java 8 
functional
- * interfaces.
- */
-@FunctionalInterface
-public interface ThrowingBiConsumer<T1, T2> {
-  void accept(T1 t1, T2 t2) throws Exception;
-}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingBiFunction.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingBiFunction.java
deleted file mode 100644
index 7db62eb..0000000
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingBiFunction.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.harness.util;
-
-import java.util.function.BiFunction;
-
-/**
- * A {@link BiFunction} which can throw {@link Exception}s.
- *
- * <p>Used to expand the allowed set of method references to be used by Java 8 
functional
- * interfaces.
- */
-@FunctionalInterface
-public interface ThrowingBiFunction<T1, T2, T3> {
-  T3 apply(T1 t1, T2 t2) throws Exception;
-}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingConsumer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingConsumer.java
deleted file mode 100644
index 9209ad5..0000000
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingConsumer.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.harness.util;
-
-import java.util.function.Consumer;
-
-/**
- * A {@link Consumer} which can throw {@link Exception}s.
- *
- * <p>Used to expand the allowed set of method references to be used by Java 8 
functional
- * interfaces.
- */
-@FunctionalInterface
-public interface ThrowingConsumer<T> {
-  void accept(T t) throws Exception;
-}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingFunction.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingFunction.java
deleted file mode 100644
index 187bb7f..0000000
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingFunction.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.harness.util;
-
-import java.util.function.Function;
-
-/**
- * A {@link Function} which can throw {@link Exception}s.
- *
- * <p>Used to expand the allowed set of method references to be used by Java 8 
functional
- * interfaces.
- */
-@FunctionalInterface
-public interface ThrowingFunction<T1, T2> {
-  T2 apply(T1 value) throws Exception;
-}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingRunnable.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingRunnable.java
deleted file mode 100644
index d74d157..0000000
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingRunnable.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.harness.util;
-
-/**
- * A {@link Runnable} which can throw {@link Exception}s.
- *
- * <p>Used to expand the allowed set of method references to be used by Java 8 
functional
- * interfaces.
- */
-@FunctionalInterface
-public interface ThrowingRunnable {
-  void run() throws Exception;
-}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java
index 565bbce..6e07e59 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java
@@ -19,9 +19,6 @@ package org.apache.beam.runners.dataflow.worker;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -29,8 +26,11 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
-import org.apache.beam.runners.dataflow.worker.util.FluentBackoff;
 import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index dd0ee5e..c0f4b44 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -22,9 +22,6 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.beam.runners.dataflow.DataflowRunner.hasExperiment;
 import static 
org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics.THROTTLING_MSECS_METRIC_NAME;
 
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
 import com.google.api.services.dataflow.model.CounterStructuredName;
 import com.google.api.services.dataflow.model.CounterUpdate;
 import com.google.api.services.dataflow.model.MapTask;
@@ -114,7 +111,6 @@ import 
org.apache.beam.runners.dataflow.worker.status.LastExceptionDataProvider;
 import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
 import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
 import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
-import org.apache.beam.runners.dataflow.worker.util.FluentBackoff;
 import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker;
@@ -127,6 +123,10 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.GetWo
 import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.StreamPool;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
 import org.apache.beam.sdk.util.Transport;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
index 128834c..4597018 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
@@ -26,7 +26,6 @@ import static 
org.apache.beam.runners.dataflow.util.Structs.getStrings;
 import static 
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray;
 import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
 
-import com.google.api.client.util.BackOff;
 import com.google.api.client.util.Base64;
 import com.google.api.services.dataflow.model.ApproximateReportedProgress;
 import com.google.api.services.dataflow.model.ApproximateSplitRequest;
@@ -54,13 +53,14 @@ import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.internal.CustomSources;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.CloudObject;
-import org.apache.beam.runners.dataflow.worker.util.FluentBackoff;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.ValueWithRecordId;
 import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.ByteString;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/FluentBackoff.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/FluentBackoff.java
deleted file mode 100644
index 8a1cc1e..0000000
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/FluentBackoff.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.api.client.util.BackOff;
-import com.google.common.base.MoreObjects;
-import org.joda.time.Duration;
-
-/**
- * A fluent builder for {@link BackOff} objects that allows customization of 
the retry algorithm.
- *
- * @see #DEFAULT for the default configuration parameters.
- */
-public final class FluentBackoff {
-
-  private static final double DEFAULT_EXPONENT = 1.5;
-  private static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5;
-  private static final Duration DEFAULT_MIN_BACKOFF = 
Duration.standardSeconds(1);
-  private static final Duration DEFAULT_MAX_BACKOFF = 
Duration.standardDays(1000);
-  private static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;
-  private static final Duration DEFAULT_MAX_CUM_BACKOFF = 
Duration.standardDays(1000);
-
-  private final double exponent;
-  private final Duration initialBackoff;
-  private final Duration maxBackoff;
-  private final Duration maxCumulativeBackoff;
-  private final int maxRetries;
-
-  /**
-   * By default the {@link BackOff} created by this builder will use 
exponential backoff (base
-   * exponent 1.5) with an initial backoff of 1 second. These parameters can 
be overridden with
-   * {@link #withExponent(double)} and {@link #withInitialBackoff(Duration)}, 
respectively, and the
-   * maximum backoff after exponential increase can be capped using {@link
-   * FluentBackoff#withMaxBackoff(Duration)}.
-   *
-   * <p>The default {@link BackOff} does not limit the number of retries. To 
limit the backoff, the
-   * maximum total number of retries can be set using {@link 
#withMaxRetries(int)}. The total time
-   * spent in backoff can be time-bounded as well by configuring {@link
-   * #withMaxCumulativeBackoff(Duration)}. If either of these limits are 
reached, calls to {@link
-   * BackOff#nextBackOffMillis()} will return {@link BackOff#STOP} to signal 
that no more retries
-   * should continue.
-   */
-  public static final FluentBackoff DEFAULT =
-      new FluentBackoff(
-          DEFAULT_EXPONENT,
-          DEFAULT_MIN_BACKOFF,
-          DEFAULT_MAX_BACKOFF,
-          DEFAULT_MAX_CUM_BACKOFF,
-          DEFAULT_MAX_RETRIES);
-
-  /**
-   * Instantiates a {@link BackOff} that will obey the current configuration.
-   *
-   * @see FluentBackoff
-   */
-  public BackOff backoff() {
-    return new BackoffImpl(this);
-  }
-
-  /**
-   * Returns a copy of this {@link FluentBackoff} that instead uses the 
specified exponent to
-   * control the exponential growth of delay.
-   *
-   * <p>Does not modify this object.
-   *
-   * @see FluentBackoff
-   */
-  public FluentBackoff withExponent(double exponent) {
-    checkArgument(exponent > 0, "exponent %s must be greater than 0", 
exponent);
-    return new FluentBackoff(
-        exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, 
maxRetries);
-  }
-
-  /**
-   * Returns a copy of this {@link FluentBackoff} that instead uses the 
specified initial backoff
-   * duration.
-   *
-   * <p>Does not modify this object.
-   *
-   * @see FluentBackoff
-   */
-  public FluentBackoff withInitialBackoff(Duration initialBackoff) {
-    checkArgument(
-        initialBackoff.isLongerThan(Duration.ZERO),
-        "initialBackoff %s must be at least 1 millisecond",
-        initialBackoff);
-    return new FluentBackoff(
-        exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, 
maxRetries);
-  }
-
-  /**
-   * Returns a copy of this {@link FluentBackoff} that limits the maximum 
backoff of an individual
-   * attempt to the specified duration.
-   *
-   * <p>Does not modify this object.
-   *
-   * @see FluentBackoff
-   */
-  public FluentBackoff withMaxBackoff(Duration maxBackoff) {
-    checkArgument(
-        maxBackoff.getMillis() > 0, "maxBackoff %s must be at least 1 
millisecond", maxBackoff);
-    return new FluentBackoff(
-        exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, 
maxRetries);
-  }
-
-  /**
-   * Returns a copy of this {@link FluentBackoff} that limits the total time 
spent in backoff
-   * returned across all calls to {@link BackOff#nextBackOffMillis()}.
-   *
-   * <p>Does not modify this object.
-   *
-   * @see FluentBackoff
-   */
-  public FluentBackoff withMaxCumulativeBackoff(Duration maxCumulativeBackoff) 
{
-    checkArgument(
-        maxCumulativeBackoff.isLongerThan(Duration.ZERO),
-        "maxCumulativeBackoff %s must be at least 1 millisecond",
-        maxCumulativeBackoff);
-    return new FluentBackoff(
-        exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, 
maxRetries);
-  }
-
-  /**
-   * Returns a copy of this {@link FluentBackoff} that limits the total number 
of retries, aka the
-   * total number of calls to {@link BackOff#nextBackOffMillis()} before 
returning {@link
-   * BackOff#STOP}.
-   *
-   * <p>Does not modify this object.
-   *
-   * @see FluentBackoff
-   */
-  public FluentBackoff withMaxRetries(int maxRetries) {
-    checkArgument(maxRetries >= 0, "maxRetries %s cannot be negative", 
maxRetries);
-    return new FluentBackoff(
-        exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, 
maxRetries);
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(FluentBackoff.class)
-        .add("exponent", exponent)
-        .add("initialBackoff", initialBackoff)
-        .add("maxBackoff", maxBackoff)
-        .add("maxRetries", maxRetries)
-        .add("maxCumulativeBackoff", maxCumulativeBackoff)
-        .toString();
-  }
-
-  private static class BackoffImpl implements BackOff {
-
-    // Customization of this backoff.
-    private final FluentBackoff backoffConfig;
-    // Current state
-    private Duration currentCumulativeBackoff;
-    private int currentRetry;
-
-    @Override
-    public void reset() {
-      currentRetry = 0;
-      currentCumulativeBackoff = Duration.ZERO;
-    }
-
-    @Override
-    public long nextBackOffMillis() {
-      // Maximum number of retries reached.
-      if (currentRetry >= backoffConfig.maxRetries) {
-        return BackOff.STOP;
-      }
-      // Maximum cumulative backoff reached.
-      if 
(currentCumulativeBackoff.compareTo(backoffConfig.maxCumulativeBackoff) >= 0) {
-        return BackOff.STOP;
-      }
-
-      double currentIntervalMillis =
-          Math.min(
-              backoffConfig.initialBackoff.getMillis()
-                  * Math.pow(backoffConfig.exponent, currentRetry),
-              backoffConfig.maxBackoff.getMillis());
-      double randomOffset =
-          (Math.random() * 2 - 1) * DEFAULT_RANDOMIZATION_FACTOR * 
currentIntervalMillis;
-      long nextBackoffMillis = Math.round(currentIntervalMillis + 
randomOffset);
-      // Cap to limit on cumulative backoff
-      Duration remainingCumulative =
-          backoffConfig.maxCumulativeBackoff.minus(currentCumulativeBackoff);
-      nextBackoffMillis = Math.min(nextBackoffMillis, 
remainingCumulative.getMillis());
-
-      // Update state and return backoff.
-      currentCumulativeBackoff = 
currentCumulativeBackoff.plus(nextBackoffMillis);
-      currentRetry += 1;
-      return nextBackoffMillis;
-    }
-
-    private BackoffImpl(FluentBackoff backoffConfig) {
-      this.backoffConfig = backoffConfig;
-      this.reset();
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(BackoffImpl.class)
-          .add("backoffConfig", backoffConfig)
-          .add("currentRetry", currentRetry)
-          .add("currentCumulativeBackoff", currentCumulativeBackoff)
-          .toString();
-    }
-  }
-
-  private FluentBackoff(
-      double exponent,
-      Duration initialBackoff,
-      Duration maxBackoff,
-      Duration maxCumulativeBackoff,
-      int maxRetries) {
-    this.exponent = exponent;
-    this.initialBackoff = initialBackoff;
-    this.maxBackoff = maxBackoff;
-    this.maxRetries = maxRetries;
-    this.maxCumulativeBackoff = maxCumulativeBackoff;
-  }
-}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
index a3974d6..25b2d78 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
@@ -17,9 +17,6 @@
  */
 package org.apache.beam.runners.dataflow.worker.windmill;
 
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
@@ -60,7 +57,6 @@ import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
 import 
org.apache.beam.runners.dataflow.worker.options.StreamingDataflowWorkerOptions;
-import org.apache.beam.runners.dataflow.worker.util.FluentBackoff;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitStatus;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkRequest;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkResponse;
@@ -88,6 +84,10 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetWor
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetWorkResponseChunk;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
 import org.apache.beam.vendor.grpc.v1.io.grpc.CallCredentials;
 import org.apache.beam.vendor.grpc.v1.io.grpc.Channel;
 import org.apache.beam.vendor.grpc.v1.io.grpc.StatusRuntimeException;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
index 6bd0d96..3ca9c7d 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java
@@ -61,7 +61,6 @@ import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
 import org.apache.beam.runners.core.InMemoryMultimapSideInputView;
 import org.apache.beam.runners.core.InMemoryStateInternals;
 import org.apache.beam.runners.core.SideInputReader;
-import org.apache.beam.runners.dataflow.harness.util.ThrowingRunnable;
 import 
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
 import 
org.apache.beam.runners.dataflow.worker.DataflowPortabilityPCollectionView;
 import org.apache.beam.runners.dataflow.worker.fn.IdGenerator;
@@ -76,6 +75,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.MoreFutures;
+import org.apache.beam.sdk.util.ThrowingRunnable;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/FluentBackoffTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/FluentBackoffTest.java
deleted file mode 100644
index ad20448..0000000
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/FluentBackoffTest.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.util;
-
-import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.lessThan;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-import com.google.api.client.util.BackOff;
-import java.io.IOException;
-import org.joda.time.Duration;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link FluentBackoff}. */
-@RunWith(JUnit4.class)
-public class FluentBackoffTest {
-
-  @Rule public ExpectedException thrown = ExpectedException.none();
-  private final FluentBackoff defaultBackoff = FluentBackoff.DEFAULT;
-
-  @Test
-  public void testInvalidExponent() {
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("exponent -2.0 must be greater than 0");
-    defaultBackoff.withExponent(-2.0);
-  }
-
-  @Test
-  public void testInvalidInitialBackoff() {
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("initialBackoff PT0S must be at least 1 millisecond");
-    defaultBackoff.withInitialBackoff(Duration.ZERO);
-  }
-
-  @Test
-  public void testInvalidMaxBackoff() {
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("maxBackoff PT0S must be at least 1 millisecond");
-    defaultBackoff.withMaxBackoff(Duration.ZERO);
-  }
-
-  @Test
-  public void testInvalidMaxRetries() {
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("maxRetries -1 cannot be negative");
-    defaultBackoff.withMaxRetries(-1);
-  }
-
-  @Test
-  public void testInvalidCumulativeBackoff() {
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("maxCumulativeBackoff PT-0.002S must be at least 1 
millisecond");
-    defaultBackoff.withMaxCumulativeBackoff(Duration.millis(-2));
-  }
-
-  /** Tests with bounded interval, custom exponent, and unlimited retries. */
-  @Test
-  public void testBoundedIntervalWithReset() throws Exception {
-    BackOff backOff =
-        FluentBackoff.DEFAULT
-            .withInitialBackoff(Duration.millis(500))
-            .withMaxBackoff(Duration.standardSeconds(1))
-            .backoff();
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), 
lessThan(751L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), 
lessThan(1126L)));
-    assertThat(
-        backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), 
lessThanOrEqualTo(1500L)));
-    assertThat(
-        backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), 
lessThanOrEqualTo(1500L)));
-    assertThat(
-        backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), 
lessThanOrEqualTo(1500L)));
-    assertThat(
-        backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), 
lessThanOrEqualTo(1500L)));
-    assertThat(
-        backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), 
lessThanOrEqualTo(1500L)));
-
-    // Reset, should go back to short times.
-    backOff.reset();
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), 
lessThan(751L)));
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), 
lessThan(1126L)));
-    assertThat(
-        backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), 
lessThanOrEqualTo(1500L)));
-    assertThat(
-        backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), 
lessThanOrEqualTo(1500L)));
-  }
-
-  /** Tests with bounded interval, custom exponent, limited retries, and a 
reset. */
-  @Test
-  public void testMaxRetriesWithReset() throws Exception {
-    BackOff backOff =
-        
FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(500)).withMaxRetries(1).backoff();
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), 
lessThan(751L)));
-    assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP));
-    assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP));
-    assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP));
-    assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP));
-
-    backOff.reset();
-    assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), 
lessThan(751L)));
-    assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP));
-  }
-
-  private static long countMaximumBackoff(BackOff backOff) throws IOException {
-    long cumulativeBackoffMillis = 0;
-    long currentBackoffMillis = backOff.nextBackOffMillis();
-    while (currentBackoffMillis != BackOff.STOP) {
-      cumulativeBackoffMillis += currentBackoffMillis;
-      currentBackoffMillis = backOff.nextBackOffMillis();
-    }
-    return cumulativeBackoffMillis;
-  }
-
-  /** Tests with bounded interval, custom exponent, limited cumulative time, 
and a reset. */
-  @Test
-  public void testBoundedIntervalAndCumTimeWithReset() throws Exception {
-    BackOff backOff =
-        FluentBackoff.DEFAULT
-            .withInitialBackoff(Duration.millis(500))
-            .withMaxBackoff(Duration.standardSeconds(1))
-            .withMaxCumulativeBackoff(Duration.standardMinutes(1))
-            .backoff();
-
-    assertThat(countMaximumBackoff(backOff), 
equalTo(Duration.standardMinutes(1).getMillis()));
-
-    backOff.reset();
-    assertThat(countMaximumBackoff(backOff), 
equalTo(Duration.standardMinutes(1).getMillis()));
-    // sanity check: should get 0 if we don't reset
-    assertThat(countMaximumBackoff(backOff), equalTo(0L));
-
-    backOff.reset();
-    assertThat(countMaximumBackoff(backOff), 
equalTo(Duration.standardMinutes(1).getMillis()));
-  }
-
-  /** Tests with bounded interval, custom exponent, limited cumulative time 
and retries. */
-  @Test
-  public void testBoundedIntervalAndCumTimeAndRetriesWithReset() throws 
Exception {
-    BackOff backOff =
-        FluentBackoff.DEFAULT
-            .withInitialBackoff(Duration.millis(500))
-            .withMaxBackoff(Duration.standardSeconds(1))
-            .withMaxCumulativeBackoff(Duration.standardMinutes(1))
-            .backoff();
-
-    long cumulativeBackoffMillis = 0;
-    long currentBackoffMillis = backOff.nextBackOffMillis();
-    while (currentBackoffMillis != BackOff.STOP) {
-      cumulativeBackoffMillis += currentBackoffMillis;
-      currentBackoffMillis = backOff.nextBackOffMillis();
-    }
-    assertThat(cumulativeBackoffMillis, 
equalTo(Duration.standardMinutes(1).getMillis()));
-  }
-
-  @Test
-  public void testFluentBackoffToString() throws IOException {
-    FluentBackoff config =
-        FluentBackoff.DEFAULT
-            .withExponent(3.4)
-            .withMaxRetries(4)
-            .withInitialBackoff(Duration.standardSeconds(3))
-            .withMaxBackoff(Duration.standardHours(1))
-            .withMaxCumulativeBackoff(Duration.standardDays(1));
-
-    assertEquals(
-        "FluentBackoff{exponent=3.4, initialBackoff=PT3S, maxBackoff=PT3600S,"
-            + " maxRetries=4, maxCumulativeBackoff=PT86400S}",
-        config.toString());
-  }
-
-  @Test
-  public void testBackoffImplToString() throws IOException {
-    FluentBackoff config =
-        FluentBackoff.DEFAULT
-            .withExponent(3.4)
-            .withMaxRetries(4)
-            .withInitialBackoff(Duration.standardSeconds(3))
-            .withMaxBackoff(Duration.standardHours(1))
-            .withMaxCumulativeBackoff(Duration.standardDays(1));
-    BackOff backOff = config.backoff();
-
-    assertEquals(
-        "BackoffImpl{backoffConfig="
-            + config.toString()
-            + ","
-            + " currentRetry=0, currentCumulativeBackoff=PT0S}",
-        backOff.toString());
-
-    // backoff once, ignoring result
-    backOff.nextBackOffMillis();
-
-    // currentRetry is exact, we can test it.
-    assertThat(backOff.toString(), containsString("currentRetry=1"));
-    // currentCumulativeBackoff is not exact; we cannot even check that it's 
non-zero (randomness).
-  }
-}

Reply via email to