Repository: incubator-beam Updated Branches: refs/heads/master 3bc0fe669 -> c83e5c48f
Deduplicate Unbounded Reads This ensures that sources that produce duplicate elements that are marked as requiresDeduplication are handled by the DirectRunner. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/569228e4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/569228e4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/569228e4 Branch: refs/heads/master Commit: 569228e4c5fc2e3a722d8f3089d75cc6fc197d93 Parents: 3bc0fe6 Author: Thomas Groh <tg...@google.com> Authored: Wed Jun 15 14:41:09 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Wed Jun 22 09:57:23 2016 -0700 ---------------------------------------------------------------------- .../direct/UnboundedReadDeduplicator.java | 102 ++++++++++++++ .../direct/UnboundedReadEvaluatorFactory.java | 17 ++- .../direct/UnboundedReadDeduplicatorTest.java | 134 +++++++++++++++++++ .../UnboundedReadEvaluatorFactoryTest.java | 50 ++++++- 4 files changed, 299 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569228e4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java new file mode 100644 index 0000000..0246236 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.io.Read.Unbounded; +import org.apache.beam.sdk.transforms.PTransform; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; + +import org.joda.time.Duration; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Provides methods to determine if a record is a duplicate within the evaluation of a + * {@link Unbounded} {@link PTransform}. + */ +interface UnboundedReadDeduplicator { + /** + * Returns true if the record with the provided ID should be output, and false if it should not + * be because it is a duplicate. + */ + boolean shouldOutput(byte[] recordId); + + /** + * An {@link UnboundedReadDeduplicator} that always returns true. For use with sources do not + * require deduplication. + */ + class NeverDeduplicator implements UnboundedReadDeduplicator { + /** + * Create a new {@link NeverDeduplicator}. + */ + public static UnboundedReadDeduplicator create() { + return new NeverDeduplicator(); + } + + @Override + public boolean shouldOutput(byte[] recordId) { + return true; + } + } + + + /** + * An {@link UnboundedReadDeduplicator} that returns true if the record ID has not been seen + * within 10 minutes. + */ + class CachedIdDeduplicator implements UnboundedReadDeduplicator { + private static final ByteArrayCoder RECORD_ID_CODER = ByteArrayCoder.of(); + private static final long MAX_RETENTION_SINCE_ACCESS = + Duration.standardMinutes(10L).getMillis(); + + private final LoadingCache<StructuralKey<byte[]>, AtomicBoolean> ids; + + /** + * Create a new {@link CachedIdDeduplicator}. + */ + public static UnboundedReadDeduplicator create() { + return new CachedIdDeduplicator(); + } + + private CachedIdDeduplicator() { + ids = CacheBuilder.newBuilder() + .expireAfterAccess(MAX_RETENTION_SINCE_ACCESS, TimeUnit.MILLISECONDS) + .maximumSize(100_000L) + .build(new TrueBooleanLoader()); + } + + @Override + public boolean shouldOutput(byte[] recordId) { + return ids.getUnchecked(StructuralKey.of(recordId, RECORD_ID_CODER)).getAndSet(false); + } + + private static class TrueBooleanLoader + extends CacheLoader<StructuralKey<byte[]>, AtomicBoolean> { + @Override + public AtomicBoolean load(StructuralKey<byte[]> key) throws Exception { + return new AtomicBoolean(true); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569228e4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index 3fb773e..9a287b7 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -105,9 +105,15 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { // factory for this transform UnboundedSource<OutputT, CheckpointMarkT> source = (UnboundedSource<OutputT, CheckpointMarkT>) transform.getTransform().getSource(); + UnboundedReadDeduplicator deduplicator; + if (source.requiresDeduping()) { + deduplicator = UnboundedReadDeduplicator.CachedIdDeduplicator.create(); + } else { + deduplicator = UnboundedReadDeduplicator.NeverDeduplicator.create(); + } UnboundedReadEvaluator<OutputT, CheckpointMarkT> evaluator = new UnboundedReadEvaluator<>( - transform, evaluationContext, source, evaluatorQueue); + transform, evaluationContext, source, deduplicator, evaluatorQueue); evaluatorQueue.offer(evaluator); } else { // otherwise return the existing Queue that arrived before us @@ -142,6 +148,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { * source as derived from {@link #transform} due to splitting. */ private final UnboundedSource<OutputT, CheckpointMarkT> source; + private final UnboundedReadDeduplicator deduplicator; private UnboundedReader<OutputT> currentReader; private CheckpointMarkT checkpointMark; @@ -155,12 +162,14 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> transform, EvaluationContext evaluationContext, UnboundedSource<OutputT, CheckpointMarkT> source, + UnboundedReadDeduplicator deduplicator, ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> evaluatorQueue) { this.transform = transform; this.evaluationContext = evaluationContext; this.evaluatorQueue = evaluatorQueue; this.source = source; this.currentReader = null; + this.deduplicator = deduplicator; this.checkpointMark = null; } @@ -177,8 +186,10 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { if (elementAvailable) { int numElements = 0; do { - output.add(WindowedValue.timestampedValueInGlobalWindow(currentReader.getCurrent(), - currentReader.getCurrentTimestamp())); + if (deduplicator.shouldOutput(currentReader.getCurrentRecordId())) { + output.add(WindowedValue.timestampedValueInGlobalWindow(currentReader.getCurrent(), + currentReader.getCurrentTimestamp())); + } numElements++; } while (numElements < ARBITRARY_MAX_ELEMENTS && currentReader.advance()); watermark = currentReader.getWatermark(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569228e4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadDeduplicatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadDeduplicatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadDeduplicatorTest.java new file mode 100644 index 0000000..b3c9012 --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadDeduplicatorTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import org.apache.beam.runners.direct.UnboundedReadDeduplicator.CachedIdDeduplicator; +import org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Tests for {@link UnboundedReadDeduplicator}. + */ +@RunWith(JUnit4.class) +public class UnboundedReadDeduplicatorTest { + @Test + public void neverDeduplicatorAlwaysTrue() { + byte[] id = new byte[] {-1, 2, 4, 22}; + UnboundedReadDeduplicator dedupper = NeverDeduplicator.create(); + + assertThat(dedupper.shouldOutput(id), is(true)); + assertThat(dedupper.shouldOutput(id), is(true)); + } + + @Test + public void cachedIdDeduplicatorTrueForFirstIdThenFalse() { + byte[] id = new byte[] {-1, 2, 4, 22}; + UnboundedReadDeduplicator dedupper = CachedIdDeduplicator.create(); + + assertThat(dedupper.shouldOutput(id), is(true)); + assertThat(dedupper.shouldOutput(id), is(false)); + } + + @Test + public void cachedIdDeduplicatorMultithreaded() throws InterruptedException { + byte[] id = new byte[] {-1, 2, 4, 22}; + UnboundedReadDeduplicator dedupper = CachedIdDeduplicator.create(); + final CountDownLatch startSignal = new CountDownLatch(1); + int numThreads = 1000; + final CountDownLatch readyLatch = new CountDownLatch(numThreads); + final CountDownLatch finishLine = new CountDownLatch(numThreads); + + ExecutorService executor = Executors.newCachedThreadPool(); + AtomicInteger successCount = new AtomicInteger(); + AtomicInteger failureCount = new AtomicInteger(); + for (int i = 0; i < numThreads; i++) { + executor.submit(new TryOutputIdRunnable(dedupper, + id, + successCount, + failureCount, + readyLatch, + startSignal, + finishLine)); + } + + readyLatch.await(); + startSignal.countDown(); + finishLine.await(10L, TimeUnit.SECONDS); + executor.shutdownNow(); + + assertThat(successCount.get(), equalTo(1)); + assertThat(failureCount.get(), equalTo(numThreads - 1)); + } + + private static class TryOutputIdRunnable implements Runnable { + private final UnboundedReadDeduplicator deduplicator; + private final byte[] id; + private final AtomicInteger successCount; + private final AtomicInteger failureCount; + private final CountDownLatch readyLatch; + private final CountDownLatch startSignal; + private final CountDownLatch finishLine; + + public TryOutputIdRunnable( + UnboundedReadDeduplicator dedupper, + byte[] id, + AtomicInteger successCount, + AtomicInteger failureCount, + CountDownLatch readyLatch, + CountDownLatch startSignal, + CountDownLatch finishLine) { + this.deduplicator = dedupper; + this.id = id; + this.successCount = successCount; + this.failureCount = failureCount; + this.readyLatch = readyLatch; + this.startSignal = startSignal; + this.finishLine = finishLine; + } + + @Override + public void run() { + readyLatch.countDown(); + try { + startSignal.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + if (deduplicator.shouldOutput(id)) { + successCount.incrementAndGet(); + } else { + failureCount.incrementAndGet(); + } + finishLine.countDown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569228e4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index e182e8d..839badf 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -40,6 +40,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; @@ -138,6 +139,38 @@ public class UnboundedReadEvaluatorFactoryTest { } @Test + public void unboundedSourceWithDuplicatesMultipleCalls() throws Exception { + Long[] outputs = new Long[20]; + for (long i = 0L; i < 20L; i++) { + outputs[(int) i] = i % 5L; + } + TestUnboundedSource<Long> source = + new TestUnboundedSource<>(BigEndianLongCoder.of(), outputs); + source.dedupes = true; + + TestPipeline p = TestPipeline.create(); + PCollection<Long> pcollection = p.apply(Read.from(source)); + AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); + + UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection); + when(context.createRootBundle(pcollection)).thenReturn(output); + TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context); + + evaluator.finishBundle(); + assertThat( + output.commit(Instant.now()).getElements(), + containsInAnyOrder(tgw(1L), tgw(2L), tgw(4L), tgw(3L), tgw(0L))); + + UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs); + when(context.createRootBundle(longs)).thenReturn(secondOutput); + TransformEvaluator<?> secondEvaluator = factory.forApplication(sourceTransform, null, context); + secondEvaluator.finishBundle(); + assertThat( + secondOutput.commit(Instant.now()).getElements(), + Matchers.<WindowedValue<Long>>emptyIterable()); + } + + @Test public void evaluatorClosesReaderAfterOutputCount() throws Exception { ContiguousSet<Long> elems = ContiguousSet.create( Range.closed(0L, 20L * UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT), @@ -251,6 +284,7 @@ public class UnboundedReadEvaluatorFactoryTest { static int readerAdvancedCount; private final Coder<T> coder; private final List<T> elems; + private boolean dedupes = false; public TestUnboundedSource(Coder<T> coder, T... elems) { readerAdvancedCount = 0; @@ -278,6 +312,11 @@ public class UnboundedReadEvaluatorFactoryTest { } @Override + public boolean requiresDeduping() { + return dedupes; + } + + @Override public void validate() {} @Override @@ -332,7 +371,16 @@ public class UnboundedReadEvaluatorFactoryTest { @Override public Instant getCurrentTimestamp() throws NoSuchElementException { - return Instant.now(); + return new Instant(index); + } + + @Override + public byte[] getCurrentRecordId() { + try { + return CoderUtils.encodeToByteArray(coder, getCurrent()); + } catch (CoderException e) { + throw new RuntimeException(e); + } } @Override