[jira] [Work logged] (BEAM-5708) Support caching of SDKHarness environments in flink

2018-10-13 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5708?focusedWorklogId=154119&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-154119
 ]

ASF GitHub Bot logged work on BEAM-5708:


Author: ASF GitHub Bot
Created on: 13/Oct/18 18:21
Start Date: 13/Oct/18 18:21
Worklog Time Spent: 10m 
  Work Description: tweise closed pull request #6638: [BEAM-5708] Cache 
environment in portable flink runner
URL: https://github.com/apache/beam/pull/6638
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 290f72399db..0ba980b27e5 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -1484,6 +1484,7 @@ artifactId=${project.name}
   def beamTestPipelineOptions = [
 
"--runner=org.apache.beam.runners.reference.testing.TestPortableRunner",
 "--jobServerDriver=${config.jobServerDriver}",
+"--environmentCacheMillis=1",
   ]
   if (config.jobServerConfig) {
 
beamTestPipelineOptions.add("--jobServerConfig=${config.jobServerConfig}")
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
index 988a94826fb..bb2b9dcbe16 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
@@ -24,12 +24,17 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.sdk.fn.function.ThrowingFunction;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -104,9 +109,30 @@ private void scheduleRelease(JobInfo jobInfo) {
 WrappedContext wrapper = getCache().get(jobInfo.jobId());
 Preconditions.checkState(
 wrapper != null, "Releasing context for unknown job: " + 
jobInfo.jobId());
-// Do not release this asynchronously, as the releasing could fail due to 
the classloader not being
-// available anymore after the tasks have been removed from the execution 
engine.
-release(wrapper);
+
+PipelineOptions pipelineOptions =
+PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
+int environmentCacheTTLMillis =
+
pipelineOptions.as(PortablePipelineOptions.class).getEnvironmentCacheMillis();
+if (environmentCacheTTLMillis > 0) {
+  if (this.getClass().getClassLoader() != 
ExecutionEnvironment.class.getClassLoader()) {
+LOG.warn(
+"{} is not loaded on parent Flink classloader. "
++ "Falling back to synchronous environment release for job 
{}.",
+this.getClass(),
+jobInfo.jobId());
+release(wrapper);
+  } else {
+// Schedule task to clean the container later.
+// Ensure that this class is loaded in the parent Flink classloader.
+getExecutor()
+.schedule(() -> release(wrapper), environmentCacheTTLMillis, 
TimeUnit.MILLISECONDS);
+  }
+} else {
+  // Do not release this asynchronously, as the releasing could fail due 
to the classloader not
+  // being available anymore after the tasks have been removed from the 
execution engine.
+  release(wrapper);
+}
   }
 
   private ConcurrentHashMap getCache() {
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
index 5107c389bb1..a8dfa8e0665 100644
---

[jira] [Work logged] (BEAM-5708) Support caching of SDKHarness environments in flink

2018-10-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5708?focusedWorklogId=154054&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-154054
 ]

ASF GitHub Bot logged work on BEAM-5708:


Author: ASF GitHub Bot
Created on: 12/Oct/18 23:07
Start Date: 12/Oct/18 23:07
Worklog Time Spent: 10m 
  Work Description: tweise commented on issue #6638: [BEAM-5708] Cache 
environment in portable flink runner
URL: https://github.com/apache/beam/pull/6638#issuecomment-429485973
 
 
   @angoenka did you test the fallback case?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 154054)
Time Spent: 1h 50m  (was: 1h 40m)

> Support caching of SDKHarness environments in flink
> ---
>
> Key: BEAM-5708
> URL: https://issues.apache.org/jira/browse/BEAM-5708
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Ankur Goenka
>Assignee: Ankur Goenka
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Cache and reuse environment to improve performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5708) Support caching of SDKHarness environments in flink

2018-10-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5708?focusedWorklogId=153978&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-153978
 ]

ASF GitHub Bot logged work on BEAM-5708:


