Repository: incubator-beam Updated Branches: refs/heads/master 7a5b7ad80 -> e9f1b579a
Clone DoFns before constructing a DoFnRunner in the InProcessRunner This ensures that each thread gets an individual copy of a DoFn, so multiple threads do not interact. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b6c74ff5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b6c74ff5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b6c74ff5 Branch: refs/heads/master Commit: b6c74ff5c18dcb7c82c3b7717c9c76926a1bbfc4 Parents: 6414425 Author: Thomas Groh <tg...@google.com> Authored: Fri Apr 15 10:23:15 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Fri Apr 15 16:11:32 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6c74ff5/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java index 7365527..a2f080c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.util.DoFnRunner; import org.apache.beam.sdk.util.DoFnRunners; import org.apache.beam.sdk.util.DoFnRunners.OutputManager; +import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.common.CounterSet; @@ -67,7 +68,7 @@ class ParDoInProcessEvaluator<T> implements TransformEvaluator<T> { DoFnRunner<InputT, OutputT> runner = DoFnRunners.createDefault( evaluationContext.getPipelineOptions(), - fn, + SerializableUtils.clone(fn), evaluationContext.createSideInputReader(sideInputs), BundleOutputManager.create(outputBundles), mainOutputTag,