Repository: systemml
Updated Branches:
  refs/heads/master a0b0e80e9 -> 159522a1f


[SYSTEMML-2238] Reuse of local parfor fair scheduler pool names

This patch fixes a leak of fair scheduler pools in scripts with
repeatedly executed or simply many parfor loops by reusing scheduler
pool names. So far, we mistakenly assumed that disassociated pools are
automatically cleaned up by Spark. The set of names is initially
allocated in the number of vcores but scheduler pools are only allocated
on demand by parfor workers if in spark execution mode and if there are
large operations in the parfor body.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/93506b23
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/93506b23
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/93506b23

Branch: refs/heads/master
Commit: 93506b2317a520d3f816b5d71d25d10a73374bef
Parents: a0b0e80
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Sun Apr 8 12:49:37 2018 -0700
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Sun Apr 8 12:49:37 2018 -0700

----------------------------------------------------------------------
 .../context/SparkExecutionContext.java          |  53 ++++++---
 .../controlprogram/parfor/LocalParWorker.java   | 114 +++++++++----------
 2 files changed, 89 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/93506b23/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 98c3eaa..325a359 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.LongWritable;
@@ -92,10 +93,9 @@ public class SparkExecutionContext extends ExecutionContext
        private static final boolean LDEBUG = false; //local debug flag
 
        //internal configurations
-       private static boolean LAZY_SPARKCTX_CREATION = true;
-       private static boolean ASYNCHRONOUS_VAR_DESTROY = true;
-
-       public static boolean FAIR_SCHEDULER_MODE = true;
+       private static final boolean LAZY_SPARKCTX_CREATION = true;
+       private static final boolean ASYNCHRONOUS_VAR_DESTROY = true;
+       public static final boolean FAIR_SCHEDULER_MODE = true;
 
        //executor memory and relative fractions as obtained from the spark 
configuration
        private static SparkClusterConfig _sconf = null;
@@ -107,8 +107,12 @@ public class SparkExecutionContext extends ExecutionContext
        //10% of JVM max heap size for parallelized RDDs; if this is not 
sufficient,
        //matrices or frames are exported to HDFS and the RDDs are created from 
files.
        //TODO unify memory management for CP, par RDDs, and potentially 
broadcasts
-       private static MemoryManagerParRDDs _parRDDs = new 
MemoryManagerParRDDs(0.1);
-
+       private static final MemoryManagerParRDDs _parRDDs = new 
MemoryManagerParRDDs(0.1);
+       
+       //pool of reused fair scheduler pool names (unset bits indicate 
availability)
+       private static boolean[] _poolBuff = FAIR_SCHEDULER_MODE ?
+               new boolean[InfrastructureAnalyzer.getLocalParallelism()] : 
null;
+       
        static {
                // for internal debugging only
                if( LDEBUG ) {
@@ -117,11 +121,10 @@ public class SparkExecutionContext extends 
ExecutionContext
                }
        }
 
