Deletes CountingInput
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/57eeaae1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/57eeaae1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/57eeaae1 Branch: refs/heads/master Commit: 57eeaae11ea248c8145f467148e799d6c3565402 Parents: 6a9a24c Author: Eugene Kirpichov <kirpic...@google.com> Authored: Wed Apr 19 15:36:42 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Fri Apr 21 16:53:50 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/CountingInput.java | 283 ------------------- .../apache/beam/sdk/io/CountingInputTest.java | 221 --------------- 2 files changed, 504 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/57eeaae1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java deleted file mode 100644 index ab006d4..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java +++ /dev/null @@ -1,283 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.base.Optional; -import org.apache.beam.sdk.io.CountingSource.NowTimestampFn; -import org.apache.beam.sdk.io.Read.Unbounded; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollection.IsBounded; -import org.joda.time.Duration; -import org.joda.time.Instant; - -/** - * A {@link PTransform} that produces longs. When used to produce a - * {@link IsBounded#BOUNDED bounded} {@link PCollection}, {@link CountingInput} starts at {@code 0} - * or starting value, and counts up to a specified maximum. When used to produce an - * {@link IsBounded#UNBOUNDED unbounded} {@link PCollection}, it counts up to {@link Long#MAX_VALUE} - * and then never produces more output. (In practice, this limit should never be reached.) - * - * <p>The bounded {@link CountingInput} is implemented based on {@link OffsetBasedSource} and - * {@link OffsetBasedSource.OffsetBasedReader}, so it performs efficient initial splitting and it - * supports dynamic work rebalancing. - * - * <p>To produce a bounded {@code PCollection<Long>} starting from {@code 0}, - * use {@link CountingInput#upTo(long)}: - * - * <pre>{@code - * Pipeline p = ... - * PTransform<PBegin, PCollection<Long>> producer = CountingInput.upTo(1000); - * PCollection<Long> bounded = p.apply(producer); - * }</pre> - * - * <p>To produce a bounded {@code PCollection<Long>} starting from {@code startOffset}, - * use {@link CountingInput#forSubrange(long, long)} instead. - * - * <p>To produce an unbounded {@code PCollection<Long>}, use {@link CountingInput#unbounded()}, - * calling {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to provide values - * with timestamps other than {@link Instant#now}. - * - * <pre>{@code - * Pipeline p = ... - * - * // To create an unbounded producer that uses processing time as the element timestamp. - * PCollection<Long> unbounded = p.apply(CountingInput.unbounded()); - * // Or, to create an unbounded source that uses a provided function to set the element timestamp. - * PCollection<Long> unboundedWithTimestamps = - * p.apply(CountingInput.unbounded().withTimestampFn(someFn)); - * }</pre> - */ -public class CountingInput { - /** - * Creates a {@link BoundedCountingInput} that will produce the specified number of elements, - * from {@code 0} to {@code numElements - 1}. - */ - public static BoundedCountingInput upTo(long numElements) { - checkArgument(numElements >= 0, - "numElements (%s) must be greater than or equal to 0", - numElements); - return new BoundedCountingInput(numElements); - } - - /** - * Creates a {@link BoundedCountingInput} that will produce elements - * starting from {@code startIndex} (inclusive) to {@code endIndex} (exclusive). - * If {@code startIndex == endIndex}, then no elements will be produced. - */ - public static BoundedCountingInput forSubrange(long startIndex, long endIndex) { - checkArgument(endIndex >= startIndex, - "endIndex (%s) must be greater than or equal to startIndex (%s)", - endIndex, startIndex); - return new BoundedCountingInput(startIndex, endIndex); - } - - /** - * Creates an {@link UnboundedCountingInput} that will produce numbers starting from {@code 0} up - * to {@link Long#MAX_VALUE}. - * - * <p>After {@link Long#MAX_VALUE}, the transform never produces more output. (In practice, this - * limit should never be reached.) - * - * <p>Elements in the resulting {@link PCollection PCollection<Long>} will by default have - * timestamps corresponding to processing time at element generation, provided by - * {@link Instant#now}. Use the transform returned by - * {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to control the output - * timestamps. - */ - public static UnboundedCountingInput unbounded() { - return new UnboundedCountingInput( - new NowTimestampFn(), - 1L /* Elements per period */, - Duration.ZERO /* Period length */, - Optional.<Long>absent() /* Maximum number of records */, - Optional.<Duration>absent() /* Maximum read duration */); - } - - /** - * A {@link PTransform} that will produce a specified number of {@link Long Longs} starting from - * 0. - * - * <pre>{@code - * PCollection<Long> bounded = p.apply(CountingInput.upTo(10L)); - * }</pre> - */ - public static class BoundedCountingInput extends PTransform<PBegin, PCollection<Long>> { - private final long startIndex; - private final long endIndex; - - private BoundedCountingInput(long numElements) { - this.endIndex = numElements; - this.startIndex = 0; - } - - private BoundedCountingInput(long startIndex, long endIndex) { - this.endIndex = endIndex; - this.startIndex = startIndex; - } - - @Override - public PCollection<Long> expand(PBegin begin) { - return begin.apply(Read.from(CountingSource.createSourceForSubrange(startIndex, endIndex))); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - - if (startIndex == 0) { - builder.add(DisplayData.item("upTo", endIndex) - .withLabel("Count Up To")); - } else { - builder.add(DisplayData.item("startAt", startIndex).withLabel("Count Starting At")) - .add(DisplayData.item("upTo", endIndex).withLabel("Count Up To")); - } - } - } - - /** - * A {@link PTransform} that will produce numbers starting from {@code 0} up to - * {@link Long#MAX_VALUE}. - * - * <p>After {@link Long#MAX_VALUE}, the transform never produces more output. (In practice, this - * limit should never be reached.) - * - * <p>Elements in the resulting {@link PCollection PCollection<Long>} will by default have - * timestamps corresponding to processing time at element generation, provided by - * {@link Instant#now}. Use the transform returned by - * {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to control the output - * timestamps. - */ - public static class UnboundedCountingInput extends PTransform<PBegin, PCollection<Long>> { - private final SerializableFunction<Long, Instant> timestampFn; - private final long elementsPerPeriod; - private final Duration period; - private final Optional<Long> maxNumRecords; - private final Optional<Duration> maxReadTime; - - private UnboundedCountingInput( - SerializableFunction<Long, Instant> timestampFn, - long elementsPerPeriod, - Duration period, - Optional<Long> maxNumRecords, - Optional<Duration> maxReadTime) { - this.timestampFn = timestampFn; - this.elementsPerPeriod = elementsPerPeriod; - this.period = period; - this.maxNumRecords = maxNumRecords; - this.maxReadTime = maxReadTime; - } - - /** - * Returns an {@link UnboundedCountingInput} like this one, but where output elements have the - * timestamp specified by the timestampFn. - * - * <p>Note that the timestamps produced by {@code timestampFn} may not decrease. - */ - public UnboundedCountingInput withTimestampFn(SerializableFunction<Long, Instant> timestampFn) { - return new UnboundedCountingInput( - timestampFn, elementsPerPeriod, period, maxNumRecords, maxReadTime); - } - - /** - * Returns an {@link UnboundedCountingInput} like this one, but that will read at most the - * specified number of elements. - * - * <p>A bounded amount of elements will be produced by the result transform, and the result - * {@link PCollection} will be {@link IsBounded#BOUNDED bounded}. - */ - public UnboundedCountingInput withMaxNumRecords(long maxRecords) { - checkArgument( - maxRecords > 0, "MaxRecords must be a positive (nonzero) value. Got %s", maxRecords); - return new UnboundedCountingInput( - timestampFn, elementsPerPeriod, period, Optional.of(maxRecords), maxReadTime); - } - - /** - * Returns an {@link UnboundedCountingInput} like this one, but with output production limited - * to an aggregate rate of no more than the number of elements per the period length. - * - * <p>Note that when there are multiple splits, each split outputs independently. This may lead - * to elements not being produced evenly across time, though the aggregate rate will still - * approach the specified rate. - * - * <p>A duration of {@link Duration#ZERO} will produce output as fast as possible. - */ - public UnboundedCountingInput withRate(long numElements, Duration periodLength) { - return new UnboundedCountingInput( - timestampFn, numElements, periodLength, maxNumRecords, maxReadTime); - } - - /** - * Returns an {@link UnboundedCountingInput} like this one, but that will read for at most the - * specified amount of time. - * - * <p>A bounded amount of elements will be produced by the result transform, and the result - * {@link PCollection} will be {@link IsBounded#BOUNDED bounded}. - */ - public UnboundedCountingInput withMaxReadTime(Duration readTime) { - checkNotNull(readTime, "ReadTime cannot be null"); - return new UnboundedCountingInput( - timestampFn, elementsPerPeriod, period, maxNumRecords, Optional.of(readTime)); - } - - @SuppressWarnings("deprecation") - @Override - public PCollection<Long> expand(PBegin begin) { - Unbounded<Long> read = - Read.from( - CountingSource.createUnboundedFrom(0) - .withTimestampFn(timestampFn) - .withRate(elementsPerPeriod, period)); - if (!maxNumRecords.isPresent() && !maxReadTime.isPresent()) { - return begin.apply(read); - } else if (maxNumRecords.isPresent() && !maxReadTime.isPresent()) { - return begin.apply(read.withMaxNumRecords(maxNumRecords.get())); - } else if (!maxNumRecords.isPresent() && maxReadTime.isPresent()) { - return begin.apply(read.withMaxReadTime(maxReadTime.get())); - } else { - return begin.apply( - read.withMaxReadTime(maxReadTime.get()).withMaxNumRecords(maxNumRecords.get())); - } - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - - builder.add(DisplayData.item("timestampFn", timestampFn.getClass()) - .withLabel("Timestamp Function")); - - if (maxReadTime.isPresent()) { - builder.add(DisplayData.item("maxReadTime", maxReadTime.get()) - .withLabel("Maximum Read Time")); - } - - if (maxNumRecords.isPresent()) { - builder.add(DisplayData.item("maxRecords", maxNumRecords.get()) - .withLabel("Maximum Read Records")); - } - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/57eeaae1/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java deleted file mode 100644 index e7a6cfd..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io; - -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; - -import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput; -import org.apache.beam.sdk.testing.NeedsRunner; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.ValidatesRunner; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.Distinct; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Max; -import org.apache.beam.sdk.transforms.Min; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link CountingInput}. - */ -@RunWith(JUnit4.class) -public class CountingInputTest { - public static void addCountingAsserts(PCollection<Long> input, long start, long end) { - // Count == numElements - PAssert.thatSingleton(input.apply("Count", Count.<Long>globally())) - .isEqualTo(end - start); - // Unique count == numElements - PAssert.thatSingleton( - input - .apply(Distinct.<Long>create()) - .apply("UniqueCount", Count.<Long>globally())) - .isEqualTo(end - start); - // Min == start - PAssert.thatSingleton(input.apply("Min", Min.<Long>globally())).isEqualTo(start); - // Max == end-1 - PAssert.thatSingleton(input.apply("Max", Max.<Long>globally())) - .isEqualTo(end - 1); - } - - @Rule - public TestPipeline p = TestPipeline.create(); - - @Test - @Category(ValidatesRunner.class) - public void testBoundedInput() { - long numElements = 1000; - PCollection<Long> input = p.apply(CountingInput.upTo(numElements)); - - addCountingAsserts(input, 0, numElements); - p.run(); - } - - @Test - @Category(ValidatesRunner.class) - public void testEmptyBoundedInput() { - PCollection<Long> input = p.apply(CountingInput.upTo(0)); - - PAssert.that(input).empty(); - p.run(); - } - - @Test - @Category(ValidatesRunner.class) - public void testEmptyBoundedInputSubrange() { - PCollection<Long> input = p.apply(CountingInput.forSubrange(42, 42)); - - PAssert.that(input).empty(); - p.run(); - } - - - @Test - @Category(ValidatesRunner.class) - public void testBoundedInputSubrange() { - long start = 10; - long end = 1000; - PCollection<Long> input = p.apply(CountingInput.forSubrange(start, end)); - - addCountingAsserts(input, start, end); - p.run(); - } - - @Test - public void testBoundedDisplayData() { - PTransform<?, ?> input = CountingInput.upTo(1234); - DisplayData displayData = DisplayData.from(input); - assertThat(displayData, hasDisplayItem("upTo", 1234)); - } - - @Test - public void testBoundedDisplayDataSubrange() { - PTransform<?, ?> input = CountingInput.forSubrange(12, 1234); - DisplayData displayData = DisplayData.from(input); - assertThat(displayData, hasDisplayItem("startAt", 12)); - assertThat(displayData, hasDisplayItem("upTo", 1234)); - } - - @Test - @Category(ValidatesRunner.class) - public void testUnboundedInput() { - long numElements = 1000; - - PCollection<Long> input = p.apply(CountingInput.unbounded().withMaxNumRecords(numElements)); - - addCountingAsserts(input, 0, numElements); - p.run(); - } - - @Test - @Category(NeedsRunner.class) - public void testUnboundedInputRate() { - long numElements = 5000; - - long elemsPerPeriod = 10L; - Duration periodLength = Duration.millis(8); - PCollection<Long> input = - p.apply( - CountingInput.unbounded() - .withRate(elemsPerPeriod, periodLength) - .withMaxNumRecords(numElements)); - - addCountingAsserts(input, 0, numElements); - long expectedRuntimeMillis = (periodLength.getMillis() * numElements) / elemsPerPeriod; - Instant startTime = Instant.now(); - p.run(); - Instant endTime = Instant.now(); - assertThat(endTime.isAfter(startTime.plus(expectedRuntimeMillis)), is(true)); - } - - private static class ElementValueDiff extends DoFn<Long, Long> { - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - c.output(c.element() - c.timestamp().getMillis()); - } - } - - @Test - @Category(ValidatesRunner.class) - public void testUnboundedInputTimestamps() { - long numElements = 1000; - - PCollection<Long> input = - p.apply( - CountingInput.unbounded() - .withTimestampFn(new ValueAsTimestampFn()) - .withMaxNumRecords(numElements)); - addCountingAsserts(input, 0, numElements); - - PCollection<Long> diffs = - input - .apply("TimestampDiff", ParDo.of(new ElementValueDiff())) - .apply("DistinctTimestamps", Distinct.<Long>create()); - // This assert also confirms that diffs only has one unique value. - PAssert.thatSingleton(diffs).isEqualTo(0L); - - p.run(); - } - - @Test - public void testUnboundedDisplayData() { - Duration maxReadTime = Duration.standardHours(5); - SerializableFunction<Long, Instant> timestampFn = new SerializableFunction<Long, Instant>() { - @Override - public Instant apply(Long input) { - return Instant.now(); - } - }; - - PTransform<?, ?> input = CountingInput.unbounded() - .withMaxNumRecords(1234) - .withMaxReadTime(maxReadTime) - .withTimestampFn(timestampFn); - - DisplayData displayData = DisplayData.from(input); - - assertThat(displayData, hasDisplayItem("maxRecords", 1234)); - assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime)); - assertThat(displayData, hasDisplayItem("timestampFn", timestampFn.getClass())); - } - - /** - * A timestamp function that uses the given value as the timestamp. Because the input values will - * not wrap, this function is non-decreasing and meets the timestamp function criteria laid out - * in {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)}. - */ - private static class ValueAsTimestampFn implements SerializableFunction<Long, Instant> { - @Override - public Instant apply(Long input) { - return new Instant(input); - } - } -}