Repository: incubator-beam Updated Branches: refs/heads/master 3f16f2660 -> 3a8b9b521
[BEAM-1052] Add InputDStream id to MicrobatchSource hashcode. Done to avoid collisions between splits of different sources. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1a4ac0f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1a4ac0f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1a4ac0f Branch: refs/heads/master Commit: a1a4ac0fc0376aa4c43a4357f3acc930e2b53c94 Parents: 3f16f26 Author: Aviem Zur <aviem...@gmail.com> Authored: Tue Nov 29 09:51:12 2016 +0200 Committer: Sela <ans...@paypal.com> Committed: Tue Nov 29 11:49:31 2016 +0200 ---------------------------------------------------------------------- .../beam/runners/spark/io/MicrobatchSource.java | 20 ++++++++++++++------ .../beam/runners/spark/io/SourceDStream.java | 3 ++- .../spark/stateful/StateSpecFunctions.java | 2 +- 3 files changed, 17 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1a4ac0f/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java index 4a174aa..5656375 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java @@ -54,6 +54,7 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo private final Duration maxReadTime; private final int numInitialSplits; private final long maxNumRecords; + private final int sourceId; // each split of the underlying UnboundedSource is associated with a (consistent) id // to match it's corresponding CheckpointMark state. @@ -63,12 +64,14 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo Duration maxReadTime, int numInitialSplits, long maxNumRecords, - int splitId) { + int splitId, + int sourceId) { this.source = source; this.maxReadTime = maxReadTime; this.numInitialSplits = numInitialSplits; this.maxNumRecords = maxNumRecords; this.splitId = splitId; + this.sourceId = sourceId; } /** @@ -98,7 +101,7 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo for (int i = 0; i < numSplits; i++) { // splits must be stable, and cannot change during consecutive executions // for example: Kafka should not add partitions if more then one topic is read. - result.add(new MicrobatchSource<>(splits.get(i), maxReadTime, 1, numRecords[i], i)); + result.add(new MicrobatchSource<>(splits.get(i), maxReadTime, 1, numRecords[i], i, sourceId)); } return result; } @@ -137,8 +140,8 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo return source.getCheckpointMarkCoder(); } - public int getSplitId() { - return splitId; + public String getId() { + return sourceId + "_" + splitId; } @Override @@ -150,13 +153,18 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo return false; } MicrobatchSource<?, ?> that = (MicrobatchSource<?, ?>) o; - + if (sourceId != that.sourceId) { + return false; + } return splitId == that.splitId; + } @Override public int hashCode() { - return splitId; + int result = sourceId; + result = 31 * result + splitId; + return result; } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1a4ac0f/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java index 4e47757..84b247b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java @@ -83,7 +83,8 @@ public class SourceDStream<T, CheckpointMarkT extends UnboundedSource.Checkpoint @Override public scala.Option<RDD<Tuple2<Source<T>, CheckpointMarkT>>> compute(Time validTime) { MicrobatchSource<T, CheckpointMarkT> microbatchSource = new MicrobatchSource<>( - unboundedSource, boundReadDuration, initialParallelism, rateControlledMaxRecords(), -1); + unboundedSource, boundReadDuration, initialParallelism, rateControlledMaxRecords(), -1, + id()); RDD<scala.Tuple2<Source<T>, CheckpointMarkT>> rdd = new SourceRDD.Unbounded<>( ssc().sc(), runtimeContext, microbatchSource); return scala.Option.apply(rdd); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1a4ac0f/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java index 48849c2..053f4ac 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java @@ -144,7 +144,7 @@ public class StateSpecFunctions { // close and checkpoint reader. reader.close(); - LOG.info("Source id {} spent {} msec on reading.", microbatchSource.getSplitId(), + LOG.info("Source id {} spent {} msec on reading.", microbatchSource.getId(), stopwatch.stop().elapsed(TimeUnit.MILLISECONDS)); // if the Source does not supply a CheckpointMark skip updating the state.