Repository: beam Updated Branches: refs/heads/master 5bfd3e049 -> 4682238dc
http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java deleted file mode 100644 index cfaf0a6..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.base.MoreObjects; -import java.io.Serializable; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.beam.sdk.transforms.Combine.CombineFn; - -/** - * An {@link Aggregator} that delegates calls to {@link #addValue} to another aggregator. - * - * <p>This {@link Aggregator} is designed to be constructed without a delegate, at pipeline - * construction time, and serialized within a {@link DoFn}. The delegate aggregator to which it - * submits values must be provided by the runner at execution time. - * - * @param <AggInputT> the type of input element - * @param <AggOutputT> the type of output element - */ -public class DelegatingAggregator<AggInputT, AggOutputT> - implements Aggregator<AggInputT, AggOutputT>, Serializable { - private static final AtomicInteger ID_GEN = new AtomicInteger(); - private final int id; - - private final String name; - - private final CombineFn<AggInputT, ?, AggOutputT> combineFn; - - private Aggregator<AggInputT, ?> delegate; - - public DelegatingAggregator(String name, - CombineFn<? super AggInputT, ?, AggOutputT> combiner) { - this.id = ID_GEN.getAndIncrement(); - this.name = checkNotNull(name, "name cannot be null"); - // Safe contravariant cast - @SuppressWarnings("unchecked") - CombineFn<AggInputT, ?, AggOutputT> specificCombiner = - (CombineFn<AggInputT, ?, AggOutputT>) checkNotNull(combiner, "combineFn cannot be null"); - this.combineFn = specificCombiner; - } - - @Override - public void addValue(AggInputT value) { - if (delegate == null) { - throw new IllegalStateException( - String.format( - "addValue cannot be called on Aggregator outside of the execution of a %s.", - DoFn.class.getSimpleName())); - } else { - delegate.addValue(value); - } - } - - @Override - public String getName() { - return name; - } - - @Override - public CombineFn<AggInputT, ?, AggOutputT> getCombineFn() { - return combineFn; - } - - /** - * Sets the current delegate of the Aggregator. - * - * @param delegate the delegate to set in this aggregator - */ - public void setDelegate(Aggregator<AggInputT, ?> delegate) { - this.delegate = delegate; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("name", name) - .add("combineFn", combineFn) - .toString(); - } - - @Override - public int hashCode() { - return Objects.hash(id, name, combineFn.getClass()); - } - - /** - * Indicates whether some other object is "equal to" this one. - * - * <p>{@code DelegatingAggregator} instances are equal if they have the same name, their - * CombineFns are the same class, and they have identical IDs. - */ - @Override - public boolean equals(Object o) { - if (o == this) { - return true; - } - if (o == null) { - return false; - } - if (o instanceof DelegatingAggregator) { - DelegatingAggregator<?, ?> that = (DelegatingAggregator<?, ?>) o; - return Objects.equals(this.id, that.id) - && Objects.equals(this.name, that.name) - && Objects.equals(this.combineFn.getClass(), that.combineFn.getClass()); - } - return false; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java index 4cb1142..d3ebbb7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java @@ -47,8 +47,6 @@ import org.apache.beam.sdk.values.TimestampedValue; * <p>Example 2: track a latest computed value in an aggregator: * <pre>{@code * class MyDoFn extends DoFn<String, String> { - * private Aggregator<TimestampedValue<Double>, Double> latestValue = - * createAggregator("latestValue", new Latest.LatestFn<Double>()); * * {@literal @}ProcessElement * public void processElement(ProcessContext c) { http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index 72cba79..f29aeb9 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -43,7 +43,6 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import org.apache.beam.fn.harness.data.BeamFnDataClient; -import org.apache.beam.fn.harness.fake.FakeAggregatorFactory; import org.apache.beam.fn.harness.fake.FakeStepContext; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; @@ -316,7 +315,6 @@ public class ProcessBundleHandler { (TupleTag) doFnInfo.getOutputMap().get(doFnInfo.getMainOutput()), new ArrayList<>(doFnInfo.getOutputMap().values()), new FakeStepContext(), - new FakeAggregatorFactory(), (WindowingStrategy) doFnInfo.getWindowingStrategy()); return runner; } http://git-wip-us.apache.org/repos/asf/beam/blob/615761a7/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java deleted file mode 100644 index b3b7f48..0000000 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeAggregatorFactory.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.fn.harness.fake; - -import org.apache.beam.runners.core.AggregatorFactory; -import org.apache.beam.runners.core.ExecutionContext.StepContext; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine.CombineFn; - -/** - * A fake implementation of an {@link AggregatorFactory} that is to be filled in at a later time. - * The factory returns {@link Aggregator}s that do nothing when a value is added. - */ -public class FakeAggregatorFactory implements AggregatorFactory { - @Override - public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( - Class<?> fnClass, - StepContext stepContext, - String aggregatorName, - CombineFn<InputT, AccumT, OutputT> combine) { - return new Aggregator<InputT, OutputT>() { - @Override - public void addValue(InputT value) {} - - @Override - public String getName() { - return aggregatorName; - } - - @Override - public CombineFn<InputT, ?, OutputT> getCombineFn() { - return combine; - } - }; - } -}