Author: ASF GitHub Bot
Created on: 12/Oct/18 18:29
Start Date: 12/Oct/18 18:29
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #6638: 
[BEAM-5708] Cache environment in portable flink runner
URL: https://github.com/apache/beam/pull/6638#discussion_r224877854
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
 ##
 @@ -104,9 +108,21 @@ private void scheduleRelease(JobInfo jobInfo) {
 WrappedContext wrapper = getCache().get(jobInfo.jobId());
 Preconditions.checkState(
 wrapper != null, "Releasing context for unknown job: " + 
jobInfo.jobId());
-// Do not release this asynchronously, as the releasing could fail due to 
the classloader not being
-// available anymore after the tasks have been removed from the execution 
engine.
-release(wrapper);
+
+PipelineOptions pipelineOptions =
+PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
+int environmentCacheTTLMillis =
+
pipelineOptions.as(PortablePipelineOptions.class).getEnvironmentCacheMillis();
+if (environmentCacheTTLMillis > 0) {
+  // Schedule task to clean the container later.
+  // Ensure that this class is loaded in the parent Flink classloader.
+  getExecutor()
+  .schedule(() -> release(wrapper), environmentCacheTTLMillis, 
TimeUnit.MILLISECONDS);
 
 Review comment:
   @mxm The class is first loaded when we create the environment. So we can be 
assured that the class is loaded before release. 
   Also, we require classes to be loaded on parent classloader for async 
container destruction. This inherently mean that once the class is loaded in 
parent class loader, its not going to be unloaded in this scenario. With the 
additional check mentioned, we will enforce this requirement.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 153978)
Time Spent: 1h 40m  (was: 1.5h)

> Support caching of SDKHarness environments in flink
> ---
>
> Key: BEAM-5708
> URL: https://issues.apache.org/jira/browse/BEAM-5708
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Ankur Goenka
>Assignee: Ankur Goenka
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Cache and reuse environment to improve performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5708) Support caching of SDKHarness environments in flink

2018-10-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5708?focusedWorklogId=153977&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-153977
 ]

ASF GitHub Bot logged work on BEAM-5708:


Author: ASF GitHub Bot
Created on: 12/Oct/18 18:29
Start Date: 12/Oct/18 18:29
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #6638: 
[BEAM-5708] Cache environment in portable flink runner
URL: https://github.com/apache/beam/pull/6638#discussion_r224875048
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
 ##
 @@ -104,9 +108,21 @@ private void scheduleRelease(JobInfo jobInfo) {
 WrappedContext wrapper = getCache().get(jobInfo.jobId());
 Preconditions.checkState(
 wrapper != null, "Releasing context for unknown job: " + 
jobInfo.jobId());
-// Do not release this asynchronously, as the releasing could fail due to 
the classloader not being
-// available anymore after the tasks have been removed from the execution 
engine.
-release(wrapper);
+
+PipelineOptions pipelineOptions =
+PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
+int environmentCacheTTLMillis =
+
pipelineOptions.as(PortablePipelineOptions.class).getEnvironmentCacheMillis();
+if (environmentCacheTTLMillis > 0) {
+  // Schedule task to clean the container later.
+  // Ensure that this class is loaded in the parent Flink classloader.
+  getExecutor()
+  .schedule(() -> release(wrapper), environmentCacheTTLMillis, 
TimeUnit.MILLISECONDS);
 
 Review comment:
   @tweise Makes sense. I will add that check. In addition of falling back to 
immediate release, I will also log a warning.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 153977)
Time Spent: 1.5h  (was: 1h 20m)

> Support caching of SDKHarness environments in flink
> ---
>
> Key: BEAM-5708
> URL: https://issues.apache.org/jira/browse/BEAM-5708
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Ankur Goenka
>Assignee: Ankur Goenka
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Cache and reuse environment to improve performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5708) Support caching of SDKHarness environments in flink

2018-10-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5708?focusedWorklogId=153897&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-153897
 ]

ASF GitHub Bot logged work on BEAM-5708:


