Repository: incubator-beam
Updated Branches:
  refs/heads/master 529bcdf56 -> 5d78420bf


Use a weakValues LoadingCache for serial TransformExecutorServices

This allows the garbage collector to clean up references to
TransformExecutorServices which are not currently in use.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6f526374
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6f526374
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6f526374

Branch: refs/heads/master
Commit: 6f526374fbc743a0d22e37ad0f746f0a695785dd
Parents: ad58e26
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 29 10:56:10 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Apr 8 14:20:20 2016 -0700

----------------------------------------------------------------------
 .../ExecutorServiceParallelExecutor.java        | 36 ++++++++++++++------
 1 file changed, 25 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f526374/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
index 4d45e8f..c770735 100644
--- 
a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
+++ 
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
@@ -31,6 +31,9 @@ import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PValue;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
 
 import org.joda.time.Instant;
@@ -69,7 +72,7 @@ final class ExecutorServiceParallelExecutor implements 
InProcessExecutor {
 
   private final InProcessEvaluationContext evaluationContext;
 
-  private final ConcurrentMap<StepAndKey, TransformExecutorService> 
currentEvaluations;
+  private final LoadingCache<StepAndKey, TransformExecutorService> 
executorServices;
   private final ConcurrentMap<TransformExecutor<?>, Boolean> 
scheduledExecutors;
 
   private final Queue<ExecutorUpdate> allUpdates;
@@ -107,8 +110,12 @@ final class ExecutorServiceParallelExecutor implements 
InProcessExecutor {
     this.transformEnforcements = transformEnforcements;
     this.evaluationContext = context;
 
-    currentEvaluations = new ConcurrentHashMap<>();
     scheduledExecutors = new ConcurrentHashMap<>();
+    // Weak Values allows TransformExecutorServices that are no longer in use 
to be reclaimed.
+    // Executing TransformExecutorServices have a strong reference to their 
TransformExecutorService
+    // which stops the TransformExecutorServices from being prematurely 
garbage collected
+    executorServices =
+        
CacheBuilder.newBuilder().weakValues().build(serialTransformExecutorServiceCacheLoader());
 
     this.allUpdates = new ConcurrentLinkedQueue<>();
     this.visibleUpdates = new ArrayBlockingQueue<>(20);
@@ -118,6 +125,16 @@ final class ExecutorServiceParallelExecutor implements 
InProcessExecutor {
     defaultCompletionCallback = new DefaultCompletionCallback();
   }
 
+  private CacheLoader<StepAndKey, TransformExecutorService>
+      serialTransformExecutorServiceCacheLoader() {
+    return new CacheLoader<StepAndKey, TransformExecutorService>() {
+      @Override
+      public TransformExecutorService load(StepAndKey stepAndKey) throws 
Exception {
+        return TransformExecutorServices.serial(executorService, 
scheduledExecutors);
+      }
+    };
+  }
+
   @Override
   public void start(Collection<AppliedPTransform<?, ?, ?>> roots) {
     rootNodes = ImmutableList.copyOf(roots);
@@ -142,7 +159,12 @@ final class ExecutorServiceParallelExecutor implements 
InProcessExecutor {
     if (bundle != null && isKeyed(bundle.getPCollection())) {
       final StepAndKey stepAndKey =
           StepAndKey.of(transform, bundle == null ? null : bundle.getKey());
-      transformExecutor = getSerialExecutorService(stepAndKey);
+      // This executor will remain reachable until it has executed all 
scheduled transforms.
+      // The TransformExecutors keep a strong reference to the Executor, the 
ExecutorService keeps
+      // a reference to the scheduled TransformExecutor callable. Follow-up 
TransformExecutors
+      // (scheduled due to the completion of another TransformExecutor) are 
provided to the
+      // ExecutorService before the Earlier TransformExecutor callable 
completes.
+      transformExecutor = executorServices.getUnchecked(stepAndKey);
     } else {
       transformExecutor = parallelExecutorService;
     }
@@ -174,14 +196,6 @@ final class ExecutorServiceParallelExecutor implements 
InProcessExecutor {
     }
   }
 
-  private TransformExecutorService getSerialExecutorService(StepAndKey 
stepAndKey) {
-    if (!currentEvaluations.containsKey(stepAndKey)) {
-      currentEvaluations.putIfAbsent(
-          stepAndKey, TransformExecutorServices.serial(executorService, 
scheduledExecutors));
-    }
-    return currentEvaluations.get(stepAndKey);
-  }
-
   @Override
   public void awaitCompletion() throws Throwable {
     VisibleExecutorUpdate update;

Reply via email to