Repository: incubator-beam Updated Branches: refs/heads/master 3c731707b -> 93d2e374c
KafkaIO reader should set consumedOffset even before reading the first record. * Distinguish between uninitialized consumed offset and consumed offset for an empty partition. * Add test to verify we handle empty partitions better. * Fix how we were using MockConsumer. checkpoint test was actually doing what it was supposed to. * Avoid cpu spinning in case of a test failure Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/08c2f1c3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/08c2f1c3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/08c2f1c3 Branch: refs/heads/master Commit: 08c2f1c361eb17f47794e805df910c3dad6a9d43 Parents: 3c73170 Author: Raghu Angadi <rang...@google.com> Authored: Fri Oct 7 17:18:25 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Thu Oct 13 17:21:52 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/kafka/KafkaCheckpointMark.java | 8 +- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 60 +++---- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 162 +++++++++++++++---- 3 files changed, 165 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08c2f1c3/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java index 664bb6f..4f9e96f 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java @@ -56,19 +56,19 @@ public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark, Seri */ public static class PartitionMark implements Serializable { private final TopicPartition topicPartition; - private final long offset; + private final long nextOffset; public PartitionMark(TopicPartition topicPartition, long offset) { this.topicPartition = topicPartition; - this.offset = offset; + this.nextOffset = offset; } public TopicPartition getTopicPartition() { return topicPartition; } - public long getOffset() { - return offset; + public long getNextOffset() { + return nextOffset; } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08c2f1c3/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index e26f7c5..2030789 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -782,6 +782,8 @@ public class KafkaIO { Executors.newSingleThreadScheduledExecutor(); private static final int OFFSET_UPDATE_INTERVAL_SECONDS = 5; + private static final long UNINITIALIZED_OFFSET = -1; + /** watermark before any records have been read. */ private static Instant initialWatermark = new Instant(Long.MIN_VALUE); @@ -792,7 +794,7 @@ public class KafkaIO { // maintains state of each assigned partition (buffered records, consumed offset, etc) private static class PartitionState { private final TopicPartition topicPartition; - private long consumedOffset; + private long nextOffset; private long latestOffset; private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = Collections.emptyIterator(); @@ -800,15 +802,15 @@ public class KafkaIO { private double avgRecordSize = 0; private static final int movingAvgWindow = 1000; // very roughly avg of last 1000 elements - PartitionState(TopicPartition partition, long offset) { + PartitionState(TopicPartition partition, long nextOffset) { this.topicPartition = partition; - this.consumedOffset = offset; - this.latestOffset = -1; + this.nextOffset = nextOffset; + this.latestOffset = UNINITIALIZED_OFFSET; } // update consumedOffset and avgRecordSize void recordConsumed(long offset, int size) { - consumedOffset = offset; + nextOffset = offset + 1; // this is always updated from single thread. probably not worth making it an AtomicDouble if (avgRecordSize <= 0) { @@ -825,14 +827,10 @@ public class KafkaIO { synchronized long approxBacklogInBytes() { // Note that is an an estimate of uncompressed backlog. - // Messages on Kafka might be comressed. - if (latestOffset < 0 || consumedOffset < 0) { + if (latestOffset < 0 || nextOffset < 0) { return UnboundedReader.BACKLOG_UNKNOWN; } - if (latestOffset <= consumedOffset || consumedOffset < 0) { - return 0; - } - return (long) ((latestOffset - consumedOffset - 1) * avgRecordSize); + return Math.max(0, (long) ((latestOffset - nextOffset) * avgRecordSize)); } } @@ -846,7 +844,7 @@ public class KafkaIO { partitionStates = ImmutableList.copyOf(Lists.transform(source.assignedPartitions, new Function<TopicPartition, PartitionState>() { public PartitionState apply(TopicPartition tp) { - return new PartitionState(tp, -1L); + return new PartitionState(tp, UNINITIALIZED_OFFSET); } })); @@ -866,7 +864,7 @@ public class KafkaIO { "checkpointed partition %s and assigned partition %s don't match", ckptMark.getTopicPartition(), assigned); - partitionStates.get(i).consumedOffset = ckptMark.getOffset(); + partitionStates.get(i).nextOffset = ckptMark.getNextOffset(); } } } @@ -925,18 +923,22 @@ public class KafkaIO { consumer = source.consumerFactoryFn.apply(source.consumerConfig); consumer.assign(source.assignedPartitions); - // seek to consumedOffset + 1 if it is set for (PartitionState p : partitionStates) { - if (p.consumedOffset >= 0) { - LOG.info("{}: resuming {} at {}", name, p.topicPartition, p.consumedOffset + 1); - consumer.seek(p.topicPartition, p.consumedOffset + 1); + if (p.nextOffset != UNINITIALIZED_OFFSET) { + consumer.seek(p.topicPartition, p.nextOffset); } else { - LOG.info("{}: resuming {} at default offset", name, p.topicPartition); + // nextOffset is unininitialized here, meaning start reading from latest record as of now + // ('latest' is the default, and is configurable). Remember the current position without + // waiting until the first record read. This ensures checkpoint is accurate even if the + // reader is closed before reading any records. + p.nextOffset = consumer.position(p.topicPartition); } + + LOG.info("{}: reading from {} starting at offset {}", name, p.topicPartition, p.nextOffset); } - // start consumer read loop. - // Note that consumer is not thread safe, should not accessed out side consumerPollLoop() + // Start consumer read loop. + // Note that consumer is not thread safe, should not be accessed out side consumerPollLoop(). consumerPollThread.submit( new Runnable() { public void run() { @@ -989,10 +991,10 @@ public class KafkaIO { } ConsumerRecord<byte[], byte[]> rawRecord = pState.recordIter.next(); - long consumed = pState.consumedOffset; + long expected = pState.nextOffset; long offset = rawRecord.offset(); - if (consumed >= 0 && offset <= consumed) { // -- (a) + if (offset < expected) { // -- (a) // this can happen when compression is enabled in Kafka (seems to be fixed in 0.10) // should we check if the offset is way off from consumedOffset (say > 1M)? LOG.warn("{}: ignoring already consumed offset {} for {}", @@ -1001,9 +1003,9 @@ public class KafkaIO { } // sanity check - if (consumed >= 0 && (offset - consumed) != 1) { - LOG.warn("{}: gap in offsets for {} after {}. {} records missing.", - this, pState.topicPartition, consumed, offset - consumed - 1); + if (offset != expected) { + LOG.warn("{}: gap in offsets for {} at {}. {} records missing.", + this, pState.topicPartition, expected, offset - expected); } if (curRecord == null) { @@ -1059,11 +1061,11 @@ public class KafkaIO { p.setLatestOffset(offset); } catch (Exception e) { LOG.warn("{}: exception while fetching latest offsets. ignored.", this, e); - p.setLatestOffset(-1L); // reset + p.setLatestOffset(UNINITIALIZED_OFFSET); // reset } - LOG.debug("{}: latest offset update for {} : {} (consumed offset {}, avg record size {})", - this, p.topicPartition, p.latestOffset, p.consumedOffset, p.avgRecordSize); + LOG.debug("{}: latest offset update for {} : {} (consumer offset {}, avg record size {})", + this, p.topicPartition, p.latestOffset, p.nextOffset, p.avgRecordSize); } LOG.debug("{}: backlog {}", this, getSplitBacklogBytes()); @@ -1086,7 +1088,7 @@ public class KafkaIO { Lists.transform(partitionStates, new Function<PartitionState, PartitionMark>() { public PartitionMark apply(PartitionState p) { - return new PartitionMark(p.topicPartition, p.consumedOffset); + return new PartitionMark(p.topicPartition, p.nextOffset); } } ))); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08c2f1c3/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 772efe1..67aa675 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.kafka; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import com.google.common.collect.ImmutableList; @@ -35,6 +36,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; @@ -69,6 +72,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.hamcrest.collection.IsIterableContainingInAnyOrder; import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; @@ -78,7 +82,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; /** - * Tests of {@link KafkaSource}. + * Tests of {@link KafkaIO}. */ @RunWith(JUnit4.class) public class KafkaIOTest { @@ -96,7 +100,8 @@ public class KafkaIOTest { // Update mock consumer with records distributed among the given topics, each with given number // of partitions. Records are assigned in round-robin order among the partitions. private static MockConsumer<byte[], byte[]> mkMockConsumer( - List<String> topics, int partitionsPerTopic, int numElements) { + List<String> topics, int partitionsPerTopic, int numElements, + OffsetResetStrategy offsetResetStrategy) { final List<TopicPartition> partitions = new ArrayList<>(); final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = new HashMap<>(); @@ -105,8 +110,10 @@ public class KafkaIOTest { for (String topic : topics) { List<PartitionInfo> partIds = new ArrayList<>(partitionsPerTopic); for (int i = 0; i < partitionsPerTopic; i++) { - partitions.add(new TopicPartition(topic, i)); + TopicPartition tp = new TopicPartition(topic, i); + partitions.add(tp); partIds.add(new PartitionInfo(topic, i, null, null, null)); + records.put(tp, new ArrayList<ConsumerRecord<byte[], byte[]>>()); } partitionMap.put(topic, partIds); } @@ -118,30 +125,28 @@ public class KafkaIOTest { int pIdx = i % numPartitions; TopicPartition tp = partitions.get(pIdx); - if (!records.containsKey(tp)) { - records.put(tp, new ArrayList<ConsumerRecord<byte[], byte[]>>()); - } records.get(tp).add( - new ConsumerRecord<byte[], byte[]>( + new ConsumerRecord<>( tp.topic(), tp.partition(), offsets[pIdx]++, - ByteBuffer.wrap(new byte[8]).putInt(i).array(), // key is 4 byte record id + ByteBuffer.wrap(new byte[4]).putInt(i).array(), // key is 4 byte record id ByteBuffer.wrap(new byte[8]).putLong(i).array())); // value is 8 byte record id } - MockConsumer<byte[], byte[]> consumer = - new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) { - // override assign() to add records that belong to the assigned partitions. - public void assign(List<TopicPartition> assigned) { + // This is updated when reader assigns partitions. + final AtomicReference<List<TopicPartition>> assignedPartitions = + new AtomicReference<>(Collections.<TopicPartition>emptyList()); + + final MockConsumer<byte[], byte[]> consumer = + new MockConsumer<byte[], byte[]>(offsetResetStrategy) { + // override assign() in order to set offset limits & to save assigned partitions. + public void assign(final List<TopicPartition> assigned) { super.assign(assigned); + assignedPartitions.set(ImmutableList.copyOf(assigned)); for (TopicPartition tp : assigned) { - for (ConsumerRecord<byte[], byte[]> r : records.get(tp)) { - addRecord(r); - } updateBeginningOffsets(ImmutableMap.of(tp, 0L)); updateEndOffsets(ImmutableMap.of(tp, (long) records.get(tp).size())); - seek(tp, 0); } } }; @@ -150,6 +155,29 @@ public class KafkaIOTest { consumer.updatePartitions(topic, partitionMap.get(topic)); } + // MockConsumer does not maintain any relationship between partition seek position and the + // records added. e.g. if we add 10 records to a partition and then seek to end of the + // partition, MockConsumer is still going to return the 10 records in next poll. It is + // our responsibility to make sure currently enqueued records sync with partition offsets. + // The following task will be called inside each invocation to MockConsumer.poll(). + // We enqueue only the records with the offset >= partition's current position. + Runnable recordEnquerTask = new Runnable() { + @Override + public void run() { + // add all the records with offset >= current partition position. + for (TopicPartition tp : assignedPartitions.get()) { + long curPos = consumer.position(tp); + for (ConsumerRecord<byte[], byte[]> r : records.get(tp)) { + if (r.offset() >= curPos) { + consumer.addRecord(r); + } + } + } + consumer.schedulePollTask(this); + } + }; + + consumer.schedulePollTask(recordEnquerTask); return consumer; } @@ -158,15 +186,20 @@ public class KafkaIOTest { private final List<String> topics; private final int partitionsPerTopic; private final int numElements; + private final OffsetResetStrategy offsetResetStrategy; - public ConsumerFactoryFn(List<String> topics, int partitionsPerTopic, int numElements) { + public ConsumerFactoryFn(List<String> topics, + int partitionsPerTopic, + int numElements, + OffsetResetStrategy offsetResetStrategy) { this.topics = topics; this.partitionsPerTopic = partitionsPerTopic; this.numElements = numElements; + this.offsetResetStrategy = offsetResetStrategy; } public Consumer<byte[], byte[]> apply(Map<String, Object> config) { - return mkMockConsumer(topics, partitionsPerTopic, numElements); + return mkMockConsumer(topics, partitionsPerTopic, numElements, offsetResetStrategy); } } @@ -183,7 +216,8 @@ public class KafkaIOTest { KafkaIO.Read<Integer, Long> reader = KafkaIO.read() .withBootstrapServers("none") .withTopics(topics) - .withConsumerFactoryFn(new ConsumerFactoryFn(topics, 10, numElements)) // 20 partitions + .withConsumerFactoryFn(new ConsumerFactoryFn( + topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 partitions .withKeyCoder(BigEndianIntegerCoder.of()) .withValueCoder(BigEndianLongCoder.of()) .withMaxNumRecords(numElements); @@ -257,7 +291,8 @@ public class KafkaIOTest { KafkaIO.TypedRead<byte[], Long> reader = KafkaIO.read() .withBootstrapServers("none") .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5))) - .withConsumerFactoryFn(new ConsumerFactoryFn(topics, 10, numElements)) // 10 partitions + .withConsumerFactoryFn(new ConsumerFactoryFn( + topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 10 partitions .withValueCoder(BigEndianLongCoder.of()) .withMaxNumRecords(numElements / 10); @@ -357,7 +392,14 @@ public class KafkaIOTest { private static void advanceOnce(UnboundedReader<?> reader) throws IOException { while (!reader.advance()) { // very rarely will there be more than one attempts. - // in case of a bug we might end up looping forever, and test will fail with a timeout. + // In case of a bug we might end up looping forever, and test will fail with a timeout. + + // Avoid hard cpu spinning in case of a test failure. + try { + Thread.sleep(1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } } @@ -373,14 +415,11 @@ public class KafkaIOTest { .get(0); UnboundedReader<KafkaRecord<Integer, Long>> reader = source.createReader(null, null); - final int numToSkip = 3; + final int numToSkip = 20; // one from each partition. // advance numToSkip elements - if (!reader.start()) { - advanceOnce(reader); - } - - for (long l = 0; l < numToSkip - 1; ++l) { + reader.start(); + for (int l = 1; l < numToSkip; ++l) { advanceOnce(reader); } @@ -396,10 +435,7 @@ public class KafkaIOTest { // Confirm that we get the next elements in sequence. // This also confirms that Reader interleaves records from each partitions by the reader. - if (!reader.start()) { - advanceOnce(reader); - } - + reader.start(); for (int i = numToSkip; i < numElements; i++) { assertEquals(i, (long) reader.getCurrent().getKV().getValue()); assertEquals(i, reader.getCurrentTimestamp().getMillis()); @@ -410,6 +446,68 @@ public class KafkaIOTest { } @Test + public void testUnboundedSourceCheckpointMarkWithEmptyPartitions() throws Exception { + // Similar to testUnboundedSourceCheckpointMark(), but verifies that source resumes + // properly from empty partitions, without missing messages added since checkpoint. + + // Initialize consumer with fewer elements than number of partitions so that some are empty. + int initialNumElements = 5; + UnboundedSource<KafkaRecord<Integer, Long>, KafkaCheckpointMark> source = + mkKafkaReadTransform(initialNumElements, new ValueAsTimestampFn()) + .makeSource() + .generateInitialSplits(1, PipelineOptionsFactory.fromArgs(new String[0]).create()) + .get(0); + + UnboundedReader<KafkaRecord<Integer, Long>> reader = source.createReader(null, null); + + reader.start(); + for (int l = 1; l < initialNumElements; ++l) { + advanceOnce(reader); + } + + // Checkpoint and restart, and confirm that the source continues correctly. + KafkaCheckpointMark mark = CoderUtils.clone( + source.getCheckpointMarkCoder(), (KafkaCheckpointMark) reader.getCheckpointMark()); + + // Create another source with MockConsumer with OffsetResetStrategy.LATEST. This insures that + // the reader need to explicitly need to seek to first offset for partitions that were empty. + + int numElements = 100; // all the 20 partitions will have elements + List<String> topics = ImmutableList.of("topic_a", "topic_b"); + + source = KafkaIO.read() + .withBootstrapServers("none") + .withTopics(topics) + .withConsumerFactoryFn(new ConsumerFactoryFn( + topics, 10, numElements, OffsetResetStrategy.LATEST)) + .withKeyCoder(BigEndianIntegerCoder.of()) + .withValueCoder(BigEndianLongCoder.of()) + .withMaxNumRecords(numElements) + .withTimestampFn(new ValueAsTimestampFn()) + .makeSource() + .generateInitialSplits(1, PipelineOptionsFactory.fromArgs(new String[0]).create()) + .get(0); + + reader = source.createReader(null, mark); + + reader.start(); + + // Verify in any order. As the partitions are unevenly read, the returned records are not in a + // simple order. Note that testUnboundedSourceCheckpointMark() verifies round-robin oder. + + List<Long> expected = new ArrayList<>(); + List<Long> actual = new ArrayList<>(); + for (long i = initialNumElements; i < numElements; i++) { + expected.add(i); + actual.add(reader.getCurrent().getKV().getValue()); + if ((i + 1) < numElements) { + advanceOnce(reader); + } + } + assertThat(actual, IsIterableContainingInAnyOrder.containsInAnyOrder(expected.toArray())); + } + + @Test public void testSink() throws Exception { // Simply read from kafka source and write to kafka sink. Then verify the records // are correctly published to mock kafka producer. @@ -440,7 +538,7 @@ public class KafkaIOTest { completionThread.shutdown(); verifyProducerRecords(topic, numElements, false); - } + } } @Test