Repository: flink
Updated Branches:
  refs/heads/master e6fbda906 -> 345b2529a


[FLINK-4628] [core] Provide user class loader during input split assignment

In analogy to the configure() method, this also sets a context class
loader during input split assignment.

This closes #2505


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/345b2529
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/345b2529
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/345b2529

Branch: refs/heads/master
Commit: 345b2529a8acdd59d67e89ea930ec69ad69a55d3
Parents: 3b8fe95
Author: Maximilian Michels <m...@apache.org>
Authored: Fri Sep 16 12:21:54 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Sep 22 14:42:12 2016 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionJobVertex.java   | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/345b2529/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 1ac9522..ead0852 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -165,10 +165,17 @@ public class ExecutionJobVertex {
                        InputSplitSource<InputSplit> splitSource = 
(InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();
                        
                        if (splitSource != null) {
-                               inputSplits = 
splitSource.createInputSplits(numTaskVertices);
-                               
-                               if (inputSplits != null) {
-                                       splitAssigner = 
splitSource.getInputSplitAssigner(inputSplits);
+                               Thread currentThread = Thread.currentThread();
+                               ClassLoader oldContextClassLoader = 
currentThread.getContextClassLoader();
+                               
currentThread.setContextClassLoader(graph.getUserClassLoader());
+                               try {
+                                       inputSplits = 
splitSource.createInputSplits(numTaskVertices);
+
+                                       if (inputSplits != null) {
+                                               splitAssigner = 
splitSource.getInputSplitAssigner(inputSplits);
+                                       }
+                               } finally {
+                                       
currentThread.setContextClassLoader(oldContextClassLoader);
                                }
                        }
                        else {

Reply via email to