Repository: incubator-beam Updated Branches: refs/heads/master 92ff63d3b -> 04a41ee54
Remove misc occurrences of OldDoFn Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/44e17d1c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/44e17d1c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/44e17d1c Branch: refs/heads/master Commit: 44e17d1c97babd487584cc78690505bdf57704b2 Parents: 92ff63d Author: Kenneth Knowles <k...@google.com> Authored: Wed Dec 7 14:17:01 2016 -0800 Committer: Luke Cwik <lc...@google.com> Committed: Thu Dec 8 09:03:59 2016 -0800 ---------------------------------------------------------------------- .../beam/sdk/AggregatorPipelineExtractor.java | 5 ++-- .../sdk/transforms/AggregatorRetriever.java | 2 +- .../org/apache/beam/sdk/transforms/Combine.java | 4 +-- .../apache/beam/sdk/util/ExecutionContext.java | 8 +++--- .../sdk/AggregatorPipelineExtractorTest.java | 20 +++++++------- .../beam/sdk/transforms/DoFnTesterTest.java | 2 +- .../beam/sdk/transforms/ParDoLifecycleTest.java | 28 ++++++++++---------- 7 files changed, 35 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java index d2130d0..ade5978 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java @@ -70,9 +70,10 @@ class AggregatorPipelineExtractor { private Collection<Aggregator<?, ?>> getAggregators(PTransform<?, ?> transform) { if (transform != null) { if (transform instanceof ParDo.Bound) { - return AggregatorRetriever.getAggregators(((ParDo.Bound<?, ?>) transform).getFn()); + return AggregatorRetriever.getAggregators(((ParDo.Bound<?, ?>) transform).getNewFn()); } else if (transform instanceof ParDo.BoundMulti) { - return AggregatorRetriever.getAggregators(((ParDo.BoundMulti<?, ?>) transform).getFn()); + return AggregatorRetriever.getAggregators( + ((ParDo.BoundMulti<?, ?>) transform).getNewFn()); } } return Collections.emptyList(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java index abed843..ce47e22 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java @@ -30,7 +30,7 @@ public final class AggregatorRetriever { /** * Returns the {@link Aggregator Aggregators} created by the provided {@link OldDoFn}. */ - public static Collection<Aggregator<?, ?>> getAggregators(OldDoFn<?, ?> fn) { + public static Collection<Aggregator<?, ?>> getAggregators(DoFn<?, ?> fn) { return fn.getAggregators(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index be063e2..4127d94 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -2392,8 +2392,8 @@ public class Combine { PCollection<? extends KV<K, ? extends Iterable<InputT>>> input) { PCollection<KV<K, OutputT>> output = input.apply(ParDo.of( - new OldDoFn<KV<K, ? extends Iterable<InputT>>, KV<K, OutputT>>() { - @Override + new DoFn<KV<K, ? extends Iterable<InputT>>, KV<K, OutputT>>() { + @ProcessElement public void processElement(final ProcessContext c) { K key = c.element().getKey(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java index f2a79bd..4429d76 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java @@ -41,14 +41,14 @@ public interface ExecutionContext { /** * Hook for subclasses to implement that will be called whenever - * {@link org.apache.beam.sdk.transforms.OldDoFn.Context#output} + * {@link org.apache.beam.sdk.transforms.DoFn.Context#output} * is called. */ void noteOutput(WindowedValue<?> output); /** * Hook for subclasses to implement that will be called whenever - * {@link org.apache.beam.sdk.transforms.OldDoFn.Context#sideOutput} + * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput} * is called. */ void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output); @@ -70,14 +70,14 @@ public interface ExecutionContext { /** * Hook for subclasses to implement that will be called whenever - * {@link org.apache.beam.sdk.transforms.OldDoFn.Context#output} + * {@link org.apache.beam.sdk.transforms.DoFn.Context#output} * is called. */ void noteOutput(WindowedValue<?> output); /** * Hook for subclasses to implement that will be called whenever - * {@link org.apache.beam.sdk.transforms.OldDoFn.Context#sideOutput} + * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput} * is called. */ void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java index b4de768..c4e9b8a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java @@ -33,9 +33,9 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Max; import org.apache.beam.sdk.transforms.Min; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; @@ -68,7 +68,7 @@ public class AggregatorPipelineExtractorTest { @SuppressWarnings("rawtypes") ParDo.Bound bound = mock(ParDo.Bound.class, "Bound"); AggregatorProvidingDoFn<ThreadGroup, StrictMath> fn = new AggregatorProvidingDoFn<>(); - when(bound.getFn()).thenReturn(fn); + when(bound.getNewFn()).thenReturn(fn); Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn()); Aggregator<Integer, Integer> aggregatorTwo = fn.addAggregator(new Min.MinIntegerFn()); @@ -96,7 +96,7 @@ public class AggregatorPipelineExtractorTest { @SuppressWarnings("rawtypes") ParDo.BoundMulti bound = mock(ParDo.BoundMulti.class, "BoundMulti"); AggregatorProvidingDoFn<Object, Void> fn = new AggregatorProvidingDoFn<>(); - when(bound.getFn()).thenReturn(fn); + when(bound.getNewFn()).thenReturn(fn); Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Max.MaxLongFn()); Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new Min.MinDoubleFn()); @@ -126,8 +126,8 @@ public class AggregatorPipelineExtractorTest { @SuppressWarnings("rawtypes") ParDo.BoundMulti otherBound = mock(ParDo.BoundMulti.class, "otherBound"); AggregatorProvidingDoFn<String, Math> fn = new AggregatorProvidingDoFn<>(); - when(bound.getFn()).thenReturn(fn); - when(otherBound.getFn()).thenReturn(fn); + when(bound.getNewFn()).thenReturn(fn); + when(otherBound.getNewFn()).thenReturn(fn); Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn()); Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new Min.MinDoubleFn()); @@ -162,7 +162,7 @@ public class AggregatorPipelineExtractorTest { AggregatorProvidingDoFn<ThreadGroup, Void> fn = new AggregatorProvidingDoFn<>(); Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new Sum.SumLongFn()); - when(bound.getFn()).thenReturn(fn); + when(bound.getNewFn()).thenReturn(fn); @SuppressWarnings("rawtypes") ParDo.BoundMulti otherBound = mock(ParDo.BoundMulti.class, "otherBound"); @@ -170,7 +170,7 @@ public class AggregatorPipelineExtractorTest { AggregatorProvidingDoFn<Long, Long> otherFn = new AggregatorProvidingDoFn<>(); Aggregator<Double, Double> aggregatorTwo = otherFn.addAggregator(new Sum.SumDoubleFn()); - when(otherBound.getFn()).thenReturn(otherFn); + when(otherBound.getNewFn()).thenReturn(otherFn); TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class); when(transformNode.getTransform()).thenReturn(bound); @@ -208,7 +208,7 @@ public class AggregatorPipelineExtractorTest { } } - private static class AggregatorProvidingDoFn<InT, OuT> extends OldDoFn<InT, OuT> { + private static class AggregatorProvidingDoFn<InT, OuT> extends DoFn<InT, OuT> { public <InputT, OutT> Aggregator<InputT, OutT> addAggregator( CombineFn<InputT, ?, OutT> combiner) { return createAggregator(randomName(), combiner); @@ -218,8 +218,8 @@ public class AggregatorPipelineExtractorTest { return UUID.randomUUID().toString(); } - @Override - public void processElement(OldDoFn<InT, OuT>.ProcessContext c) throws Exception { + @ProcessElement + public void processElement(DoFn<InT, OuT>.ProcessContext c) throws Exception { fail(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java index b47465e..2dafa27 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java @@ -399,7 +399,7 @@ public class DoFnTesterTest { /** * A {@link DoFn} that adds values to an aggregator and converts input to String in - * {@link OldDoFn#processElement}. + * {@link DoFn.ProcessElement @ProcessElement}. */ private static class CounterDoFn extends DoFn<Long, String> { Aggregator<Long, Long> agg = createAggregator("ctr", new Sum.SumLongFn()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/44e17d1c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java index f69c867..9bc8a64 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java @@ -50,7 +50,7 @@ public class ParDoLifecycleTest implements Serializable { PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4))) .and(p.apply("Polite", Create.of(3, 5, 6, 7))) .apply(Flatten.<Integer>pCollections()) - .apply(ParDo.of(new CallSequenceEnforcingOldFn<Integer>())); + .apply(ParDo.of(new CallSequenceEnforcingDoFn<Integer>())); p.run(); } @@ -62,19 +62,19 @@ public class ParDoLifecycleTest implements Serializable { PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4))) .and(p.apply("Polite", Create.of(3, 5, 6, 7))) .apply(Flatten.<Integer>pCollections()) - .apply(ParDo.of(new CallSequenceEnforcingOldFn<Integer>()) + .apply(ParDo.of(new CallSequenceEnforcingDoFn<Integer>()) .withOutputTags(new TupleTag<Integer>() {}, TupleTagList.empty())); p.run(); } - private static class CallSequenceEnforcingOldFn<T> extends OldDoFn<T, T> { + private static class CallSequenceEnforcingDoFn<T> extends DoFn<T, T> { private boolean setupCalled = false; private int startBundleCalls = 0; private int finishBundleCalls = 0; private boolean teardownCalled = false; - @Override + @Setup public void setup() { assertThat("setup should not be called twice", setupCalled, is(false)); assertThat("setup should be called before startBundle", startBundleCalls, equalTo(0)); @@ -83,7 +83,7 @@ public class ParDoLifecycleTest implements Serializable { setupCalled = true; } - @Override + @StartBundle public void startBundle(Context c) { assertThat("setup should have been called", setupCalled, is(true)); assertThat( @@ -94,7 +94,7 @@ public class ParDoLifecycleTest implements Serializable { startBundleCalls++; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { assertThat("startBundle should have been called", startBundleCalls, greaterThan(0)); assertThat( @@ -104,7 +104,7 @@ public class ParDoLifecycleTest implements Serializable { assertThat("teardown should not have been called", teardownCalled, is(false)); } - @Override + @FinishBundle public void finishBundle(Context c) { assertThat("startBundle should have been called", startBundleCalls, greaterThan(0)); assertThat( @@ -115,7 +115,7 @@ public class ParDoLifecycleTest implements Serializable { finishBundleCalls++; } - @Override + @Teardown public void teardown() { assertThat(setupCalled, is(true)); assertThat(startBundleCalls, anyOf(equalTo(finishBundleCalls))); @@ -345,7 +345,7 @@ public class ParDoLifecycleTest implements Serializable { } } - private static class ExceptionThrowingOldFn extends OldDoFn<Object, Object> { + private static class ExceptionThrowingOldFn extends DoFn<Object, Object> { static AtomicBoolean teardownCalled = new AtomicBoolean(false); private final MethodForException toThrow; @@ -355,22 +355,22 @@ public class ParDoLifecycleTest implements Serializable { this.toThrow = toThrow; } - @Override + @Setup public void setup() throws Exception { throwIfNecessary(MethodForException.SETUP); } - @Override + @StartBundle public void startBundle(Context c) throws Exception { throwIfNecessary(MethodForException.START_BUNDLE); } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { throwIfNecessary(MethodForException.PROCESS_ELEMENT); } - @Override + @FinishBundle public void finishBundle(Context c) throws Exception { throwIfNecessary(MethodForException.FINISH_BUNDLE); } @@ -382,7 +382,7 @@ public class ParDoLifecycleTest implements Serializable { } } - @Override + @Teardown public void teardown() { if (!thrown) { fail("Excepted to have a processing method throw an exception");