Author: ASF GitHub Bot
Created on: 12/Oct/18 14:27
Start Date: 12/Oct/18 14:27
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6638: 
[BEAM-5708] Cache environment in portable flink runner
URL: https://github.com/apache/beam/pull/6638#discussion_r224803034
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
 ##
 @@ -104,9 +108,21 @@ private void scheduleRelease(JobInfo jobInfo) {
 WrappedContext wrapper = getCache().get(jobInfo.jobId());
 Preconditions.checkState(
 wrapper != null, "Releasing context for unknown job: " + 
jobInfo.jobId());
-// Do not release this asynchronously, as the releasing could fail due to 
the classloader not being
-// available anymore after the tasks have been removed from the execution 
engine.
-release(wrapper);
+
+PipelineOptions pipelineOptions =
+PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
+int environmentCacheTTLMillis =
+
pipelineOptions.as(PortablePipelineOptions.class).getEnvironmentCacheMillis();
+if (environmentCacheTTLMillis > 0) {
+  // Schedule task to clean the container later.
+  // Ensure that this class is loaded in the parent Flink classloader.
+  getExecutor()
+  .schedule(() -> release(wrapper), environmentCacheTTLMillis, 
TimeUnit.MILLISECONDS);
 
 Review comment:
   Something like this (disclaimer: not tested):
   
   ```
   if (environmentCacheTTLMillis > 0 && this.getClass().getClassLoader() == 
ExecutionEnvironment.class.getClassLoader()) 
   ```
   For execution in the job server, class loader will be same (applies for 
Jenkins). On the remote Flink cluster (by default), the user class loader will 
be different and will be removed.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 153897)
Time Spent: 1h 20m  (was: 1h 10m)

> Support caching of SDKHarness environments in flink
> ---
>
> Key: BEAM-5708
> URL: https://issues.apache.org/jira/browse/BEAM-5708
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Ankur Goenka
>Assignee: Ankur Goenka
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Cache and reuse environment to improve performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5708) Support caching of SDKHarness environments in flink

2018-10-12 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5708?focusedWorklogId=153818&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-153818
 ]

ASF GitHub Bot logged work on BEAM-5708:


Author: ASF GitHub Bot
Created on: 12/Oct/18 08:22
Start Date: 12/Oct/18 08:22
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6638: 
[BEAM-5708] Cache environment in portable flink runner
URL: https://github.com/apache/beam/pull/6638#discussion_r224706600
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
 ##
 @@ -104,9 +108,21 @@ private void scheduleRelease(JobInfo jobInfo) {
 WrappedContext wrapper = getCache().get(jobInfo.jobId());
 Preconditions.checkState(
 wrapper != null, "Releasing context for unknown job: " + 
jobInfo.jobId());
-// Do not release this asynchronously, as the releasing could fail due to 
the classloader not being
-// available anymore after the tasks have been removed from the execution 
engine.
-release(wrapper);
+
+PipelineOptions pipelineOptions =
+PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
+int environmentCacheTTLMillis =
+
pipelineOptions.as(PortablePipelineOptions.class).getEnvironmentCacheMillis();
+if (environmentCacheTTLMillis > 0) {
+  // Schedule task to clean the container later.
+  // Ensure that this class is loaded in the parent Flink classloader.
+  getExecutor()
+  .schedule(() -> release(wrapper), environmentCacheTTLMillis, 
TimeUnit.MILLISECONDS);
 
 Review comment:
   >But we want to differ the release as we want to cache.
   
   Yes, but we need to ensure classes are loaded. A dry run would have to work 
on a dummy environment to load all the necessary classes.
   
   >It isn't possible to defer this when the class is loaded by the user class 
loader. So we would probably need to compare the class loader that loaded the 
flink runtime with that of the beam class?
   
   Can you elaborate on that? The class may never be loaded because this code 
is executed for the first time when the class loader already has been closed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 153818)
Time Spent: 1h 10m  (was: 1h)

> Support caching of SDKHarness environments in flink
> ---
>
> Key: BEAM-5708
> URL: https://issues.apache.org/jira/browse/BEAM-5708
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Ankur Goenka
>Assignee: Ankur Goenka
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Cache and reuse environment to improve performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5708) Support caching of SDKHarness environments in flink

2018-10-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5708?focusedWorklogId=153765&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-153765
 ]

ASF GitHub Bot logged work on BEAM-5708:


Author: ASF GitHub Bot
Created on: 12/Oct/18 05:14
Start Date: 12/Oct/18 05:14
Worklog Time Spent: 10m 
  Work Description: tweise commented on a change in pull request #6638: 
