[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes

2021-02-19 Thread GitBox


XComp commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r579274752



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -635,6 +641,33 @@ public void cancel() {
 return 
executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
 }
 
+protected void archiveGlobalFailure(Throwable failure) {
+taskFailureHistory.add(
+new ErrorInfo(failure, 
executionGraph.getStatusTimestamp(JobStatus.FAILED)));
+log.debug("Archive global failure.", failure);
+}
+
+protected void archiveFromFailureHandlingResult(FailureHandlingResult 
failureHandlingResult) {
+Optional executionOptional =
+failureHandlingResult
+.getExecutionVertexIdOfFailedTask()
+.map(this::getExecutionVertex)
+.map(ExecutionVertex::getCurrentExecutionAttempt);
+
+executionOptional.ifPresent(
+execution ->
+execution
+.getFailureInfo()
+.ifPresent(
+failureInfo -> {
+
taskFailureHistory.add(failureInfo);
+log.debug(
+"Archive local failure 
causing attempt {} to fail: {}",
+execution.getAttemptId(),
+
failureInfo.getExceptionAsString());
+}));

Review comment:
   I added this remark to FLINK-21190





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes

2021-02-19 Thread GitBox


XComp commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r579259796



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -635,6 +641,33 @@ public void cancel() {
 return 
executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
 }
 
+protected void archiveGlobalFailure(Throwable failure) {
+taskFailureHistory.add(
+new ErrorInfo(failure, 
executionGraph.getStatusTimestamp(JobStatus.FAILED)));
+log.debug("Archive global failure.", failure);
+}
+
+protected void archiveFromFailureHandlingResult(FailureHandlingResult 
failureHandlingResult) {
+Optional executionOptional =
+failureHandlingResult
+.getExecutionVertexIdOfFailedTask()
+.map(this::getExecutionVertex)
+.map(ExecutionVertex::getCurrentExecutionAttempt);
+
+executionOptional.ifPresent(
+execution ->
+execution
+.getFailureInfo()
+.ifPresent(
+failureInfo -> {
+
taskFailureHistory.add(failureInfo);
+log.debug(
+"Archive local failure 
causing attempt {} to fail: {}",
+execution.getAttemptId(),
+
failureInfo.getExceptionAsString());
+}));

Review comment:
   Yes, that would be possible if would would like to relax the constraint 
that the exception history does not provide the timestamp the `Execution` was 
marked as failed but an earlier timestamp. But to be fair, we do that already 
for the fallback of a global failover in `archiveFromFailureHandlingResult`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes

2021-02-19 Thread GitBox


XComp commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r579234673



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -635,6 +641,41 @@ public void cancel() {
 return 
executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
 }
 
+protected void archiveGlobalFailure(Throwable failure) {
+archiveGlobalFailure(failure, 
executionGraph.getStatusTimestamp(JobStatus.FAILED));
+}
+
+protected void archiveGlobalFailure(Throwable failure, long timestamp) {
+taskFailureHistory.add(new ErrorInfo(failure, timestamp));
+log.debug("Archive global failure.", failure);
+}
+
+protected void archiveFromFailureHandlingResult(FailureHandlingResult 
failureHandlingResult) {
+final Optional executionOptional =
+failureHandlingResult
+.getExecutionVertexIdOfFailedTask()
+.map(this::getExecutionVertex)
+.map(ExecutionVertex::getCurrentExecutionAttempt);
+
+if (executionOptional.isPresent()) {

Review comment:
   I'm hesitant to change that: IMHO, it makes the code harder to read and 
easier to misbehave. The current version of the code is straight forward: Use 
the `FAILED` timestamp provided the corresponding `Execution` if we have the 
corresponding `ExecutionAttemptId`. Additionally, your change request might 
cause unwanted behavior if we decide to introduce global failures due to some 
local failure. I consider the fact that the global failure does not have a 
causing `Execution` an implementation detail which we don't have to expose 
here. @tillrohrmann Is that reasonable?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r578786368



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -635,6 +641,33 @@ public void cancel() {
 return 
executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
 }
 
+protected void archiveGlobalFailure(Throwable failure) {
+taskFailureHistory.add(
+new ErrorInfo(failure, 
executionGraph.getStatusTimestamp(JobStatus.FAILED)));
+log.debug("Archive global failure.", failure);
+}
+
+protected void archiveFromFailureHandlingResult(FailureHandlingResult 
failureHandlingResult) {
+Optional executionOptional =
+failureHandlingResult
+.getExecutionVertexIdOfFailedTask()
+.map(this::getExecutionVertex)
+.map(ExecutionVertex::getCurrentExecutionAttempt);
+
+executionOptional.ifPresent(
+execution ->
+execution
+.getFailureInfo()
+.ifPresent(
+failureInfo -> {
+
taskFailureHistory.add(failureInfo);
+log.debug(
+"Archive local failure 
causing attempt {} to fail: {}",
+execution.getAttemptId(),
+
failureInfo.getExceptionAsString());
+}));

Review comment:
   But looking into that again, I realize that there is, indeed, the 
`failureInfo` in `ExecutionGraph` that is set during 
[handleTaskFailure](https://github.com/XComp/flink/blob/8e732bfb2bddc38ec7422f482dcda4be3d296408/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java#L218)
 and 
[handleGlobalFailure](https://github.com/XComp/flink/blob/8e732bfb2bddc38ec7422f482dcda4be3d296408/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java#L236).
 This is used to forward the current root cause of the `ExecutionGraph` to the 
Web UI. I could have used that. But the drawback is that it's using 
`System.currentMillis`.
   
   Instead, I'm gonna work on removing the `ExecutionGraph.failureInfo` (and 
related methods) as part of FLINK-21190. It becomes redundant after we enable 
to exception history since it's just the exception history's last entry.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r578787023



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -635,6 +641,33 @@ public void cancel() {
 return 
executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
 }
 
+protected void archiveGlobalFailure(Throwable failure) {
+taskFailureHistory.add(
+new ErrorInfo(failure, 
executionGraph.getStatusTimestamp(JobStatus.FAILED)));
+log.debug("Archive global failure.", failure);
+}
+
+protected void archiveFromFailureHandlingResult(FailureHandlingResult 
failureHandlingResult) {
+Optional executionOptional =
+failureHandlingResult
+.getExecutionVertexIdOfFailedTask()
+.map(this::getExecutionVertex)
+.map(ExecutionVertex::getCurrentExecutionAttempt);

Review comment:
   You're right. I remember looking into that. Back then, I mistook the 
task as only focusing on "task failures". That was a misunderstanding. I added 
the test for global failover and the test.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r578786368



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -635,6 +641,33 @@ public void cancel() {
 return 
executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
 }
 
+protected void archiveGlobalFailure(Throwable failure) {
+taskFailureHistory.add(
+new ErrorInfo(failure, 
executionGraph.getStatusTimestamp(JobStatus.FAILED)));
+log.debug("Archive global failure.", failure);
+}
+
+protected void archiveFromFailureHandlingResult(FailureHandlingResult 
failureHandlingResult) {
+Optional executionOptional =
+failureHandlingResult
+.getExecutionVertexIdOfFailedTask()
+.map(this::getExecutionVertex)
+.map(ExecutionVertex::getCurrentExecutionAttempt);
+
+executionOptional.ifPresent(
+execution ->
+execution
+.getFailureInfo()
+.ifPresent(
+failureInfo -> {
+
taskFailureHistory.add(failureInfo);
+log.debug(
+"Archive local failure 
causing attempt {} to fail: {}",
+execution.getAttemptId(),
+
failureInfo.getExceptionAsString());
+}));

Review comment:
   But looking into that again, I realize that there is, indeed, the 
`failureInfo` in `ExecutionGraph` that is set during 
[handleTaskFailure](https://github.com/XComp/flink/blob/8e732bfb2bddc38ec7422f482dcda4be3d296408/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java#L218)
 and 
[handleGlobalFailure](https://github.com/XComp/flink/blob/8e732bfb2bddc38ec7422f482dcda4be3d296408/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java#L236).
 This is used to forward the current root cause of the `ExecutionGraph` to the 
Web UI. I could have used that. But the drawback is that it's using 
`System.currentMillis`.
   
   Instead, I'm gonna work on removing the `ExecutionGraph.failureInfo` (and 
related methods) as part of FLINK-21188. It becomes redundant after we enable 
to exception history since it's just the exception history's last entry.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes

2021-02-18 Thread GitBox


XComp commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r578737299



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -635,6 +641,33 @@ public void cancel() {
 return 
executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
 }
 
+protected void archiveGlobalFailure(Throwable failure) {
+taskFailureHistory.add(
+new ErrorInfo(failure, 
executionGraph.getStatusTimestamp(JobStatus.FAILED)));
+log.debug("Archive global failure.", failure);
+}
+
+protected void archiveFromFailureHandlingResult(FailureHandlingResult 
failureHandlingResult) {
+Optional executionOptional =
+failureHandlingResult
+.getExecutionVertexIdOfFailedTask()
+.map(this::getExecutionVertex)
+.map(ExecutionVertex::getCurrentExecutionAttempt);
+
+executionOptional.ifPresent(
+execution ->
+execution
+.getFailureInfo()
+.ifPresent(
+failureInfo -> {
+
taskFailureHistory.add(failureInfo);
+log.debug(
+"Archive local failure 
causing attempt {} to fail: {}",
+execution.getAttemptId(),
+
failureInfo.getExceptionAsString());
+}));

Review comment:
   Because of the timestamp in case of local failure: The `Execution` fails 
but does not necessarily trigger the `ExecutionGraph` to switch to fail. Hence, 
no timestamp is record for this kind of failure.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes

2021-02-05 Thread GitBox


XComp commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r569986337



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -522,6 +527,7 @@ protected ComponentMainThreadExecutor 
getMainThreadExecutor() {
 protected void failJob(Throwable cause) {
 incrementVersionsOfAllVertices();
 executionGraph.failJob(cause);
+getTerminationFuture().thenRun(() -> archiveGlobalFailure(cause));

Review comment:
   Here, I don't understand fully: You're referring to the case where a 
failure happens, the user cancels the job while the failure handling is done 
and the `failJob` method might be called while being in a 
`CANCELED`/`CANCELLING` state?For this case, I would still think that we should 
archive the exception because the users intervention happened after the 
exception happened.

##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##
@@ -870,6 +874,78 @@ public void 
allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception
 assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), 
hasSize(0));
 }
 
+@Test
+public void testExceptionHistoryWithRestartableFailure() {
+final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+final JobID jobId = jobGraph.getJobID();
+
+final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+// initiate restartable failure
+final ExecutionAttemptID restartableAttemptId =
+
Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+.getCurrentExecutionAttempt()
+.getAttemptId();
+final RuntimeException restartableException = new 
RuntimeException("restartable exception");
+Range updateStateTriggeringRestartTimeframe =
+initiateFailure(scheduler, jobId, restartableAttemptId, 
restartableException);
+
+taskRestartExecutor.triggerNonPeriodicScheduledTask();
+
+// initiate job failure
+testRestartBackoffTimeStrategy.setCanRestart(false);
+
+final ExecutionAttemptID failingAttemptId =
+
Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+.getCurrentExecutionAttempt()
+.getAttemptId();
+final RuntimeException failingException = new 
RuntimeException("failing exception");
+Range updateStateTriggeringJobFailureTimeframe =
+initiateFailure(scheduler, jobId, failingAttemptId, 
failingException);
+
+List actualExceptionHistory = 
scheduler.getExceptionHistory();
+assertThat(actualExceptionHistory.size(), is(2));
+
+// assert restarted attempt
+ErrorInfo restartableFailure = actualExceptionHistory.get(0);
+assertThat(
+restartableFailure
+.getException()
+.deserializeError(ClassLoader.getSystemClassLoader()),
+is(restartableException));
+assertThat(
+restartableFailure.getTimestamp(),
+
greaterThanOrEqualTo(updateStateTriggeringRestartTimeframe.lowerEndpoint()));
+assertThat(
+restartableFailure.getTimestamp(),
+
lessThanOrEqualTo(updateStateTriggeringRestartTimeframe.upperEndpoint()));
+
+// assert job failure attempt
+ErrorInfo globalFailure = actualExceptionHistory.get(1);
+Throwable actualException =
+
globalFailure.getException().deserializeError(ClassLoader.getSystemClassLoader());
+assertThat(actualException, 
org.hamcrest.core.IsInstanceOf.instanceOf(JobException.class));
+assertThat(actualException.getCause(), is(failingException));
+assertThat(
+globalFailure.getTimestamp(),
+
greaterThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.lowerEndpoint()));
+assertThat(
+globalFailure.getTimestamp(),
+
lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint()));
+}
+
+private static Range initiateFailure(
+DefaultScheduler scheduler,
+JobID jobId,
+ExecutionAttemptID executionAttemptID,
+Throwable exception) {
+long start = System.currentTimeMillis();

Review comment:
   Thanks for clarification.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##
@@ -1022,18 +1021,38 @@ private boolean transitionState(ExecutionState 
currentState, ExecutionState newS
 return transitionState(currentState, newState, null);
 }
 
+/**
+ * Try to transition to FAILED state from a given state and sets the 
{@code 

[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes

2021-02-04 Thread GitBox


XComp commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r570520523



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##
@@ -1022,18 +1021,38 @@ private boolean transitionState(ExecutionState 
currentState, ExecutionState newS
 return transitionState(currentState, newState, null);
 }
 
+/**
+ * Try to transition to FAILED state from a given state and sets the 
{@code failureCause}.
+ *
+ * @param currentState of the execution
+ * @param cause the {@link Throwable} causing the failure
+ * @return true if the transition was successful, otherwise false
+ * @throws NullPointerException if no {@code cause} is provided
+ */
+private boolean transitionToFailedStateAndSetFailureCause(
+ExecutionState currentState, Throwable cause) {
+Preconditions.checkNotNull(cause, "No cause is given when 
transitioning to FAILED state.");
+failureCause = cause;
+return transitionState(currentState, ExecutionState.FAILED, cause);

Review comment:
   That's actually a good point. I will correct that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes

2021-02-03 Thread GitBox


XComp commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r570012364



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##
@@ -870,6 +874,78 @@ public void 
allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception
 assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), 
hasSize(0));
 }
 
+@Test
+public void testExceptionHistoryWithRestartableFailure() {
+final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+final JobID jobId = jobGraph.getJobID();
+
+final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+// initiate restartable failure
+final ExecutionAttemptID restartableAttemptId =
+
Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+.getCurrentExecutionAttempt()
+.getAttemptId();
+final RuntimeException restartableException = new 
RuntimeException("restartable exception");
+Range updateStateTriggeringRestartTimeframe =
+initiateFailure(scheduler, jobId, restartableAttemptId, 
restartableException);
+
+taskRestartExecutor.triggerNonPeriodicScheduledTask();
+
+// initiate job failure
+testRestartBackoffTimeStrategy.setCanRestart(false);
+
+final ExecutionAttemptID failingAttemptId =
+
Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+.getCurrentExecutionAttempt()
+.getAttemptId();
+final RuntimeException failingException = new 
RuntimeException("failing exception");
+Range updateStateTriggeringJobFailureTimeframe =
+initiateFailure(scheduler, jobId, failingAttemptId, 
failingException);
+
+List actualExceptionHistory = 
scheduler.getExceptionHistory();
+assertThat(actualExceptionHistory.size(), is(2));
+
+// assert restarted attempt
+ErrorInfo restartableFailure = actualExceptionHistory.get(0);
+assertThat(
+restartableFailure
+.getException()
+.deserializeError(ClassLoader.getSystemClassLoader()),
+is(restartableException));
+assertThat(
+restartableFailure.getTimestamp(),
+
greaterThanOrEqualTo(updateStateTriggeringRestartTimeframe.lowerEndpoint()));
+assertThat(
+restartableFailure.getTimestamp(),
+
lessThanOrEqualTo(updateStateTriggeringRestartTimeframe.upperEndpoint()));
+
+// assert job failure attempt
+ErrorInfo globalFailure = actualExceptionHistory.get(1);
+Throwable actualException =
+
globalFailure.getException().deserializeError(ClassLoader.getSystemClassLoader());
+assertThat(actualException, 
org.hamcrest.core.IsInstanceOf.instanceOf(JobException.class));
+assertThat(actualException.getCause(), is(failingException));
+assertThat(
+globalFailure.getTimestamp(),
+
greaterThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.lowerEndpoint()));
+assertThat(
+globalFailure.getTimestamp(),
+
lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint()));
+}
+
+private static Range initiateFailure(
+DefaultScheduler scheduler,
+JobID jobId,
+ExecutionAttemptID executionAttemptID,
+Throwable exception) {
+long start = System.currentTimeMillis();

Review comment:
   Thanks for clarification.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes

2021-02-03 Thread GitBox


XComp commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r569986337



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -522,6 +527,7 @@ protected ComponentMainThreadExecutor 
getMainThreadExecutor() {
 protected void failJob(Throwable cause) {
 incrementVersionsOfAllVertices();
 executionGraph.failJob(cause);
+getTerminationFuture().thenRun(() -> archiveGlobalFailure(cause));

Review comment:
   Here, I don't understand fully: You're referring to the case where a 
failure happens, the user cancels the job while the failure handling is done 
and the `failJob` method might be called while being in a 
`CANCELED`/`CANCELLING` state?For this case, I would still think that we should 
archive the exception because the users intervention happened after the 
exception happened.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes

2021-02-03 Thread GitBox


XComp commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r569720896



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
##
@@ -371,9 +375,12 @@ private static void compareExecution(
 assertEquals(
 runtimeExecution.getAssignedResourceLocation(),
 archivedExecution.getAssignedResourceLocation());
-assertEquals(
-runtimeExecution.getFailureCauseAsString(),
-archivedExecution.getFailureCauseAsString());
+assertThat(
+
runtimeExecution.getFailureInfo().map(ErrorInfo::getExceptionAsString),
+
is(archivedExecution.getFailureInfo().map(ErrorInfo::getExceptionAsString)));
+assertThat(
+runtimeExecution.getFailureInfo().map(ErrorInfo::getTimestamp),
+
is(archivedExecution.getFailureInfo().map(ErrorInfo::getExceptionAsString)));

Review comment:
   Oh man, good catch. `ArchiveExecutionGraphTest` triggered a global 
failure which only set the `failureInfo` on the `ExecutionGraph` level. I 
switched to `SchedulerBase.updateTaskExecutionState` which also sets the 
`failureInfo` on an `Execution` level





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes

2021-02-03 Thread GitBox


XComp commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r569551171



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
##
@@ -317,13 +316,11 @@ public TaskManagerLocation getAssignedResourceLocation() {
 : null;
 }
 
-public Throwable getFailureCause() {
-return failureCause;
-}
-
 @Override
-public String getFailureCauseAsString() {
-return ExceptionUtils.stringifyException(getFailureCause());
+public Optional getFailureInfo() {
+return failureCause == null
+? Optional.empty()
+: Optional.of(new ErrorInfo(failureCause, 
getStateTimestamp(FAILED)));

Review comment:
   Yes, it does. I actually had it like that in a first version of the code 
but changed it for the sake of consistency after realizing that it's not 
Serializable (which was a problem in `ArchivedExecution`). I will revert it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes

2021-01-31 Thread GitBox


XComp commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r567422474



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##
@@ -870,6 +874,78 @@ public void 
allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception
 assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), 
hasSize(0));
 }
 
+@Test
+public void testExceptionHistoryWithRestartableFailure() {
+final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+final JobID jobId = jobGraph.getJobID();
+
+final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+// initiate restartable failure
+final ExecutionAttemptID restartableAttemptId =
+
Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+.getCurrentExecutionAttempt()
+.getAttemptId();
+final RuntimeException restartableException = new 
RuntimeException("restartable exception");
+Range updateStateTriggeringRestartTimeframe =
+initiateFailure(scheduler, jobId, restartableAttemptId, 
restartableException);
+
+taskRestartExecutor.triggerNonPeriodicScheduledTask();
+
+// initiate job failure
+testRestartBackoffTimeStrategy.setCanRestart(false);
+
+final ExecutionAttemptID failingAttemptId =
+
Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+.getCurrentExecutionAttempt()
+.getAttemptId();
+final RuntimeException failingException = new 
RuntimeException("failing exception");
+Range updateStateTriggeringJobFailureTimeframe =
+initiateFailure(scheduler, jobId, failingAttemptId, 
failingException);
+
+List actualExceptionHistory = 
scheduler.getExceptionHistory();
+assertThat(actualExceptionHistory.size(), is(2));
+
+// assert restarted attempt
+ErrorInfo restartableFailure = actualExceptionHistory.get(0);
+assertThat(
+restartableFailure
+.getException()
+.deserializeError(ClassLoader.getSystemClassLoader()),
+is(restartableException));
+assertThat(
+restartableFailure.getTimestamp(),
+
greaterThanOrEqualTo(updateStateTriggeringRestartTimeframe.lowerEndpoint()));
+assertThat(
+restartableFailure.getTimestamp(),
+
lessThanOrEqualTo(updateStateTriggeringRestartTimeframe.upperEndpoint()));
+
+// assert job failure attempt
+ErrorInfo globalFailure = actualExceptionHistory.get(1);
+Throwable actualException =
+
globalFailure.getException().deserializeError(ClassLoader.getSystemClassLoader());
+assertThat(actualException, 
org.hamcrest.core.IsInstanceOf.instanceOf(JobException.class));
+assertThat(actualException.getCause(), is(failingException));
+assertThat(
+globalFailure.getTimestamp(),
+
greaterThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.lowerEndpoint()));
+assertThat(
+globalFailure.getTimestamp(),
+
lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint()));
+}
+
+private static Range initiateFailure(
+DefaultScheduler scheduler,
+JobID jobId,
+ExecutionAttemptID executionAttemptID,
+Throwable exception) {
+long start = System.currentTimeMillis();

Review comment:
   Correct me if I'm wrong here: Another way to measure the time would be 
`System.nanoTime()` since it ensures monotonicity. But using this in the test 
would require `Execution` to use `System.nanoTime()` as well which would not 
work as `System.nanoTime` cannot be used to compare timestamps between 
different JVMs 
[[1]](https://www.javaadvent.com/2019/12/measuring-time-from-java-to-kernel-and-back.html)?!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes

2021-01-31 Thread GitBox


XComp commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r567421044



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
##
@@ -267,9 +267,16 @@ private void restartTasksWithDelay(final 
FailureHandlingResult failureHandlingRe
 delayExecutor.schedule(
 () ->
 FutureUtils.assertNoException(
-cancelFuture.thenRunAsync(
-restartTasks(executionVertexVersions, 
globalRecovery),
-getMainThreadExecutor())),
+cancelFuture
+.thenRunAsync(
+() ->
+
archiveFromFailureHandlingResult(
+
failureHandlingResult),
+getMainThreadExecutor())
+.thenRunAsync(
+restartTasks(
+
executionVertexVersions, globalRecovery),
+getMainThreadExecutor())),

Review comment:
   That's a good question. I did it when experimenting for convenience and 
didn't reiterate over it again.  I refactored it now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes

2021-01-31 Thread GitBox


XComp commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r567418341



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -641,6 +648,33 @@ public void cancel() {
 return 
executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
 }
 
+protected void archiveGlobalFailure(Throwable failure) {
+Set executionAttemptIds =
+
IterableUtils.toStream(executionGraph.getAllExecutionVertices())
+.map(ExecutionVertex::getCurrentExecutionAttempt)
+.map(Execution::getAttemptId)
+.collect(Collectors.toSet());
+
+globalTaskFailures.put(
+executionAttemptIds, new ErrorInfo(failure, 
System.currentTimeMillis()));
+
+log.debug("Archive global {}: {}", failure, failure.getMessage());
+}
+
+protected void archiveFromFailureHandlingResult(FailureHandlingResult 
failureHandlingResult) {
+if (failureHandlingResult.getExecutionVertexIdOfFailedTask() == null) {

Review comment:
   I removed this check. This if was a relict from a previous code version.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes

2021-01-31 Thread GitBox


XComp commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r567418118



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##
@@ -870,6 +874,78 @@ public void 
allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception
 assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), 
hasSize(0));
 }
 
+@Test
+public void testExceptionHistoryWithRestartableFailure() {
+final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
+final JobID jobId = jobGraph.getJobID();
+
+final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+
+// initiate restartable failure
+final ExecutionAttemptID restartableAttemptId =
+
Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+.getCurrentExecutionAttempt()
+.getAttemptId();
+final RuntimeException restartableException = new 
RuntimeException("restartable exception");
+Range updateStateTriggeringRestartTimeframe =
+initiateFailure(scheduler, jobId, restartableAttemptId, 
restartableException);
+
+taskRestartExecutor.triggerNonPeriodicScheduledTask();
+
+// initiate job failure
+testRestartBackoffTimeStrategy.setCanRestart(false);
+
+final ExecutionAttemptID failingAttemptId =
+
Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices())
+.getCurrentExecutionAttempt()
+.getAttemptId();
+final RuntimeException failingException = new 
RuntimeException("failing exception");
+Range updateStateTriggeringJobFailureTimeframe =
+initiateFailure(scheduler, jobId, failingAttemptId, 
failingException);
+
+List actualExceptionHistory = 
scheduler.getExceptionHistory();
+assertThat(actualExceptionHistory.size(), is(2));
+
+// assert restarted attempt
+ErrorInfo restartableFailure = actualExceptionHistory.get(0);
+assertThat(
+restartableFailure
+.getException()
+.deserializeError(ClassLoader.getSystemClassLoader()),
+is(restartableException));
+assertThat(
+restartableFailure.getTimestamp(),
+
greaterThanOrEqualTo(updateStateTriggeringRestartTimeframe.lowerEndpoint()));
+assertThat(
+restartableFailure.getTimestamp(),
+
lessThanOrEqualTo(updateStateTriggeringRestartTimeframe.upperEndpoint()));
+
+// assert job failure attempt
+ErrorInfo globalFailure = actualExceptionHistory.get(1);
+Throwable actualException =
+
globalFailure.getException().deserializeError(ClassLoader.getSystemClassLoader());
+assertThat(actualException, 
org.hamcrest.core.IsInstanceOf.instanceOf(JobException.class));
+assertThat(actualException.getCause(), is(failingException));

Review comment:
   Ah, thanks for the hint. I didn't know about `FlinkMatchers`, yet.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes

2021-01-31 Thread GitBox


XComp commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r567417737



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##
@@ -164,6 +168,8 @@
 private final Map coordinatorMap;
 
 private final ComponentMainThreadExecutor mainThreadExecutor;
+private final Map localTaskFailures = new 
HashMap<>();
+private final Map, ErrorInfo> globalTaskFailures = 
new HashMap<>();

Review comment:
   You're right. That was still some leftover of my initial work 
considering multiple exceptions per failure cause. I'm gonna refactor it to use 
a List here in order to be more consistent.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes

2021-01-31 Thread GitBox


XComp commented on a change in pull request #14798:
URL: https://github.com/apache/flink/pull/14798#discussion_r567417575



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
##
@@ -100,17 +116,22 @@ public long getRestartDelayMS() {
 }
 }
 
+/**
+ * Returns the {@link ExecutionVertexID} of the task causing this failure.
+ *
+ * @return The {@code ExecutionVertexID} or {@code null} if it's a global 
failure.
+ */
+public ExecutionVertexID getExecutionVertexIdOfFailedTask() {

Review comment:
   I switched to returning an `Optional` to make this more explicit.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org