Repository: incubator-beam
Updated Branches:
  refs/heads/master 7a5b7ad80 -> e9f1b579a


Clone DoFns before constructing a DoFnRunner in the InProcessRunner

This ensures that each thread gets an individual copy of a DoFn, so
multiple threads do not interact.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b6c74ff5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b6c74ff5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b6c74ff5

Branch: refs/heads/master
Commit: b6c74ff5c18dcb7c82c3b7717c9c76926a1bbfc4
Parents: 6414425
Author: Thomas Groh <tg...@google.com>
Authored: Fri Apr 15 10:23:15 2016 -0700
Committer: Dan Halperin <dhalp...@google.com>
Committed: Fri Apr 15 16:11:32 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java       | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6c74ff5/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java
index 7365527..a2f080c 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.util.DoFnRunner;
 import org.apache.beam.sdk.util.DoFnRunners;
 import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.common.CounterSet;
@@ -67,7 +68,7 @@ class ParDoInProcessEvaluator<T> implements 
TransformEvaluator<T> {
     DoFnRunner<InputT, OutputT> runner =
         DoFnRunners.createDefault(
             evaluationContext.getPipelineOptions(),
-            fn,
+            SerializableUtils.clone(fn),
             evaluationContext.createSideInputReader(sideInputs),
             BundleOutputManager.create(outputBundles),
             mainOutputTag,

Reply via email to