http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java index cafe873..517f968 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java @@ -24,6 +24,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasName import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom; + import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.everyItem; @@ -40,7 +41,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; @@ -54,7 +55,6 @@ import com.google.common.testing.EqualsTester; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; - import org.hamcrest.CustomTypeSafeMatcher; import org.hamcrest.FeatureMatcher; import org.hamcrest.Matcher; @@ -1053,7 +1053,7 @@ public class DisplayDataTest implements Serializable { private static class IdentityTransform<T> extends PTransform<PCollection<T>, PCollection<T>> { @Override public PCollection<T> apply(PCollection<T> input) { - return input.apply(ParDo.of(new DoFn<T, T>() { + return input.apply(ParDo.of(new OldDoFn<T, T>() { @Override public void processElement(ProcessContext c) throws Exception { c.output(c.element());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java index 10a2a7e..97667a3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java @@ -29,9 +29,9 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.DoFnTester; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -85,8 +85,8 @@ public class CoGroupByKeyTest implements Serializable { .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of()))); } return input - .apply("Identity" + name, ParDo.of(new DoFn<KV<Integer, String>, - KV<Integer, String>>() { + .apply("Identity" + name, ParDo.of(new OldDoFn<KV<Integer, String>, + KV<Integer, String>>() { @Override public void processElement(ProcessContext c) { c.output(c.element()); @@ -313,11 +313,11 @@ public class CoGroupByKeyTest implements Serializable { } /** - * A DoFn used in testCoGroupByKeyWithWindowing(), to test processing the + * A OldDoFn used in testCoGroupByKeyWithWindowing(), to test processing the * results of a CoGroupByKey. */ private static class ClickOfPurchaseFn extends - DoFn<KV<Integer, CoGbkResult>, KV<String, String>> implements RequiresWindowAccess { + OldDoFn<KV<Integer, CoGbkResult>, KV<String, String>> implements RequiresWindowAccess { private final TupleTag<String> clicksTag; private final TupleTag<String> purchasesTag; @@ -347,11 +347,11 @@ public class CoGroupByKeyTest implements Serializable { /** - * A DoFn used in testCoGroupByKeyHandleResults(), to test processing the + * A OldDoFn used in testCoGroupByKeyHandleResults(), to test processing the * results of a CoGroupByKey. */ private static class CorrelatePurchaseCountForAddressesWithoutNamesFn extends - DoFn<KV<Integer, CoGbkResult>, KV<String, Integer>> { + OldDoFn<KV<Integer, CoGbkResult>, KV<String, Integer>> { private final TupleTag<String> purchasesTag; private final TupleTag<String> addressesTag; @@ -401,7 +401,7 @@ public class CoGroupByKeyTest implements Serializable { } /** - * Tests that the consuming DoFn + * Tests that the consuming OldDoFn * (CorrelatePurchaseCountForAddressesWithoutNamesFn) performs as expected. */ @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java index fb2b4d5..ed64f84 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertThat; import org.apache.beam.sdk.util.TriggerTester; import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; import org.apache.beam.sdk.values.TimestampedValue; + import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java index 76bc038..27d2539 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java @@ -36,8 +36,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.WindowingStrategy; @@ -199,7 +199,7 @@ public class WindowTest implements Serializable { .apply(GroupByKey.<Integer, String>create()) .apply( ParDo.of( - new DoFn<KV<Integer, Iterable<String>>, Void>() { + new OldDoFn<KV<Integer, Iterable<String>>, Void>() { @Override public void processElement(ProcessContext c) throws Exception { assertThat( @@ -231,7 +231,7 @@ public class WindowTest implements Serializable { .apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10))) .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow())) .apply(GroupByKey.<Integer, String>create()) - .apply(ParDo.of(new DoFn<KV<Integer, Iterable<String>>, Void>() { + .apply(ParDo.of(new OldDoFn<KV<Integer, Iterable<String>>, Void>() { @Override public void processElement(ProcessContext c) throws Exception { assertThat(c.timestamp(), equalTo(new Instant(10 * 60 * 1000 - 1))); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java index c1e092a..622a277 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java @@ -26,9 +26,9 @@ import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; @@ -59,7 +59,7 @@ public class WindowingTest implements Serializable { private static class WindowedCount extends PTransform<PCollection<String>, PCollection<String>> { private final class FormatCountsDoFn - extends DoFn<KV<String, Long>, String> implements RequiresWindowAccess { + extends OldDoFn<KV<String, Long>, String> implements RequiresWindowAccess { @Override public void processElement(ProcessContext c) { c.output(c.element().getKey() + ":" + c.element().getValue() @@ -234,8 +234,8 @@ public class WindowingTest implements Serializable { p.run(); } - /** A DoFn that tokenizes lines of text into individual words. */ - static class ExtractWordsWithTimestampsFn extends DoFn<String, String> { + /** A OldDoFn that tokenizes lines of text into individual words. */ + static class ExtractWordsWithTimestampsFn extends OldDoFn<String, String> { @Override public void processElement(ProcessContext c) { String[] words = c.element().split("[^a-zA-Z0-9']+"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BucketingFunctionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BucketingFunctionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BucketingFunctionTest.java index c808b4d..ee5a2b3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BucketingFunctionTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BucketingFunctionTest.java @@ -18,10 +18,12 @@ package org.apache.beam.sdk.util; -import org.apache.beam.sdk.transforms.Combine; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.transforms.Combine; + import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java index 2cbc20e..b95f235 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java @@ -18,10 +18,12 @@ package org.apache.beam.sdk.util; -import org.apache.beam.sdk.transforms.Combine; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; + +import org.apache.beam.sdk.transforms.Combine; + import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java index d9e7593..30406fc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java @@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableList; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; - import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializerTest.java index 6c5d0bd..f6bacc4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializerTest.java @@ -25,7 +25,6 @@ import static org.apache.beam.sdk.util.Structs.addString; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeInfo; - import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java index 7e68df9..e87bbee 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java @@ -59,12 +59,12 @@ public class StringUtilsTest { /** * Inner class for simple name test. */ - private class EmbeddedDoFn { + private class EmbeddedOldDoFn { - private class DeeperEmbeddedDoFn extends EmbeddedDoFn {} + private class DeeperEmbeddedOldDoFn extends EmbeddedOldDoFn {} - private EmbeddedDoFn getEmbedded() { - return new DeeperEmbeddedDoFn(); + private EmbeddedOldDoFn getEmbedded() { + return new DeeperEmbeddedOldDoFn(); } } @@ -93,22 +93,22 @@ public class StringUtilsTest { @Test public void testSimpleName() { assertEquals("Embedded", - StringUtils.approximateSimpleName(EmbeddedDoFn.class)); + StringUtils.approximateSimpleName(EmbeddedOldDoFn.class)); } @Test public void testAnonSimpleName() throws Exception { thrown.expect(IllegalArgumentException.class); - EmbeddedDoFn anon = new EmbeddedDoFn(){}; + EmbeddedOldDoFn anon = new EmbeddedOldDoFn(){}; StringUtils.approximateSimpleName(anon.getClass()); } @Test public void testNestedSimpleName() { - EmbeddedDoFn fn = new EmbeddedDoFn(); - EmbeddedDoFn inner = fn.getEmbedded(); + EmbeddedOldDoFn fn = new EmbeddedOldDoFn(); + EmbeddedOldDoFn inner = fn.getEmbedded(); assertEquals("DeeperEmbedded", StringUtils.approximateSimpleName(inner.getClass())); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java index b321c8f..4892bbd 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.util; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; + import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java index fb002de..79f0cb7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java @@ -31,6 +31,7 @@ import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.util.common.Counter.CommitState; import org.apache.beam.sdk.util.common.Counter.CounterMean; + import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java index 9a8ab30..547c778 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java @@ -26,7 +26,7 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -75,7 +75,7 @@ public final class PCollectionTupleTest implements Serializable { .apply(Create.of(inputs)); PCollectionTuple outputs = mainInput.apply(ParDo - .of(new DoFn<Integer, Integer>() { + .of(new OldDoFn<Integer, Integer>() { @Override public void processElement(ProcessContext c) { c.sideOutput(sideOutputTag, c.element()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java index ba5dffb..c525cf1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java @@ -26,7 +26,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.junit.Rule; @@ -44,7 +44,7 @@ public class TypedPValueTest { @Rule public ExpectedException thrown = ExpectedException.none(); - private static class IdentityDoFn extends DoFn<Integer, Integer> { + private static class IdentityDoFn extends OldDoFn<Integer, Integer> { private static final long serialVersionUID = 0; @Override public void processElement(ProcessContext c) throws Exception { @@ -129,7 +129,7 @@ public class TypedPValueTest { static class EmptyClass { } - private static class EmptyClassDoFn extends DoFn<Integer, EmptyClass> { + private static class EmptyClassDoFn extends OldDoFn<Integer, EmptyClass> { private static final long serialVersionUID = 0; @Override public void processElement(ProcessContext c) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java index 72abaea..88836f9 100644 --- a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java +++ b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java @@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.joinlibrary; import static com.google.common.base.Preconditions.checkNotNull; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.join.CoGbkResult; import org.apache.beam.sdk.transforms.join.CoGroupByKey; @@ -59,7 +59,7 @@ public class Join { .apply(CoGroupByKey.<K>create()); return coGbkResultCollection.apply(ParDo.of( - new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { + new OldDoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { @Override public void processElement(ProcessContext c) { KV<K, CoGbkResult> e = c.element(); @@ -108,7 +108,7 @@ public class Join { .apply(CoGroupByKey.<K>create()); return coGbkResultCollection.apply(ParDo.of( - new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { + new OldDoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { @Override public void processElement(ProcessContext c) { KV<K, CoGbkResult> e = c.element(); @@ -161,7 +161,7 @@ public class Join { .apply(CoGroupByKey.<K>create()); return coGbkResultCollection.apply(ParDo.of( - new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { + new OldDoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { @Override public void processElement(ProcessContext c) { KV<K, CoGbkResult> e = c.element(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 76f7079..9fccbf9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -44,7 +44,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -324,7 +324,7 @@ public class BigQueryIO { * <p>Each {@link TableRow} contains values indexed by column name. Here is a * sample processing function that processes a "line" column from rows: * <pre>{@code - * static class ExtractWordsFn extends DoFn<TableRow, String> { + * static class ExtractWordsFn extends OldDoFn<TableRow, String> { * public void processElement(ProcessContext c) { * // Get the "line" field of the TableRow object, split it into words, and emit them. * TableRow row = c.element(); @@ -696,7 +696,7 @@ public class BigQueryIO { input.getPipeline() .apply("Create(CleanupOperation)", Create.of(cleanupOperation)) .apply("Cleanup", ParDo.of( - new DoFn<CleanupOperation, Void>() { + new OldDoFn<CleanupOperation, Void>() { @Override public void processElement(ProcessContext c) throws Exception { @@ -707,7 +707,7 @@ public class BigQueryIO { return outputs.get(mainOutput); } - private static class IdentityFn<T> extends DoFn<T, T> { + private static class IdentityFn<T> extends OldDoFn<T, T> { @Override public void processElement(ProcessContext c) { c.output(c.element()); @@ -1262,7 +1262,7 @@ public class BigQueryIO { * <p>Here is a sample transform that produces TableRow values containing * "word" and "count" columns: * <pre>{@code - * static class FormatCountsFn extends DoFn<KV<String, Long>, TableRow> { + * static class FormatCountsFn extends OldDoFn<KV<String, Long>, TableRow> { * public void processElement(ProcessContext c) { * TableRow row = new TableRow() * .set("word", c.element().getKey()) @@ -2011,11 +2011,11 @@ public class BigQueryIO { ///////////////////////////////////////////////////////////////////////////// /** - * Implementation of DoFn to perform streaming BigQuery write. + * Implementation of OldDoFn to perform streaming BigQuery write. */ @SystemDoFnInternal private static class StreamingWriteFn - extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> { + extends OldDoFn<KV<ShardedKey<String>, TableRowInfo>, Void> { /** TableSchema in JSON. Use String to make the class Serializable. */ private final String jsonTableSchema; @@ -2248,8 +2248,8 @@ public class BigQueryIO { * id is created by concatenating this randomUUID with a sequential number. */ private static class TagWithUniqueIdsAndTable - extends DoFn<TableRow, KV<ShardedKey<String>, TableRowInfo>> - implements DoFn.RequiresWindowAccess { + extends OldDoFn<TableRow, KV<ShardedKey<String>, TableRowInfo>> + implements OldDoFn.RequiresWindowAccess { /** TableSpec to write to. */ private final String tableSpec; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index f4082d4..1f77e3e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -31,7 +31,7 @@ import org.apache.beam.sdk.io.range.ByteKeyRange; import org.apache.beam.sdk.io.range.ByteKeyRangeTracker; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -512,7 +512,7 @@ public class BigtableIO { return new BigtableServiceImpl(options); } - private class BigtableWriterFn extends DoFn<KV<ByteString, Iterable<Mutation>>, Void> { + private class BigtableWriterFn extends OldDoFn<KV<ByteString, Iterable<Mutation>>, Void> { public BigtableWriterFn(String tableId, BigtableService bigtableService) { this.tableId = checkNotNull(tableId, "tableId"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java index bda907a..6f3663a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java @@ -37,9 +37,9 @@ import org.apache.beam.sdk.io.Sink.Writer; import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Values; @@ -85,7 +85,6 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; - import javax.annotation.Nullable; /** @@ -479,11 +478,11 @@ public class V1Beta3 { } /** - * A {@link DoFn} that splits a given query into multiple sub-queries, assigns them unique keys - * and outputs them as {@link KV}. + * A {@link OldDoFn} that splits a given query into multiple sub-queries, assigns them unique + * keys and outputs them as {@link KV}. */ @VisibleForTesting - static class SplitQueryFn extends DoFn<Query, KV<Integer, Query>> { + static class SplitQueryFn extends OldDoFn<Query, KV<Integer, Query>> { private final V1Beta3Options options; // number of splits to make for a given query private final int numSplits; @@ -560,10 +559,10 @@ public class V1Beta3 { } /** - * A {@link DoFn} that reads entities from Datastore for each query. + * A {@link OldDoFn} that reads entities from Datastore for each query. */ @VisibleForTesting - static class ReadFn extends DoFn<Query, Entity> { + static class ReadFn extends OldDoFn<Query, Entity> { private final V1Beta3Options options; private final V1Beta3DatastoreFactory datastoreFactory; // Datastore client http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 00e7891..7d2df62 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -58,7 +58,7 @@ import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -216,7 +216,7 @@ public class BigQueryIOTest implements Serializable { private Object[] pollJobReturns; private String executingProject; // Both counts will be reset back to zeros after serialization. - // This is a work around for DoFn's verifyUnmodified check. + // This is a work around for OldDoFn's verifyUnmodified check. private transient int startJobCallsCount; private transient int pollJobStatusCallsCount; @@ -546,7 +546,7 @@ public class BigQueryIOTest implements Serializable { .apply(BigQueryIO.Read.from("non-executing-project:somedataset.sometable") .withTestServices(fakeBqServices) .withoutValidation()) - .apply(ParDo.of(new DoFn<TableRow, String>() { + .apply(ParDo.of(new OldDoFn<TableRow, String>() { @Override public void processElement(ProcessContext c) throws Exception { c.output((String) c.element().get("name")); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java index a39d7d5..83489a5 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java @@ -23,7 +23,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; @@ -108,7 +108,7 @@ public class BigtableWriteIT implements Serializable { Pipeline p = Pipeline.create(options); p.apply(CountingInput.upTo(numRows)) - .apply(ParDo.of(new DoFn<Long, KV<ByteString, Iterable<Mutation>>>() { + .apply(ParDo.of(new OldDoFn<Long, KV<ByteString, Iterable<Mutation>>>() { @Override public void processElement(ProcessContext c) { int index = c.element().intValue(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java index 59d91d4..daed1cb 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java @@ -27,7 +27,7 @@ import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue; import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; import org.apache.beam.sdk.util.RetryHttpRequestInitializer; @@ -109,9 +109,9 @@ class V1Beta3TestUtil { } /** - * A DoFn that creates entity for a long number. + * A OldDoFn that creates entity for a long number. */ - static class CreateEntityFn extends DoFn<Long, Entity> { + static class CreateEntityFn extends OldDoFn<Long, Entity> { private final String kind; @Nullable private final String namespace; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 2de933c..342c4fc 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -28,7 +28,7 @@ import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -453,7 +453,7 @@ public class JmsIO { checkArgument((queue != null || topic != null), "Either queue or topic is required"); } - private static class JmsWriter extends DoFn<String, Void> { + private static class JmsWriter extends OldDoFn<String, Void> { private ConnectionFactory connectionFactory; private String queue; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 3b64bd5..eb649a6 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -33,7 +33,7 @@ import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -550,7 +550,7 @@ public class KafkaIO { return typedRead .apply(begin) .apply("Remove Kafka Metadata", - ParDo.of(new DoFn<KafkaRecord<K, V>, KV<K, V>>() { + ParDo.of(new OldDoFn<KafkaRecord<K, V>, KV<K, V>>() { @Override public void processElement(ProcessContext ctx) { ctx.output(ctx.element().getKV()); @@ -1315,7 +1315,7 @@ public class KafkaIO { public PDone apply(PCollection<V> input) { return input .apply("Kafka values with default key", - ParDo.of(new DoFn<V, KV<Void, V>>() { + ParDo.of(new OldDoFn<V, KV<Void, V>>() { @Override public void processElement(ProcessContext ctx) throws Exception { ctx.output(KV.<Void, V>of(null, ctx.element())); @@ -1326,7 +1326,7 @@ public class KafkaIO { } } - private static class KafkaWriter<K, V> extends DoFn<KV<K, V>, Void> { + private static class KafkaWriter<K, V> extends OldDoFn<KV<K, V>, Void> { @Override public void startBundle(Context c) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index dd93823..d7b1921 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -33,10 +33,10 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.Max; import org.apache.beam.sdk.transforms.Min; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.RemoveDuplicates; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -78,7 +78,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - import javax.annotation.Nullable; /** @@ -281,7 +280,7 @@ public class KafkaIOTest { p.run(); } - private static class ElementValueDiff extends DoFn<Long, Long> { + private static class ElementValueDiff extends OldDoFn<Long, Long> { @Override public void processElement(ProcessContext c) throws Exception { c.output(c.element() - c.timestamp().getMillis()); @@ -309,7 +308,7 @@ public class KafkaIOTest { p.run(); } - private static class RemoveKafkaMetadata<K, V> extends DoFn<KafkaRecord<K, V>, KV<K, V>> { + private static class RemoveKafkaMetadata<K, V> extends OldDoFn<KafkaRecord<K, V>, KV<K, V>> { @Override public void processElement(ProcessContext ctx) throws Exception { ctx.output(ctx.element().getKV()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java index fef8d40..1141e88 100644 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java +++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java @@ -47,9 +47,9 @@ public class WithTimestampsJava8Test implements Serializable { .apply(WithTimestamps.of((String input) -> new Instant(Long.valueOf(yearTwoThousand)))); PCollection<KV<String, Instant>> timestampedVals = - timestamped.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() { + timestamped.apply(ParDo.of(new OldDoFn<String, KV<String, Instant>>() { @Override - public void processElement(DoFn<String, KV<String, Instant>>.ProcessContext c) + public void processElement(OldDoFn<String, KV<String, Instant>>.ProcessContext c) throws Exception { c.output(KV.of(c.element(), c.timestamp())); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java index c0e5b17..bc55c06 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java @@ -25,7 +25,7 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.KV; @@ -108,7 +108,7 @@ import java.util.regex.Pattern; */ public class DebuggingWordCount { /** A DoFn that filters for a specific key based upon a regular expression. */ - public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> { + public static class FilterTextFn extends OldDoFn<KV<String, Long>, KV<String, Long>> { /** * Concept #1: The logger below uses the fully qualified class name of FilterTextFn * as the logger. All log statements emitted by this logger will be referenced by this name http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java index be32afa..55beb1f 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java @@ -23,7 +23,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; @@ -82,7 +82,7 @@ public class MinimalWordCount { // DoFn (defined in-line) on each element that tokenizes the text line into individual words. // The ParDo returns a PCollection<String>, where each element is an individual word in // Shakespeare's collected texts. - .apply("ExtractWords", ParDo.of(new DoFn<String, String>() { + .apply("ExtractWords", ParDo.of(new OldDoFn<String, String>() { @Override public void processElement(ProcessContext c) { for (String word : c.element().split("[^a-zA-Z']+")) { @@ -98,7 +98,7 @@ public class MinimalWordCount { .apply(Count.<String>perElement()) // Apply another ParDo transform that formats our PCollection of word counts into a printable // string, suitable for writing to an output file. - .apply("FormatResults", ParDo.of(new DoFn<KV<String, Long>, String>() { + .apply("FormatResults", ParDo.of(new OldDoFn<KV<String, Long>, String>() { @Override public void processElement(ProcessContext c) { c.output(c.element().getKey() + ": " + c.element().getValue()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java index c2defa7..ffe8b88 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; @@ -121,7 +121,7 @@ public class WindowedWordCount { * his masterworks. Each line of the corpus will get a random associated timestamp somewhere in a * 2-hour period. */ - static class AddTimestampFn extends DoFn<String, String> { + static class AddTimestampFn extends OldDoFn<String, String> { private static final long RAND_RANGE = 7200000; // 2 hours in ms @Override @@ -137,7 +137,7 @@ public class WindowedWordCount { } /** A DoFn that converts a Word and Count into a BigQuery table row. */ - static class FormatAsTableRowFn extends DoFn<KV<String, Long>, TableRow> { + static class FormatAsTableRowFn extends OldDoFn<KV<String, Long>, TableRow> { @Override public void processElement(ProcessContext c) { TableRow row = new TableRow() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java index 803e800..5432036 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; @@ -95,7 +95,7 @@ public class WordCount { * of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the * pipeline. */ - static class ExtractWordsFn extends DoFn<String, String> { + static class ExtractWordsFn extends OldDoFn<String, String> { private final Aggregator<Long, Long> emptyLines = createAggregator("emptyLines", new Sum.SumLongFn()); @@ -118,7 +118,7 @@ public class WordCount { } /** A DoFn that converts a Word and Count into a printable string. */ - public static class FormatAsTextFn extends DoFn<KV<String, Long>, String> { + public static class FormatAsTextFn extends OldDoFn<KV<String, Long>, String> { @Override public void processElement(ProcessContext c) { c.output(c.element().getKey() + ": " + c.element().getValue()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java index 5c182b2..9b347da 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java @@ -24,7 +24,7 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.IntraBundleParallelization; import org.apache.beam.sdk.util.Transport; @@ -72,7 +72,7 @@ public class PubsubFileInjector { } /** A DoFn that publishes non-empty lines to Google Cloud PubSub. */ - public static class Bound extends DoFn<String, Void> { + public static class Bound extends OldDoFn<String, Void> { private final String outputTopic; private final String timestampLabelKey; public transient Pubsub pubsub; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java index 9a75bb7..6a1c41b 100644 --- a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java +++ b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java @@ -20,7 +20,7 @@ package ${package}; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.slf4j.Logger; @@ -50,13 +50,13 @@ public class StarterPipeline { PipelineOptionsFactory.fromArgs(args).withValidation().create()); p.apply(Create.of("Hello", "World")) - .apply(ParDo.of(new DoFn<String, String>() { + .apply(ParDo.of(new OldDoFn<String, String>() { @Override public void processElement(ProcessContext c) { c.output(c.element().toUpperCase()); } })) - .apply(ParDo.of(new DoFn<String, Void>() { + .apply(ParDo.of(new OldDoFn<String, Void>() { @Override public void processElement(ProcessContext c) { LOG.info(c.element()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java index 8c71d9d..7c13350 100644 --- a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java +++ b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java @@ -20,7 +20,7 @@ package it.pkg; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.slf4j.Logger; @@ -50,13 +50,13 @@ public class StarterPipeline { PipelineOptionsFactory.fromArgs(args).withValidation().create()); p.apply(Create.of("Hello", "World")) - .apply(ParDo.of(new DoFn<String, String>() { + .apply(ParDo.of(new OldDoFn<String, String>() { @Override public void processElement(ProcessContext c) { c.output(c.element().toUpperCase()); } })) - .apply(ParDo.of(new DoFn<String, Void>() { + .apply(ParDo.of(new OldDoFn<String, Void>() { @Override public void processElement(ProcessContext c) { LOG.info(c.element()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java ---------------------------------------------------------------------- diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java index f1dfbb9..0da75f4 100644 --- a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java +++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java @@ -20,11 +20,11 @@ package org.apache.beam.sdk.microbenchmarks.transforms; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnReflector; import org.apache.beam.sdk.transforms.DoFnReflector.DoFnInvoker; import org.apache.beam.sdk.transforms.DoFnWithContext; import org.apache.beam.sdk.transforms.DoFnWithContext.ExtraContextFactory; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowingInternals; @@ -40,7 +40,7 @@ import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Warmup; /** - * Benchmarks for {@link DoFn} and {@link DoFnWithContext} invocations, specifically + * Benchmarks for {@link OldDoFn} and {@link DoFnWithContext} invocations, specifically * for measuring the overhead of {@link DoFnReflector}. */ @State(Scope.Benchmark) @@ -50,7 +50,7 @@ public class DoFnReflectorBenchmark { private static final String ELEMENT = "some string to use for testing"; - private DoFn<String, String> doFn = new UpperCaseDoFn(); + private OldDoFn<String, String> doFn = new UpperCaseDoFn(); private DoFnWithContext<String, String> doFnWithContext = new UpperCaseDoFnWithContext(); private StubDoFnProcessContext stubDoFnContext = new StubDoFnProcessContext(doFn, ELEMENT); @@ -71,7 +71,7 @@ public class DoFnReflectorBenchmark { }; private DoFnReflector doFnReflector; - private DoFn<String, String> adaptedDoFnWithContext; + private OldDoFn<String, String> adaptedDoFnWithContext; private DoFnInvoker<String, String> invoker; @@ -100,7 +100,7 @@ public class DoFnReflectorBenchmark { return stubDoFnWithContextContext.output; } - private static class UpperCaseDoFn extends DoFn<String, String> { + private static class UpperCaseDoFn extends OldDoFn<String, String> { @Override public void processElement(ProcessContext c) throws Exception { @@ -116,12 +116,12 @@ public class DoFnReflectorBenchmark { } } - private static class StubDoFnProcessContext extends DoFn<String, String>.ProcessContext { + private static class StubDoFnProcessContext extends OldDoFn<String, String>.ProcessContext { private final String element; private String output; - public StubDoFnProcessContext(DoFn<String, String> fn, String element) { + public StubDoFnProcessContext(OldDoFn<String, String> fn, String element) { fn.super(); this.element = element; }