-       protected SparkExecutionContext(boolean allocateVars, Program prog)
-       {
+       protected SparkExecutionContext(boolean allocateVars, Program prog) {
                //protected constructor to force use of ExecutionContextFactory
                super( allocateVars, prog );
-
+               
                //spark context creation via internal initializer
                if( !LAZY_SPARKCTX_CREATION || 
DMLScript.rtplatform==RUNTIME_PLATFORM.SPARK ) {
                        initSparkContext();
@@ -134,8 +137,7 @@ public class SparkExecutionContext extends ExecutionContext
         *
         * @return java spark context
         */
-       public JavaSparkContext getSparkContext()
-       {
+       public JavaSparkContext getSparkContext() {
                //lazy spark context creation on demand (lazy instead of 
asynchronous
                //to avoid wait for uninitialized spark context on close)
                if( LAZY_SPARKCTX_CREATION ) {
@@ -1232,19 +1234,40 @@ public class SparkExecutionContext extends 
ExecutionContext
                        in.count(); //trigger caching to prevent contention
        }
 
-       public void setThreadLocalSchedulerPool(String poolName) {
+       public int setThreadLocalSchedulerPool() {
+               int pool = -1;
                if( FAIR_SCHEDULER_MODE ) {
+                       pool = allocSchedulerPoolName();
                        getSparkContext().sc().setLocalProperty(
-                                       "spark.scheduler.pool", poolName);
+                               "spark.scheduler.pool", "parforPool"+pool);
                }
+               return pool;
        }
 
-       public void cleanupThreadLocalSchedulerPool() {
+       public void cleanupThreadLocalSchedulerPool(int pool) {
                if( FAIR_SCHEDULER_MODE ) {
+                       freeSchedulerPoolName(pool);
                        getSparkContext().sc().setLocalProperty(
-                                       "spark.scheduler.pool", null);
+                               "spark.scheduler.pool", null);
                }
        }
+       
+       private static synchronized int allocSchedulerPoolName() {
+               int pool = ArrayUtils.indexOf(_poolBuff, false);
+               //grow pool on demand
+               if( pool < 0 ) {
+                       pool = _poolBuff.length;
+                       _poolBuff = Arrays.copyOf(_poolBuff,
+                               (int)Math.min(2L*pool, Integer.MAX_VALUE));
+               }
+               //mark pool name for in use
+               _poolBuff[pool] = true;
+               return pool;
+       }
+       
+       private static synchronized void freeSchedulerPoolName(int pool) {
+               _poolBuff[pool] = false;
+       }
 
        private boolean isRDDMarkedForCaching( int rddID ) {
                JavaSparkContext jsc = getSparkContext();

http://git-wip-us.apache.org/repos/asf/systemml/blob/93506b23/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/LocalParWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/LocalParWorker.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/LocalParWorker.java
index f77c22e..058026c 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/LocalParWorker.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/LocalParWorker.java
@@ -41,20 +41,15 @@ import 
org.apache.sysml.runtime.controlprogram.parfor.stat.Timing;
  */
 public class LocalParWorker extends ParWorker implements Runnable
 {
-       protected LocalTaskQueue<Task> _taskQueue   = null;
-       
+       protected final LocalTaskQueue<Task> _taskQueue;
+       protected final CompilerConfig _cconf;
+       protected final boolean _stopped;
+       protected final int _max_retry;
        protected Collection<String> _fnNames = null;
        
-       protected CompilerConfig _cconf  = null;
-       protected boolean   _stopped     = false;
-       protected int           _max_retry   = -1;
-       
-       public LocalParWorker( long ID, LocalTaskQueue<Task> q, ParForBody 
body, CompilerConfig cconf, int max_retry, boolean monitor ) 
-       {
+       public LocalParWorker( long ID, LocalTaskQueue<Task> q, ParForBody 
body, CompilerConfig cconf, int max_retry, boolean monitor ) {
                super(ID, body, monitor);
-
                _taskQueue = q;
-               
                _cconf = cconf;
                _stopped   = false;
                _max_retry = max_retry;
@@ -76,10 +71,11 @@ public class LocalParWorker extends ParWorker implements 
Runnable
                
                //setup fair scheduler pool for worker thread, but avoid 
unnecessary
                //spark context creation (if data cached already created)
+               int pool = -1;
                if( OptimizerUtils.isSparkExecutionMode() 
                        && SparkExecutionContext.isSparkContextCreated() ) {
                        SparkExecutionContext sec = (SparkExecutionContext)_ec;
-                       sec.setThreadLocalSchedulerPool("parforPool"+_workerID);
+                       pool = sec.setThreadLocalSchedulerPool();
                }
 
                // Initialize this GPUContext to this thread
@@ -98,60 +94,54 @@ public class LocalParWorker extends ParWorker implements 
Runnable
                
                // continuous execution (execute tasks until (1) stopped or (2) 
no more tasks)
                Task lTask = null; 
-               
-               while( !_stopped ) 
-               {
-                       //dequeue the next task (abort on NO_MORE_TASKS or 
error)
-                       try
-                       {
-                               lTask = _taskQueue.dequeueTask();
-                               
-                               if( lTask == LocalTaskQueue.NO_MORE_TASKS ) // 
task queue closed (no more tasks)
-                                       break; //normal end of parallel worker
-                       }
-                       catch(Exception ex)
-                       {
-                               // abort on taskqueue error
-                               LOG.warn("Error reading from task queue: 
"+ex.getMessage());
-                               LOG.warn("Stopping LocalParWorker.");
-                               break; //no exception thrown to prevent 
blocking on join
-                       }
-                       
-                       //execute the task sequentially (re-try on error)
-                       boolean success = false;
-                       int retrys = _max_retry;
-                       
-                       while( !success )
-                       {
-                               try 
-                               {
-                                       ///////
-                                       //core execution (see ParWorker)
-                                       executeTask( lTask );
-                                       success = true;
-                               } 
-                               catch (Exception ex) 
-                               {
-                                       LOG.error("Failed to execute 
"+lTask.toString()+", retry:"+retrys, ex);
+               try {
+                       while( !_stopped ) {
+                               //dequeue the next task (abort on NO_MORE_TASKS 
or error)
+                               try {
+                                       lTask = _taskQueue.dequeueTask();
                                        
-                                       if( retrys > 0 )
-                                               retrys--; //retry on task error
-                                       else
-                                       {
-                                               // abort on no remaining retrys
-                                               LOG.error("Error executing 
task: ",ex);
-                                               LOG.error("Stopping 
LocalParWorker.");
-                                               break; //no exception thrown to 
prevent blocking on join 
+                                       if( lTask == 
LocalTaskQueue.NO_MORE_TASKS ) // task queue closed (no more tasks)
+                                               break; //normal end of parallel 
worker
+                               }
+                               catch(Exception ex) {
+                                       // abort on taskqueue error
+                                       LOG.warn("Error reading from task 
queue: "+ex.getMessage());
+                                       LOG.warn("Stopping LocalParWorker.");
+                                       break; //no exception thrown to prevent 
blocking on join
+                               }
+                               
+                               //execute the task sequentially (re-try on 
error)
+                               boolean success = false;
+                               int retrys = _max_retry;
+                               
+                               while( !success ) {
+                                       try {
+                                               ///////
+                                               //core execution (see ParWorker)
+                                               executeTask( lTask );
+                                               success = true;
+                                       } 
+                                       catch (Exception ex)  {
+                                               LOG.error("Failed to execute 
"+lTask.toString()+", retry:"+retrys, ex);
+                                               
+                                               if( retrys > 0 )
+                                                       retrys--; //retry on 
task error
+                                               else {
+                                                       // abort on no 
remaining retrys
+                                                       LOG.error("Error 
executing task: ",ex);
+                                                       LOG.error("Stopping 
LocalParWorker.");
+                                                       break; //no exception 
thrown to prevent blocking on join 
+                                               }
                                        }
                                }
                        }
-               }       
-
-               //setup fair scheduler pool for worker thread
-               if( OptimizerUtils.isSparkExecutionMode() 
-                       && SparkExecutionContext.isSparkContextCreated() ) {
-                       SparkExecutionContext sec = (SparkExecutionContext)_ec;
-                       sec.cleanupThreadLocalSchedulerPool();
+               }
+               finally {
+                       //cleanup fair scheduler pool for worker thread
+                       if( OptimizerUtils.isSparkExecutionMode() ) {
+                               SparkExecutionContext sec = 
(SparkExecutionContext)_ec;
+                               sec.cleanupThreadLocalSchedulerPool(pool);
+                       }
                }
                
                if( _monitor ) {
@@ -161,5 +151,3 @@ public class LocalParWorker extends ParWorker implements 
Runnable
                }
        }
 }
-
-       

Reply via email to