Repository: incubator-beam
Updated Branches:
  refs/heads/master 3c731707b -> 93d2e374c


KafkaIO reader should set consumedOffset even before reading the first record.

* Distinguish between uninitialized consumed offset and consumed offset for
  an empty partition.

* Add test to verify we handle empty partitions better.

* Fix how we were using MockConsumer. checkpoint test was actually doing what 
it was supposed to.

* Avoid cpu spinning in case of a test failure


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/08c2f1c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/08c2f1c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/08c2f1c3

Branch: refs/heads/master
Commit: 08c2f1c361eb17f47794e805df910c3dad6a9d43
Parents: 3c73170
Author: Raghu Angadi <rang...@google.com>
Authored: Fri Oct 7 17:18:25 2016 -0700
Committer: Dan Halperin <dhalp...@google.com>
Committed: Thu Oct 13 17:21:52 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/kafka/KafkaCheckpointMark.java  |   8 +-
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   |  60 +++----
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 162 +++++++++++++++----
 3 files changed, 165 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08c2f1c3/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
index 664bb6f..4f9e96f 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
@@ -56,19 +56,19 @@ public class KafkaCheckpointMark implements 
UnboundedSource.CheckpointMark, Seri
    */
   public static class PartitionMark implements Serializable {
     private final TopicPartition topicPartition;
-    private final long offset;
+    private final long nextOffset;
 
     public PartitionMark(TopicPartition topicPartition, long offset) {
       this.topicPartition = topicPartition;
-      this.offset = offset;
+      this.nextOffset = offset;
     }
 
     public TopicPartition getTopicPartition() {
       return topicPartition;
     }
 
-    public long getOffset() {
-      return offset;
+    public long getNextOffset() {
+      return nextOffset;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08c2f1c3/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index e26f7c5..2030789 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -782,6 +782,8 @@ public class KafkaIO {
         Executors.newSingleThreadScheduledExecutor();
     private static final int OFFSET_UPDATE_INTERVAL_SECONDS = 5;
 
+    private static final long UNINITIALIZED_OFFSET = -1;
+
     /** watermark before any records have been read. */
     private static Instant initialWatermark = new Instant(Long.MIN_VALUE);
 
@@ -792,7 +794,7 @@ public class KafkaIO {
     // maintains state of each assigned partition (buffered records, consumed 
offset, etc)
     private static class PartitionState {
       private final TopicPartition topicPartition;
-      private long consumedOffset;
+      private long nextOffset;
       private long latestOffset;
       private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = 
Collections.emptyIterator();
 
@@ -800,15 +802,15 @@ public class KafkaIO {
       private double avgRecordSize = 0;
       private static final int movingAvgWindow = 1000; // very roughly avg of 
last 1000 elements
 
-      PartitionState(TopicPartition partition, long offset) {
+      PartitionState(TopicPartition partition, long nextOffset) {
         this.topicPartition = partition;
-        this.consumedOffset = offset;
-        this.latestOffset = -1;
+        this.nextOffset = nextOffset;
+        this.latestOffset = UNINITIALIZED_OFFSET;
       }
 
       // update consumedOffset and avgRecordSize
       void recordConsumed(long offset, int size) {
-        consumedOffset = offset;
+        nextOffset = offset + 1;
 
         // this is always updated from single thread. probably not worth 
making it an AtomicDouble
         if (avgRecordSize <= 0) {
@@ -825,14 +827,10 @@ public class KafkaIO {
 
       synchronized long approxBacklogInBytes() {
         // Note that is an an estimate of uncompressed backlog.
-        // Messages on Kafka might be comressed.
-        if (latestOffset < 0 || consumedOffset < 0) {
+        if (latestOffset < 0 || nextOffset < 0) {
           return UnboundedReader.BACKLOG_UNKNOWN;
         }
-        if (latestOffset <= consumedOffset || consumedOffset < 0) {
-          return 0;
-        }
-        return (long) ((latestOffset - consumedOffset - 1) * avgRecordSize);
+        return Math.max(0, (long) ((latestOffset - nextOffset) * 
avgRecordSize));
       }
     }
 
@@ -846,7 +844,7 @@ public class KafkaIO {
       partitionStates = 
ImmutableList.copyOf(Lists.transform(source.assignedPartitions,
           new Function<TopicPartition, PartitionState>() {
             public PartitionState apply(TopicPartition tp) {
-              return new PartitionState(tp, -1L);
+              return new PartitionState(tp, UNINITIALIZED_OFFSET);
             }
         }));
 
@@ -866,7 +864,7 @@ public class KafkaIO {
               "checkpointed partition %s and assigned partition %s don't 
match",
               ckptMark.getTopicPartition(), assigned);
 
-          partitionStates.get(i).consumedOffset = ckptMark.getOffset();
+          partitionStates.get(i).nextOffset = ckptMark.getNextOffset();
         }
       }
     }
@@ -925,18 +923,22 @@ public class KafkaIO {
       consumer = source.consumerFactoryFn.apply(source.consumerConfig);
       consumer.assign(source.assignedPartitions);
 
-      // seek to consumedOffset + 1 if it is set
       for (PartitionState p : partitionStates) {
-        if (p.consumedOffset >= 0) {
-          LOG.info("{}: resuming {} at {}", name, p.topicPartition, 
p.consumedOffset + 1);
-          consumer.seek(p.topicPartition, p.consumedOffset + 1);
+        if (p.nextOffset != UNINITIALIZED_OFFSET) {
+          consumer.seek(p.topicPartition, p.nextOffset);
         } else {
-          LOG.info("{}: resuming {} at default offset", name, 
p.topicPartition);
+          // nextOffset is unininitialized here, meaning start reading from 
latest record as of now
+          // ('latest' is the default, and is configurable). Remember the 
current position without
+          // waiting until the first record read. This ensures checkpoint is 
accurate even if the
+          // reader is closed before reading any records.
+          p.nextOffset = consumer.position(p.topicPartition);
         }
+
+        LOG.info("{}: reading from {} starting at offset {}", name, 
p.topicPartition, p.nextOffset);
       }
 
-      // start consumer read loop.
-      // Note that consumer is not thread safe, should not accessed out side 
consumerPollLoop()
+      // Start consumer read loop.
+      // Note that consumer is not thread safe, should not be accessed out 
side consumerPollLoop().
       consumerPollThread.submit(
           new Runnable() {
             public void run() {
@@ -989,10 +991,10 @@ public class KafkaIO {
           }
 
           ConsumerRecord<byte[], byte[]> rawRecord = pState.recordIter.next();
-          long consumed = pState.consumedOffset;
+          long expected = pState.nextOffset;
           long offset = rawRecord.offset();
 
-          if (consumed >= 0 && offset <= consumed) { // -- (a)
+          if (offset < expected) { // -- (a)
             // this can happen when compression is enabled in Kafka (seems to 
be fixed in 0.10)
             // should we check if the offset is way off from consumedOffset 
(say > 1M)?
             LOG.warn("{}: ignoring already consumed offset {} for {}",
@@ -1001,9 +1003,9 @@ public class KafkaIO {
           }
 
           // sanity check
-          if (consumed >= 0 && (offset - consumed) != 1) {
-            LOG.warn("{}: gap in offsets for {} after {}. {} records missing.",
-                this, pState.topicPartition, consumed, offset - consumed - 1);
+          if (offset != expected) {
+            LOG.warn("{}: gap in offsets for {} at {}. {} records missing.",
+                this, pState.topicPartition, expected, offset - expected);
           }
 
           if (curRecord == null) {
@@ -1059,11 +1061,11 @@ public class KafkaIO {
           p.setLatestOffset(offset);
         } catch (Exception e) {
           LOG.warn("{}: exception while fetching latest offsets. ignored.",  
this, e);
-          p.setLatestOffset(-1L); // reset
+          p.setLatestOffset(UNINITIALIZED_OFFSET); // reset
         }
 
-        LOG.debug("{}: latest offset update for {} : {} (consumed offset {}, 
avg record size {})",
-            this, p.topicPartition, p.latestOffset, p.consumedOffset, 
p.avgRecordSize);
+        LOG.debug("{}: latest offset update for {} : {} (consumer offset {}, 
avg record size {})",
+            this, p.topicPartition, p.latestOffset, p.nextOffset, 
p.avgRecordSize);
       }
 
       LOG.debug("{}:  backlog {}", this, getSplitBacklogBytes());
@@ -1086,7 +1088,7 @@ public class KafkaIO {
           Lists.transform(partitionStates,
               new Function<PartitionState, PartitionMark>() {
                 public PartitionMark apply(PartitionState p) {
-                  return new PartitionMark(p.topicPartition, p.consumedOffset);
+                  return new PartitionMark(p.topicPartition, p.nextOffset);
                 }
               }
           )));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08c2f1c3/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 772efe1..67aa675 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.kafka;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.ImmutableList;
@@ -35,6 +36,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
@@ -69,6 +72,7 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.hamcrest.collection.IsIterableContainingInAnyOrder;
 import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
@@ -78,7 +82,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /**
- * Tests of {@link KafkaSource}.
+ * Tests of {@link KafkaIO}.
  */
 @RunWith(JUnit4.class)
 public class KafkaIOTest {
@@ -96,7 +100,8 @@ public class KafkaIOTest {
   // Update mock consumer with records distributed among the given topics, 
each with given number
   // of partitions. Records are assigned in round-robin order among the 
partitions.
   private static MockConsumer<byte[], byte[]> mkMockConsumer(
-      List<String> topics, int partitionsPerTopic, int numElements) {
+      List<String> topics, int partitionsPerTopic, int numElements,
+      OffsetResetStrategy offsetResetStrategy) {
 
     final List<TopicPartition> partitions = new ArrayList<>();
     final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = 
new HashMap<>();
@@ -105,8 +110,10 @@ public class KafkaIOTest {
     for (String topic : topics) {
       List<PartitionInfo> partIds = new ArrayList<>(partitionsPerTopic);
       for (int i = 0; i < partitionsPerTopic; i++) {
-        partitions.add(new TopicPartition(topic, i));
+        TopicPartition tp = new TopicPartition(topic, i);
+        partitions.add(tp);
         partIds.add(new PartitionInfo(topic, i, null, null, null));
+        records.put(tp, new ArrayList<ConsumerRecord<byte[], byte[]>>());
       }
       partitionMap.put(topic, partIds);
     }
@@ -118,30 +125,28 @@ public class KafkaIOTest {
       int pIdx = i % numPartitions;
       TopicPartition tp = partitions.get(pIdx);
 
-      if (!records.containsKey(tp)) {
-        records.put(tp, new ArrayList<ConsumerRecord<byte[], byte[]>>());
-      }
       records.get(tp).add(
-          new ConsumerRecord<byte[], byte[]>(
+          new ConsumerRecord<>(
               tp.topic(),
               tp.partition(),
               offsets[pIdx]++,
-              ByteBuffer.wrap(new byte[8]).putInt(i).array(),    // key is 4 
byte record id
+              ByteBuffer.wrap(new byte[4]).putInt(i).array(),    // key is 4 
byte record id
               ByteBuffer.wrap(new byte[8]).putLong(i).array())); // value is 8 
byte record id
     }
 
-    MockConsumer<byte[], byte[]> consumer =
-        new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
-          // override assign() to add records that belong to the assigned 
partitions.
-          public void assign(List<TopicPartition> assigned) {
+    // This is updated when reader assigns partitions.
+    final AtomicReference<List<TopicPartition>> assignedPartitions =
+        new AtomicReference<>(Collections.<TopicPartition>emptyList());
+
+    final MockConsumer<byte[], byte[]> consumer =
+        new MockConsumer<byte[], byte[]>(offsetResetStrategy) {
+          // override assign() in order to set offset limits & to save 
assigned partitions.
+          public void assign(final List<TopicPartition> assigned) {
             super.assign(assigned);
+            assignedPartitions.set(ImmutableList.copyOf(assigned));
             for (TopicPartition tp : assigned) {
-              for (ConsumerRecord<byte[], byte[]> r : records.get(tp)) {
-                addRecord(r);
-              }
               updateBeginningOffsets(ImmutableMap.of(tp, 0L));
               updateEndOffsets(ImmutableMap.of(tp, (long) 
records.get(tp).size()));
-              seek(tp, 0);
             }
           }
         };
@@ -150,6 +155,29 @@ public class KafkaIOTest {
       consumer.updatePartitions(topic, partitionMap.get(topic));
     }
 
+    // MockConsumer does not maintain any relationship between partition seek 
position and the
+    // records added. e.g. if we add 10 records to a partition and then seek 
to end of the
+    // partition, MockConsumer is still going to return the 10 records in next 
poll. It is
+    // our responsibility to make sure currently enqueued records sync with 
partition offsets.
+    // The following task will be called inside each invocation to 
MockConsumer.poll().
+    // We enqueue only the records with the offset >= partition's current 
position.
+    Runnable recordEnquerTask = new Runnable() {
+      @Override
+      public void run() {
+        // add all the records with offset >= current partition position.
+        for (TopicPartition tp : assignedPartitions.get()) {
+          long curPos = consumer.position(tp);
+          for (ConsumerRecord<byte[], byte[]> r : records.get(tp)) {
+            if (r.offset() >= curPos) {
+              consumer.addRecord(r);
+            }
+          }
+        }
+        consumer.schedulePollTask(this);
+      }
+    };
+
+    consumer.schedulePollTask(recordEnquerTask);
     return consumer;
   }
 
@@ -158,15 +186,20 @@ public class KafkaIOTest {
     private final List<String> topics;
     private final int partitionsPerTopic;
     private final int numElements;
+    private final OffsetResetStrategy offsetResetStrategy;
 
-    public ConsumerFactoryFn(List<String> topics, int partitionsPerTopic, int 
numElements) {
+    public ConsumerFactoryFn(List<String> topics,
+                             int partitionsPerTopic,
+                             int numElements,
+                             OffsetResetStrategy offsetResetStrategy) {
       this.topics = topics;
       this.partitionsPerTopic = partitionsPerTopic;
       this.numElements = numElements;
+      this.offsetResetStrategy = offsetResetStrategy;
     }
 
     public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
-      return mkMockConsumer(topics, partitionsPerTopic, numElements);
+      return mkMockConsumer(topics, partitionsPerTopic, numElements, 
offsetResetStrategy);
     }
   }
 
@@ -183,7 +216,8 @@ public class KafkaIOTest {
     KafkaIO.Read<Integer, Long> reader = KafkaIO.read()
         .withBootstrapServers("none")
         .withTopics(topics)
-        .withConsumerFactoryFn(new ConsumerFactoryFn(topics, 10, numElements)) 
// 20 partitions
+        .withConsumerFactoryFn(new ConsumerFactoryFn(
+            topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 
partitions
         .withKeyCoder(BigEndianIntegerCoder.of())
         .withValueCoder(BigEndianLongCoder.of())
         .withMaxNumRecords(numElements);
@@ -257,7 +291,8 @@ public class KafkaIOTest {
     KafkaIO.TypedRead<byte[], Long> reader = KafkaIO.read()
         .withBootstrapServers("none")
         .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5)))
-        .withConsumerFactoryFn(new ConsumerFactoryFn(topics, 10, numElements)) 
// 10 partitions
+        .withConsumerFactoryFn(new ConsumerFactoryFn(
+            topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 10 
partitions
         .withValueCoder(BigEndianLongCoder.of())
         .withMaxNumRecords(numElements / 10);
 
@@ -357,7 +392,14 @@ public class KafkaIOTest {
   private static void advanceOnce(UnboundedReader<?> reader) throws 
IOException {
     while (!reader.advance()) {
       // very rarely will there be more than one attempts.
-      // in case of a bug we might end up looping forever, and test will fail 
with a timeout.
+      // In case of a bug we might end up looping forever, and test will fail 
with a timeout.
+
+      // Avoid hard cpu spinning in case of a test failure.
+      try {
+        Thread.sleep(1);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
     }
   }
 
@@ -373,14 +415,11 @@ public class KafkaIOTest {
           .get(0);
 
     UnboundedReader<KafkaRecord<Integer, Long>> reader = 
source.createReader(null, null);
-    final int numToSkip = 3;
+    final int numToSkip = 20; // one from each partition.
 
     // advance numToSkip elements
-    if (!reader.start()) {
-      advanceOnce(reader);
-    }
-
-    for (long l = 0; l < numToSkip - 1; ++l) {
+    reader.start();
+    for (int l = 1; l < numToSkip; ++l) {
       advanceOnce(reader);
     }
 
@@ -396,10 +435,7 @@ public class KafkaIOTest {
     // Confirm that we get the next elements in sequence.
     // This also confirms that Reader interleaves records from each partitions 
by the reader.
 
-    if (!reader.start()) {
-      advanceOnce(reader);
-    }
-
+    reader.start();
     for (int i = numToSkip; i < numElements; i++) {
       assertEquals(i, (long) reader.getCurrent().getKV().getValue());
       assertEquals(i, reader.getCurrentTimestamp().getMillis());
@@ -410,6 +446,68 @@ public class KafkaIOTest {
   }
 
   @Test
+  public void testUnboundedSourceCheckpointMarkWithEmptyPartitions() throws 
Exception {
+    // Similar to testUnboundedSourceCheckpointMark(), but verifies that 
source resumes
+    // properly from empty partitions, without missing messages added since 
checkpoint.
+
+    // Initialize consumer with fewer elements than number of partitions so 
that some are empty.
+    int initialNumElements = 5;
+    UnboundedSource<KafkaRecord<Integer, Long>, KafkaCheckpointMark> source =
+        mkKafkaReadTransform(initialNumElements, new ValueAsTimestampFn())
+            .makeSource()
+            .generateInitialSplits(1, PipelineOptionsFactory.fromArgs(new 
String[0]).create())
+            .get(0);
+
+    UnboundedReader<KafkaRecord<Integer, Long>> reader = 
source.createReader(null, null);
+
+    reader.start();
+    for (int l = 1; l < initialNumElements; ++l) {
+      advanceOnce(reader);
+    }
+
+    // Checkpoint and restart, and confirm that the source continues correctly.
+    KafkaCheckpointMark mark = CoderUtils.clone(
+        source.getCheckpointMarkCoder(), (KafkaCheckpointMark) 
reader.getCheckpointMark());
+
+    // Create another source with MockConsumer with 
OffsetResetStrategy.LATEST. This insures that
+    // the reader need to explicitly need to seek to first offset for 
partitions that were empty.
+
+    int numElements = 100; // all the 20 partitions will have elements
+    List<String> topics = ImmutableList.of("topic_a", "topic_b");
+
+    source = KafkaIO.read()
+        .withBootstrapServers("none")
+        .withTopics(topics)
+        .withConsumerFactoryFn(new ConsumerFactoryFn(
+            topics, 10, numElements, OffsetResetStrategy.LATEST))
+        .withKeyCoder(BigEndianIntegerCoder.of())
+        .withValueCoder(BigEndianLongCoder.of())
+        .withMaxNumRecords(numElements)
+        .withTimestampFn(new ValueAsTimestampFn())
+        .makeSource()
+        .generateInitialSplits(1, PipelineOptionsFactory.fromArgs(new 
String[0]).create())
+        .get(0);
+
+    reader = source.createReader(null, mark);
+
+    reader.start();
+
+    // Verify in any order. As the partitions are unevenly read, the returned 
records are not in a
+    // simple order. Note that testUnboundedSourceCheckpointMark() verifies 
round-robin oder.
+
+    List<Long> expected = new ArrayList<>();
+    List<Long> actual = new ArrayList<>();
+    for (long i = initialNumElements; i < numElements; i++) {
+      expected.add(i);
+      actual.add(reader.getCurrent().getKV().getValue());
+      if ((i + 1) < numElements) {
+        advanceOnce(reader);
+      }
+    }
+    assertThat(actual, 
IsIterableContainingInAnyOrder.containsInAnyOrder(expected.toArray()));
+  }
+
+  @Test
   public void testSink() throws Exception {
     // Simply read from kafka source and write to kafka sink. Then verify the 
records
     // are correctly published to mock kafka producer.
@@ -440,7 +538,7 @@ public class KafkaIOTest {
       completionThread.shutdown();
 
       verifyProducerRecords(topic, numElements, false);
-     }
+    }
   }
 
   @Test

Reply via email to