Repository: flink Updated Branches: refs/heads/master e6fbda906 -> 345b2529a
[FLINK-4628] [core] Provide user class loader during input split assignment In analogy to the configure() method, this also sets a context class loader during input split assignment. This closes #2505 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/345b2529 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/345b2529 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/345b2529 Branch: refs/heads/master Commit: 345b2529a8acdd59d67e89ea930ec69ad69a55d3 Parents: 3b8fe95 Author: Maximilian Michels <m...@apache.org> Authored: Fri Sep 16 12:21:54 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Sep 22 14:42:12 2016 +0200 ---------------------------------------------------------------------- .../runtime/executiongraph/ExecutionJobVertex.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/345b2529/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 1ac9522..ead0852 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -165,10 +165,17 @@ public class ExecutionJobVertex { InputSplitSource<InputSplit> splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource(); if (splitSource != null) { - inputSplits = splitSource.createInputSplits(numTaskVertices); - - if (inputSplits != null) { - splitAssigner = splitSource.getInputSplitAssigner(inputSplits); + Thread currentThread = Thread.currentThread(); + ClassLoader oldContextClassLoader = currentThread.getContextClassLoader(); + currentThread.setContextClassLoader(graph.getUserClassLoader()); + try { + inputSplits = splitSource.createInputSplits(numTaskVertices); + + if (inputSplits != null) { + splitAssigner = splitSource.getInputSplitAssigner(inputSplits); + } + } finally { + currentThread.setContextClassLoader(oldContextClassLoader); } } else {