Repository: incubator-beam
Updated Branches:
  refs/heads/master 365029863 -> 5a7bd8083


Add CountingInput as a PTransform

This transform produces an unbounded PCollection containing longs based
on a CountingSource.

Deprecate methods producing a Source in CountingSource.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5a7bd808
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5a7bd808
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5a7bd808

Branch: refs/heads/master
Commit: 5a7bd80832d72ed8a287d4aab1f1f9cfa6d18c8a
Parents: 3650298
Author: Thomas Groh <tg...@google.com>
Authored: Wed Mar 2 14:03:28 2016 -0800
Committer: Dan Halperin <dhalp...@google.com>
Committed: Thu Mar 3 14:33:01 2016 -0800

----------------------------------------------------------------------
 .../cloud/dataflow/sdk/io/CountingInput.java    | 191 +++++++++++++++++++
 .../cloud/dataflow/sdk/io/CountingSource.java   |  31 ++-
 .../dataflow/sdk/io/CountingInputTest.java      | 122 ++++++++++++
 3 files changed, 334 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a7bd808/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java
new file mode 100644
index 0000000..07609ba
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java
@@ -0,0 +1,191 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.io;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.cloud.dataflow.sdk.io.CountingSource.NowTimestampFn;
+import com.google.cloud.dataflow.sdk.io.Read.Unbounded;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
+import com.google.common.base.Optional;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A {@link PTransform} that produces longs. When used to produce a
+ * {@link IsBounded#BOUNDED bounded} {@link PCollection}, {@link 
CountingInput} starts at {@code 0}
+ * and counts up to a specified maximum. When used to produce an
+ * {@link IsBounded#UNBOUNDED unbounded} {@link PCollection}, it counts up to 
{@link Long#MAX_VALUE}
+ * and then never produces more output. (In practice, this limit should never 
be reached.)
+ *
+ * <p>The bounded {@link CountingInput} is implemented based on {@link 
OffsetBasedSource} and
+ * {@link OffsetBasedSource.OffsetBasedReader}, so it performs efficient 
initial splitting and it
+ * supports dynamic work rebalancing.
+ *
+ * <p>To produce a bounded {@code PCollection<Long>}, use {@link 
CountingInput#upTo(long)}:
+ *
+ * <pre>{@code
+ * Pipeline p = ...
+ * PTransform<PBegin, PCollection<Long>> producer = CountingInput.upTo(1000);
+ * PCollection<Long> bounded = p.apply(producer);
+ * }</pre>
+ *
+ * <p>To produce an unbounded {@code PCollection<Long>}, use {@link 
CountingInput#unbounded()},
+ * calling {@link 
UnboundedCountingInput#withTimestampFn(SerializableFunction)} to provide values
+ * with timestamps other than {@link Instant#now}.
+ *
+ * <pre>{@code
+ * Pipeline p = ...
+ *
+ * // To create an unbounded producer that uses processing time as the element 
timestamp.
+ * PCollection<Long> unbounded = p.apply(CountingInput.unbounded());
+ * // Or, to create an unbounded source that uses a provided function to set 
the element timestamp.
+ * PCollection<Long> unboundedWithTimestamps =
+ *     p.apply(CountingInput.unbounded().withTimestampFn(someFn));
+ * }</pre>
+ */
+public class CountingInput {
+  /**
+   * Creates a {@link BoundedCountingInput} that will produce the specified 
number of elements,
+   * from {@code 0} to {@code numElements - 1}.
+   */
+  public static BoundedCountingInput upTo(long numElements) {
+    checkArgument(numElements > 0, "numElements (%s) must be greater than 0", 
numElements);
+    return new BoundedCountingInput(numElements);
+  }
+
+  /**
+   * Creates an {@link UnboundedCountingInput} that will produce numbers 
starting from {@code 0} up
+   * to {@link Long#MAX_VALUE}.
+   *
+   * <p>After {@link Long#MAX_VALUE}, the transform never produces more 
output. (In practice, this
+   * limit should never be reached.)
+   *
+   * <p>Elements in the resulting {@link PCollection PCollection&lt;Long&gt;} 
will by default have
+   * timestamps corresponding to processing time at element generation, 
provided by
+   * {@link Instant#now}. Use the transform returned by
+   * {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to 
control the output
+   * timestamps.
+   */
+  public static UnboundedCountingInput unbounded() {
+    return new UnboundedCountingInput(
+        new NowTimestampFn(), Optional.<Long>absent(), 
Optional.<Duration>absent());
+  }
+
+  /**
+   * A {@link PTransform} that will produce a specified number of {@link Long 
Longs} starting from
+   * 0.
+   */
+  public static class BoundedCountingInput extends PTransform<PBegin, 
PCollection<Long>> {
+    private final long numElements;
+
+    private BoundedCountingInput(long numElements) {
+      this.numElements = numElements;
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public PCollection<Long> apply(PBegin begin) {
+      return begin.apply(Read.from(CountingSource.upTo(numElements)));
+    }
+  }
+
+  /**
+   * A {@link PTransform} that will produce numbers starting from {@code 0} up 
to
+   * {@link Long#MAX_VALUE}.
+   *
+   * <p>After {@link Long#MAX_VALUE}, the transform never produces more 
output. (In practice, this
+   * limit should never be reached.)
+   *
+   * <p>Elements in the resulting {@link PCollection PCollection&lt;Long&gt;} 
will by default have
+   * timestamps corresponding to processing time at element generation, 
provided by
+   * {@link Instant#now}. Use the transform returned by
+   * {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to 
control the output
+   * timestamps.
+   */
+  public static class UnboundedCountingInput extends PTransform<PBegin, 
PCollection<Long>> {
+    private final SerializableFunction<Long, Instant> timestampFn;
+    private final Optional<Long> maxNumRecords;
+    private final Optional<Duration> maxReadTime;
+
+    private UnboundedCountingInput(
+        SerializableFunction<Long, Instant> timestampFn,
+        Optional<Long> maxNumRecords,
+        Optional<Duration> maxReadTime) {
+      this.timestampFn = timestampFn;
+      this.maxNumRecords = maxNumRecords;
+      this.maxReadTime = maxReadTime;
+    }
+
+    /**
+     * Returns an {@link UnboundedCountingInput} like this one, but where 
output elements have the
+     * timestamp specified by the timestampFn.
+     *
+     * <p>Note that the timestamps produced by {@code timestampFn} may not 
decrease.
+     */
+    public UnboundedCountingInput withTimestampFn(SerializableFunction<Long, 
Instant> timestampFn) {
+      return new UnboundedCountingInput(timestampFn, maxNumRecords, 
maxReadTime);
+    }
+
+    /**
+     * Returns an {@link UnboundedCountingInput} like this one, but that will 
read at most the
+     * specified number of elements.
+     *
+     * <p>A bounded amount of elements will be produced by the result 
transform, and the result
+     * {@link PCollection} will be {@link IsBounded#BOUNDED bounded}.
+     */
+    public UnboundedCountingInput withMaxNumRecords(long maxRecords) {
+      checkArgument(
+          maxRecords > 0, "MaxRecords must be a positive (nonzero) value. Got 
%s", maxRecords);
+      return new UnboundedCountingInput(timestampFn, Optional.of(maxRecords), 
maxReadTime);
+    }
+
+    /**
+     * Returns an {@link UnboundedCountingInput} like this one, but that will 
read for at most the
+     * specified amount of time.
+     *
+     * <p>A bounded amount of elements will be produced by the result 
transform, and the result
+     * {@link PCollection} will be {@link IsBounded#BOUNDED bounded}.
+     */
+    public UnboundedCountingInput withMaxReadTime(Duration readTime) {
+      checkNotNull(readTime, "ReadTime cannot be null");
+      return new UnboundedCountingInput(timestampFn, maxNumRecords, 
Optional.of(readTime));
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public PCollection<Long> apply(PBegin begin) {
+      Unbounded<Long> read = 
Read.from(CountingSource.unboundedWithTimestampFn(timestampFn));
+      if (!maxNumRecords.isPresent() && !maxReadTime.isPresent()) {
+        return begin.apply(read);
+      } else if (maxNumRecords.isPresent() && !maxReadTime.isPresent()) {
+        return begin.apply(read.withMaxNumRecords(maxNumRecords.get()));
+      } else if (!maxNumRecords.isPresent() && maxReadTime.isPresent()) {
+        return begin.apply(read.withMaxReadTime(maxReadTime.get()));
+      } else {
+        return begin.apply(
+            
read.withMaxReadTime(maxReadTime.get()).withMaxNumRecords(maxNumRecords.get()));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a7bd808/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java
index 2938534..412f3a7 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java
@@ -22,6 +22,7 @@ import com.google.cloud.dataflow.sdk.coders.AvroCoder;
 import com.google.cloud.dataflow.sdk.coders.Coder;
 import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
 import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
+import com.google.cloud.dataflow.sdk.io.CountingInput.UnboundedCountingInput;
 import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
@@ -48,29 +49,33 @@ import java.util.NoSuchElementException;
  *
  * <pre>{@code
  * Pipeline p = ...
- * BoundedSource<Long> source = CountingSource.upTo(1000);
- * PCollection<Long> bounded = p.apply(Read.from(source));
+ * PTransform<PBegin, PCollection<Long>> producer = CountingInput.upTo(1000);
+ * PCollection<Long> bounded = p.apply(producer);
  * }</pre>
  *
- * <p>To produce an unbounded {@code PCollection<Long>}, use {@link 
CountingSource#unbounded} or
- * {@link CountingSource#unboundedWithTimestampFn}:
+ * <p>To produce an unbounded {@code PCollection<Long>}, use {@link 
CountingInput#unbounded()},
+ * calling {@link 
UnboundedCountingInput#withTimestampFn(SerializableFunction)} to provide values
+ * with timestamps other than {@link Instant#now}.
  *
  * <pre>{@code
  * Pipeline p = ...
  *
- * // To create an unbounded source that uses processing time as the element 
timestamp.
- * UnboundedSource<Long, CounterMark> source = CountingSource.unbounded();
+ * // To create an unbounded PCollection that uses processing time as the 
element timestamp.
+ * PCollection<Long> unbounded = p.apply(CountingInput.unbounded());
  * // Or, to create an unbounded source that uses a provided function to set 
the element timestamp.
- * UnboundedSource<Long, CounterMark> source = 
CountingSource.unboundedWithTimestampFn(someFn);
+ * PCollection<Long> unboundedWithTimestamps =
+ *     p.apply(CountingInput.unbounded().withTimestampFn(someFn));
  *
- * PCollection<Long> unbounded = p.apply(Read.from(source));
  * }</pre>
  */
 public class CountingSource {
   /**
    * Creates a {@link BoundedSource} that will produce the specified number of 
elements,
    * from {@code 0} to {@code numElements - 1}.
+   *
+   * @deprecated use {@link CountingInput#upTo(long)} instead
    */
+  @Deprecated
   public static BoundedSource<Long> upTo(long numElements) {
     checkArgument(numElements > 0, "numElements (%s) must be greater than 0", 
numElements);
     return new BoundedCountingSource(0, numElements);
@@ -85,7 +90,10 @@ public class CountingSource {
    *
    * <p>Elements in the resulting {@link PCollection PCollection&lt;Long&gt;} 
will have timestamps
    * corresponding to processing time at element generation, provided by 
{@link Instant#now}.
+   *
+   * @deprecated use {@link CountingInput#unbounded()} instead
    */
+  @Deprecated
   public static UnboundedSource<Long, CounterMark> unbounded() {
     return unboundedWithTimestampFn(new NowTimestampFn());
   }
@@ -98,7 +106,11 @@ public class CountingSource {
    * limit should never be reached.)
    *
    * <p>Note that the timestamps produced by {@code timestampFn} may not 
decrease.
+   *
+   * @deprecated use {@link CountingInput#unbounded()} and call
+   *             {@link 
UnboundedCountingInput#withTimestampFn(SerializableFunction)} instead
    */
+  @Deprecated
   public static UnboundedSource<Long, CounterMark> unboundedWithTimestampFn(
       SerializableFunction<Long, Instant> timestampFn) {
     return new UnboundedCountingSource(0, 1, timestampFn);
@@ -109,11 +121,10 @@ public class CountingSource {
   /** Prevent instantiation. */
   private CountingSource() {}
 
-
   /**
    * A function that returns {@link Instant#now} as the timestamp for each 
generated element.
    */
-  private static class NowTimestampFn implements SerializableFunction<Long, 
Instant> {
+  static class NowTimestampFn implements SerializableFunction<Long, Instant> {
     @Override
     public Instant apply(Long input) {
       return Instant.now();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a7bd808/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
new file mode 100644
index 0000000..948a892
--- /dev/null
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy 
of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.io;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.CountingInput.UnboundedCountingInput;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Max;
+import com.google.cloud.dataflow.sdk.transforms.Min;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
+import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests for {@link CountingInput}.
+ */
+public class CountingInputTest {
+  public static void addCountingAsserts(PCollection<Long> input, long 
numElements) {
+    // Count == numElements
+    DataflowAssert.thatSingleton(input.apply("Count", Count.<Long>globally()))
+        .isEqualTo(numElements);
+    // Unique count == numElements
+    DataflowAssert.thatSingleton(
+            input
+                .apply(RemoveDuplicates.<Long>create())
+                .apply("UniqueCount", Count.<Long>globally()))
+        .isEqualTo(numElements);
+    // Min == 0
+    DataflowAssert.thatSingleton(input.apply("Min", 
Min.<Long>globally())).isEqualTo(0L);
+    // Max == numElements-1
+    DataflowAssert.thatSingleton(input.apply("Max", Max.<Long>globally()))
+        .isEqualTo(numElements - 1);
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testBoundedInput() {
+    Pipeline p = TestPipeline.create();
+    long numElements = 1000;
+    PCollection<Long> input = p.apply(CountingInput.upTo(numElements));
+
+    addCountingAsserts(input, numElements);
+    p.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testUnboundedInput() {
+    Pipeline p = TestPipeline.create();
+    long numElements = 1000;
+
+    PCollection<Long> input = 
p.apply(CountingInput.unbounded().withMaxNumRecords(numElements));
+
+    addCountingAsserts(input, numElements);
+    p.run();
+  }
+
+  private static class ElementValueDiff extends DoFn<Long, Long> {
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      c.output(c.element() - c.timestamp().getMillis());
+    }
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testUnboundedInputTimestamps() {
+    Pipeline p = TestPipeline.create();
+    long numElements = 1000;
+
+    PCollection<Long> input =
+        p.apply(
+            CountingInput.unbounded()
+                .withTimestampFn(new ValueAsTimestampFn())
+                .withMaxNumRecords(numElements));
+    addCountingAsserts(input, numElements);
+
+    PCollection<Long> diffs =
+        input
+            .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
+            .apply("RemoveDuplicateTimestamps", 
RemoveDuplicates.<Long>create());
+    // This assert also confirms that diffs only has one unique value.
+    DataflowAssert.thatSingleton(diffs).isEqualTo(0L);
+
+    p.run();
+  }
+
+  /**
+   * A timestamp function that uses the given value as the timestamp. Because 
the input values will
+   * not wrap, this function is non-decreasing and meets the timestamp 
function criteria laid out
+   * in {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)}.
+   */
+  private static class ValueAsTimestampFn implements 
SerializableFunction<Long, Instant> {
+    @Override
+    public Instant apply(Long input) {
+      return new Instant(input);
+    }
+  }
+}

Reply via email to