[BEAM-5708] Cache environment in portable flink runner
URL: https://github.com/apache/beam/pull/6638#discussion_r224672532
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
 ##
 @@ -104,9 +108,21 @@ private void scheduleRelease(JobInfo jobInfo) {
 WrappedContext wrapper = getCache().get(jobInfo.jobId());
 Preconditions.checkState(
 wrapper != null, "Releasing context for unknown job: " + 
jobInfo.jobId());
-// Do not release this asynchronously, as the releasing could fail due to 
the classloader not being
-// available anymore after the tasks have been removed from the execution 
engine.
-release(wrapper);
+
+PipelineOptions pipelineOptions =
+PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
+int environmentCacheTTLMillis =
+
pipelineOptions.as(PortablePipelineOptions.class).getEnvironmentCacheMillis();
+if (environmentCacheTTLMillis > 0) {
+  // Schedule task to clean the container later.
+  // Ensure that this class is loaded in the parent Flink classloader.
+  getExecutor()
+  .schedule(() -> release(wrapper), environmentCacheTTLMillis, 
TimeUnit.MILLISECONDS);
 
 Review comment:
   It isn't possible to defer this when the class is loaded by the user class 
loader. So we would probably need to compare the class loader that loaded the 
flink runtime with that of the beam class?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 153765)
Time Spent: 1h  (was: 50m)

> Support caching of SDKHarness environments in flink
> ---
>
> Key: BEAM-5708
> URL: https://issues.apache.org/jira/browse/BEAM-5708
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Ankur Goenka
>Assignee: Ankur Goenka
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Cache and reuse environment to improve performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5708) Support caching of SDKHarness environments in flink

2018-10-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5708?focusedWorklogId=153693&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-153693
 ]

ASF GitHub Bot logged work on BEAM-5708:


Author: ASF GitHub Bot
Created on: 11/Oct/18 20:38
Start Date: 11/Oct/18 20:38
Worklog Time Spent: 10m 
  Work Description: angoenka commented on a change in pull request #6638: 
[BEAM-5708] Cache environment in portable flink runner
URL: https://github.com/apache/beam/pull/6638#discussion_r224596080
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
 ##
 @@ -104,9 +108,21 @@ private void scheduleRelease(JobInfo jobInfo) {
 WrappedContext wrapper = getCache().get(jobInfo.jobId());
 Preconditions.checkState(
 wrapper != null, "Releasing context for unknown job: " + 
jobInfo.jobId());
-// Do not release this asynchronously, as the releasing could fail due to 
the classloader not being
-// available anymore after the tasks have been removed from the execution 
engine.
-release(wrapper);
+
+PipelineOptions pipelineOptions =
+PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
+int environmentCacheTTLMillis =
+
pipelineOptions.as(PortablePipelineOptions.class).getEnvironmentCacheMillis();
+if (environmentCacheTTLMillis > 0) {
+  // Schedule task to clean the container later.
+  // Ensure that this class is loaded in the parent Flink classloader.
+  getExecutor()
+  .schedule(() -> release(wrapper), environmentCacheTTLMillis, 
TimeUnit.MILLISECONDS);
 
 Review comment:
   Class is loaded in the current class loader. however I am not sure how to 
check if the class is loaded in flink class loader without making things too 
complicated.
   
   > We could perform a release before we schedule the release.
   
   But we want to differ the release as we want to cache.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 153693)
Time Spent: 50m  (was: 40m)

> Support caching of SDKHarness environments in flink
> ---
>
> Key: BEAM-5708
> URL: https://issues.apache.org/jira/browse/BEAM-5708
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Ankur Goenka
>Assignee: Ankur Goenka
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Cache and reuse environment to improve performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5708) Support caching of SDKHarness environments in flink

2018-10-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5708?focusedWorklogId=153445&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-153445
 ]

ASF GitHub Bot logged work on BEAM-5708:


