[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2018-01-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5184


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2018-01-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r160692953
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -956,23 +958,29 @@ private void jobStatusChanged(
"The job is 
registered as 'FINISHED (successful), but this notification describes " +
"a failure, 
since the resulting accumulators could not be fetched.", e);
 
-   executor.execute(() 
->jobCompletionActions.jobFailed(exception));
+   executor.execute(() -> 
jobCompletionActions.jobFailed(builder
+   
.serializedThrowable(new SerializedThrowable(exception))
+   .build()));
}
break;
 
case CANCELED: {
final JobExecutionException exception = 
new JobExecutionException(
jobID, "Job was cancelled.", 
new Exception("The job was cancelled"));
 
-   executor.execute(() -> 
jobCompletionActions.jobFailed(exception));
+   executor.execute(() -> 
jobCompletionActions.jobFailed(builder
+   .serializedThrowable(new 
SerializedThrowable(exception))
+   .build()));
break;
}
 
case FAILED: {
final Throwable unpackedError = 
SerializedThrowable.get(error, userCodeLoader);
--- End diff --

True. Not sure where I was looking at.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2018-01-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r160692288
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -357,6 +360,31 @@ public void start() throws Exception {
return 
CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort());
}
 
+   @Override
+   public CompletableFuture getJobExecutionResult(
+   final JobID jobId,
+   final Time timeout) {
+
+   final JobExecutionResult jobExecutionResult = 
jobExecutionResultCache.get(jobId);
+   if (jobExecutionResult == null) {
+   return FutureUtils.completedExceptionally(new 
JobExecutionResultNotFoundException(jobId));
+   } else {
+   return 
CompletableFuture.completedFuture(jobExecutionResult);
+   }
+   }
+
+   @Override
+   public CompletableFuture isJobExecutionResultPresent(
--- End diff --

True. Keep it like this.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2018-01-09 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r160453714
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -357,6 +360,31 @@ public void start() throws Exception {
return 
CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort());
}
 
+   @Override
+   public CompletableFuture getJobExecutionResult(
+   final JobID jobId,
+   final Time timeout) {
+
+   final JobExecutionResult jobExecutionResult = 
jobExecutionResultCache.get(jobId);
+   if (jobExecutionResult == null) {
+   return FutureUtils.completedExceptionally(new 
JobExecutionResultNotFoundException(jobId));
+   } else {
+   return 
CompletableFuture.completedFuture(jobExecutionResult);
+   }
+   }
+
+   @Override
+   public CompletableFuture isJobExecutionResultPresent(
--- End diff --

I think a Job can be done but a JobExecutionResult is ready or present.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2018-01-09 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r160408755
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobExecutionResult.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Similar to {@link org.apache.flink.api.common.JobExecutionResult} but 
with an optional
+ * {@link SerializedThrowable} when the job failed.
+ *
+ * This is used by the {@link JobMaster} to send the results to the 
{@link Dispatcher}.
+ */
+public class JobExecutionResult {
--- End diff --

Will change it.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2018-01-09 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r160408734
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobExecutionResult.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Similar to {@link org.apache.flink.api.common.JobExecutionResult} but 
with an optional
+ * {@link SerializedThrowable} when the job failed.
+ *
+ * This is used by the {@link JobMaster} to send the results to the 
{@link Dispatcher}.
+ */
+public class JobExecutionResult {
+
+   private final JobID jobId;
+
+   private final Map accumulatorResults;
+
+   private final long netRuntime;
+
+   private final SerializedThrowable serializedThrowable;
--- End diff --

Will change it.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2018-01-09 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r160407686
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -956,23 +958,29 @@ private void jobStatusChanged(
"The job is 
registered as 'FINISHED (successful), but this notification describes " +
"a failure, 
since the resulting accumulators could not be fetched.", e);
 
-   executor.execute(() 
->jobCompletionActions.jobFailed(exception));
+   executor.execute(() -> 
jobCompletionActions.jobFailed(builder
+   
.serializedThrowable(new SerializedThrowable(exception))
+   .build()));
}
break;
 
case CANCELED: {
final JobExecutionException exception = 
new JobExecutionException(
jobID, "Job was cancelled.", 
new Exception("The job was cancelled"));
 
-   executor.execute(() -> 
jobCompletionActions.jobFailed(exception));
+   executor.execute(() -> 
jobCompletionActions.jobFailed(builder
+   .serializedThrowable(new 
SerializedThrowable(exception))
+   .build()));
break;
}
 
case FAILED: {
final Throwable unpackedError = 
SerializedThrowable.get(error, userCodeLoader);
--- End diff --

I think `error` is always a `SerializedThrowable`.
```
private void notifyJobStatusChange(JobStatus newState, Throwable error) 
{
if (jobStatusListeners.size() > 0) {
final long timestamp = System.currentTimeMillis();
final Throwable serializedError = error == null ? null 
: new SerializedThrowable(error);

for (JobStatusListener listener : jobStatusListeners) {
try {
listener.jobStatusChanges(getJobID(), 
newState, timestamp, serializedError);
} catch (Throwable t) {
LOG.warn("Error while notifying 
JobStatusListener", t);
}
}
}
}
```
Not sure if it makes sense to unpack, wrap, and serialize it again.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2018-01-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r160114353
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobExecutionResult;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Caches {@link JobExecutionResult}s.
+ */
+class JobExecutionResultCache {
+
+   private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300;
+
+   private final Cache
+   jobExecutionResultCache =
+   CacheBuilder.newBuilder()
+   .softValues()
--- End diff --

I assume that we use a LRU strategy if we hit the maximum size. But what 
happens if we purged a job result and then a client asks for it. We should 
return then at least something like the job finished but the execution result 
couldn't be stored.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2018-01-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r160110253
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobExecutionResult.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Similar to {@link org.apache.flink.api.common.JobExecutionResult} but 
with an optional
+ * {@link SerializedThrowable} when the job failed.
+ *
+ * This is used by the {@link JobMaster} to send the results to the 
{@link Dispatcher}.
+ */
+public class JobExecutionResult {
--- End diff --

Could be made `Serializable` especially if the `JobMaster` should run in 
the future on a different machine.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2018-01-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r160108453
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 ---
@@ -358,12 +360,12 @@ private DetachedFinalizer(JobID jobID, int 
numJobManagersToWaitFor) {
}
 
@Override
-   public void jobFinished(JobExecutionResult result) {
+   public void 
jobFinished(org.apache.flink.runtime.jobmaster.JobExecutionResult result) {
decrementCheckAndCleanup();
}
 
@Override
-   public void jobFailed(Throwable cause) {
+   public void 
jobFailed(org.apache.flink.runtime.jobmaster.JobExecutionResult result) {
--- End diff --

+1


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2018-01-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r160107760
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -956,23 +958,29 @@ private void jobStatusChanged(
"The job is 
registered as 'FINISHED (successful), but this notification describes " +
"a failure, 
since the resulting accumulators could not be fetched.", e);
 
-   executor.execute(() 
->jobCompletionActions.jobFailed(exception));
+   executor.execute(() -> 
jobCompletionActions.jobFailed(builder
+   
.serializedThrowable(new SerializedThrowable(exception))
+   .build()));
}
break;
 
case CANCELED: {
final JobExecutionException exception = 
new JobExecutionException(
jobID, "Job was cancelled.", 
new Exception("The job was cancelled"));
 
-   executor.execute(() -> 
jobCompletionActions.jobFailed(exception));
+   executor.execute(() -> 
jobCompletionActions.jobFailed(builder
+   .serializedThrowable(new 
SerializedThrowable(exception))
+   .build()));
break;
}
 
case FAILED: {
final Throwable unpackedError = 
SerializedThrowable.get(error, userCodeLoader);
--- End diff --

I think `error` should never be a `SerializedThrowable`.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2018-01-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r160105159
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobExecutionResult;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Caches {@link JobExecutionResult}s.
+ */
+class JobExecutionResultCache {
+
+   private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300;
+
+   private final Cache
+   jobExecutionResultCache =
+   CacheBuilder.newBuilder()
+   .softValues()
--- End diff --

After an offline talk with Gary, we came to the conclusion that it might be 
better to use a maximum size instead of soft values. This will make testability 
much easier. The maximum size should be configurable, though.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2018-01-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r159603034
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -357,6 +360,31 @@ public void start() throws Exception {
return 
CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort());
}
 
+   @Override
+   public CompletableFuture getJobExecutionResult(
+   final JobID jobId,
+   final Time timeout) {
+
+   final JobExecutionResult jobExecutionResult = 
jobExecutionResultCache.get(jobId);
+   if (jobExecutionResult == null) {
+   return FutureUtils.completedExceptionally(new 
JobExecutionResultNotFoundException(jobId));
+   } else {
+   return 
CompletableFuture.completedFuture(jobExecutionResult);
+   }
+   }
+
+   @Override
+   public CompletableFuture isJobExecutionResultPresent(
--- End diff --

Shall we maybe rename this method into `isJobExecutionResultDone`


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2018-01-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r160110770
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobExecutionResult.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Similar to {@link org.apache.flink.api.common.JobExecutionResult} but 
with an optional
+ * {@link SerializedThrowable} when the job failed.
+ *
+ * This is used by the {@link JobMaster} to send the results to the 
{@link Dispatcher}.
+ */
+public class JobExecutionResult {
+
+   private final JobID jobId;
+
+   private final Map accumulatorResults;
+
+   private final long netRuntime;
+
+   private final SerializedThrowable serializedThrowable;
--- End diff --

Maybe add comment that this field is only non null if the job has failed.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2018-01-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r160108633
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 ---
@@ -405,22 +407,27 @@ private void decrementCheckAndCleanup() {
 
private volatile Throwable runnerException;
 
-   private volatile JobExecutionResult result;
+   private volatile 
org.apache.flink.runtime.jobmaster.JobExecutionResult result;

BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) {
this.jobId = jobId;
this.jobMastersToWaitFor = new 
CountDownLatch(numJobMastersToWaitFor);
}
 
@Override
-   public void jobFinished(JobExecutionResult jobResult) {
-   this.result = jobResult;
+   public void 
jobFinished(org.apache.flink.runtime.jobmaster.JobExecutionResult result) {
+   this.result = result;
jobMastersToWaitFor.countDown();
}
 
@Override
-   public void jobFailed(Throwable cause) {
-   jobException = cause;
+   public void 
jobFailed(org.apache.flink.runtime.jobmaster.JobExecutionResult result) {
+   
checkArgument(result.getSerializedThrowable().isPresent());
+
+   jobException = result
--- End diff --

Then I guess that we should remove it and store instead the 
`jobmaster.JobExecutionResult`.
  


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2018-01-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r160105972
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobExecutionResult.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Similar to {@link org.apache.flink.api.common.JobExecutionResult} but 
with an optional
+ * {@link SerializedThrowable} when the job failed.
+ *
+ * This is used by the {@link JobMaster} to send the results to the 
{@link Dispatcher}.
+ */
+public class JobExecutionResult {
+
+   private final JobID jobId;
+
+   private final Map accumulatorResults;
+
+   private final long netRuntime;
+
+   private final SerializedThrowable serializedThrowable;
--- End diff --

`@Nullable` missing.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2018-01-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r160106268
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -936,17 +936,19 @@ private void jobStatusChanged(
 
final JobID jobID = executionGraph.getJobID();
--- End diff --

No longer the case with your changes.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2018-01-08 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r160111827
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 ---
@@ -458,7 +465,14 @@ public JobExecutionResult getResult() throws 
JobExecutionException, InterruptedE
}
}
else if (result != null) {
-   return result;
+   try {
+   return new SerializedJobExecutionResult(
+   jobId,
+   result.getNetRuntime(),
+   
result.getAccumulatorResults()).toJobExecutionResult(ClassLoader.getSystemClassLoader());
--- End diff --

I think it is ok. Because this is what happens on the client as well.

Code-wise one could also write it
```
return new JobExecutionResult(
jobId,
result.getNetRuntime(),

AccumulatorHelper.deserializeAccumulators(result.getAccumulatorResults(), 
ClassLoader.getSystemClassLoader()));
```




---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2018-01-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r159826471
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -936,17 +936,19 @@ private void jobStatusChanged(
 
final JobID jobID = executionGraph.getJobID();
--- End diff --

Variables declared too early


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2017-12-20 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157963387
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 ---
@@ -405,22 +407,27 @@ private void decrementCheckAndCleanup() {
 
private volatile Throwable runnerException;
 
-   private volatile JobExecutionResult result;
+   private volatile 
org.apache.flink.runtime.jobmaster.JobExecutionResult result;

BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) {
this.jobId = jobId;
this.jobMastersToWaitFor = new 
CountDownLatch(numJobMastersToWaitFor);
}
 
@Override
-   public void jobFinished(JobExecutionResult jobResult) {
-   this.result = jobResult;
+   public void 
jobFinished(org.apache.flink.runtime.jobmaster.JobExecutionResult result) {
+   this.result = result;
jobMastersToWaitFor.countDown();
}
 
@Override
-   public void jobFailed(Throwable cause) {
-   jobException = cause;
+   public void 
jobFailed(org.apache.flink.runtime.jobmaster.JobExecutionResult result) {
+   
checkArgument(result.getSerializedThrowable().isPresent());
+
+   jobException = result
--- End diff --

Actually it is not needed to store the exception separately because the 
JobExecutionResult already contains the exception.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2017-12-20 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157962431
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 ---
@@ -458,7 +465,14 @@ public JobExecutionResult getResult() throws 
JobExecutionException, InterruptedE
}
}
else if (result != null) {
-   return result;
+   try {
+   return new SerializedJobExecutionResult(
+   jobId,
+   result.getNetRuntime(),
+   
result.getAccumulatorResults()).toJobExecutionResult(ClassLoader.getSystemClassLoader());
--- End diff --

Because the exception is serialized in 
`OnCompletionActions#jobFailed(JobExecutionResult);`, I have to deserialize it 
here again. I wonder if this is sane?



---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2017-12-19 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157878862
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 ---
@@ -358,12 +360,12 @@ private DetachedFinalizer(JobID jobID, int 
numJobManagersToWaitFor) {
}
 
@Override
-   public void jobFinished(JobExecutionResult result) {
+   public void 
jobFinished(org.apache.flink.runtime.jobmaster.JobExecutionResult result) {
decrementCheckAndCleanup();
}
 
@Override
-   public void jobFailed(Throwable cause) {
+   public void 
jobFailed(org.apache.flink.runtime.jobmaster.JobExecutionResult result) {
--- End diff --

Maybe rename to `JobResult` after all to avoid fqn.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2017-12-19 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157877969
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobExecutionResult.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Similar to {@link org.apache.flink.api.common.JobExecutionResult} but 
with an optional
+ * {@link SerializedThrowable} when the job failed.
+ *
+ * This is used by the {@link JobMaster} to send the results to the 
{@link Dispatcher}.
+ */
+public class JobExecutionResult {
+
+   private final JobID jobId;
+
+   private final Map accumulatorResults;
+
+   private final long netRuntime;
+
+   private final SerializedThrowable serializedThrowable;
+
+   private JobExecutionResult(
+   final JobID jobId,
+   final Map 
accumulatorResults,
+   final long netRuntime,
+   @Nullable final SerializedThrowable 
serializedThrowable) {
+
+   checkArgument(netRuntime >= 0, "netRuntime must be greater than 
or equals 0");
+
+   this.jobId = requireNonNull(jobId);
+   this.accumulatorResults = requireNonNull(accumulatorResults);
+   this.netRuntime = netRuntime;
+   this.serializedThrowable = serializedThrowable;
+   }
+
+   public boolean isSuccess() {
--- End diff --

Javadocs are missing.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2017-12-19 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157877406
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 ---
@@ -458,7 +465,14 @@ public JobExecutionResult getResult() throws 
JobExecutionException, InterruptedE
}
}
else if (result != null) {
-   return result;
+   try {
+   return new SerializedJobExecutionResult(
+   jobId,
+   result.getNetRuntime(),
+   
result.getAccumulatorResults()).toJobExecutionResult(ClassLoader.getSystemClassLoader());
--- End diff --

Because the exception is serialized in 
`OnCompletionActions#jobFailed(JobExecutionResult);`, I have to deserialize it 
here again. I wonder if this is sane?

CC: @tillrohrmann 


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2017-12-19 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157876761
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 ---
@@ -92,4 +94,42 @@
 * @return Future containing the collection of instance ids and the 
corresponding metric query service path
 */
CompletableFuture>> 
requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
+
+   /**
+* Returns the {@link SerializedJobExecutionResult} for a job, or in 
case the job failed, the
--- End diff --

Javadoc needs to be updated.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2017-12-19 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157876793
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 ---
@@ -92,4 +94,42 @@
 * @return Future containing the collection of instance ids and the 
corresponding metric query service path
 */
CompletableFuture>> 
requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
+
+   /**
+* Returns the {@link SerializedJobExecutionResult} for a job, or in 
case the job failed, the
+* failure cause.
+*
+* @param jobId ID of the job that we are interested in.
+* @param timeout Timeout for the asynchronous operation.
+*
+* @see #isJobExecutionResultPresent(JobID, Time)
+*
+* @return {@link CompletableFuture} containing the {@link 
JobExecutionResult} or a
+* {@link Throwable} which represents the failure cause. If there is no 
result, the future will
+* be completed exceptionally with
+* {@link 
org.apache.flink.runtime.messages.JobExecutionResultNotFoundException}
+*/
+   default CompletableFuture getJobExecutionResult(
+   JobID jobId,
+   @RpcTimeout Time timeout) {
+   throw new UnsupportedOperationException();
+   }
+
+   /**
+* Tests if the {@link SerializedJobExecutionResult} is present.
--- End diff --

Javadoc needs to be updated.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2017-12-19 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5184

[FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher

## What is the purpose of the change

Cache `JobExecutionResult` in `Dispatcher`, and add methods to 
`RestfulGateway` to enable retrieval of results through HTTP (not yet 
implemented). This will be needed so that accumulator results can be 
transmitted to the client.

## Brief change log

  - *Introduce new JobExecutionResult used by JobMaster to forward the 
information in the already existing JobExecutionResult.*
  - *Always cache a JobExecutionResult. Even in case of job failures. In 
case of job failures, the serialized exception is stored additionally.*
  - *Introduce new methods to RestfulGateway to allow retrieval of cached 
JobExecutionResults.*


## Verifying this change

This change added tests and can be verified as follows:

  - *Added unit tests to verify that the Dispatcher caches the job results 
when the job finishes successfully or by failure.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

CC: @tillrohrmann 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8234

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5184.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5184


commit d05c76e621106810c32bc17aa0576923ba6be401
Author: gyao 
Date:   2017-12-19T17:58:53Z

[FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher

- Introduce new JobExecutionResult used by JobMaster to forward the 
information in
  the already existing JobExecutionResult.
- Always cache a JobExecutionResult. Even in case of job failures. In case 
of
  job failures, the serialized exception is stored additionally.
- Introduce new methods to RestfulGateway to allow retrieval of cached
  JobExecutionResults




---