http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java index 5b9eeff..5e96c46 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java @@ -30,7 +30,7 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; @@ -145,9 +145,9 @@ public class SerializationTest { } /** - * A DoFn that tokenizes lines of text into individual words. + * A OldDoFn that tokenizes lines of text into individual words. */ - static class ExtractWordsFn extends DoFn<StringHolder, StringHolder> { + static class ExtractWordsFn extends OldDoFn<StringHolder, StringHolder> { private static final Pattern WORD_BOUNDARY = Pattern.compile("[^a-zA-Z']+"); private final Aggregator<Long, Long> emptyLines = createAggregator("emptyLines", new Sum.SumLongFn()); @@ -173,9 +173,9 @@ public class SerializationTest { } /** - * A DoFn that converts a Word and Count into a printable string. + * A OldDoFn that converts a Word and Count into a printable string. */ - private static class FormatCountsFn extends DoFn<KV<StringHolder, Long>, StringHolder> { + private static class FormatCountsFn extends OldDoFn<KV<StringHolder, Long>, StringHolder> { @Override public void processElement(ProcessContext c) { c.output(new StringHolder(c.element().getKey() + ": " + c.element().getValue()));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java index 60b7f71..5775565 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java @@ -28,7 +28,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringDelegateCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.junit.After; @@ -54,7 +54,7 @@ public class SideEffectsTest implements Serializable { pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); - pipeline.apply(Create.of("a")).apply(ParDo.of(new DoFn<String, String>() { + pipeline.apply(Create.of("a")).apply(ParDo.of(new OldDoFn<String, String>() { @Override public void processElement(ProcessContext c) throws Exception { throw new UserException(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java index 904b448..c005f14 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; @@ -122,7 +122,7 @@ public class KafkaStreamingTest { EMBEDDED_ZOOKEEPER.shutdown(); } - private static class FormatKVFn extends DoFn<KV<String, String>, String> { + private static class FormatKVFn extends OldDoFn<KV<String, String>, String> { @Override public void processElement(ProcessContext c) { c.output(c.element().getKey() + "," + c.element().getValue()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java index 873a591..da4db93 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java @@ -24,7 +24,6 @@ import org.apache.beam.sdk.values.TypeDescriptor; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; - import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java index 9db6650..c34ce66 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DurationCoder.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.coders; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import com.fasterxml.jackson.annotation.JsonCreator; - import org.joda.time.Duration; import org.joda.time.ReadableDuration; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java index 693791c..d41bd1f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java @@ -22,7 +22,6 @@ import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import com.google.common.base.Converter; import com.fasterxml.jackson.annotation.JsonCreator; - import org.joda.time.Instant; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java index ecb1f0a..182fa1f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java @@ -25,7 +25,7 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.options.PubsubOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -709,7 +709,7 @@ public class PubsubIO { * * <p>Public so can be suppressed by runners. */ - public class PubsubBoundedReader extends DoFn<Void, T> { + public class PubsubBoundedReader extends OldDoFn<Void, T> { private static final int DEFAULT_PULL_SIZE = 100; private static final int ACK_TIMEOUT_SEC = 60; @@ -998,7 +998,7 @@ public class PubsubIO { * * <p>Public so can be suppressed by runners. */ - public class PubsubBoundedWriter extends DoFn<T, Void> { + public class PubsubBoundedWriter extends OldDoFn<T, Void> { private static final int MAX_PUBLISH_BATCH_SIZE = 100; private transient List<OutgoingMessage> output; private transient PubsubClient pubsubClient; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java index 6f2b3ac..9e9536d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java @@ -31,8 +31,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.options.PubsubOptions; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; @@ -78,7 +78,7 @@ import javax.annotation.Nullable; * <li>We try to send messages in batches while also limiting send latency. * <li>No stats are logged. Rather some counters are used to keep track of elements and batches. * <li>Though some background threads are used by the underlying netty system all actual Pubsub - * calls are blocking. We rely on the underlying runner to allow multiple {@link DoFn} instances + * calls are blocking. We rely on the underlying runner to allow multiple {@link OldDoFn} instances * to execute concurrently and hide latency. * <li>A failed bundle will cause messages to be resent. Thus we rely on the Pubsub consumer * to dedup messages. @@ -155,7 +155,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { /** * Convert elements to messages and shard them. */ - private static class ShardFn<T> extends DoFn<T, KV<Integer, OutgoingMessage>> { + private static class ShardFn<T> extends OldDoFn<T, KV<Integer, OutgoingMessage>> { private final Aggregator<Long, Long> elementCounter = createAggregator("elements", new Sum.SumLongFn()); private final Coder<T> elementCoder; @@ -207,7 +207,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { * Publish messages to Pubsub in batches. */ private static class WriterFn - extends DoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> { + extends OldDoFn<KV<Integer, Iterable<OutgoingMessage>>, Void> { private final PubsubClientFactory pubsubFactory; private final TopicPath topic; private final String timestampLabel; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java index 07d355e..d98bd6a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java @@ -31,7 +31,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PubsubOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; @@ -1107,7 +1107,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> // StatsFn // ================================================================================ - private static class StatsFn<T> extends DoFn<T, T> { + private static class StatsFn<T> extends OldDoFn<T, T> { private final Aggregator<Long, Long> elementCounter = createAggregator("elements", new Sum.SumLongFn()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java index b8902f9..de00035 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java @@ -19,9 +19,9 @@ package org.apache.beam.sdk.io; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; - import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; + import org.joda.time.Instant; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java index 42d3c05..3e997b0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java @@ -27,8 +27,8 @@ import org.apache.beam.sdk.io.Sink.WriteOperation; import org.apache.beam.sdk.io.Sink.Writer; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -156,7 +156,7 @@ public class Write { * Writes all the elements in a bundle using a {@link Writer} produced by the * {@link WriteOperation} associated with the {@link Sink}. */ - private class WriteBundles<WriteT> extends DoFn<T, WriteT> { + private class WriteBundles<WriteT> extends OldDoFn<T, WriteT> { // Writer that will write the records in this bundle. Lazily // initialized in processElement. private Writer<T, WriteT> writer = null; @@ -182,7 +182,7 @@ public class Write { // Discard write result and close the write. try { writer.close(); - // The writer does not need to be reset, as this DoFn cannot be reused. + // The writer does not need to be reset, as this OldDoFn cannot be reused. } catch (Exception closeException) { if (closeException instanceof InterruptedException) { // Do not silently ignore interrupted state. @@ -217,7 +217,7 @@ public class Write { * * @see WriteBundles */ - private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> { + private class WriteShardedBundles<WriteT> extends OldDoFn<KV<Integer, Iterable<T>>, WriteT> { private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView; WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) { @@ -296,10 +296,11 @@ public class Write { * <p>This singleton collection containing the WriteOperation is then used as a side input to a * ParDo over the PCollection of elements to write. In this bundle-writing phase, * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}. - * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and - * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for every - * element in the bundle. The output of this ParDo is a PCollection of <i>writer result</i> - * objects (see {@link Sink} for a description of writer results)-one for each bundle. + * {@link Writer#open} and {@link Writer#close} are called in {@link OldDoFn#startBundle} and + * {@link OldDoFn#finishBundle}, respectively, and {@link Writer#write} method is called for + * every element in the bundle. The output of this ParDo is a PCollection of + * <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for + * each bundle. * * <p>The final do-once ParDo uses the singleton collection of the WriteOperation as input and * the collection of writer results as a side-input. In this ParDo, @@ -333,7 +334,7 @@ public class Write { // Initialize the resource in a do-once ParDo on the WriteOperation. operationCollection = operationCollection .apply("Initialize", ParDo.of( - new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() { + new OldDoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() { @Override public void processElement(ProcessContext c) throws Exception { WriteOperation<T, WriteT> writeOperation = c.element(); @@ -387,7 +388,7 @@ public class Write { // ParDo. There is a dependency between this ParDo and the parallel write (the writer results // collection as a side input), so it will happen after the parallel write. operationCollection - .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() { + .apply("Finalize", ParDo.of(new OldDoFn<WriteOperation<T, WriteT>, Integer>() { @Override public void processElement(ProcessContext c) throws Exception { WriteOperation<T, WriteT> writeOperation = c.element(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java index e0a1ef3..b2df96e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java @@ -29,7 +29,6 @@ import com.google.common.base.Strings; import com.google.common.io.Files; import com.fasterxml.jackson.annotation.JsonIgnore; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index e89e5ad..aa9f13e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -22,8 +22,8 @@ import org.apache.beam.sdk.options.GoogleApiDebugOptions.GoogleApiTracer; import org.apache.beam.sdk.options.ProxyInvocationHandler.Deserializer; import org.apache.beam.sdk.options.ProxyInvocationHandler.Serializer; import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.Context; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.OldDoFn.Context; import org.apache.beam.sdk.transforms.display.HasDisplayData; import com.google.auto.service.AutoService; @@ -52,7 +52,7 @@ import javax.annotation.concurrent.ThreadSafe; * and {@link PipelineOptionsFactory#as(Class)}. They can be created * from command-line arguments with {@link PipelineOptionsFactory#fromArgs(String[])}. * They can be converted to another type by invoking {@link PipelineOptions#as(Class)} and - * can be accessed from within a {@link DoFn} by invoking + * can be accessed from within a {@link OldDoFn} by invoking * {@link Context#getPipelineOptions()}. * * <p>For example: @@ -151,7 +151,7 @@ import javax.annotation.concurrent.ThreadSafe; * {@link PipelineOptionsFactory#withValidation()} is invoked. * * <p>{@link JsonIgnore @JsonIgnore} is used to prevent a property from being serialized and - * available during execution of {@link DoFn}. See the Serialization section below for more + * available during execution of {@link OldDoFn}. See the Serialization section below for more * details. * * <h2>Registration Of PipelineOptions</h2> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java index f21b9b9..67fa2af 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java @@ -53,7 +53,6 @@ import com.google.common.collect.TreeMultimap; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsReflector.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsReflector.java index 815de82..607bdda 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsReflector.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsReflector.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.options; import org.apache.beam.sdk.util.common.ReflectHelpers; + import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorValues.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorValues.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorValues.java index a42ece2..6f6836e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorValues.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/AggregatorValues.java @@ -19,14 +19,14 @@ package org.apache.beam.sdk.runners; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import java.util.Collection; import java.util.Map; /** * A collection of values associated with an {@link Aggregator}. Aggregators declared in a - * {@link DoFn} are emitted on a per-{@code DoFn}-application basis. + * {@link OldDoFn} are emitted on a per-{@code OldDoFn}-application basis. * * @param <T> the output type of the aggregator */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index a202ed4..80340c2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -33,11 +33,11 @@ import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -762,7 +762,7 @@ public class PAssert { .apply("RewindowActuals", rewindowActuals.<T>windowActuals()) .apply( ParDo.of( - new DoFn<T, T>() { + new OldDoFn<T, T>() { @Override public void processElement(ProcessContext context) throws CoderException { context.output(CoderUtils.clone(coder, context.element())); @@ -884,7 +884,7 @@ public class PAssert { } } - private static final class ConcatFn<T> extends DoFn<Iterable<Iterable<T>>, Iterable<T>> { + private static final class ConcatFn<T> extends OldDoFn<Iterable<Iterable<T>>, Iterable<T>> { @Override public void processElement(ProcessContext c) throws Exception { c.output(Iterables.concat(c.element())); @@ -995,13 +995,13 @@ public class PAssert { } /** - * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of a + * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of a * {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing. * * <p>The input is ignored, but is {@link Integer} to be usable on runners that do not support * null values. */ - private static class SideInputCheckerDoFn<ActualT> extends DoFn<Integer, Void> { + private static class SideInputCheckerDoFn<ActualT> extends OldDoFn<Integer, Void> { private final SerializableFunction<ActualT, Void> checkerFn; private final Aggregator<Integer, Integer> success = createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn()); @@ -1030,13 +1030,13 @@ public class PAssert { } /** - * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of + * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of * the single iterable element of the input {@link PCollection} and adjusts counters and * thrown exceptions for use in testing. * * <p>The singleton property is presumed, not enforced. */ - private static class GroupedValuesCheckerDoFn<ActualT> extends DoFn<ActualT, Void> { + private static class GroupedValuesCheckerDoFn<ActualT> extends OldDoFn<ActualT, Void> { private final SerializableFunction<ActualT, Void> checkerFn; private final Aggregator<Integer, Integer> success = createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn()); @@ -1061,14 +1061,14 @@ public class PAssert { } /** - * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of + * A {@link OldDoFn} that runs a checking {@link SerializableFunction} on the contents of * the single item contained within the single iterable on input and * adjusts counters and thrown exceptions for use in testing. * * <p>The singleton property of the input {@link PCollection} is presumed, not enforced. However, * each input element must be a singleton iterable, or this will fail. */ - private static class SingletonCheckerDoFn<ActualT> extends DoFn<Iterable<ActualT>, Void> { + private static class SingletonCheckerDoFn<ActualT> extends OldDoFn<Iterable<ActualT>, Void> { private final SerializableFunction<ActualT, Void> checkerFn; private final Aggregator<Integer, Integer> success = createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn()); @@ -1310,7 +1310,7 @@ public class PAssert { } /** - * A DoFn that filters elements based on their presence in a static collection of windows. + * A OldDoFn that filters elements based on their presence in a static collection of windows. */ private static final class FilterWindows<T> extends PTransform<PCollection<T>, PCollection<T>> { private final StaticWindows windows; @@ -1324,7 +1324,7 @@ public class PAssert { return input.apply("FilterWindows", ParDo.of(new Fn())); } - private class Fn extends DoFn<T, T> implements RequiresWindowAccess { + private class Fn extends OldDoFn<T, T> implements RequiresWindowAccess { @Override public void processElement(ProcessContext c) throws Exception { if (windows.getWindows().contains(c.window())) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java index 45b0592..4e0c0be 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SerializableMatchers.java @@ -35,7 +35,6 @@ import java.io.Serializable; import java.util.Arrays; import java.util.Collection; import java.util.List; - import javax.annotation.Nullable; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index 0de3024..98cdeba 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -37,7 +37,6 @@ import com.fasterxml.jackson.core.TreeNode; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; - import org.junit.experimental.categories.Category; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java index ff553ba..c4596c1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java @@ -21,6 +21,7 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.PipelineOptions; + import org.hamcrest.BaseMatcher; import org.hamcrest.Description; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java index c8aad78..db4ab33 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java @@ -24,8 +24,9 @@ import org.apache.beam.sdk.util.ExecutionContext; * An {@code Aggregator<InputT>} enables monitoring of values of type {@code InputT}, * to be combined across all bundles. * - * <p>Aggregators are created by calling {@link DoFn#createAggregator DoFn.createAggregatorForDoFn}, - * typically from the {@link DoFn} constructor. Elements can be added to the + * <p>Aggregators are created by calling + * {@link OldDoFn#createAggregator OldDoFn.createAggregatorForDoFn}, + * typically from the {@link OldDoFn} constructor. Elements can be added to the * {@code Aggregator} by calling {@link Aggregator#addValue}. * * <p>Aggregators are visible in the monitoring UI, when the pipeline is run @@ -36,7 +37,7 @@ import org.apache.beam.sdk.util.ExecutionContext; * * <p>Example: * <pre> {@code - * class MyDoFn extends DoFn<String, String> { + * class MyDoFn extends OldDoFn<String, String> { * private Aggregator<Integer, Integer> myAggregator; * * public MyDoFn() { @@ -78,8 +79,9 @@ public interface Aggregator<InputT, OutputT> { /** * Create an aggregator with the given {@code name} and {@link CombineFn}. * - * <p>This method is called to create an aggregator for a {@link DoFn}. It receives the class - * of the {@link DoFn} being executed and the context of the step it is being executed in. + * <p>This method is called to create an aggregator for a {@link OldDoFn}. It receives the + * class of the {@link OldDoFn} being executed and the context of the step it is being + * executed in. */ <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( Class<?> fnClass, ExecutionContext.StepContext stepContext, @@ -88,7 +90,7 @@ public interface Aggregator<InputT, OutputT> { // TODO: Consider the following additional API conveniences: // - In addition to createAggregatorForDoFn(), consider adding getAggregator() to - // avoid the need to store the aggregator locally in a DoFn, i.e., create + // avoid the need to store the aggregator locally in a OldDoFn, i.e., create // if not already present. // - Add a shortcut for the most common aggregator: // c.createAggregatorForDoFn("name", new Sum.SumIntegerFn()). http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java index 97961e9..abed843 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java @@ -20,7 +20,7 @@ package org.apache.beam.sdk.transforms; import java.util.Collection; /** - * An internal class for extracting {@link Aggregator Aggregators} from {@link DoFn DoFns}. + * An internal class for extracting {@link Aggregator Aggregators} from {@link OldDoFn DoFns}. */ public final class AggregatorRetriever { private AggregatorRetriever() { @@ -28,9 +28,9 @@ public final class AggregatorRetriever { } /** - * Returns the {@link Aggregator Aggregators} created by the provided {@link DoFn}. + * Returns the {@link Aggregator Aggregators} created by the provided {@link OldDoFn}. */ - public static Collection<Aggregator<?, ?>> getAggregators(DoFn<?, ?> fn) { + public static Collection<Aggregator<?, ?>> getAggregators(OldDoFn<?, ?> fn) { return fn.getAggregators(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 96c03eb..6fc2324 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -1473,9 +1473,9 @@ public class Combine { PCollection<OutputT> defaultIfEmpty = maybeEmpty.getPipeline() .apply("CreateVoid", Create.of((Void) null).withCoder(VoidCoder.of())) .apply("ProduceDefault", ParDo.withSideInputs(maybeEmptyView).of( - new DoFn<Void, OutputT>() { + new OldDoFn<Void, OutputT>() { @Override - public void processElement(DoFn<Void, OutputT>.ProcessContext c) { + public void processElement(OldDoFn<Void, OutputT>.ProcessContext c) { Iterator<OutputT> combined = c.sideInput(maybeEmptyView).iterator(); if (!combined.hasNext()) { c.output(defaultValue); @@ -2097,7 +2097,7 @@ public class Combine { final TupleTag<KV<KV<K, Integer>, InputT>> hot = new TupleTag<>(); final TupleTag<KV<K, InputT>> cold = new TupleTag<>(); PCollectionTuple split = input.apply("AddNonce", ParDo.of( - new DoFn<KV<K, InputT>, KV<K, InputT>>() { + new OldDoFn<KV<K, InputT>, KV<K, InputT>>() { transient int counter; @Override public void startBundle(Context c) { @@ -2135,8 +2135,8 @@ public class Combine { .setWindowingStrategyInternal(preCombineStrategy) .apply("PreCombineHot", Combine.perKey(hotPreCombine)) .apply("StripNonce", ParDo.of( - new DoFn<KV<KV<K, Integer>, AccumT>, - KV<K, InputOrAccum<InputT, AccumT>>>() { + new OldDoFn<KV<KV<K, Integer>, AccumT>, + KV<K, InputOrAccum<InputT, AccumT>>>() { @Override public void processElement(ProcessContext c) { c.output(KV.of( @@ -2151,7 +2151,7 @@ public class Combine { .get(cold) .setCoder(inputCoder) .apply("PrepareCold", ParDo.of( - new DoFn<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>() { + new OldDoFn<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>() { @Override public void processElement(ProcessContext c) { c.output(KV.of(c.element().getKey(), @@ -2359,7 +2359,7 @@ public class Combine { final PerKeyCombineFnRunner<? super K, ? super InputT, ?, OutputT> combineFnRunner = PerKeyCombineFnRunners.create(fn); PCollection<KV<K, OutputT>> output = input.apply(ParDo.of( - new DoFn<KV<K, ? extends Iterable<InputT>>, KV<K, OutputT>>() { + new OldDoFn<KV<K, ? extends Iterable<InputT>>, KV<K, OutputT>>() { @Override public void processElement(ProcessContext c) { K key = c.element().getKey(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java index f2ed5e1..777deba 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java @@ -90,7 +90,7 @@ public class CombineFns { * * PCollection<T> finalResultCollection = maxAndMean * .apply(ParDo.of( - * new DoFn<KV<K, CoCombineResult>, T>() { + * new OldDoFn<KV<K, CoCombineResult>, T>() { * @Override * public void processElement(ProcessContext c) throws Exception { * KV<K, CoCombineResult> e = c.element(); @@ -133,7 +133,7 @@ public class CombineFns { * * PCollection<T> finalResultCollection = maxAndMean * .apply(ParDo.of( - * new DoFn<CoCombineResult, T>() { + * new OldDoFn<CoCombineResult, T>() { * @Override * public void processElement(ProcessContext c) throws Exception { * CoCombineResult e = c.element(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java index 3a0fb5d..7601ffc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java @@ -107,7 +107,7 @@ public class Count { public PCollection<KV<T, Long>> apply(PCollection<T> input) { return input - .apply("Init", ParDo.of(new DoFn<T, KV<T, Void>>() { + .apply("Init", ParDo.of(new OldDoFn<T, KV<T, Void>>() { @Override public void processElement(ProcessContext c) { c.output(KV.of(c.element(), (Void) null)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java index fa645ab..fb7f784 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java @@ -486,7 +486,7 @@ public class Create<T> { this.elementCoder = elementCoder; } - private static class ConvertTimestamps<T> extends DoFn<TimestampedValue<T>, T> { + private static class ConvertTimestamps<T> extends OldDoFn<TimestampedValue<T>, T> { @Override public void processElement(ProcessContext c) { c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java deleted file mode 100644 index 6d5d1ed..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ /dev/null @@ -1,565 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.HasDisplayData; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowingInternals; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TypeDescriptor; - -import com.google.common.base.MoreObjects; - -import org.joda.time.Duration; -import org.joda.time.Instant; - -import java.io.Serializable; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.UUID; - -/** - * The argument to {@link ParDo} providing the code to use to process - * elements of the input - * {@link org.apache.beam.sdk.values.PCollection}. - * - * <p>See {@link ParDo} for more explanation, examples of use, and - * discussion of constraints on {@code DoFn}s, including their - * serializability, lack of access to global shared mutable state, - * requirements for failure tolerance, and benefits of optimization. - * - * <p>{@code DoFn}s can be tested in the context of a particular - * {@code Pipeline} by running that {@code Pipeline} on sample input - * and then checking its output. Unit testing of a {@code DoFn}, - * separately from any {@code ParDo} transform or {@code Pipeline}, - * can be done via the {@link DoFnTester} harness. - * - * <p>{@link DoFnWithContext} (currently experimental) offers an alternative - * mechanism for accessing {@link ProcessContext#window()} without the need - * to implement {@link RequiresWindowAccess}. - * - * <p>See also {@link #processElement} for details on implementing the transformation - * from {@code InputT} to {@code OutputT}. - * - * @param <InputT> the type of the (main) input elements - * @param <OutputT> the type of the (main) output elements - */ -public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayData { - - /** - * Information accessible to all methods in this {@code DoFn}. - * Used primarily to output elements. - */ - public abstract class Context { - - /** - * Returns the {@code PipelineOptions} specified with the - * {@link org.apache.beam.sdk.runners.PipelineRunner} - * invoking this {@code DoFn}. The {@code PipelineOptions} will - * be the default running via {@link DoFnTester}. - */ - public abstract PipelineOptions getPipelineOptions(); - - /** - * Adds the given element to the main output {@code PCollection}. - * - * <p>Once passed to {@code output} the element should be considered - * immutable and not be modified in any way. It may be cached or retained - * by the Dataflow runtime or later steps in the pipeline, or used in - * other unspecified ways. - * - * <p>If invoked from {@link DoFn#processElement processElement}, the output - * element will have the same timestamp and be in the same windows - * as the input element passed to {@link DoFn#processElement processElement}. - * - * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle}, - * this will attempt to use the - * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} - * of the input {@code PCollection} to determine what windows the element - * should be in, throwing an exception if the {@code WindowFn} attempts - * to access any information about the input element. The output element - * will have a timestamp of negative infinity. - */ - public abstract void output(OutputT output); - - /** - * Adds the given element to the main output {@code PCollection}, - * with the given timestamp. - * - * <p>Once passed to {@code outputWithTimestamp} the element should not be - * modified in any way. - * - * <p>If invoked from {@link DoFn#processElement processElement}, the timestamp - * must not be older than the input element's timestamp minus - * {@link DoFn#getAllowedTimestampSkew getAllowedTimestampSkew}. The output element will - * be in the same windows as the input element. - * - * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle}, - * this will attempt to use the - * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} - * of the input {@code PCollection} to determine what windows the element - * should be in, throwing an exception if the {@code WindowFn} attempts - * to access any information about the input element except for the - * timestamp. - */ - public abstract void outputWithTimestamp(OutputT output, Instant timestamp); - - /** - * Adds the given element to the side output {@code PCollection} with the - * given tag. - * - * <p>Once passed to {@code sideOutput} the element should not be modified - * in any way. - * - * <p>The caller of {@code ParDo} uses {@link ParDo#withOutputTags withOutputTags} to - * specify the tags of side outputs that it consumes. Non-consumed side - * outputs, e.g., outputs for monitoring purposes only, don't necessarily - * need to be specified. - * - * <p>The output element will have the same timestamp and be in the same - * windows as the input element passed to {@link DoFn#processElement processElement}. - * - * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle}, - * this will attempt to use the - * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} - * of the input {@code PCollection} to determine what windows the element - * should be in, throwing an exception if the {@code WindowFn} attempts - * to access any information about the input element. The output element - * will have a timestamp of negative infinity. - * - * @see ParDo#withOutputTags - */ - public abstract <T> void sideOutput(TupleTag<T> tag, T output); - - /** - * Adds the given element to the specified side output {@code PCollection}, - * with the given timestamp. - * - * <p>Once passed to {@code sideOutputWithTimestamp} the element should not be - * modified in any way. - * - * <p>If invoked from {@link DoFn#processElement processElement}, the timestamp - * must not be older than the input element's timestamp minus - * {@link DoFn#getAllowedTimestampSkew getAllowedTimestampSkew}. The output element will - * be in the same windows as the input element. - * - * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle}, - * this will attempt to use the - * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} - * of the input {@code PCollection} to determine what windows the element - * should be in, throwing an exception if the {@code WindowFn} attempts - * to access any information about the input element except for the - * timestamp. - * - * @see ParDo#withOutputTags - */ - public abstract <T> void sideOutputWithTimestamp( - TupleTag<T> tag, T output, Instant timestamp); - - /** - * Creates an {@link Aggregator} in the {@link DoFn} context with the - * specified name and aggregation logic specified by {@link CombineFn}. - * - * <p>For internal use only. - * - * @param name the name of the aggregator - * @param combiner the {@link CombineFn} to use in the aggregator - * @return an aggregator for the provided name and {@link CombineFn} in this - * context - */ - @Experimental(Kind.AGGREGATOR) - protected abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> - createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner); - - /** - * Sets up {@link Aggregator}s created by the {@link DoFn} so they are - * usable within this context. - * - * <p>This method should be called by runners before {@link DoFn#startBundle} - * is executed. - */ - @Experimental(Kind.AGGREGATOR) - protected final void setupDelegateAggregators() { - for (DelegatingAggregator<?, ?> aggregator : aggregators.values()) { - setupDelegateAggregator(aggregator); - } - - aggregatorsAreFinal = true; - } - - private final <AggInputT, AggOutputT> void setupDelegateAggregator( - DelegatingAggregator<AggInputT, AggOutputT> aggregator) { - - Aggregator<AggInputT, AggOutputT> delegate = createAggregatorInternal( - aggregator.getName(), aggregator.getCombineFn()); - - aggregator.setDelegate(delegate); - } - } - - /** - * Information accessible when running {@link DoFn#processElement}. - */ - public abstract class ProcessContext extends Context { - - /** - * Returns the input element to be processed. - * - * <p>The element should be considered immutable. The Dataflow runtime will not mutate the - * element, so it is safe to cache, etc. The element should not be mutated by any of the - * {@link DoFn} methods, because it may be cached elsewhere, retained by the Dataflow runtime, - * or used in other unspecified ways. - */ - public abstract InputT element(); - - /** - * Returns the value of the side input for the window corresponding to the - * window of the main input element. - * - * <p>See - * {@link org.apache.beam.sdk.transforms.windowing.WindowFn#getSideInputWindow} - * for how this corresponding window is determined. - * - * @throws IllegalArgumentException if this is not a side input - * @see ParDo#withSideInputs - */ - public abstract <T> T sideInput(PCollectionView<T> view); - - /** - * Returns the timestamp of the input element. - * - * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window} - * for more information. - */ - public abstract Instant timestamp(); - - /** - * Returns the window into which the input element has been assigned. - * - * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window} - * for more information. - * - * @throws UnsupportedOperationException if this {@link DoFn} does - * not implement {@link RequiresWindowAccess}. - */ - public abstract BoundedWindow window(); - - /** - * Returns information about the pane within this window into which the - * input element has been assigned. - * - * <p>Generally all data is in a single, uninteresting pane unless custom - * triggering and/or late data has been explicitly requested. - * See {@link org.apache.beam.sdk.transforms.windowing.Window} - * for more information. - */ - public abstract PaneInfo pane(); - - /** - * Returns the process context to use for implementing windowing. - */ - @Experimental - public abstract WindowingInternals<InputT, OutputT> windowingInternals(); - } - - /** - * Returns the allowed timestamp skew duration, which is the maximum - * duration that timestamps can be shifted backward in - * {@link DoFn.Context#outputWithTimestamp}. - * - * <p>The default value is {@code Duration.ZERO}, in which case - * timestamps can only be shifted forward to future. For infinite - * skew, return {@code Duration.millis(Long.MAX_VALUE)}. - * - * <p> Note that producing an element whose timestamp is less than the - * current timestamp may result in late data, i.e. returning a non-zero - * value here does not impact watermark calculations used for firing - * windows. - * - * @deprecated does not interact well with the watermark. - */ - @Deprecated - public Duration getAllowedTimestampSkew() { - return Duration.ZERO; - } - - /** - * Interface for signaling that a {@link DoFn} needs to access the window the - * element is being processed in, via {@link DoFn.ProcessContext#window}. - */ - @Experimental - public interface RequiresWindowAccess {} - - public DoFn() { - this(new HashMap<String, DelegatingAggregator<?, ?>>()); - } - - DoFn(Map<String, DelegatingAggregator<?, ?>> aggregators) { - this.aggregators = aggregators; - } - - ///////////////////////////////////////////////////////////////////////////// - - private final Map<String, DelegatingAggregator<?, ?>> aggregators; - - /** - * Protects aggregators from being created after initialization. - */ - private boolean aggregatorsAreFinal; - - /** - * Prepares this {@code DoFn} instance for processing a batch of elements. - * - * <p>By default, does nothing. - */ - public void startBundle(Context c) throws Exception { - } - - /** - * Processes one input element. - * - * <p>The current element of the input {@code PCollection} is returned by - * {@link ProcessContext#element() c.element()}. It should be considered immutable. The Dataflow - * runtime will not mutate the element, so it is safe to cache, etc. The element should not be - * mutated by any of the {@link DoFn} methods, because it may be cached elsewhere, retained by the - * Dataflow runtime, or used in other unspecified ways. - * - * <p>A value is added to the main output {@code PCollection} by {@link ProcessContext#output}. - * Once passed to {@code output} the element should be considered immutable and not be modified in - * any way. It may be cached elsewhere, retained by the Dataflow runtime, or used in other - * unspecified ways. - * - * @see ProcessContext - */ - public abstract void processElement(ProcessContext c) throws Exception; - - /** - * Finishes processing this batch of elements. - * - * <p>By default, does nothing. - */ - public void finishBundle(Context c) throws Exception { - } - - /** - * {@inheritDoc} - * - * <p>By default, does not register any display data. Implementors may override this method - * to provide their own display data. - */ - @Override - public void populateDisplayData(DisplayData.Builder builder) { - } - - ///////////////////////////////////////////////////////////////////////////// - - /** - * Returns a {@link TypeDescriptor} capturing what is known statically - * about the input type of this {@code DoFn} instance's most-derived - * class. - * - * <p>See {@link #getOutputTypeDescriptor} for more discussion. - */ - protected TypeDescriptor<InputT> getInputTypeDescriptor() { - return new TypeDescriptor<InputT>(getClass()) {}; - } - - /** - * Returns a {@link TypeDescriptor} capturing what is known statically - * about the output type of this {@code DoFn} instance's - * most-derived class. - * - * <p>In the normal case of a concrete {@code DoFn} subclass with - * no generic type parameters of its own (including anonymous inner - * classes), this will be a complete non-generic type, which is good - * for choosing a default output {@code Coder<OutputT>} for the output - * {@code PCollection<OutputT>}. - */ - protected TypeDescriptor<OutputT> getOutputTypeDescriptor() { - return new TypeDescriptor<OutputT>(getClass()) {}; - } - - /** - * Returns an {@link Aggregator} with aggregation logic specified by the - * {@link CombineFn} argument. The name provided must be unique across - * {@link Aggregator}s created within the DoFn. Aggregators can only be created - * during pipeline construction. - * - * @param name the name of the aggregator - * @param combiner the {@link CombineFn} to use in the aggregator - * @return an aggregator for the provided name and combiner in the scope of - * this DoFn - * @throws NullPointerException if the name or combiner is null - * @throws IllegalArgumentException if the given name collides with another - * aggregator in this scope - * @throws IllegalStateException if called during pipeline processing. - */ - protected final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> - createAggregator(String name, CombineFn<? super AggInputT, ?, AggOutputT> combiner) { - checkNotNull(name, "name cannot be null"); - checkNotNull(combiner, "combiner cannot be null"); - checkArgument(!aggregators.containsKey(name), - "Cannot create aggregator with name %s." - + " An Aggregator with that name already exists within this scope.", - name); - - checkState(!aggregatorsAreFinal, "Cannot create an aggregator during DoFn processing." - + " Aggregators should be registered during pipeline construction."); - - DelegatingAggregator<AggInputT, AggOutputT> aggregator = - new DelegatingAggregator<>(name, combiner); - aggregators.put(name, aggregator); - return aggregator; - } - - /** - * Returns an {@link Aggregator} with the aggregation logic specified by the - * {@link SerializableFunction} argument. The name provided must be unique - * across {@link Aggregator}s created within the DoFn. Aggregators can only be - * created during pipeline construction. - * - * @param name the name of the aggregator - * @param combiner the {@link SerializableFunction} to use in the aggregator - * @return an aggregator for the provided name and combiner in the scope of - * this DoFn - * @throws NullPointerException if the name or combiner is null - * @throws IllegalArgumentException if the given name collides with another - * aggregator in this scope - * @throws IllegalStateException if called during pipeline processing. - */ - protected final <AggInputT> Aggregator<AggInputT, AggInputT> createAggregator(String name, - SerializableFunction<Iterable<AggInputT>, AggInputT> combiner) { - checkNotNull(combiner, "combiner cannot be null."); - return createAggregator(name, Combine.IterableCombineFn.of(combiner)); - } - - /** - * Returns the {@link Aggregator Aggregators} created by this {@code DoFn}. - */ - Collection<Aggregator<?, ?>> getAggregators() { - return Collections.<Aggregator<?, ?>>unmodifiableCollection(aggregators.values()); - } - - /** - * An {@link Aggregator} that delegates calls to addValue to another - * aggregator. - * - * @param <AggInputT> the type of input element - * @param <AggOutputT> the type of output element - */ - static class DelegatingAggregator<AggInputT, AggOutputT> implements - Aggregator<AggInputT, AggOutputT>, Serializable { - private final UUID id; - - private final String name; - - private final CombineFn<AggInputT, ?, AggOutputT> combineFn; - - private Aggregator<AggInputT, ?> delegate; - - public DelegatingAggregator(String name, - CombineFn<? super AggInputT, ?, AggOutputT> combiner) { - this.id = UUID.randomUUID(); - this.name = checkNotNull(name, "name cannot be null"); - // Safe contravariant cast - @SuppressWarnings("unchecked") - CombineFn<AggInputT, ?, AggOutputT> specificCombiner = - (CombineFn<AggInputT, ?, AggOutputT>) checkNotNull(combiner, "combineFn cannot be null"); - this.combineFn = specificCombiner; - } - - @Override - public void addValue(AggInputT value) { - if (delegate == null) { - throw new IllegalStateException( - "addValue cannot be called on Aggregator outside of the execution of a DoFn."); - } else { - delegate.addValue(value); - } - } - - @Override - public String getName() { - return name; - } - - @Override - public CombineFn<AggInputT, ?, AggOutputT> getCombineFn() { - return combineFn; - } - - /** - * Sets the current delegate of the Aggregator. - * - * @param delegate the delegate to set in this aggregator - */ - public void setDelegate(Aggregator<AggInputT, ?> delegate) { - this.delegate = delegate; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("name", name) - .add("combineFn", combineFn) - .toString(); - } - - @Override - public int hashCode() { - return Objects.hash(id, name, combineFn.getClass()); - } - - /** - * Indicates whether some other object is "equal to" this one. - * - * <p>{@code DelegatingAggregator} instances are equal if they have the same name, their - * CombineFns are the same class, and they have identical IDs. - */ - @Override - public boolean equals(Object o) { - if (o == this) { - return true; - } - if (o == null) { - return false; - } - if (o instanceof DelegatingAggregator) { - DelegatingAggregator<?, ?> that = (DelegatingAggregator<?, ?>) o; - return Objects.equals(this.id, that.id) - && Objects.equals(this.name, that.name) - && Objects.equals(this.combineFn.getClass(), that.combineFn.getClass()); - } - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java index 0616eff..d8d4181 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnReflector.java @@ -194,7 +194,7 @@ public abstract class DoFnReflector { */ public abstract boolean usesSingleWindow(); - /** Create an {@link DoFnInvoker} bound to the given {@link DoFn}. */ + /** Create an {@link DoFnInvoker} bound to the given {@link OldDoFn}. */ public abstract <InputT, OutputT> DoFnInvoker<InputT, OutputT> bindInvoker( DoFnWithContext<InputT, OutputT> fn); @@ -217,9 +217,9 @@ public abstract class DoFnReflector { } /** - * Create a {@link DoFn} that the {@link DoFnWithContext}. + * Create a {@link OldDoFn} that the {@link DoFnWithContext}. */ - public <InputT, OutputT> DoFn<InputT, OutputT> toDoFn(DoFnWithContext<InputT, OutputT> fn) { + public <InputT, OutputT> OldDoFn<InputT, OutputT> toDoFn(DoFnWithContext<InputT, OutputT> fn) { if (usesSingleWindow()) { return new WindowDoFnAdapter<InputT, OutputT>(this, fn); } else { @@ -287,7 +287,7 @@ public abstract class DoFnReflector { * <li>Any generics on the extra context arguments match what is expected. Eg., * {@code WindowingInternals<InputT, OutputT>} either matches the * {@code InputT} and {@code OutputT} parameters of the - * {@code DoFn<InputT, OutputT>.ProcessContext}, or it uses a wildcard, etc. + * {@code OldDoFn<InputT, OutputT>.ProcessContext}, or it uses a wildcard, etc. * </ol> * * @param m the method to verify @@ -328,7 +328,7 @@ public abstract class DoFnReflector { AdditionalParameter[] contextInfos = new AdditionalParameter[params.length - 1]; // Fill in the generics in the allExtraContextArgs interface from the types in the - // Context or ProcessContext DoFn. + // Context or ProcessContext OldDoFn. ParameterizedType pt = (ParameterizedType) contextToken.getType(); // We actually want the owner, since ProcessContext and Context are owned by DoFnWithContext. pt = (ParameterizedType) pt.getOwnerType(); @@ -364,18 +364,18 @@ public abstract class DoFnReflector { return ImmutableList.copyOf(contextInfos); } - /** Interface for invoking the {@code DoFn} processing methods. */ + /** Interface for invoking the {@code OldDoFn} processing methods. */ public interface DoFnInvoker<InputT, OutputT> { - /** Invoke {@link DoFn#startBundle} on the bound {@code DoFn}. */ + /** Invoke {@link OldDoFn#startBundle} on the bound {@code OldDoFn}. */ void invokeStartBundle( DoFnWithContext<InputT, OutputT>.Context c, ExtraContextFactory<InputT, OutputT> extra); - /** Invoke {@link DoFn#finishBundle} on the bound {@code DoFn}. */ + /** Invoke {@link OldDoFn#finishBundle} on the bound {@code OldDoFn}. */ void invokeFinishBundle( DoFnWithContext<InputT, OutputT>.Context c, ExtraContextFactory<InputT, OutputT> extra); - /** Invoke {@link DoFn#processElement} on the bound {@code DoFn}. */ + /** Invoke {@link OldDoFn#processElement} on the bound {@code OldDoFn}. */ public void invokeProcessElement( DoFnWithContext<InputT, OutputT>.ProcessContext c, ExtraContextFactory<InputT, OutputT> extra); @@ -565,10 +565,10 @@ public abstract class DoFnReflector { extends DoFnWithContext<InputT, OutputT>.Context implements DoFnWithContext.ExtraContextFactory<InputT, OutputT> { - private DoFn<InputT, OutputT>.Context context; + private OldDoFn<InputT, OutputT>.Context context; private ContextAdapter( - DoFnWithContext<InputT, OutputT> fn, DoFn<InputT, OutputT>.Context context) { + DoFnWithContext<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) { fn.super(); this.context = context; } @@ -618,11 +618,11 @@ public abstract class DoFnReflector { extends DoFnWithContext<InputT, OutputT>.ProcessContext implements DoFnWithContext.ExtraContextFactory<InputT, OutputT> { - private DoFn<InputT, OutputT>.ProcessContext context; + private OldDoFn<InputT, OutputT>.ProcessContext context; private ProcessContextAdapter( DoFnWithContext<InputT, OutputT> fn, - DoFn<InputT, OutputT>.ProcessContext context) { + OldDoFn<InputT, OutputT>.ProcessContext context) { fn.super(); this.context = context; } @@ -683,7 +683,7 @@ public abstract class DoFnReflector { } } - public static Class<?> getDoFnClass(DoFn<?, ?> fn) { + public static Class<?> getDoFnClass(OldDoFn<?, ?> fn) { if (fn instanceof SimpleDoFnAdapter) { return ((SimpleDoFnAdapter<?, ?>) fn).fn.getClass(); } else { @@ -691,7 +691,7 @@ public abstract class DoFnReflector { } } - private static class SimpleDoFnAdapter<InputT, OutputT> extends DoFn<InputT, OutputT> { + private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> { private final DoFnWithContext<InputT, OutputT> fn; private transient DoFnInvoker<InputT, OutputT> invoker; @@ -703,19 +703,19 @@ public abstract class DoFnReflector { } @Override - public void startBundle(DoFn<InputT, OutputT>.Context c) throws Exception { + public void startBundle(OldDoFn<InputT, OutputT>.Context c) throws Exception { ContextAdapter<InputT, OutputT> adapter = new ContextAdapter<>(fn, c); invoker.invokeStartBundle(adapter, adapter); } @Override - public void finishBundle(DoFn<InputT, OutputT>.Context c) throws Exception { + public void finishBundle(OldDoFn<InputT, OutputT>.Context c) throws Exception { ContextAdapter<InputT, OutputT> adapter = new ContextAdapter<>(fn, c); invoker.invokeFinishBundle(adapter, adapter); } @Override - public void processElement(DoFn<InputT, OutputT>.ProcessContext c) throws Exception { + public void processElement(OldDoFn<InputT, OutputT>.ProcessContext c) throws Exception { ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c); invoker.invokeProcessElement(adapter, adapter); } @@ -743,7 +743,7 @@ public abstract class DoFnReflector { } private static class WindowDoFnAdapter<InputT, OutputT> - extends SimpleDoFnAdapter<InputT, OutputT> implements DoFn.RequiresWindowAccess { + extends SimpleDoFnAdapter<InputT, OutputT> implements OldDoFn.RequiresWindowAccess { private WindowDoFnAdapter(DoFnReflector reflector, DoFnWithContext<InputT, OutputT> fn) { super(reflector, fn); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index a136632..9336e4c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -49,12 +49,12 @@ import java.util.List; import java.util.Map; /** - * A harness for unit-testing a {@link DoFn}. + * A harness for unit-testing a {@link OldDoFn}. * * <p>For example: * * <pre> {@code - * DoFn<InputT, OutputT> fn = ...; + * OldDoFn<InputT, OutputT> fn = ...; * * DoFnTester<InputT, OutputT> fnTester = DoFnTester.of(fn); * @@ -71,22 +71,22 @@ import java.util.Map; * Assert.assertThat(fnTester.processBundle(i1, i2, ...), Matchers.hasItems(...)); * } </pre> * - * @param <InputT> the type of the {@code DoFn}'s (main) input elements - * @param <OutputT> the type of the {@code DoFn}'s (main) output elements + * @param <InputT> the type of the {@code OldDoFn}'s (main) input elements + * @param <OutputT> the type of the {@code OldDoFn}'s (main) output elements */ public class DoFnTester<InputT, OutputT> { /** * Returns a {@code DoFnTester} supporting unit-testing of the given - * {@link DoFn}. + * {@link OldDoFn}. */ @SuppressWarnings("unchecked") - public static <InputT, OutputT> DoFnTester<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { + public static <InputT, OutputT> DoFnTester<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) { return new DoFnTester<InputT, OutputT>(fn); } /** * Returns a {@code DoFnTester} supporting unit-testing of the given - * {@link DoFn}. + * {@link OldDoFn}. */ @SuppressWarnings("unchecked") public static <InputT, OutputT> DoFnTester<InputT, OutputT> @@ -96,12 +96,12 @@ public class DoFnTester<InputT, OutputT> { /** * Registers the tuple of values of the side input {@link PCollectionView}s to - * pass to the {@link DoFn} under test. + * pass to the {@link OldDoFn} under test. * * <p>Resets the state of this {@link DoFnTester}. * * <p>If this isn't called, {@code DoFnTester} assumes the - * {@link DoFn} takes no side inputs. + * {@link OldDoFn} takes no side inputs. */ public void setSideInputs(Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs) { this.sideInputs = sideInputs; @@ -109,8 +109,8 @@ public class DoFnTester<InputT, OutputT> { } /** - * Registers the values of a side input {@link PCollectionView} to pass to the {@link DoFn} under - * test. + * Registers the values of a side input {@link PCollectionView} to pass to the {@link OldDoFn} + * under test. * * <p>The provided value is the final value of the side input in the specified window, not * the value of the input PCollection in that window. @@ -128,7 +128,7 @@ public class DoFnTester<InputT, OutputT> { } /** - * Whether or not a {@link DoFnTester} should clone the {@link DoFn} under test. + * Whether or not a {@link DoFnTester} should clone the {@link OldDoFn} under test. */ public enum CloningBehavior { CLONE, @@ -136,14 +136,14 @@ public class DoFnTester<InputT, OutputT> { } /** - * Instruct this {@link DoFnTester} whether or not to clone the {@link DoFn} under test. + * Instruct this {@link DoFnTester} whether or not to clone the {@link OldDoFn} under test. */ public void setCloningBehavior(CloningBehavior newValue) { this.cloningBehavior = newValue; } /** - * Indicates whether this {@link DoFnTester} will clone the {@link DoFn} under test. + * Indicates whether this {@link DoFnTester} will clone the {@link OldDoFn} under test. */ public CloningBehavior getCloningBehavior() { return cloningBehavior; @@ -165,7 +165,7 @@ public class DoFnTester<InputT, OutputT> { } /** - * A convenience method for testing {@link DoFn DoFns} with bundles of elements. + * A convenience method for testing {@link OldDoFn DoFns} with bundles of elements. * Logic proceeds as follows: * * <ol> @@ -181,9 +181,9 @@ public class DoFnTester<InputT, OutputT> { } /** - * Calls {@link DoFn#startBundle} on the {@code DoFn} under test. + * Calls {@link OldDoFn#startBundle} on the {@code OldDoFn} under test. * - * <p>If needed, first creates a fresh instance of the DoFn under test. + * <p>If needed, first creates a fresh instance of the OldDoFn under test. */ public void startBundle() throws Exception { resetState(); @@ -195,14 +195,14 @@ public class DoFnTester<InputT, OutputT> { } /** - * Calls {@link DoFn#processElement} on the {@code DoFn} under test, in a - * context where {@link DoFn.ProcessContext#element} returns the + * Calls {@link OldDoFn#processElement} on the {@code OldDoFn} under test, in a + * context where {@link OldDoFn.ProcessContext#element} returns the * given element. * * <p>Will call {@link #startBundle} automatically, if it hasn't * already been called. * - * @throws IllegalStateException if the {@code DoFn} under test has already + * @throws IllegalStateException if the {@code OldDoFn} under test has already * been finished */ public void processElement(InputT element) throws Exception { @@ -216,12 +216,12 @@ public class DoFnTester<InputT, OutputT> { } /** - * Calls {@link DoFn#finishBundle} of the {@code DoFn} under test. + * Calls {@link OldDoFn#finishBundle} of the {@code OldDoFn} under test. * * <p>Will call {@link #startBundle} automatically, if it hasn't * already been called. * - * @throws IllegalStateException if the {@code DoFn} under test has already + * @throws IllegalStateException if the {@code OldDoFn} under test has already * been finished */ public void finishBundle() throws Exception { @@ -403,18 +403,18 @@ public class DoFnTester<InputT, OutputT> { return MoreObjects.firstNonNull(elems, Collections.<WindowedValue<T>>emptyList()); } - private TestContext<InputT, OutputT> createContext(DoFn<InputT, OutputT> fn) { + private TestContext<InputT, OutputT> createContext(OldDoFn<InputT, OutputT> fn) { return new TestContext<>(fn, options, mainOutputTag, outputs, accumulators); } - private static class TestContext<InT, OutT> extends DoFn<InT, OutT>.Context { + private static class TestContext<InT, OutT> extends OldDoFn<InT, OutT>.Context { private final PipelineOptions opts; private final TupleTag<OutT> mainOutputTag; private final Map<TupleTag<?>, List<WindowedValue<?>>> outputs; private final Map<String, Object> accumulators; public TestContext( - DoFn<InT, OutT> fn, + OldDoFn<InT, OutT> fn, PipelineOptions opts, TupleTag<OutT> mainOutputTag, Map<TupleTag<?>, List<WindowedValue<?>>> outputs, @@ -498,7 +498,7 @@ public class DoFnTester<InputT, OutputT> { } private TestProcessContext<InputT, OutputT> createProcessContext( - DoFn<InputT, OutputT> fn, + OldDoFn<InputT, OutputT> fn, InputT elem) { return new TestProcessContext<>(fn, createContext(fn), @@ -507,14 +507,14 @@ public class DoFnTester<InputT, OutputT> { sideInputs); } - private static class TestProcessContext<InT, OutT> extends DoFn<InT, OutT>.ProcessContext { + private static class TestProcessContext<InT, OutT> extends OldDoFn<InT, OutT>.ProcessContext { private final TestContext<InT, OutT> context; private final TupleTag<OutT> mainOutputTag; private final WindowedValue<InT> element; private final Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs; private TestProcessContext( - DoFn<InT, OutT> fn, + OldDoFn<InT, OutT> fn, TestContext<InT, OutT> context, WindowedValue<InT> element, TupleTag<OutT> mainOutputTag, @@ -643,15 +643,15 @@ public class DoFnTester<InputT, OutputT> { protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal( String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { throw new IllegalStateException("Aggregators should not be created within ProcessContext. " - + "Instead, create an aggregator at DoFn construction time with createAggregatorForDoFn," - + " and ensure they are set up by the time startBundle is called " - + "with setupDelegateAggregators."); + + "Instead, create an aggregator at OldDoFn construction time with" + + " createAggregatorForDoFn, and ensure they are set up by the time startBundle is" + + " called with setupDelegateAggregators."); } } ///////////////////////////////////////////////////////////////////////////// - /** The possible states of processing a DoFn. */ + /** The possible states of processing a OldDoFn. */ enum State { UNSTARTED, STARTED, @@ -660,35 +660,35 @@ public class DoFnTester<InputT, OutputT> { private final PipelineOptions options = PipelineOptionsFactory.create(); - /** The original DoFn under test. */ - private final DoFn<InputT, OutputT> origFn; + /** The original OldDoFn under test. */ + private final OldDoFn<InputT, OutputT> origFn; /** - * Whether to clone the original {@link DoFn} or just use it as-is. + * Whether to clone the original {@link OldDoFn} or just use it as-is. * - * <p></p>Worker-side {@link DoFn DoFns} may not be serializable, and are not required to be. + * <p></p>Worker-side {@link OldDoFn DoFns} may not be serializable, and are not required to be. */ private CloningBehavior cloningBehavior = CloningBehavior.CLONE; - /** The side input values to provide to the DoFn under test. */ + /** The side input values to provide to the OldDoFn under test. */ private Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs = new HashMap<>(); private Map<String, Object> accumulators; - /** The output tags used by the DoFn under test. */ + /** The output tags used by the OldDoFn under test. */ private TupleTag<OutputT> mainOutputTag = new TupleTag<>(); - /** The original DoFn under test, if started. */ - DoFn<InputT, OutputT> fn; + /** The original OldDoFn under test, if started. */ + OldDoFn<InputT, OutputT> fn; /** The ListOutputManager to examine the outputs. */ private Map<TupleTag<?>, List<WindowedValue<?>>> outputs; - /** The state of processing of the DoFn under test. */ + /** The state of processing of the OldDoFn under test. */ private State state; - private DoFnTester(DoFn<InputT, OutputT> origFn) { + private DoFnTester(OldDoFn<InputT, OutputT> origFn) { this.origFn = origFn; resetState(); } @@ -705,7 +705,7 @@ public class DoFnTester<InputT, OutputT> { if (cloningBehavior.equals(CloningBehavior.DO_NOT_CLONE)) { fn = origFn; } else { - fn = (DoFn<InputT, OutputT>) + fn = (OldDoFn<InputT, OutputT>) SerializableUtils.deserializeFromByteArray( SerializableUtils.serializeToByteArray(origFn), origFn.toString());