Repository: incubator-beam
Updated Branches:
  refs/heads/master 3f16f2660 -> 3a8b9b521


[BEAM-1052] Add InputDStream id to MicrobatchSource hashcode.

Done to avoid collisions between splits of different sources.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1a4ac0f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1a4ac0f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1a4ac0f

Branch: refs/heads/master
Commit: a1a4ac0fc0376aa4c43a4357f3acc930e2b53c94
Parents: 3f16f26
Author: Aviem Zur <aviem...@gmail.com>
Authored: Tue Nov 29 09:51:12 2016 +0200
Committer: Sela <ans...@paypal.com>
Committed: Tue Nov 29 11:49:31 2016 +0200

----------------------------------------------------------------------
 .../beam/runners/spark/io/MicrobatchSource.java | 20 ++++++++++++++------
 .../beam/runners/spark/io/SourceDStream.java    |  3 ++-
 .../spark/stateful/StateSpecFunctions.java      |  2 +-
 3 files changed, 17 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1a4ac0f/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index 4a174aa..5656375 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -54,6 +54,7 @@ public class MicrobatchSource<T, CheckpointMarkT extends 
UnboundedSource.Checkpo
   private final Duration maxReadTime;
   private final int numInitialSplits;
   private final long maxNumRecords;
+  private final int sourceId;
 
   // each split of the underlying UnboundedSource is associated with a 
(consistent) id
   // to match it's corresponding CheckpointMark state.
@@ -63,12 +64,14 @@ public class MicrobatchSource<T, CheckpointMarkT extends 
UnboundedSource.Checkpo
                    Duration maxReadTime,
                    int numInitialSplits,
                    long maxNumRecords,
-                   int splitId) {
+                   int splitId,
+                   int sourceId) {
     this.source = source;
     this.maxReadTime = maxReadTime;
     this.numInitialSplits = numInitialSplits;
     this.maxNumRecords = maxNumRecords;
     this.splitId = splitId;
+    this.sourceId = sourceId;
   }
 
   /**
@@ -98,7 +101,7 @@ public class MicrobatchSource<T, CheckpointMarkT extends 
UnboundedSource.Checkpo
     for (int i = 0; i < numSplits; i++) {
       // splits must be stable, and cannot change during consecutive executions
       // for example: Kafka should not add partitions if more then one topic 
is read.
-      result.add(new MicrobatchSource<>(splits.get(i), maxReadTime, 1, 
numRecords[i], i));
+      result.add(new MicrobatchSource<>(splits.get(i), maxReadTime, 1, 
numRecords[i], i, sourceId));
     }
     return result;
   }
@@ -137,8 +140,8 @@ public class MicrobatchSource<T, CheckpointMarkT extends 
UnboundedSource.Checkpo
     return source.getCheckpointMarkCoder();
   }
 
-  public int getSplitId() {
-    return splitId;
+  public String getId() {
+    return sourceId + "_" + splitId;
   }
 
   @Override
@@ -150,13 +153,18 @@ public class MicrobatchSource<T, CheckpointMarkT extends 
UnboundedSource.Checkpo
       return false;
     }
     MicrobatchSource<?, ?> that = (MicrobatchSource<?, ?>) o;
-
+    if (sourceId != that.sourceId) {
+      return false;
+    }
     return splitId == that.splitId;
+
   }
 
   @Override
   public int hashCode() {
-    return splitId;
+    int result = sourceId;
+    result = 31 * result + splitId;
+    return result;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1a4ac0f/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
index 4e47757..84b247b 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
@@ -83,7 +83,8 @@ public class SourceDStream<T, CheckpointMarkT extends 
UnboundedSource.Checkpoint
   @Override
   public scala.Option<RDD<Tuple2<Source<T>, CheckpointMarkT>>> compute(Time 
validTime) {
     MicrobatchSource<T, CheckpointMarkT> microbatchSource = new 
MicrobatchSource<>(
-        unboundedSource, boundReadDuration, initialParallelism, 
rateControlledMaxRecords(), -1);
+        unboundedSource, boundReadDuration, initialParallelism, 
rateControlledMaxRecords(), -1,
+        id());
     RDD<scala.Tuple2<Source<T>, CheckpointMarkT>> rdd = new 
SourceRDD.Unbounded<>(
         ssc().sc(), runtimeContext, microbatchSource);
     return scala.Option.apply(rdd);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1a4ac0f/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
index 48849c2..053f4ac 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -144,7 +144,7 @@ public class StateSpecFunctions {
 
           // close and checkpoint reader.
           reader.close();
-          LOG.info("Source id {} spent {} msec on reading.", 
microbatchSource.getSplitId(),
+          LOG.info("Source id {} spent {} msec on reading.", 
microbatchSource.getId(),
               stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
 
           // if the Source does not supply a CheckpointMark skip updating the 
state.

Reply via email to