Repository: incubator-beam
Updated Branches:
  refs/heads/master 3bc0fe669 -> c83e5c48f


Deduplicate Unbounded Reads

This ensures that sources that produce duplicate elements that are
marked as requiresDeduplication are handled by the DirectRunner.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/569228e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/569228e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/569228e4

Branch: refs/heads/master
Commit: 569228e4c5fc2e3a722d8f3089d75cc6fc197d93
Parents: 3bc0fe6
Author: Thomas Groh <tg...@google.com>
Authored: Wed Jun 15 14:41:09 2016 -0700
Committer: Dan Halperin <dhalp...@google.com>
Committed: Wed Jun 22 09:57:23 2016 -0700

----------------------------------------------------------------------
 .../direct/UnboundedReadDeduplicator.java       | 102 ++++++++++++++
 .../direct/UnboundedReadEvaluatorFactory.java   |  17 ++-
 .../direct/UnboundedReadDeduplicatorTest.java   | 134 +++++++++++++++++++
 .../UnboundedReadEvaluatorFactoryTest.java      |  50 ++++++-
 4 files changed, 299 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569228e4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java
new file mode 100644
index 0000000..0246236
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.io.Read.Unbounded;
+import org.apache.beam.sdk.transforms.PTransform;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+import org.joda.time.Duration;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Provides methods to determine if a record is a duplicate within the 
evaluation of a
+ * {@link Unbounded} {@link PTransform}.
+ */
+interface UnboundedReadDeduplicator {
+  /**
+   * Returns true if the record with the provided ID should be output, and 
false if it should not
+   * be because it is a duplicate.
+   */
+  boolean shouldOutput(byte[] recordId);
+
+  /**
+   * An {@link UnboundedReadDeduplicator} that always returns true. For use 
with sources do not
+   * require deduplication.
+   */
+  class NeverDeduplicator implements UnboundedReadDeduplicator {
+    /**
+     * Create a new {@link NeverDeduplicator}.
+     */
+    public static UnboundedReadDeduplicator create() {
+      return new NeverDeduplicator();
+    }
+
+    @Override
+    public boolean shouldOutput(byte[] recordId) {
+      return true;
+    }
+  }
+
+
+  /**
+   * An {@link UnboundedReadDeduplicator} that returns true if the record ID 
has not been seen
+   * within 10 minutes.
+   */
+  class CachedIdDeduplicator implements UnboundedReadDeduplicator {
+    private static final ByteArrayCoder RECORD_ID_CODER = ByteArrayCoder.of();
+    private static final long MAX_RETENTION_SINCE_ACCESS =
+        Duration.standardMinutes(10L).getMillis();
+
+    private final LoadingCache<StructuralKey<byte[]>, AtomicBoolean> ids;
+
+    /**
+     * Create a new {@link CachedIdDeduplicator}.
+     */
+    public static UnboundedReadDeduplicator create() {
+      return new CachedIdDeduplicator();
+    }
+
+    private CachedIdDeduplicator() {
+      ids = CacheBuilder.newBuilder()
+          .expireAfterAccess(MAX_RETENTION_SINCE_ACCESS, TimeUnit.MILLISECONDS)
+          .maximumSize(100_000L)
+          .build(new TrueBooleanLoader());
+    }
+
+    @Override
+    public boolean shouldOutput(byte[] recordId) {
+      return ids.getUnchecked(StructuralKey.of(recordId, 
RECORD_ID_CODER)).getAndSet(false);
+    }
+
+    private static class TrueBooleanLoader
+        extends CacheLoader<StructuralKey<byte[]>, AtomicBoolean> {
+      @Override
+      public AtomicBoolean load(StructuralKey<byte[]> key) throws Exception {
+        return new AtomicBoolean(true);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569228e4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 3fb773e..9a287b7 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -105,9 +105,15 @@ class UnboundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
         // factory for this transform
         UnboundedSource<OutputT, CheckpointMarkT> source =
             (UnboundedSource<OutputT, CheckpointMarkT>) 
transform.getTransform().getSource();
+        UnboundedReadDeduplicator deduplicator;
+        if (source.requiresDeduping()) {
+          deduplicator = 
UnboundedReadDeduplicator.CachedIdDeduplicator.create();
+        } else {
+          deduplicator = UnboundedReadDeduplicator.NeverDeduplicator.create();
+        }
         UnboundedReadEvaluator<OutputT, CheckpointMarkT> evaluator =
             new UnboundedReadEvaluator<>(
-                transform, evaluationContext, source, evaluatorQueue);
+                transform, evaluationContext, source, deduplicator, 
evaluatorQueue);
         evaluatorQueue.offer(evaluator);
       } else {
         // otherwise return the existing Queue that arrived before us
@@ -142,6 +148,7 @@ class UnboundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
      * source as derived from {@link #transform} due to splitting.
      */
     private final UnboundedSource<OutputT, CheckpointMarkT> source;
+    private final UnboundedReadDeduplicator deduplicator;
     private UnboundedReader<OutputT> currentReader;
     private CheckpointMarkT checkpointMark;
 
@@ -155,12 +162,14 @@ class UnboundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
         AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>> 
transform,
         EvaluationContext evaluationContext,
         UnboundedSource<OutputT, CheckpointMarkT> source,
+        UnboundedReadDeduplicator deduplicator,
         ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, 
CheckpointMarkT>> evaluatorQueue) {
       this.transform = transform;
       this.evaluationContext = evaluationContext;
       this.evaluatorQueue = evaluatorQueue;
       this.source = source;
       this.currentReader = null;
+      this.deduplicator = deduplicator;
       this.checkpointMark = null;
     }
 
@@ -177,8 +186,10 @@ class UnboundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
         if (elementAvailable) {
           int numElements = 0;
           do {
-            
output.add(WindowedValue.timestampedValueInGlobalWindow(currentReader.getCurrent(),
-                currentReader.getCurrentTimestamp()));
+            if (deduplicator.shouldOutput(currentReader.getCurrentRecordId())) 
{
+              
output.add(WindowedValue.timestampedValueInGlobalWindow(currentReader.getCurrent(),
+                  currentReader.getCurrentTimestamp()));
+            }
             numElements++;
           } while (numElements < ARBITRARY_MAX_ELEMENTS && 
currentReader.advance());
           watermark = currentReader.getWatermark();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569228e4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadDeduplicatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadDeduplicatorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadDeduplicatorTest.java
new file mode 100644
index 0000000..b3c9012
--- /dev/null
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadDeduplicatorTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import 
org.apache.beam.runners.direct.UnboundedReadDeduplicator.CachedIdDeduplicator;
+import 
org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Tests for {@link UnboundedReadDeduplicator}.
+ */
+@RunWith(JUnit4.class)
+public class UnboundedReadDeduplicatorTest {
+  @Test
+  public void neverDeduplicatorAlwaysTrue() {
+    byte[] id = new byte[] {-1, 2, 4, 22};
+    UnboundedReadDeduplicator dedupper = NeverDeduplicator.create();
+
+    assertThat(dedupper.shouldOutput(id), is(true));
+    assertThat(dedupper.shouldOutput(id), is(true));
+  }
+
+  @Test
+  public void cachedIdDeduplicatorTrueForFirstIdThenFalse() {
+    byte[] id = new byte[] {-1, 2, 4, 22};
+    UnboundedReadDeduplicator dedupper = CachedIdDeduplicator.create();
+
+    assertThat(dedupper.shouldOutput(id), is(true));
+    assertThat(dedupper.shouldOutput(id), is(false));
+  }
+
+  @Test
+  public void cachedIdDeduplicatorMultithreaded() throws InterruptedException {
+    byte[] id = new byte[] {-1, 2, 4, 22};
+    UnboundedReadDeduplicator dedupper = CachedIdDeduplicator.create();
+    final CountDownLatch startSignal = new CountDownLatch(1);
+    int numThreads = 1000;
+    final CountDownLatch readyLatch = new CountDownLatch(numThreads);
+    final CountDownLatch finishLine = new CountDownLatch(numThreads);
+
+    ExecutorService executor = Executors.newCachedThreadPool();
+    AtomicInteger successCount = new AtomicInteger();
+    AtomicInteger failureCount = new AtomicInteger();
+    for (int i = 0; i < numThreads; i++) {
+      executor.submit(new TryOutputIdRunnable(dedupper,
+          id,
+          successCount,
+          failureCount,
+          readyLatch,
+          startSignal,
+          finishLine));
+    }
+
+    readyLatch.await();
+    startSignal.countDown();
+    finishLine.await(10L, TimeUnit.SECONDS);
+    executor.shutdownNow();
+
+    assertThat(successCount.get(), equalTo(1));
+    assertThat(failureCount.get(), equalTo(numThreads - 1));
+  }
+
+  private static class TryOutputIdRunnable implements Runnable {
+    private final UnboundedReadDeduplicator deduplicator;
+    private final byte[] id;
+    private final AtomicInteger successCount;
+    private final AtomicInteger failureCount;
+    private final CountDownLatch readyLatch;
+    private final CountDownLatch startSignal;
+    private final CountDownLatch finishLine;
+
+    public TryOutputIdRunnable(
+        UnboundedReadDeduplicator dedupper,
+        byte[] id,
+        AtomicInteger successCount,
+        AtomicInteger failureCount,
+        CountDownLatch readyLatch,
+        CountDownLatch startSignal,
+        CountDownLatch finishLine) {
+      this.deduplicator = dedupper;
+      this.id = id;
+      this.successCount = successCount;
+      this.failureCount = failureCount;
+      this.readyLatch = readyLatch;
+      this.startSignal = startSignal;
+      this.finishLine = finishLine;
+    }
+
+    @Override
+    public void run() {
+      readyLatch.countDown();
+      try {
+        startSignal.await();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+      if (deduplicator.shouldOutput(id)) {
+        successCount.incrementAndGet();
+      } else {
+        failureCount.incrementAndGet();
+      }
+      finishLine.countDown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569228e4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index e182e8d..839badf 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -138,6 +139,38 @@ public class UnboundedReadEvaluatorFactoryTest {
   }
 
   @Test
+  public void unboundedSourceWithDuplicatesMultipleCalls() throws Exception {
+    Long[] outputs = new Long[20];
+    for (long i = 0L; i < 20L; i++) {
+      outputs[(int) i] = i % 5L;
+    }
+    TestUnboundedSource<Long> source =
+        new TestUnboundedSource<>(BigEndianLongCoder.of(), outputs);
+    source.dedupes = true;
+
+    TestPipeline p = TestPipeline.create();
+    PCollection<Long> pcollection = p.apply(Read.from(source));
+    AppliedPTransform<?, ?, ?> sourceTransform = 
pcollection.getProducingTransformInternal();
+
+    UncommittedBundle<Long> output = 
bundleFactory.createRootBundle(pcollection);
+    when(context.createRootBundle(pcollection)).thenReturn(output);
+    TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, 
null, context);
+
+    evaluator.finishBundle();
+    assertThat(
+        output.commit(Instant.now()).getElements(),
+        containsInAnyOrder(tgw(1L), tgw(2L), tgw(4L), tgw(3L), tgw(0L)));
+
+    UncommittedBundle<Long> secondOutput = 
bundleFactory.createRootBundle(longs);
+    when(context.createRootBundle(longs)).thenReturn(secondOutput);
+    TransformEvaluator<?> secondEvaluator = 
factory.forApplication(sourceTransform, null, context);
+    secondEvaluator.finishBundle();
+    assertThat(
+        secondOutput.commit(Instant.now()).getElements(),
+        Matchers.<WindowedValue<Long>>emptyIterable());
+  }
+
+  @Test
   public void evaluatorClosesReaderAfterOutputCount() throws Exception {
     ContiguousSet<Long> elems = ContiguousSet.create(
         Range.closed(0L, 20L * 
UnboundedReadEvaluatorFactory.MAX_READER_REUSE_COUNT),
@@ -251,6 +284,7 @@ public class UnboundedReadEvaluatorFactoryTest {
     static int readerAdvancedCount;
     private final Coder<T> coder;
     private final List<T> elems;
+    private boolean dedupes = false;
 
     public TestUnboundedSource(Coder<T> coder, T... elems) {
       readerAdvancedCount = 0;
@@ -278,6 +312,11 @@ public class UnboundedReadEvaluatorFactoryTest {
     }
 
     @Override
+    public boolean requiresDeduping() {
+      return dedupes;
+    }
+
+    @Override
     public void validate() {}
 
     @Override
@@ -332,7 +371,16 @@ public class UnboundedReadEvaluatorFactoryTest {
 
       @Override
       public Instant getCurrentTimestamp() throws NoSuchElementException {
-        return Instant.now();
+        return new Instant(index);
+      }
+
+      @Override
+      public byte[] getCurrentRecordId() {
+        try {
+          return CoderUtils.encodeToByteArray(coder, getCurrent());
+        } catch (CoderException e) {
+          throw new RuntimeException(e);
+        }
       }
 
       @Override

Reply via email to