[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5184 ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r160692953 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -956,23 +958,29 @@ private void jobStatusChanged( "The job is registered as 'FINISHED (successful), but this notification describes " + "a failure, since the resulting accumulators could not be fetched.", e); - executor.execute(() ->jobCompletionActions.jobFailed(exception)); + executor.execute(() -> jobCompletionActions.jobFailed(builder + .serializedThrowable(new SerializedThrowable(exception)) + .build())); } break; case CANCELED: { final JobExecutionException exception = new JobExecutionException( jobID, "Job was cancelled.", new Exception("The job was cancelled")); - executor.execute(() -> jobCompletionActions.jobFailed(exception)); + executor.execute(() -> jobCompletionActions.jobFailed(builder + .serializedThrowable(new SerializedThrowable(exception)) + .build())); break; } case FAILED: { final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader); --- End diff -- True. Not sure where I was looking at. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r160692288 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -357,6 +360,31 @@ public void start() throws Exception { return CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort()); } + @Override + public CompletableFuture getJobExecutionResult( + final JobID jobId, + final Time timeout) { + + final JobExecutionResult jobExecutionResult = jobExecutionResultCache.get(jobId); + if (jobExecutionResult == null) { + return FutureUtils.completedExceptionally(new JobExecutionResultNotFoundException(jobId)); + } else { + return CompletableFuture.completedFuture(jobExecutionResult); + } + } + + @Override + public CompletableFuture isJobExecutionResultPresent( --- End diff -- True. Keep it like this. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r160453714 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -357,6 +360,31 @@ public void start() throws Exception { return CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort()); } + @Override + public CompletableFuture getJobExecutionResult( + final JobID jobId, + final Time timeout) { + + final JobExecutionResult jobExecutionResult = jobExecutionResultCache.get(jobId); + if (jobExecutionResult == null) { + return FutureUtils.completedExceptionally(new JobExecutionResultNotFoundException(jobId)); + } else { + return CompletableFuture.completedFuture(jobExecutionResult); + } + } + + @Override + public CompletableFuture isJobExecutionResultPresent( --- End diff -- I think a Job can be done but a JobExecutionResult is ready or present. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r160408755 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobExecutionResult.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.util.SerializedThrowable; +import org.apache.flink.util.SerializedValue; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Similar to {@link org.apache.flink.api.common.JobExecutionResult} but with an optional + * {@link SerializedThrowable} when the job failed. + * + * This is used by the {@link JobMaster} to send the results to the {@link Dispatcher}. + */ +public class JobExecutionResult { --- End diff -- Will change it. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r160408734 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobExecutionResult.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.util.SerializedThrowable; +import org.apache.flink.util.SerializedValue; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Similar to {@link org.apache.flink.api.common.JobExecutionResult} but with an optional + * {@link SerializedThrowable} when the job failed. + * + * This is used by the {@link JobMaster} to send the results to the {@link Dispatcher}. + */ +public class JobExecutionResult { + + private final JobID jobId; + + private final MapaccumulatorResults; + + private final long netRuntime; + + private final SerializedThrowable serializedThrowable; --- End diff -- Will change it. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r160407686 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -956,23 +958,29 @@ private void jobStatusChanged( "The job is registered as 'FINISHED (successful), but this notification describes " + "a failure, since the resulting accumulators could not be fetched.", e); - executor.execute(() ->jobCompletionActions.jobFailed(exception)); + executor.execute(() -> jobCompletionActions.jobFailed(builder + .serializedThrowable(new SerializedThrowable(exception)) + .build())); } break; case CANCELED: { final JobExecutionException exception = new JobExecutionException( jobID, "Job was cancelled.", new Exception("The job was cancelled")); - executor.execute(() -> jobCompletionActions.jobFailed(exception)); + executor.execute(() -> jobCompletionActions.jobFailed(builder + .serializedThrowable(new SerializedThrowable(exception)) + .build())); break; } case FAILED: { final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader); --- End diff -- I think `error` is always a `SerializedThrowable`. ``` private void notifyJobStatusChange(JobStatus newState, Throwable error) { if (jobStatusListeners.size() > 0) { final long timestamp = System.currentTimeMillis(); final Throwable serializedError = error == null ? null : new SerializedThrowable(error); for (JobStatusListener listener : jobStatusListeners) { try { listener.jobStatusChanges(getJobID(), newState, timestamp, serializedError); } catch (Throwable t) { LOG.warn("Error while notifying JobStatusListener", t); } } } } ``` Not sure if it makes sense to unpack, wrap, and serialize it again. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r160114353 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobmaster.JobExecutionResult; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; + +import javax.annotation.Nullable; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Caches {@link JobExecutionResult}s. + */ +class JobExecutionResultCache { + + private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300; + + private final Cache+ jobExecutionResultCache = + CacheBuilder.newBuilder() + .softValues() --- End diff -- I assume that we use a LRU strategy if we hit the maximum size. But what happens if we purged a job result and then a client asks for it. We should return then at least something like the job finished but the execution result couldn't be stored. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r160110253 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobExecutionResult.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.util.SerializedThrowable; +import org.apache.flink.util.SerializedValue; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Similar to {@link org.apache.flink.api.common.JobExecutionResult} but with an optional + * {@link SerializedThrowable} when the job failed. + * + * This is used by the {@link JobMaster} to send the results to the {@link Dispatcher}. + */ +public class JobExecutionResult { --- End diff -- Could be made `Serializable` especially if the `JobMaster` should run in the future on a different machine. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r160108453 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java --- @@ -358,12 +360,12 @@ private DetachedFinalizer(JobID jobID, int numJobManagersToWaitFor) { } @Override - public void jobFinished(JobExecutionResult result) { + public void jobFinished(org.apache.flink.runtime.jobmaster.JobExecutionResult result) { decrementCheckAndCleanup(); } @Override - public void jobFailed(Throwable cause) { + public void jobFailed(org.apache.flink.runtime.jobmaster.JobExecutionResult result) { --- End diff -- +1 ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r160107760 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -956,23 +958,29 @@ private void jobStatusChanged( "The job is registered as 'FINISHED (successful), but this notification describes " + "a failure, since the resulting accumulators could not be fetched.", e); - executor.execute(() ->jobCompletionActions.jobFailed(exception)); + executor.execute(() -> jobCompletionActions.jobFailed(builder + .serializedThrowable(new SerializedThrowable(exception)) + .build())); } break; case CANCELED: { final JobExecutionException exception = new JobExecutionException( jobID, "Job was cancelled.", new Exception("The job was cancelled")); - executor.execute(() -> jobCompletionActions.jobFailed(exception)); + executor.execute(() -> jobCompletionActions.jobFailed(builder + .serializedThrowable(new SerializedThrowable(exception)) + .build())); break; } case FAILED: { final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader); --- End diff -- I think `error` should never be a `SerializedThrowable`. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r160105159 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobmaster.JobExecutionResult; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; + +import javax.annotation.Nullable; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Caches {@link JobExecutionResult}s. + */ +class JobExecutionResultCache { + + private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300; + + private final Cache+ jobExecutionResultCache = + CacheBuilder.newBuilder() + .softValues() --- End diff -- After an offline talk with Gary, we came to the conclusion that it might be better to use a maximum size instead of soft values. This will make testability much easier. The maximum size should be configurable, though. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r159603034 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -357,6 +360,31 @@ public void start() throws Exception { return CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort()); } + @Override + public CompletableFuture getJobExecutionResult( + final JobID jobId, + final Time timeout) { + + final JobExecutionResult jobExecutionResult = jobExecutionResultCache.get(jobId); + if (jobExecutionResult == null) { + return FutureUtils.completedExceptionally(new JobExecutionResultNotFoundException(jobId)); + } else { + return CompletableFuture.completedFuture(jobExecutionResult); + } + } + + @Override + public CompletableFuture isJobExecutionResultPresent( --- End diff -- Shall we maybe rename this method into `isJobExecutionResultDone` ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r160110770 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobExecutionResult.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.util.SerializedThrowable; +import org.apache.flink.util.SerializedValue; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Similar to {@link org.apache.flink.api.common.JobExecutionResult} but with an optional + * {@link SerializedThrowable} when the job failed. + * + * This is used by the {@link JobMaster} to send the results to the {@link Dispatcher}. + */ +public class JobExecutionResult { + + private final JobID jobId; + + private final MapaccumulatorResults; + + private final long netRuntime; + + private final SerializedThrowable serializedThrowable; --- End diff -- Maybe add comment that this field is only non null if the job has failed. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r160108633 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java --- @@ -405,22 +407,27 @@ private void decrementCheckAndCleanup() { private volatile Throwable runnerException; - private volatile JobExecutionResult result; + private volatile org.apache.flink.runtime.jobmaster.JobExecutionResult result; BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) { this.jobId = jobId; this.jobMastersToWaitFor = new CountDownLatch(numJobMastersToWaitFor); } @Override - public void jobFinished(JobExecutionResult jobResult) { - this.result = jobResult; + public void jobFinished(org.apache.flink.runtime.jobmaster.JobExecutionResult result) { + this.result = result; jobMastersToWaitFor.countDown(); } @Override - public void jobFailed(Throwable cause) { - jobException = cause; + public void jobFailed(org.apache.flink.runtime.jobmaster.JobExecutionResult result) { + checkArgument(result.getSerializedThrowable().isPresent()); + + jobException = result --- End diff -- Then I guess that we should remove it and store instead the `jobmaster.JobExecutionResult`. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r160105972 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobExecutionResult.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.util.SerializedThrowable; +import org.apache.flink.util.SerializedValue; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Similar to {@link org.apache.flink.api.common.JobExecutionResult} but with an optional + * {@link SerializedThrowable} when the job failed. + * + * This is used by the {@link JobMaster} to send the results to the {@link Dispatcher}. + */ +public class JobExecutionResult { + + private final JobID jobId; + + private final MapaccumulatorResults; + + private final long netRuntime; + + private final SerializedThrowable serializedThrowable; --- End diff -- `@Nullable` missing. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r160106268 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -936,17 +936,19 @@ private void jobStatusChanged( final JobID jobID = executionGraph.getJobID(); --- End diff -- No longer the case with your changes. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r160111827 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java --- @@ -458,7 +465,14 @@ public JobExecutionResult getResult() throws JobExecutionException, InterruptedE } } else if (result != null) { - return result; + try { + return new SerializedJobExecutionResult( + jobId, + result.getNetRuntime(), + result.getAccumulatorResults()).toJobExecutionResult(ClassLoader.getSystemClassLoader()); --- End diff -- I think it is ok. Because this is what happens on the client as well. Code-wise one could also write it ``` return new JobExecutionResult( jobId, result.getNetRuntime(), AccumulatorHelper.deserializeAccumulators(result.getAccumulatorResults(), ClassLoader.getSystemClassLoader())); ``` ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r159826471 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -936,17 +936,19 @@ private void jobStatusChanged( final JobID jobID = executionGraph.getJobID(); --- End diff -- Variables declared too early ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157963387 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java --- @@ -405,22 +407,27 @@ private void decrementCheckAndCleanup() { private volatile Throwable runnerException; - private volatile JobExecutionResult result; + private volatile org.apache.flink.runtime.jobmaster.JobExecutionResult result; BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) { this.jobId = jobId; this.jobMastersToWaitFor = new CountDownLatch(numJobMastersToWaitFor); } @Override - public void jobFinished(JobExecutionResult jobResult) { - this.result = jobResult; + public void jobFinished(org.apache.flink.runtime.jobmaster.JobExecutionResult result) { + this.result = result; jobMastersToWaitFor.countDown(); } @Override - public void jobFailed(Throwable cause) { - jobException = cause; + public void jobFailed(org.apache.flink.runtime.jobmaster.JobExecutionResult result) { + checkArgument(result.getSerializedThrowable().isPresent()); + + jobException = result --- End diff -- Actually it is not needed to store the exception separately because the JobExecutionResult already contains the exception. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157962431 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java --- @@ -458,7 +465,14 @@ public JobExecutionResult getResult() throws JobExecutionException, InterruptedE } } else if (result != null) { - return result; + try { + return new SerializedJobExecutionResult( + jobId, + result.getNetRuntime(), + result.getAccumulatorResults()).toJobExecutionResult(ClassLoader.getSystemClassLoader()); --- End diff -- Because the exception is serialized in `OnCompletionActions#jobFailed(JobExecutionResult);`, I have to deserialize it here again. I wonder if this is sane? ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157878862 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java --- @@ -358,12 +360,12 @@ private DetachedFinalizer(JobID jobID, int numJobManagersToWaitFor) { } @Override - public void jobFinished(JobExecutionResult result) { + public void jobFinished(org.apache.flink.runtime.jobmaster.JobExecutionResult result) { decrementCheckAndCleanup(); } @Override - public void jobFailed(Throwable cause) { + public void jobFailed(org.apache.flink.runtime.jobmaster.JobExecutionResult result) { --- End diff -- Maybe rename to `JobResult` after all to avoid fqn. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157877969 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobExecutionResult.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.util.SerializedThrowable; +import org.apache.flink.util.SerializedValue; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Similar to {@link org.apache.flink.api.common.JobExecutionResult} but with an optional + * {@link SerializedThrowable} when the job failed. + * + * This is used by the {@link JobMaster} to send the results to the {@link Dispatcher}. + */ +public class JobExecutionResult { + + private final JobID jobId; + + private final MapaccumulatorResults; + + private final long netRuntime; + + private final SerializedThrowable serializedThrowable; + + private JobExecutionResult( + final JobID jobId, + final Map accumulatorResults, + final long netRuntime, + @Nullable final SerializedThrowable serializedThrowable) { + + checkArgument(netRuntime >= 0, "netRuntime must be greater than or equals 0"); + + this.jobId = requireNonNull(jobId); + this.accumulatorResults = requireNonNull(accumulatorResults); + this.netRuntime = netRuntime; + this.serializedThrowable = serializedThrowable; + } + + public boolean isSuccess() { --- End diff -- Javadocs are missing. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157877406 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java --- @@ -458,7 +465,14 @@ public JobExecutionResult getResult() throws JobExecutionException, InterruptedE } } else if (result != null) { - return result; + try { + return new SerializedJobExecutionResult( + jobId, + result.getNetRuntime(), + result.getAccumulatorResults()).toJobExecutionResult(ClassLoader.getSystemClassLoader()); --- End diff -- Because the exception is serialized in `OnCompletionActions#jobFailed(JobExecutionResult);`, I have to deserialize it here again. I wonder if this is sane? CC: @tillrohrmann ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157876761 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java --- @@ -92,4 +94,42 @@ * @return Future containing the collection of instance ids and the corresponding metric query service path */ CompletableFuture>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout); + + /** +* Returns the {@link SerializedJobExecutionResult} for a job, or in case the job failed, the --- End diff -- Javadoc needs to be updated. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157876793 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java --- @@ -92,4 +94,42 @@ * @return Future containing the collection of instance ids and the corresponding metric query service path */ CompletableFuture>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout); + + /** +* Returns the {@link SerializedJobExecutionResult} for a job, or in case the job failed, the +* failure cause. +* +* @param jobId ID of the job that we are interested in. +* @param timeout Timeout for the asynchronous operation. +* +* @see #isJobExecutionResultPresent(JobID, Time) +* +* @return {@link CompletableFuture} containing the {@link JobExecutionResult} or a +* {@link Throwable} which represents the failure cause. If there is no result, the future will +* be completed exceptionally with +* {@link org.apache.flink.runtime.messages.JobExecutionResultNotFoundException} +*/ + default CompletableFuture getJobExecutionResult( + JobID jobId, + @RpcTimeout Time timeout) { + throw new UnsupportedOperationException(); + } + + /** +* Tests if the {@link SerializedJobExecutionResult} is present. --- End diff -- Javadoc needs to be updated. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5184 [FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher ## What is the purpose of the change Cache `JobExecutionResult` in `Dispatcher`, and add methods to `RestfulGateway` to enable retrieval of results through HTTP (not yet implemented). This will be needed so that accumulator results can be transmitted to the client. ## Brief change log - *Introduce new JobExecutionResult used by JobMaster to forward the information in the already existing JobExecutionResult.* - *Always cache a JobExecutionResult. Even in case of job failures. In case of job failures, the serialized exception is stored additionally.* - *Introduce new methods to RestfulGateway to allow retrieval of cached JobExecutionResults.* ## Verifying this change This change added tests and can be verified as follows: - *Added unit tests to verify that the Dispatcher caches the job results when the job finishes successfully or by failure.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) CC: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8234 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5184.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5184 commit d05c76e621106810c32bc17aa0576923ba6be401 Author: gyaoDate: 2017-12-19T17:58:53Z [FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher - Introduce new JobExecutionResult used by JobMaster to forward the information in the already existing JobExecutionResult. - Always cache a JobExecutionResult. Even in case of job failures. In case of job failures, the serialized exception is stored additionally. - Introduce new methods to RestfulGateway to allow retrieval of cached JobExecutionResults ---