This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 21cc2ba [BEAM-5855] Remove duplicated code files from the Dataflow worker donation (#6821) 21cc2ba is described below commit 21cc2bab72178f1e292b13356b23ed2609a758ba Author: Sam sam <rohde.sam...@gmail.com> AuthorDate: Wed Oct 24 18:47:33 2018 -0700 [BEAM-5855] Remove duplicated code files from the Dataflow worker donation (#6821) * Remove duplicated code files from the Dataflow worker donation. Modify existing files to use deduplicated code. --- .../dataflow/harness/util/ThrowingBiConsumer.java | 31 --- .../dataflow/harness/util/ThrowingBiFunction.java | 31 --- .../dataflow/harness/util/ThrowingConsumer.java | 31 --- .../dataflow/harness/util/ThrowingFunction.java | 31 --- .../dataflow/harness/util/ThrowingRunnable.java | 29 --- .../worker/DataflowBatchWorkerHarness.java | 8 +- .../dataflow/worker/StreamingDataflowWorker.java | 8 +- .../dataflow/worker/WorkerCustomSources.java | 4 +- .../dataflow/worker/util/FluentBackoff.java | 237 --------------------- .../worker/windmill/GrpcWindmillServer.java | 8 +- .../RegisterAndProcessBundleOperationTest.java | 2 +- .../dataflow/worker/util/FluentBackoffTest.java | 218 ------------------- 12 files changed, 15 insertions(+), 623 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingBiConsumer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingBiConsumer.java deleted file mode 100644 index a6c7e1d..0000000 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingBiConsumer.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.harness.util; - -import java.util.function.BiConsumer; - -/** - * A {@link BiConsumer} which can throw {@link Exception}s. - * - * <p>Used to expand the allowed set of method references to be used by Java 8 functional - * interfaces. - */ -@FunctionalInterface -public interface ThrowingBiConsumer<T1, T2> { - void accept(T1 t1, T2 t2) throws Exception; -} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingBiFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingBiFunction.java deleted file mode 100644 index 7db62eb..0000000 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingBiFunction.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.harness.util; - -import java.util.function.BiFunction; - -/** - * A {@link BiFunction} which can throw {@link Exception}s. - * - * <p>Used to expand the allowed set of method references to be used by Java 8 functional - * interfaces. - */ -@FunctionalInterface -public interface ThrowingBiFunction<T1, T2, T3> { - T3 apply(T1 t1, T2 t2) throws Exception; -} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingConsumer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingConsumer.java deleted file mode 100644 index 9209ad5..0000000 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingConsumer.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.harness.util; - -import java.util.function.Consumer; - -/** - * A {@link Consumer} which can throw {@link Exception}s. - * - * <p>Used to expand the allowed set of method references to be used by Java 8 functional - * interfaces. - */ -@FunctionalInterface -public interface ThrowingConsumer<T> { - void accept(T t) throws Exception; -} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingFunction.java deleted file mode 100644 index 187bb7f..0000000 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingFunction.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.harness.util; - -import java.util.function.Function; - -/** - * A {@link Function} which can throw {@link Exception}s. - * - * <p>Used to expand the allowed set of method references to be used by Java 8 functional - * interfaces. - */ -@FunctionalInterface -public interface ThrowingFunction<T1, T2> { - T2 apply(T1 value) throws Exception; -} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingRunnable.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingRunnable.java deleted file mode 100644 index d74d157..0000000 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/harness/util/ThrowingRunnable.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.harness.util; - -/** - * A {@link Runnable} which can throw {@link Exception}s. - * - * <p>Used to expand the allowed set of method references to be used by Java 8 functional - * interfaces. - */ -@FunctionalInterface -public interface ThrowingRunnable { - void run() throws Exception; -} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java index 565bbce..6e07e59 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowBatchWorkerHarness.java @@ -19,9 +19,6 @@ package org.apache.beam.runners.dataflow.worker; import static com.google.common.base.Preconditions.checkArgument; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.BackOffUtils; -import com.google.api.client.util.Sleeper; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.ArrayList; @@ -29,8 +26,11 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; -import org.apache.beam.runners.dataflow.worker.util.FluentBackoff; import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index dd0ee5e..c0f4b44 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -22,9 +22,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.beam.runners.dataflow.DataflowRunner.hasExperiment; import static org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics.THROTTLING_MSECS_METRIC_NAME; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.BackOffUtils; -import com.google.api.client.util.Sleeper; import com.google.api.services.dataflow.model.CounterStructuredName; import com.google.api.services.dataflow.model.CounterUpdate; import com.google.api.services.dataflow.model.MapTask; @@ -114,7 +111,6 @@ import org.apache.beam.runners.dataflow.worker.status.LastExceptionDataProvider; import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider; import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; -import org.apache.beam.runners.dataflow.worker.util.FluentBackoff; import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateSampler; import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutionStateTracker; @@ -127,6 +123,10 @@ import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.GetWo import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.StreamPool; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; import org.apache.beam.sdk.util.Transport; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java index 128834c..4597018 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java @@ -26,7 +26,6 @@ import static org.apache.beam.runners.dataflow.util.Structs.getStrings; import static org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray; import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray; -import com.google.api.client.util.BackOff; import com.google.api.client.util.Base64; import com.google.api.services.dataflow.model.ApproximateReportedProgress; import com.google.api.services.dataflow.model.ApproximateSplitRequest; @@ -54,13 +53,14 @@ import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.internal.CustomSources; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.CloudObject; -import org.apache.beam.runners.dataflow.worker.util.FluentBackoff; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.ValueWithRecordId; import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.ByteString; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/FluentBackoff.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/FluentBackoff.java deleted file mode 100644 index 8a1cc1e..0000000 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/FluentBackoff.java +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.worker.util; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.api.client.util.BackOff; -import com.google.common.base.MoreObjects; -import org.joda.time.Duration; - -/** - * A fluent builder for {@link BackOff} objects that allows customization of the retry algorithm. - * - * @see #DEFAULT for the default configuration parameters. - */ -public final class FluentBackoff { - - private static final double DEFAULT_EXPONENT = 1.5; - private static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5; - private static final Duration DEFAULT_MIN_BACKOFF = Duration.standardSeconds(1); - private static final Duration DEFAULT_MAX_BACKOFF = Duration.standardDays(1000); - private static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE; - private static final Duration DEFAULT_MAX_CUM_BACKOFF = Duration.standardDays(1000); - - private final double exponent; - private final Duration initialBackoff; - private final Duration maxBackoff; - private final Duration maxCumulativeBackoff; - private final int maxRetries; - - /** - * By default the {@link BackOff} created by this builder will use exponential backoff (base - * exponent 1.5) with an initial backoff of 1 second. These parameters can be overridden with - * {@link #withExponent(double)} and {@link #withInitialBackoff(Duration)}, respectively, and the - * maximum backoff after exponential increase can be capped using {@link - * FluentBackoff#withMaxBackoff(Duration)}. - * - * <p>The default {@link BackOff} does not limit the number of retries. To limit the backoff, the - * maximum total number of retries can be set using {@link #withMaxRetries(int)}. The total time - * spent in backoff can be time-bounded as well by configuring {@link - * #withMaxCumulativeBackoff(Duration)}. If either of these limits are reached, calls to {@link - * BackOff#nextBackOffMillis()} will return {@link BackOff#STOP} to signal that no more retries - * should continue. - */ - public static final FluentBackoff DEFAULT = - new FluentBackoff( - DEFAULT_EXPONENT, - DEFAULT_MIN_BACKOFF, - DEFAULT_MAX_BACKOFF, - DEFAULT_MAX_CUM_BACKOFF, - DEFAULT_MAX_RETRIES); - - /** - * Instantiates a {@link BackOff} that will obey the current configuration. - * - * @see FluentBackoff - */ - public BackOff backoff() { - return new BackoffImpl(this); - } - - /** - * Returns a copy of this {@link FluentBackoff} that instead uses the specified exponent to - * control the exponential growth of delay. - * - * <p>Does not modify this object. - * - * @see FluentBackoff - */ - public FluentBackoff withExponent(double exponent) { - checkArgument(exponent > 0, "exponent %s must be greater than 0", exponent); - return new FluentBackoff( - exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries); - } - - /** - * Returns a copy of this {@link FluentBackoff} that instead uses the specified initial backoff - * duration. - * - * <p>Does not modify this object. - * - * @see FluentBackoff - */ - public FluentBackoff withInitialBackoff(Duration initialBackoff) { - checkArgument( - initialBackoff.isLongerThan(Duration.ZERO), - "initialBackoff %s must be at least 1 millisecond", - initialBackoff); - return new FluentBackoff( - exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries); - } - - /** - * Returns a copy of this {@link FluentBackoff} that limits the maximum backoff of an individual - * attempt to the specified duration. - * - * <p>Does not modify this object. - * - * @see FluentBackoff - */ - public FluentBackoff withMaxBackoff(Duration maxBackoff) { - checkArgument( - maxBackoff.getMillis() > 0, "maxBackoff %s must be at least 1 millisecond", maxBackoff); - return new FluentBackoff( - exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries); - } - - /** - * Returns a copy of this {@link FluentBackoff} that limits the total time spent in backoff - * returned across all calls to {@link BackOff#nextBackOffMillis()}. - * - * <p>Does not modify this object. - * - * @see FluentBackoff - */ - public FluentBackoff withMaxCumulativeBackoff(Duration maxCumulativeBackoff) { - checkArgument( - maxCumulativeBackoff.isLongerThan(Duration.ZERO), - "maxCumulativeBackoff %s must be at least 1 millisecond", - maxCumulativeBackoff); - return new FluentBackoff( - exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries); - } - - /** - * Returns a copy of this {@link FluentBackoff} that limits the total number of retries, aka the - * total number of calls to {@link BackOff#nextBackOffMillis()} before returning {@link - * BackOff#STOP}. - * - * <p>Does not modify this object. - * - * @see FluentBackoff - */ - public FluentBackoff withMaxRetries(int maxRetries) { - checkArgument(maxRetries >= 0, "maxRetries %s cannot be negative", maxRetries); - return new FluentBackoff( - exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(FluentBackoff.class) - .add("exponent", exponent) - .add("initialBackoff", initialBackoff) - .add("maxBackoff", maxBackoff) - .add("maxRetries", maxRetries) - .add("maxCumulativeBackoff", maxCumulativeBackoff) - .toString(); - } - - private static class BackoffImpl implements BackOff { - - // Customization of this backoff. - private final FluentBackoff backoffConfig; - // Current state - private Duration currentCumulativeBackoff; - private int currentRetry; - - @Override - public void reset() { - currentRetry = 0; - currentCumulativeBackoff = Duration.ZERO; - } - - @Override - public long nextBackOffMillis() { - // Maximum number of retries reached. - if (currentRetry >= backoffConfig.maxRetries) { - return BackOff.STOP; - } - // Maximum cumulative backoff reached. - if (currentCumulativeBackoff.compareTo(backoffConfig.maxCumulativeBackoff) >= 0) { - return BackOff.STOP; - } - - double currentIntervalMillis = - Math.min( - backoffConfig.initialBackoff.getMillis() - * Math.pow(backoffConfig.exponent, currentRetry), - backoffConfig.maxBackoff.getMillis()); - double randomOffset = - (Math.random() * 2 - 1) * DEFAULT_RANDOMIZATION_FACTOR * currentIntervalMillis; - long nextBackoffMillis = Math.round(currentIntervalMillis + randomOffset); - // Cap to limit on cumulative backoff - Duration remainingCumulative = - backoffConfig.maxCumulativeBackoff.minus(currentCumulativeBackoff); - nextBackoffMillis = Math.min(nextBackoffMillis, remainingCumulative.getMillis()); - - // Update state and return backoff. - currentCumulativeBackoff = currentCumulativeBackoff.plus(nextBackoffMillis); - currentRetry += 1; - return nextBackoffMillis; - } - - private BackoffImpl(FluentBackoff backoffConfig) { - this.backoffConfig = backoffConfig; - this.reset(); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(BackoffImpl.class) - .add("backoffConfig", backoffConfig) - .add("currentRetry", currentRetry) - .add("currentCumulativeBackoff", currentCumulativeBackoff) - .toString(); - } - } - - private FluentBackoff( - double exponent, - Duration initialBackoff, - Duration maxBackoff, - Duration maxCumulativeBackoff, - int maxRetries) { - this.exponent = exponent; - this.initialBackoff = initialBackoff; - this.maxBackoff = maxBackoff; - this.maxRetries = maxRetries; - this.maxCumulativeBackoff = maxCumulativeBackoff; - } -} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java index a3974d6..25b2d78 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java @@ -17,9 +17,6 @@ */ package org.apache.beam.runners.dataflow.worker.windmill; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.BackOffUtils; -import com.google.api.client.util.Sleeper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; @@ -60,7 +57,6 @@ import java.util.function.Supplier; import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils; import org.apache.beam.runners.dataflow.worker.options.StreamingDataflowWorkerOptions; -import org.apache.beam.runners.dataflow.worker.util.FluentBackoff; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitStatus; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkRequest; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkResponse; @@ -88,6 +84,10 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetWor import org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetWorkResponseChunk; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; import org.apache.beam.vendor.grpc.v1.io.grpc.CallCredentials; import org.apache.beam.vendor.grpc.v1.io.grpc.Channel; import org.apache.beam.vendor.grpc.v1.io.grpc.StatusRuntimeException; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java index 6bd0d96..3ca9c7d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/RegisterAndProcessBundleOperationTest.java @@ -61,7 +61,6 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse; import org.apache.beam.runners.core.InMemoryMultimapSideInputView; import org.apache.beam.runners.core.InMemoryStateInternals; import org.apache.beam.runners.core.SideInputReader; -import org.apache.beam.runners.dataflow.harness.util.ThrowingRunnable; import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext; import org.apache.beam.runners.dataflow.worker.DataflowPortabilityPCollectionView; import org.apache.beam.runners.dataflow.worker.fn.IdGenerator; @@ -76,6 +75,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.MoreFutures; +import org.apache.beam.sdk.util.ThrowingRunnable; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/FluentBackoffTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/FluentBackoffTest.java deleted file mode 100644 index ad20448..0000000 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/FluentBackoffTest.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.worker.util; - -import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.lessThan; -import static org.hamcrest.Matchers.lessThanOrEqualTo; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - -import com.google.api.client.util.BackOff; -import java.io.IOException; -import org.joda.time.Duration; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link FluentBackoff}. */ -@RunWith(JUnit4.class) -public class FluentBackoffTest { - - @Rule public ExpectedException thrown = ExpectedException.none(); - private final FluentBackoff defaultBackoff = FluentBackoff.DEFAULT; - - @Test - public void testInvalidExponent() { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("exponent -2.0 must be greater than 0"); - defaultBackoff.withExponent(-2.0); - } - - @Test - public void testInvalidInitialBackoff() { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("initialBackoff PT0S must be at least 1 millisecond"); - defaultBackoff.withInitialBackoff(Duration.ZERO); - } - - @Test - public void testInvalidMaxBackoff() { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("maxBackoff PT0S must be at least 1 millisecond"); - defaultBackoff.withMaxBackoff(Duration.ZERO); - } - - @Test - public void testInvalidMaxRetries() { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("maxRetries -1 cannot be negative"); - defaultBackoff.withMaxRetries(-1); - } - - @Test - public void testInvalidCumulativeBackoff() { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("maxCumulativeBackoff PT-0.002S must be at least 1 millisecond"); - defaultBackoff.withMaxCumulativeBackoff(Duration.millis(-2)); - } - - /** Tests with bounded interval, custom exponent, and unlimited retries. */ - @Test - public void testBoundedIntervalWithReset() throws Exception { - BackOff backOff = - FluentBackoff.DEFAULT - .withInitialBackoff(Duration.millis(500)) - .withMaxBackoff(Duration.standardSeconds(1)) - .backoff(); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), lessThan(1126L))); - assertThat( - backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), lessThanOrEqualTo(1500L))); - assertThat( - backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), lessThanOrEqualTo(1500L))); - assertThat( - backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), lessThanOrEqualTo(1500L))); - assertThat( - backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), lessThanOrEqualTo(1500L))); - assertThat( - backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), lessThanOrEqualTo(1500L))); - - // Reset, should go back to short times. - backOff.reset(); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), lessThan(1126L))); - assertThat( - backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), lessThanOrEqualTo(1500L))); - assertThat( - backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), lessThanOrEqualTo(1500L))); - } - - /** Tests with bounded interval, custom exponent, limited retries, and a reset. */ - @Test - public void testMaxRetriesWithReset() throws Exception { - BackOff backOff = - FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(500)).withMaxRetries(1).backoff(); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L))); - assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP)); - assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP)); - assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP)); - assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP)); - - backOff.reset(); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L))); - assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP)); - } - - private static long countMaximumBackoff(BackOff backOff) throws IOException { - long cumulativeBackoffMillis = 0; - long currentBackoffMillis = backOff.nextBackOffMillis(); - while (currentBackoffMillis != BackOff.STOP) { - cumulativeBackoffMillis += currentBackoffMillis; - currentBackoffMillis = backOff.nextBackOffMillis(); - } - return cumulativeBackoffMillis; - } - - /** Tests with bounded interval, custom exponent, limited cumulative time, and a reset. */ - @Test - public void testBoundedIntervalAndCumTimeWithReset() throws Exception { - BackOff backOff = - FluentBackoff.DEFAULT - .withInitialBackoff(Duration.millis(500)) - .withMaxBackoff(Duration.standardSeconds(1)) - .withMaxCumulativeBackoff(Duration.standardMinutes(1)) - .backoff(); - - assertThat(countMaximumBackoff(backOff), equalTo(Duration.standardMinutes(1).getMillis())); - - backOff.reset(); - assertThat(countMaximumBackoff(backOff), equalTo(Duration.standardMinutes(1).getMillis())); - // sanity check: should get 0 if we don't reset - assertThat(countMaximumBackoff(backOff), equalTo(0L)); - - backOff.reset(); - assertThat(countMaximumBackoff(backOff), equalTo(Duration.standardMinutes(1).getMillis())); - } - - /** Tests with bounded interval, custom exponent, limited cumulative time and retries. */ - @Test - public void testBoundedIntervalAndCumTimeAndRetriesWithReset() throws Exception { - BackOff backOff = - FluentBackoff.DEFAULT - .withInitialBackoff(Duration.millis(500)) - .withMaxBackoff(Duration.standardSeconds(1)) - .withMaxCumulativeBackoff(Duration.standardMinutes(1)) - .backoff(); - - long cumulativeBackoffMillis = 0; - long currentBackoffMillis = backOff.nextBackOffMillis(); - while (currentBackoffMillis != BackOff.STOP) { - cumulativeBackoffMillis += currentBackoffMillis; - currentBackoffMillis = backOff.nextBackOffMillis(); - } - assertThat(cumulativeBackoffMillis, equalTo(Duration.standardMinutes(1).getMillis())); - } - - @Test - public void testFluentBackoffToString() throws IOException { - FluentBackoff config = - FluentBackoff.DEFAULT - .withExponent(3.4) - .withMaxRetries(4) - .withInitialBackoff(Duration.standardSeconds(3)) - .withMaxBackoff(Duration.standardHours(1)) - .withMaxCumulativeBackoff(Duration.standardDays(1)); - - assertEquals( - "FluentBackoff{exponent=3.4, initialBackoff=PT3S, maxBackoff=PT3600S," - + " maxRetries=4, maxCumulativeBackoff=PT86400S}", - config.toString()); - } - - @Test - public void testBackoffImplToString() throws IOException { - FluentBackoff config = - FluentBackoff.DEFAULT - .withExponent(3.4) - .withMaxRetries(4) - .withInitialBackoff(Duration.standardSeconds(3)) - .withMaxBackoff(Duration.standardHours(1)) - .withMaxCumulativeBackoff(Duration.standardDays(1)); - BackOff backOff = config.backoff(); - - assertEquals( - "BackoffImpl{backoffConfig=" - + config.toString() - + "," - + " currentRetry=0, currentCumulativeBackoff=PT0S}", - backOff.toString()); - - // backoff once, ignoring result - backOff.nextBackOffMillis(); - - // currentRetry is exact, we can test it. - assertThat(backOff.toString(), containsString("currentRetry=1")); - // currentCumulativeBackoff is not exact; we cannot even check that it's non-zero (randomness). - } -}