http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java index 77c857c..7917aec 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java @@ -23,8 +23,8 @@ import static com.google.common.base.Preconditions.checkNotNull; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.VarInt; import com.google.common.base.MoreObjects; @@ -38,8 +38,8 @@ import java.util.Objects; /** * Provides information about the pane an element belongs to. Every pane is implicitly associated * with a window. Panes are observable only via the - * {@link org.apache.beam.sdk.transforms.DoFn.ProcessContext#pane} method of the context - * passed to a {@link DoFn#processElement} overridden method. + * {@link OldDoFn.ProcessContext#pane} method of the context + * passed to a {@link OldDoFn#processElement} overridden method. * * <p>Note: This does not uniquely identify a pane, and should not be used for comparisons. */ @@ -74,8 +74,8 @@ public final class PaneInfo { * definitions: * <ol> * <li>We'll call a pipeline 'simple' if it does not use - * {@link org.apache.beam.sdk.transforms.DoFn.Context#outputWithTimestamp} in - * any {@code DoFn}, and it uses the same + * {@link OldDoFn.Context#outputWithTimestamp} in + * any {@code OldDoFn}, and it uses the same * {@link org.apache.beam.sdk.transforms.windowing.Window.Bound#withAllowedLateness} * argument value on all windows (or uses the default of {@link org.joda.time.Duration#ZERO}). * <li>We'll call an element 'locally late', from the point of view of a computation on a
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index fe8b66f..03ff481 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -21,8 +21,8 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -645,7 +645,7 @@ public class Window { // We first apply a (trivial) transform to the input PCollection to produce a new // PCollection. This ensures that we don't modify the windowing strategy of the input // which may be used elsewhere. - .apply("Identity", ParDo.of(new DoFn<T, T>() { + .apply("Identity", ParDo.of(new OldDoFn<T, T>() { @Override public void processElement(ProcessContext c) { c.output(c.element()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java index a62444f..dd36367 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java @@ -107,7 +107,7 @@ public abstract class BaseExecutionContext<T extends ExecutionContext.StepContex /** * Hook for subclasses to implement that will be called whenever - * {@link org.apache.beam.sdk.transforms.DoFn.Context#output} + * {@link OldDoFn.Context#output} * is called. */ @Override @@ -115,7 +115,7 @@ public abstract class BaseExecutionContext<T extends ExecutionContext.StepContex /** * Hook for subclasses to implement that will be called whenever - * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput} + * {@link OldDoFn.Context#sideOutput} * is called. */ @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BucketingFunction.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BucketingFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BucketingFunction.java index ce35c24..e14aec8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BucketingFunction.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BucketingFunction.java @@ -21,6 +21,7 @@ package org.apache.beam.sdk.util; import static com.google.common.base.Preconditions.checkState; import org.apache.beam.sdk.transforms.Combine; + import java.util.HashMap; import java.util.Map; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java index f73fae3..149d276 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java @@ -19,7 +19,7 @@ package org.apache.beam.sdk.util; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineWithContext.Context; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.state.StateContext; import org.apache.beam.sdk.values.PCollectionView; @@ -49,9 +49,9 @@ public class CombineContextFactory { } /** - * Returns a {@code Combine.Context} that wraps a {@code DoFn.ProcessContext}. + * Returns a {@code Combine.Context} that wraps a {@code OldDoFn.ProcessContext}. */ - public static Context createFromProcessContext(final DoFn<?, ?>.ProcessContext c) { + public static Context createFromProcessContext(final OldDoFn<?, ?>.ProcessContext c) { return new Context() { @Override public PipelineOptions getPipelineOptions() { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java index 01bde82..1c2f554 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java @@ -42,14 +42,14 @@ public interface ExecutionContext { /** * Hook for subclasses to implement that will be called whenever - * {@link org.apache.beam.sdk.transforms.DoFn.Context#output} + * {@link OldDoFn.Context#output} * is called. */ void noteOutput(WindowedValue<?> output); /** * Hook for subclasses to implement that will be called whenever - * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput} + * {@link OldDoFn.Context#sideOutput} * is called. */ void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output); @@ -71,14 +71,14 @@ public interface ExecutionContext { /** * Hook for subclasses to implement that will be called whenever - * {@link org.apache.beam.sdk.transforms.DoFn.Context#output} + * {@link OldDoFn.Context#output} * is called. */ void noteOutput(WindowedValue<?> output); /** * Hook for subclasses to implement that will be called whenever - * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput} + * {@link OldDoFn.Context#sideOutput} * is called. */ void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java index 96802ae..eb0a91a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java @@ -21,6 +21,7 @@ package org.apache.beam.sdk.util; import static com.google.common.base.Preconditions.checkArgument; import org.apache.beam.sdk.transforms.Combine; + import java.util.Arrays; /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunner.java index 9dc4f68..ae3d391 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunner.java @@ -19,7 +19,7 @@ package org.apache.beam.sdk.util; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import java.io.Serializable; @@ -43,62 +43,62 @@ public interface PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> extends Seria ///////////////////////////////////////////////////////////////////////////// /** - * Forwards the call to a {@link PerKeyCombineFn} to create the accumulator in a {@link DoFn}. + * Forwards the call to a {@link PerKeyCombineFn} to create the accumulator in a {@link OldDoFn}. * - * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext} + * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} * if it is required. */ - public AccumT createAccumulator(K key, DoFn<?, ?>.ProcessContext c); + public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c); /** - * Forwards the call to a {@link PerKeyCombineFn} to add the input in a {@link DoFn}. + * Forwards the call to a {@link PerKeyCombineFn} to add the input in a {@link OldDoFn}. * - * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext} + * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} * if it is required. */ - public AccumT addInput(K key, AccumT accumulator, InputT input, DoFn<?, ?>.ProcessContext c); + public AccumT addInput(K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c); /** - * Forwards the call to a {@link PerKeyCombineFn} to merge accumulators in a {@link DoFn}. + * Forwards the call to a {@link PerKeyCombineFn} to merge accumulators in a {@link OldDoFn}. * - * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext} + * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} * if it is required. */ public AccumT mergeAccumulators( - K key, Iterable<AccumT> accumulators, DoFn<?, ?>.ProcessContext c); + K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c); /** - * Forwards the call to a {@link PerKeyCombineFn} to extract the output in a {@link DoFn}. + * Forwards the call to a {@link PerKeyCombineFn} to extract the output in a {@link OldDoFn}. * - * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext} + * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} * if it is required. */ - public OutputT extractOutput(K key, AccumT accumulator, DoFn<?, ?>.ProcessContext c); + public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c); /** - * Forwards the call to a {@link PerKeyCombineFn} to compact the accumulator in a {@link DoFn}. + * Forwards the call to a {@link PerKeyCombineFn} to compact the accumulator in a {@link OldDoFn}. * - * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext} + * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} * if it is required. */ - public AccumT compact(K key, AccumT accumulator, DoFn<?, ?>.ProcessContext c); + public AccumT compact(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c); /** * Forwards the call to a {@link PerKeyCombineFn} to combine the inputs and extract output - * in a {@link DoFn}. + * in a {@link OldDoFn}. * - * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext} + * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} * if it is required. */ - public OutputT apply(K key, Iterable<? extends InputT> inputs, DoFn<?, ?>.ProcessContext c); + public OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, ?>.ProcessContext c); /** - * Forwards the call to a {@link PerKeyCombineFn} to add all inputs in a {@link DoFn}. + * Forwards the call to a {@link PerKeyCombineFn} to add all inputs in a {@link OldDoFn}. * - * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext} + * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} * if it is required. */ - public AccumT addInputs(K key, Iterable<InputT> inputs, DoFn<?, ?>.ProcessContext c); + public AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, ?>.ProcessContext c); ///////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunners.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunners.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunners.java index 2d28682..87870a8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunners.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunners.java @@ -23,7 +23,7 @@ import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import com.google.common.collect.Iterables; @@ -69,39 +69,39 @@ public class PerKeyCombineFnRunners { } @Override - public AccumT createAccumulator(K key, DoFn<?, ?>.ProcessContext c) { + public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) { return keyedCombineFn.createAccumulator(key); } @Override public AccumT addInput( - K key, AccumT accumulator, InputT input, DoFn<?, ?>.ProcessContext c) { + K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c) { return keyedCombineFn.addInput(key, accumulator, input); } @Override public AccumT mergeAccumulators( - K key, Iterable<AccumT> accumulators, DoFn<?, ?>.ProcessContext c) { + K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) { return keyedCombineFn.mergeAccumulators(key, accumulators); } @Override - public OutputT extractOutput(K key, AccumT accumulator, DoFn<?, ?>.ProcessContext c) { + public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) { return keyedCombineFn.extractOutput(key, accumulator); } @Override - public AccumT compact(K key, AccumT accumulator, DoFn<?, ?>.ProcessContext c) { + public AccumT compact(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) { return keyedCombineFn.compact(key, accumulator); } @Override - public OutputT apply(K key, Iterable<? extends InputT> inputs, DoFn<?, ?>.ProcessContext c) { + public OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, ?>.ProcessContext c) { return keyedCombineFn.apply(key, inputs); } @Override - public AccumT addInputs(K key, Iterable<InputT> inputs, DoFn<?, ?>.ProcessContext c) { + public AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, ?>.ProcessContext c) { AccumT accum = keyedCombineFn.createAccumulator(key); for (InputT input : inputs) { accum = keyedCombineFn.addInput(key, accum, input); @@ -165,45 +165,45 @@ public class PerKeyCombineFnRunners { } @Override - public AccumT createAccumulator(K key, DoFn<?, ?>.ProcessContext c) { + public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) { return keyedCombineFnWithContext.createAccumulator(key, CombineContextFactory.createFromProcessContext(c)); } @Override public AccumT addInput( - K key, AccumT accumulator, InputT value, DoFn<?, ?>.ProcessContext c) { + K key, AccumT accumulator, InputT value, OldDoFn<?, ?>.ProcessContext c) { return keyedCombineFnWithContext.addInput(key, accumulator, value, CombineContextFactory.createFromProcessContext(c)); } @Override public AccumT mergeAccumulators( - K key, Iterable<AccumT> accumulators, DoFn<?, ?>.ProcessContext c) { + K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) { return keyedCombineFnWithContext.mergeAccumulators( key, accumulators, CombineContextFactory.createFromProcessContext(c)); } @Override - public OutputT extractOutput(K key, AccumT accumulator, DoFn<?, ?>.ProcessContext c) { + public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) { return keyedCombineFnWithContext.extractOutput(key, accumulator, CombineContextFactory.createFromProcessContext(c)); } @Override - public AccumT compact(K key, AccumT accumulator, DoFn<?, ?>.ProcessContext c) { + public AccumT compact(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) { return keyedCombineFnWithContext.compact(key, accumulator, CombineContextFactory.createFromProcessContext(c)); } @Override - public OutputT apply(K key, Iterable<? extends InputT> inputs, DoFn<?, ?>.ProcessContext c) { + public OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, ?>.ProcessContext c) { return keyedCombineFnWithContext.apply(key, inputs, CombineContextFactory.createFromProcessContext(c)); } @Override - public AccumT addInputs(K key, Iterable<InputT> inputs, DoFn<?, ?>.ProcessContext c) { + public AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, ?>.ProcessContext c) { CombineWithContext.Context combineContext = CombineContextFactory.createFromProcessContext(c); AccumT accum = keyedCombineFnWithContext.createAccumulator(key, combineContext); for (InputT input : inputs) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java index 36c4a9f..9e6c7d2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java @@ -34,6 +34,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; + import javax.annotation.Nullable; /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java index 9fa0380..88ae6cc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java @@ -34,6 +34,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; + import javax.annotation.Nullable; /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java index c2273f5..2808ca9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java @@ -17,11 +17,11 @@ */ package org.apache.beam.sdk.util; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.values.KV; /** - * DoFn that makes timestamps and window assignments explicit in the value part of each key/value + * OldDoFn that makes timestamps and window assignments explicit in the value part of each key/value * pair. * * @param <K> the type of the keys of the input and output {@code PCollection}s @@ -29,7 +29,7 @@ import org.apache.beam.sdk.values.KV; */ @SystemDoFnInternal public class ReifyTimestampAndWindowsDoFn<K, V> - extends DoFn<KV<K, V>, KV<K, WindowedValue<V>>> { + extends OldDoFn<KV<K, V>, KV<K, WindowedValue<V>>> { @Override public void processElement(ProcessContext c) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java index 6c58689..66c7cc0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java @@ -17,8 +17,8 @@ */ package org.apache.beam.sdk.util; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -70,7 +70,7 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti // set allowed lateness. .setWindowingStrategyInternal(originalStrategy) .apply("ExpandIterable", ParDo.of( - new DoFn<KV<K, Iterable<V>>, KV<K, V>>() { + new OldDoFn<KV<K, Iterable<V>>, KV<K, V>>() { @Override public void processElement(ProcessContext c) { K key = c.element().getKey(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java index 45f6c4a..1e70aaf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java @@ -105,7 +105,7 @@ public class SerializableUtils { */ public static CloudObject ensureSerializable(Coder<?> coder) { // Make sure that Coders are java serializable as well since - // they are regularly captured within DoFn's. + // they are regularly captured within OldDoFn's. Coder<?> copy = (Coder<?>) ensureSerializable((Serializable) coder); CloudObject cloudObject = copy.asCloudObject(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java index 53201a4..bb59373 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java @@ -98,7 +98,7 @@ public class StringUtils { } private static final String[] STANDARD_NAME_SUFFIXES = - new String[]{"DoFn", "Fn"}; + new String[]{"OldDoFn", "Fn"}; /** * Pattern to match a non-anonymous inner class. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java index 9a42b23..b8a5cd4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.util; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; @@ -26,10 +26,10 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** - * Annotation to mark {@link DoFn DoFns} as an internal component of the Dataflow SDK. + * Annotation to mark {@link OldDoFn DoFns} as an internal component of the Dataflow SDK. * * <p>Currently, the only effect of this is to mark any aggregators reported by an annotated - * {@code DoFn} as a system counter (as opposed to a user counter). + * {@code OldDoFn} as a system counter (as opposed to a user counter). * * <p>This is internal to the Dataflow SDK. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java index c03ab4d..3212d64 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java @@ -33,7 +33,6 @@ import com.google.common.base.MoreObjects; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; - import org.joda.time.Instant; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java index e724349..f0e4812 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java @@ -22,7 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StandardCoder; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import com.google.common.base.MoreObjects; @@ -139,8 +139,8 @@ public class ValueWithRecordId<ValueT> { ByteArrayCoder idCoder; } - /** {@link DoFn} to turn a {@code ValueWithRecordId<T>} back to the value {@code T}. */ - public static class StripIdsDoFn<T> extends DoFn<ValueWithRecordId<T>, T> { + /** {@link OldDoFn} to turn a {@code ValueWithRecordId<T>} back to the value {@code T}. */ + public static class StripIdsDoFn<T> extends OldDoFn<ValueWithRecordId<T>, T> { @Override public void processElement(ProcessContext c) { c.output(c.element().getValue()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java index 676848c..9d341a1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java @@ -38,7 +38,6 @@ import com.google.common.collect.ImmutableList; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; - import org.joda.time.Instant; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java index 149c497..3a1b654 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java @@ -30,8 +30,8 @@ import java.io.IOException; import java.util.Collection; /** - * Interface that may be required by some (internal) {@code DoFn}s to implement windowing. It should - * not be necessary for general user code to interact with this at all. + * Interface that may be required by some (internal) {@code OldDoFn}s to implement windowing. It + * should not be necessary for general user code to interact with this at all. * * <p>This interface should be provided by runner implementors to support windowing on their runner. * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java index 75b8ad8..6db532e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.util.common; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; + import static java.util.Arrays.asList; import com.google.common.base.Function; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java index b60a53e..69bf77d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java @@ -27,7 +27,6 @@ import org.apache.beam.sdk.util.PropertyNames; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; - import org.joda.time.Instant; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java index 8abfb05..5137031 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java @@ -36,8 +36,8 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.UserCodeException; @@ -146,9 +146,9 @@ public class PipelineTest { private static PTransform<PCollection<? extends String>, PCollection<String>> addSuffix( final String suffix) { - return ParDo.of(new DoFn<String, String>() { + return ParDo.of(new OldDoFn<String, String>() { @Override - public void processElement(DoFn<String, String>.ProcessContext c) { + public void processElement(OldDoFn<String, String>.ProcessContext c) { c.output(c.element() + suffix); } }); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java index 54f7ec1..41d0932 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java @@ -31,7 +31,7 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.util.SerializableUtils; @@ -134,7 +134,7 @@ public class AvroCoderTest { } } - private static class GetTextFn extends DoFn<Pojo, String> { + private static class GetTextFn extends OldDoFn<Pojo, String> { @Override public void processElement(ProcessContext c) { c.output(c.element().text); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java index 817ea20..35ec6c6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.coders.protobuf.ProtoCoder; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.CloudObject; @@ -366,7 +366,7 @@ public class CoderRegistryTest { private static class PTransformOutputingMySerializableGeneric extends PTransform<PCollection<String>, PCollection<KV<String, MySerializableGeneric<String>>>> { - private class OutputDoFn extends DoFn<String, KV<String, MySerializableGeneric<String>>> { + private class OutputDoFn extends OldDoFn<String, KV<String, MySerializableGeneric<String>>> { @Override public void processElement(ProcessContext c) { } } @@ -430,7 +430,7 @@ public class CoderRegistryTest { PCollection<String>, PCollection<KV<String, MySerializableGeneric<T>>>> { - private class OutputDoFn extends DoFn<String, KV<String, MySerializableGeneric<T>>> { + private class OutputDoFn extends OldDoFn<String, KV<String, MySerializableGeneric<T>>> { @Override public void processElement(ProcessContext c) { } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java index d6423e5..3e7fd50 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java @@ -26,7 +26,7 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.util.CoderUtils; @@ -82,14 +82,14 @@ public class SerializableCoderTest implements Serializable { } } - static class StringToRecord extends DoFn<String, MyRecord> { + static class StringToRecord extends OldDoFn<String, MyRecord> { @Override public void processElement(ProcessContext c) { c.output(new MyRecord(c.element())); } } - static class RecordToString extends DoFn<MyRecord, String> { + static class RecordToString extends OldDoFn<MyRecord, String> { @Override public void processElement(ProcessContext c) { c.output(c.element().value); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java index c7153f8..09405ab 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java index cabfc21..fe9415b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom; + import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java index 8fbed94..01e5fe5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom; + import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java index c5f7478..95f7454 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; @@ -28,9 +29,9 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Max; import org.apache.beam.sdk.transforms.Min; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.RemoveDuplicates; @@ -119,7 +120,7 @@ public class CountingInputTest { assertThat(endTime.isAfter(startTime.plus(expectedRuntimeMillis)), is(true)); } - private static class ElementValueDiff extends DoFn<Long, Long> { + private static class ElementValueDiff extends OldDoFn<Long, Long> { @Override public void processElement(ProcessContext c) throws Exception { c.output(c.element() - c.timestamp().getMillis()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java index 321f066..45f636f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java @@ -34,10 +34,10 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.Max; import org.apache.beam.sdk.transforms.Min; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.RemoveDuplicates; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -159,7 +159,7 @@ public class CountingSourceTest { p.run(); } - private static class ElementValueDiff extends DoFn<Long, Long> { + private static class ElementValueDiff extends OldDoFn<Long, Long> { @Override public void processElement(ProcessContext c) throws Exception { c.output(c.element() - c.timestamp().getMillis()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java index 7009023..f689f51 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive; import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java index 9c75972..f8592c9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java @@ -24,7 +24,7 @@ import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.PubsubClient; import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; @@ -58,7 +58,7 @@ public class PubsubUnboundedSinkTest { private static final String ID_LABEL = "id"; private static final int NUM_SHARDS = 10; - private static class Stamp extends DoFn<String, String> { + private static class Stamp extends OldDoFn<String, String> { @Override public void processElement(ProcessContext c) { c.outputWithTimestamp(c.element(), new Instant(TIMESTAMP)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java index 237c025..a47ddf2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java @@ -37,6 +37,7 @@ import org.junit.runners.JUnit4; import java.io.IOException; import java.io.Serializable; import java.util.List; + import javax.annotation.Nullable; /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index a1f1f70..6ec3a71 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -23,6 +23,7 @@ import static org.apache.beam.sdk.TestUtils.NO_INTS_ARRAY; import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; + import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.startsWith; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java index 0af0744..4b6e749 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom; + import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; @@ -40,9 +41,9 @@ import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOption import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; @@ -101,14 +102,14 @@ public class WriteTest { this.window = window; } - private static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> { + private static class AddArbitraryKey<T> extends OldDoFn<T, KV<Integer, T>> { @Override public void processElement(ProcessContext c) throws Exception { c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element())); } } - private static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> { + private static class RemoveArbitraryKey<T> extends OldDoFn<KV<Integer, Iterable<T>>, T> { @Override public void processElement(ProcessContext c) throws Exception { for (T s : c.element().getValue()) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java index 98aee4e..ea0db73 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java @@ -46,6 +46,7 @@ import java.nio.channels.WritableByteChannel; import java.util.ArrayList; import java.util.Arrays; import java.util.List; + import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlType; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java index 22359dc..ec2902e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.options; import static com.google.common.base.Strings.isNullOrEmpty; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java index 546fe7d..8e1439b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java @@ -29,7 +29,6 @@ import com.google.api.services.bigquery.Bigquery.Datasets.Delete; import com.google.api.services.storage.Storage; import com.fasterxml.jackson.databind.ObjectMapper; - import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java index 8b8337e..0c1b596 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java @@ -43,7 +43,6 @@ import com.google.common.collect.ListMultimap; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; - import org.hamcrest.Matchers; import org.junit.Rule; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java index 687271c..b2efa61 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java @@ -28,7 +28,6 @@ import com.google.common.collect.Sets; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.databind.ObjectMapper; - import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java index 110f30a..c4c5c1c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java @@ -22,6 +22,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasNamespace; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; + import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.hasItem; @@ -43,7 +44,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; - import org.hamcrest.Matchers; import org.joda.time.Instant; import org.junit.Rule; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractorTest.java index 74cc5e0..13476e2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractorTest.java @@ -27,9 +27,9 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Max; import org.apache.beam.sdk.transforms.Min; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; @@ -211,7 +211,7 @@ public class AggregatorPipelineExtractorTest { } } - private static class AggregatorProvidingDoFn<InT, OuT> extends DoFn<InT, OuT> { + private static class AggregatorProvidingDoFn<InT, OuT> extends OldDoFn<InT, OuT> { public <InputT, OutT> Aggregator<InputT, OutT> addAggregator( CombineFn<InputT, ?, OutT> combiner) { return createAggregator(randomName(), combiner); @@ -222,7 +222,7 @@ public class AggregatorPipelineExtractorTest { } @Override - public void processElement(DoFn<InT, OuT>.ProcessContext c) throws Exception { + public void processElement(OldDoFn<InT, OuT>.ProcessContext c) throws Exception { fail(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java index 1070dab..acc2b48 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java @@ -39,7 +39,6 @@ import org.apache.beam.sdk.values.TimestampedValue; import com.google.common.collect.Iterables; import com.fasterxml.jackson.annotation.JsonCreator; - import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Rule; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java index 043c06c..0bd7893 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java @@ -31,7 +31,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import com.fasterxml.jackson.databind.ObjectMapper; - import org.hamcrest.BaseMatcher; import org.hamcrest.Description; import org.junit.Rule; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java index 8c2451b..fc10d4b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms; import static org.apache.beam.sdk.TestUtils.checkCombineFn; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java index 1a42947..5c8732f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.transforms; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -53,7 +54,7 @@ import java.util.List; */ @RunWith(JUnit4.class) public class ApproximateUniqueTest implements Serializable { - // implements Serializable just to make it easy to use anonymous inner DoFn subclasses + // implements Serializable just to make it easy to use anonymous inner OldDoFn subclasses @Test public void testEstimationErrorToSampleSize() { @@ -222,7 +223,7 @@ public class ApproximateUniqueTest implements Serializable { .apply(View.<Long>asSingleton()); PCollection<KV<Long, Long>> approximateAndExact = approximate - .apply(ParDo.of(new DoFn<Long, KV<Long, Long>>() { + .apply(ParDo.of(new OldDoFn<Long, KV<Long, Long>>() { @Override public void processElement(ProcessContext c) { c.output(KV.of(c.element(), c.sideInput(exact))); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java index 486c738..d6bf826 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java @@ -461,7 +461,7 @@ public class CombineFnsTest { } private static class ExtractResultDoFn - extends DoFn<KV<String, CoCombineResult>, KV<String, KV<Integer, String>>>{ + extends OldDoFn<KV<String, CoCombineResult>, KV<String, KV<Integer, String>>> { private final TupleTag<Integer> maxIntTag; private final TupleTag<UserString> concatStringTag; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index b453089..cb9928e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -25,6 +25,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.include import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; + import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -73,7 +74,6 @@ import com.google.common.collect.Sets; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; - import org.hamcrest.Matchers; import org.joda.time.Duration; import org.junit.Test; @@ -117,7 +117,7 @@ public class CombineTest implements Serializable { 1, 1, 2, 3, 5, 8, 13, 21, 34, 55 }; - @Mock private DoFn<?, ?>.ProcessContext processContext; + @Mock private OldDoFn<?, ?>.ProcessContext processContext; PCollection<KV<String, Integer>> createInput(Pipeline p, KV<String, Integer>[] table) { @@ -372,7 +372,7 @@ public class CombineTest implements Serializable { pipeline.run(); } - private static class FormatPaneInfo extends DoFn<Integer, String> { + private static class FormatPaneInfo extends OldDoFn<Integer, String> { @Override public void processElement(ProcessContext c) { c.output(c.element() + ": " + c.pane().isLast()); @@ -560,7 +560,7 @@ public class CombineTest implements Serializable { pipeline.run(); } - private static class GetLast extends DoFn<Integer, Integer> { + private static class GetLast extends OldDoFn<Integer, Integer> { @Override public void processElement(ProcessContext c) { if (c.pane().isLast()) { @@ -653,7 +653,7 @@ public class CombineTest implements Serializable { PCollection<Integer> output = pipeline .apply("CreateVoidMainInput", Create.of((Void) null)) - .apply("OutputSideInput", ParDo.of(new DoFn<Void, Integer>() { + .apply("OutputSideInput", ParDo.of(new OldDoFn<Void, Integer>() { @Override public void processElement(ProcessContext c) { c.output(c.sideInput(view)); @@ -1176,7 +1176,7 @@ public class CombineTest implements Serializable { } private static <T> PCollection<T> copy(PCollection<T> pc, final int n) { - return pc.apply(ParDo.of(new DoFn<T, T>() { + return pc.apply(ParDo.of(new OldDoFn<T, T>() { @Override public void processElement(ProcessContext c) throws Exception { for (int i = 0; i < n; i++) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java index 07ba002..cf65423 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java @@ -229,7 +229,7 @@ public class CreateTest { p.run(); } - private static class PrintTimestamps extends DoFn<String, String> { + private static class PrintTimestamps extends OldDoFn<String, String> { @Override public void processElement(ProcessContext c) { c.output(c.element() + ":" + c.timestamp().getMillis()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnContextTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnContextTest.java deleted file mode 100644 index 2e588c7..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnContextTest.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -/** - * Tests for {@link DoFn.Context}. - */ -@RunWith(JUnit4.class) -public class DoFnContextTest { - - @Mock - private Aggregator<Long, Long> agg; - - private DoFn<Object, Object> fn; - private DoFn<Object, Object>.Context context; - - @Before - public void setup() { - MockitoAnnotations.initMocks(this); - - // Need to be real objects to call the constructor, and to reference the - // outer instance of DoFn - NoOpDoFn<Object, Object> noOpFn = new NoOpDoFn<>(); - DoFn<Object, Object>.Context noOpContext = noOpFn.context(); - - fn = spy(noOpFn); - context = spy(noOpContext); - } - - @Test - public void testSetupDelegateAggregatorsCreatesAndLinksDelegateAggregators() { - Sum.SumLongFn combiner = new Sum.SumLongFn(); - Aggregator<Long, Long> delegateAggregator = - fn.createAggregator("test", combiner); - - when(context.createAggregatorInternal("test", combiner)).thenReturn(agg); - - context.setupDelegateAggregators(); - delegateAggregator.addValue(1L); - - verify(agg).addValue(1L); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java index bf9899c..2488042 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java @@ -24,7 +24,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.DoFn.DelegatingAggregator; +import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator; import org.junit.Before; import org.junit.Rule; @@ -36,7 +36,7 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; /** - * Tests for DoFn.DelegatingAggregator. + * Tests for OldDoFn.DelegatingAggregator. */ @RunWith(JUnit4.class) public class DoFnDelegatingAggregatorTest { @@ -54,7 +54,7 @@ public class DoFnDelegatingAggregatorTest { @Test public void testAddValueWithoutDelegateThrowsException() { - DoFn<Void, Void> doFn = new NoOpDoFn<>(); + OldDoFn<Void, Void> doFn = new NoOpDoFn<>(); String name = "agg"; CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class); @@ -64,7 +64,7 @@ public class DoFnDelegatingAggregatorTest { thrown.expect(IllegalStateException.class); thrown.expectMessage("cannot be called"); - thrown.expectMessage("DoFn"); + thrown.expectMessage("OldDoFn"); aggregator.addValue(21.2); } @@ -74,7 +74,7 @@ public class DoFnDelegatingAggregatorTest { String name = "agg"; CombineFn<Long, ?, Long> combiner = mockCombineFn(Long.class); - DoFn<Void, Void> doFn = new NoOpDoFn<>(); + OldDoFn<Void, Void> doFn = new NoOpDoFn<>(); DelegatingAggregator<Long, Long> aggregator = (DelegatingAggregator<Long, Long>) doFn.createAggregator(name, combiner); @@ -91,7 +91,7 @@ public class DoFnDelegatingAggregatorTest { String name = "agg"; CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class); - DoFn<Void, Void> doFn = new NoOpDoFn<>(); + OldDoFn<Void, Void> doFn = new NoOpDoFn<>(); DelegatingAggregator<Double, Double> aggregator = (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner); @@ -114,7 +114,7 @@ public class DoFnDelegatingAggregatorTest { String name = "agg"; CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class); - DoFn<Void, Void> doFn = new NoOpDoFn<>(); + OldDoFn<Void, Void> doFn = new NoOpDoFn<>(); DelegatingAggregator<Double, Double> aggregator = (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner); @@ -127,7 +127,7 @@ public class DoFnDelegatingAggregatorTest { String name = "agg"; CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class); - DoFn<Void, Void> doFn = new NoOpDoFn<>(); + OldDoFn<Void, Void> doFn = new NoOpDoFn<>(); DelegatingAggregator<Double, Double> aggregator = (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java index 3238f2c..0cb3d7b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java @@ -158,7 +158,7 @@ public class DoFnReflectorTest { @Test public void testDoFnInvokersReused() throws Exception { - // Ensures that we don't create a new Invoker class for every instance of the DoFn. + // Ensures that we don't create a new Invoker class for every instance of the OldDoFn. IdentityParent fn1 = new IdentityParent(); IdentityParent fn2 = new IdentityParent(); DoFnReflector reflector1 = underTest(fn1); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java deleted file mode 100644 index 9242ece..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java +++ /dev/null @@ -1,242 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.isA; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertThat; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.Pipeline.PipelineExecutionException; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.runners.AggregatorValues; -import org.apache.beam.sdk.testing.NeedsRunner; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Max.MaxIntegerFn; -import org.apache.beam.sdk.transforms.Sum.SumIntegerFn; -import org.apache.beam.sdk.transforms.display.DisplayData; - -import com.google.common.collect.ImmutableMap; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.Serializable; -import java.util.Map; - -/** - * Tests for DoFn. - */ -@RunWith(JUnit4.class) -public class DoFnTest implements Serializable { - - @Rule - public transient ExpectedException thrown = ExpectedException.none(); - - @Test - public void testCreateAggregatorWithCombinerSucceeds() { - String name = "testAggregator"; - Sum.SumLongFn combiner = new Sum.SumLongFn(); - - DoFn<Void, Void> doFn = new NoOpDoFn<>(); - - Aggregator<Long, Long> aggregator = doFn.createAggregator(name, combiner); - - assertEquals(name, aggregator.getName()); - assertEquals(combiner, aggregator.getCombineFn()); - } - - @Test - public void testCreateAggregatorWithNullNameThrowsException() { - thrown.expect(NullPointerException.class); - thrown.expectMessage("name cannot be null"); - - DoFn<Void, Void> doFn = new NoOpDoFn<>(); - - doFn.createAggregator(null, new Sum.SumLongFn()); - } - - @Test - public void testCreateAggregatorWithNullCombineFnThrowsException() { - CombineFn<Object, Object, Object> combiner = null; - - thrown.expect(NullPointerException.class); - thrown.expectMessage("combiner cannot be null"); - - DoFn<Void, Void> doFn = new NoOpDoFn<>(); - - doFn.createAggregator("testAggregator", combiner); - } - - @Test - public void testCreateAggregatorWithNullSerializableFnThrowsException() { - SerializableFunction<Iterable<Object>, Object> combiner = null; - - thrown.expect(NullPointerException.class); - thrown.expectMessage("combiner cannot be null"); - - DoFn<Void, Void> doFn = new NoOpDoFn<>(); - - doFn.createAggregator("testAggregator", combiner); - } - - @Test - public void testCreateAggregatorWithSameNameThrowsException() { - String name = "testAggregator"; - CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn(); - - DoFn<Void, Void> doFn = new NoOpDoFn<>(); - - doFn.createAggregator(name, combiner); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Cannot create"); - thrown.expectMessage(name); - thrown.expectMessage("already exists"); - - doFn.createAggregator(name, combiner); - } - - @Test - public void testCreateAggregatorsWithDifferentNamesSucceeds() { - String nameOne = "testAggregator"; - String nameTwo = "aggregatorPrime"; - CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn(); - - DoFn<Void, Void> doFn = new NoOpDoFn<>(); - - Aggregator<Double, Double> aggregatorOne = - doFn.createAggregator(nameOne, combiner); - Aggregator<Double, Double> aggregatorTwo = - doFn.createAggregator(nameTwo, combiner); - - assertNotEquals(aggregatorOne, aggregatorTwo); - } - - @Test - @Category(NeedsRunner.class) - public void testCreateAggregatorInStartBundleThrows() { - TestPipeline p = createTestPipeline(new DoFn<String, String>() { - @Override - public void startBundle(DoFn<String, String>.Context c) throws Exception { - createAggregator("anyAggregate", new MaxIntegerFn()); - } - - @Override - public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {} - }); - - thrown.expect(PipelineExecutionException.class); - thrown.expectCause(isA(IllegalStateException.class)); - - p.run(); - } - - @Test - @Category(NeedsRunner.class) - public void testCreateAggregatorInProcessElementThrows() { - TestPipeline p = createTestPipeline(new DoFn<String, String>() { - @Override - public void processElement(ProcessContext c) throws Exception { - createAggregator("anyAggregate", new MaxIntegerFn()); - } - }); - - thrown.expect(PipelineExecutionException.class); - thrown.expectCause(isA(IllegalStateException.class)); - - p.run(); - } - - @Test - @Category(NeedsRunner.class) - public void testCreateAggregatorInFinishBundleThrows() { - TestPipeline p = createTestPipeline(new DoFn<String, String>() { - @Override - public void finishBundle(DoFn<String, String>.Context c) throws Exception { - createAggregator("anyAggregate", new MaxIntegerFn()); - } - - @Override - public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {} - }); - - thrown.expect(PipelineExecutionException.class); - thrown.expectCause(isA(IllegalStateException.class)); - - p.run(); - } - - /** - * Initialize a test pipeline with the specified {@link DoFn}. - */ - private <InputT, OutputT> TestPipeline createTestPipeline(DoFn<InputT, OutputT> fn) { - TestPipeline pipeline = TestPipeline.create(); - pipeline.apply(Create.of((InputT) null)) - .apply(ParDo.of(fn)); - - return pipeline; - } - - @Test - public void testPopulateDisplayDataDefaultBehavior() { - DoFn<String, String> usesDefault = - new DoFn<String, String>() { - @Override - public void processElement(ProcessContext c) throws Exception {} - }; - - DisplayData data = DisplayData.from(usesDefault); - assertThat(data.items(), empty()); - } - - @Test - @Category(NeedsRunner.class) - public void testAggregators() throws Exception { - Pipeline pipeline = TestPipeline.create(); - - CountOddsFn countOdds = new CountOddsFn(); - pipeline - .apply(Create.of(1, 3, 5, 7, 2, 4, 6, 8, 10, 12, 14, 20, 42, 68, 100)) - .apply(ParDo.of(countOdds)); - PipelineResult result = pipeline.run(); - - AggregatorValues<Integer> values = result.getAggregatorValues(countOdds.aggregator); - assertThat(values.getValuesAtSteps(), - equalTo((Map<String, Integer>) ImmutableMap.<String, Integer>of("ParDo(CountOdds)", 4))); - } - - private static class CountOddsFn extends DoFn<Integer, Void> { - @Override - public void processElement(ProcessContext c) throws Exception { - if (c.element() % 2 == 1) { - aggregator.addValue(1); - } - } - - Aggregator<Integer, Integer> aggregator = - createAggregator("odds", new SumIntegerFn()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java index 8460a7c..e379f11 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java @@ -235,7 +235,7 @@ public class DoFnTesterTest { final PCollectionView<Integer> value = PCollectionViews.singletonView( TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of()); - DoFn<Integer, Integer> fn = new SideInputDoFn(value); + OldDoFn<Integer, Integer> fn = new SideInputDoFn(value); DoFnTester<Integer, Integer> tester = DoFnTester.of(fn); @@ -251,7 +251,7 @@ public class DoFnTesterTest { final PCollectionView<Integer> value = PCollectionViews.singletonView( TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of()); - DoFn<Integer, Integer> fn = new SideInputDoFn(value); + OldDoFn<Integer, Integer> fn = new SideInputDoFn(value); DoFnTester<Integer, Integer> tester = DoFnTester.of(fn); tester.setSideInput(value, GlobalWindow.INSTANCE, -2); @@ -264,7 +264,7 @@ public class DoFnTesterTest { assertThat(tester.peekOutputElements(), containsInAnyOrder(-2, -2, -2, -2)); } - private static class SideInputDoFn extends DoFn<Integer, Integer> { + private static class SideInputDoFn extends OldDoFn<Integer, Integer> { private final PCollectionView<Integer> value; private SideInputDoFn(PCollectionView<Integer> value) { @@ -278,9 +278,9 @@ public class DoFnTesterTest { } /** - * A DoFn that adds values to an aggregator and converts input to String in processElement. + * A OldDoFn that adds values to an aggregator and converts input to String in processElement. */ - private static class CounterDoFn extends DoFn<Long, String> { + private static class CounterDoFn extends OldDoFn<Long, String> { Aggregator<Long, Long> agg = createAggregator("ctr", new Sum.SumLongFn()); private final long startBundleVal; private final long finishBundleVal;