Author: ASF GitHub Bot
Created on: 11/Oct/18 09:16
Start Date: 11/Oct/18 09:16
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6638: 
[BEAM-5708] Cache environment in portable flink runner
URL: https://github.com/apache/beam/pull/6638#discussion_r224373342
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
 ##
 @@ -104,9 +108,21 @@ private void scheduleRelease(JobInfo jobInfo) {
 WrappedContext wrapper = getCache().get(jobInfo.jobId());
 Preconditions.checkState(
 wrapper != null, "Releasing context for unknown job: " + 
jobInfo.jobId());
-// Do not release this asynchronously, as the releasing could fail due to 
the classloader not being
-// available anymore after the tasks have been removed from the execution 
engine.
-release(wrapper);
+
+PipelineOptions pipelineOptions =
+PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
+int environmentCacheTTLMillis =
+
pipelineOptions.as(PortablePipelineOptions.class).getEnvironmentCacheMillis();
+if (environmentCacheTTLMillis > 0) {
+  // Schedule task to clean the container later.
+  // Ensure that this class is loaded in the parent Flink classloader.
+  getExecutor()
+  .schedule(() -> release(wrapper), environmentCacheTTLMillis, 
TimeUnit.MILLISECONDS);
 
 Review comment:
   How do we ensure that the class is loaded? We could perform a release before 
we schedule the release. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 153445)
Time Spent: 0.5h  (was: 20m)

> Support caching of SDKHarness environments in flink
> ---
>
> Key: BEAM-5708
> URL: https://issues.apache.org/jira/browse/BEAM-5708
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Ankur Goenka
>Assignee: Ankur Goenka
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Cache and reuse environment to improve performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5708) Support caching of SDKHarness environments in flink

2018-10-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5708?focusedWorklogId=153446&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-153446
 ]

ASF GitHub Bot logged work on BEAM-5708:


Author: ASF GitHub Bot
Created on: 11/Oct/18 09:16
Start Date: 11/Oct/18 09:16
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #6638: 
[BEAM-5708] Cache environment in portable flink runner
URL: https://github.com/apache/beam/pull/6638#discussion_r224374141
 
 

 ##
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
 ##
 @@ -87,4 +87,10 @@
   String getSdkWorkerParallelism();
 
   void setSdkWorkerParallelism(@Nullable String parallelism);
+
+  @Description("Duration in milli seconds for environment cache within a job. 
0 means no caching.")
 
 Review comment:
   milliseconds


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 153446)
Time Spent: 40m  (was: 0.5h)

> Support caching of SDKHarness environments in flink
> ---
>
> Key: BEAM-5708
> URL: https://issues.apache.org/jira/browse/BEAM-5708
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Ankur Goenka
>Assignee: Ankur Goenka
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Cache and reuse environment to improve performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5708) Support caching of SDKHarness environments in flink

2018-10-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5708?focusedWorklogId=153281&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-153281
 ]

ASF GitHub Bot logged work on BEAM-5708:


Author: ASF GitHub Bot
Created on: 10/Oct/18 20:51
Start Date: 10/Oct/18 20:51
Worklog Time Spent: 10m 
  Work Description: angoenka commented on issue #6638: [BEAM-5708] Cache 
environment in portable flink runner
URL: https://github.com/apache/beam/pull/6638#issuecomment-428727253
 
 
   R: @mxm @tweise 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 153281)
Time Spent: 20m  (was: 10m)

> Support caching of SDKHarness environments in flink
> ---
>
> Key: BEAM-5708
> URL: https://issues.apache.org/jira/browse/BEAM-5708
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Ankur Goenka
>Assignee: Ankur Goenka
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Cache and reuse environment to improve performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5708) Support caching of SDKHarness environments in flink

2018-10-10 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5708?focusedWorklogId=153272&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-153272
 ]

ASF GitHub Bot logged work on BEAM-5708:


Author: ASF GitHub Bot
Created on: 10/Oct/18 20:34
Start Date: 10/Oct/18 20:34
Worklog Time Spent: 10m 
  Work Description: angoenka opened a new pull request #6638: [BEAM-5708] 
Cache environment in portable flink runner
URL: https://github.com/apache/beam/pull/6638
 
 
   **Please** add a meaningful description for your change here
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/)
 | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 153272)
Time Spent: 10m
Remaining Estimate: 0h

> Support caching of SDKHarness environments in flink
> ---
>
> Key: BEAM-5708
> URL: https://issues.apache.org/jira/browse/BEAM-5708
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>