[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes
XComp commented on a change in pull request #14798: URL: https://github.com/apache/flink/pull/14798#discussion_r579274752 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -635,6 +641,33 @@ public void cancel() { return executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn()); } +protected void archiveGlobalFailure(Throwable failure) { +taskFailureHistory.add( +new ErrorInfo(failure, executionGraph.getStatusTimestamp(JobStatus.FAILED))); +log.debug("Archive global failure.", failure); +} + +protected void archiveFromFailureHandlingResult(FailureHandlingResult failureHandlingResult) { +Optional executionOptional = +failureHandlingResult +.getExecutionVertexIdOfFailedTask() +.map(this::getExecutionVertex) +.map(ExecutionVertex::getCurrentExecutionAttempt); + +executionOptional.ifPresent( +execution -> +execution +.getFailureInfo() +.ifPresent( +failureInfo -> { + taskFailureHistory.add(failureInfo); +log.debug( +"Archive local failure causing attempt {} to fail: {}", +execution.getAttemptId(), + failureInfo.getExceptionAsString()); +})); Review comment: I added this remark to FLINK-21190 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes
XComp commented on a change in pull request #14798: URL: https://github.com/apache/flink/pull/14798#discussion_r579259796 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -635,6 +641,33 @@ public void cancel() { return executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn()); } +protected void archiveGlobalFailure(Throwable failure) { +taskFailureHistory.add( +new ErrorInfo(failure, executionGraph.getStatusTimestamp(JobStatus.FAILED))); +log.debug("Archive global failure.", failure); +} + +protected void archiveFromFailureHandlingResult(FailureHandlingResult failureHandlingResult) { +Optional executionOptional = +failureHandlingResult +.getExecutionVertexIdOfFailedTask() +.map(this::getExecutionVertex) +.map(ExecutionVertex::getCurrentExecutionAttempt); + +executionOptional.ifPresent( +execution -> +execution +.getFailureInfo() +.ifPresent( +failureInfo -> { + taskFailureHistory.add(failureInfo); +log.debug( +"Archive local failure causing attempt {} to fail: {}", +execution.getAttemptId(), + failureInfo.getExceptionAsString()); +})); Review comment: Yes, that would be possible if would would like to relax the constraint that the exception history does not provide the timestamp the `Execution` was marked as failed but an earlier timestamp. But to be fair, we do that already for the fallback of a global failover in `archiveFromFailureHandlingResult`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes
XComp commented on a change in pull request #14798: URL: https://github.com/apache/flink/pull/14798#discussion_r579234673 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -635,6 +641,41 @@ public void cancel() { return executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn()); } +protected void archiveGlobalFailure(Throwable failure) { +archiveGlobalFailure(failure, executionGraph.getStatusTimestamp(JobStatus.FAILED)); +} + +protected void archiveGlobalFailure(Throwable failure, long timestamp) { +taskFailureHistory.add(new ErrorInfo(failure, timestamp)); +log.debug("Archive global failure.", failure); +} + +protected void archiveFromFailureHandlingResult(FailureHandlingResult failureHandlingResult) { +final Optional executionOptional = +failureHandlingResult +.getExecutionVertexIdOfFailedTask() +.map(this::getExecutionVertex) +.map(ExecutionVertex::getCurrentExecutionAttempt); + +if (executionOptional.isPresent()) { Review comment: I'm hesitant to change that: IMHO, it makes the code harder to read and easier to misbehave. The current version of the code is straight forward: Use the `FAILED` timestamp provided the corresponding `Execution` if we have the corresponding `ExecutionAttemptId`. Additionally, your change request might cause unwanted behavior if we decide to introduce global failures due to some local failure. I consider the fact that the global failure does not have a causing `Execution` an implementation detail which we don't have to expose here. @tillrohrmann Is that reasonable? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes
XComp commented on a change in pull request #14798: URL: https://github.com/apache/flink/pull/14798#discussion_r578786368 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -635,6 +641,33 @@ public void cancel() { return executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn()); } +protected void archiveGlobalFailure(Throwable failure) { +taskFailureHistory.add( +new ErrorInfo(failure, executionGraph.getStatusTimestamp(JobStatus.FAILED))); +log.debug("Archive global failure.", failure); +} + +protected void archiveFromFailureHandlingResult(FailureHandlingResult failureHandlingResult) { +Optional executionOptional = +failureHandlingResult +.getExecutionVertexIdOfFailedTask() +.map(this::getExecutionVertex) +.map(ExecutionVertex::getCurrentExecutionAttempt); + +executionOptional.ifPresent( +execution -> +execution +.getFailureInfo() +.ifPresent( +failureInfo -> { + taskFailureHistory.add(failureInfo); +log.debug( +"Archive local failure causing attempt {} to fail: {}", +execution.getAttemptId(), + failureInfo.getExceptionAsString()); +})); Review comment: But looking into that again, I realize that there is, indeed, the `failureInfo` in `ExecutionGraph` that is set during [handleTaskFailure](https://github.com/XComp/flink/blob/8e732bfb2bddc38ec7422f482dcda4be3d296408/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java#L218) and [handleGlobalFailure](https://github.com/XComp/flink/blob/8e732bfb2bddc38ec7422f482dcda4be3d296408/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java#L236). This is used to forward the current root cause of the `ExecutionGraph` to the Web UI. I could have used that. But the drawback is that it's using `System.currentMillis`. Instead, I'm gonna work on removing the `ExecutionGraph.failureInfo` (and related methods) as part of FLINK-21190. It becomes redundant after we enable to exception history since it's just the exception history's last entry. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes
XComp commented on a change in pull request #14798: URL: https://github.com/apache/flink/pull/14798#discussion_r578787023 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -635,6 +641,33 @@ public void cancel() { return executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn()); } +protected void archiveGlobalFailure(Throwable failure) { +taskFailureHistory.add( +new ErrorInfo(failure, executionGraph.getStatusTimestamp(JobStatus.FAILED))); +log.debug("Archive global failure.", failure); +} + +protected void archiveFromFailureHandlingResult(FailureHandlingResult failureHandlingResult) { +Optional executionOptional = +failureHandlingResult +.getExecutionVertexIdOfFailedTask() +.map(this::getExecutionVertex) +.map(ExecutionVertex::getCurrentExecutionAttempt); Review comment: You're right. I remember looking into that. Back then, I mistook the task as only focusing on "task failures". That was a misunderstanding. I added the test for global failover and the test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes
XComp commented on a change in pull request #14798: URL: https://github.com/apache/flink/pull/14798#discussion_r578786368 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -635,6 +641,33 @@ public void cancel() { return executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn()); } +protected void archiveGlobalFailure(Throwable failure) { +taskFailureHistory.add( +new ErrorInfo(failure, executionGraph.getStatusTimestamp(JobStatus.FAILED))); +log.debug("Archive global failure.", failure); +} + +protected void archiveFromFailureHandlingResult(FailureHandlingResult failureHandlingResult) { +Optional executionOptional = +failureHandlingResult +.getExecutionVertexIdOfFailedTask() +.map(this::getExecutionVertex) +.map(ExecutionVertex::getCurrentExecutionAttempt); + +executionOptional.ifPresent( +execution -> +execution +.getFailureInfo() +.ifPresent( +failureInfo -> { + taskFailureHistory.add(failureInfo); +log.debug( +"Archive local failure causing attempt {} to fail: {}", +execution.getAttemptId(), + failureInfo.getExceptionAsString()); +})); Review comment: But looking into that again, I realize that there is, indeed, the `failureInfo` in `ExecutionGraph` that is set during [handleTaskFailure](https://github.com/XComp/flink/blob/8e732bfb2bddc38ec7422f482dcda4be3d296408/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java#L218) and [handleGlobalFailure](https://github.com/XComp/flink/blob/8e732bfb2bddc38ec7422f482dcda4be3d296408/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java#L236). This is used to forward the current root cause of the `ExecutionGraph` to the Web UI. I could have used that. But the drawback is that it's using `System.currentMillis`. Instead, I'm gonna work on removing the `ExecutionGraph.failureInfo` (and related methods) as part of FLINK-21188. It becomes redundant after we enable to exception history since it's just the exception history's last entry. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes
XComp commented on a change in pull request #14798: URL: https://github.com/apache/flink/pull/14798#discussion_r578737299 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -635,6 +641,33 @@ public void cancel() { return executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn()); } +protected void archiveGlobalFailure(Throwable failure) { +taskFailureHistory.add( +new ErrorInfo(failure, executionGraph.getStatusTimestamp(JobStatus.FAILED))); +log.debug("Archive global failure.", failure); +} + +protected void archiveFromFailureHandlingResult(FailureHandlingResult failureHandlingResult) { +Optional executionOptional = +failureHandlingResult +.getExecutionVertexIdOfFailedTask() +.map(this::getExecutionVertex) +.map(ExecutionVertex::getCurrentExecutionAttempt); + +executionOptional.ifPresent( +execution -> +execution +.getFailureInfo() +.ifPresent( +failureInfo -> { + taskFailureHistory.add(failureInfo); +log.debug( +"Archive local failure causing attempt {} to fail: {}", +execution.getAttemptId(), + failureInfo.getExceptionAsString()); +})); Review comment: Because of the timestamp in case of local failure: The `Execution` fails but does not necessarily trigger the `ExecutionGraph` to switch to fail. Hence, no timestamp is record for this kind of failure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes
XComp commented on a change in pull request #14798: URL: https://github.com/apache/flink/pull/14798#discussion_r569986337 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -522,6 +527,7 @@ protected ComponentMainThreadExecutor getMainThreadExecutor() { protected void failJob(Throwable cause) { incrementVersionsOfAllVertices(); executionGraph.failJob(cause); +getTerminationFuture().thenRun(() -> archiveGlobalFailure(cause)); Review comment: Here, I don't understand fully: You're referring to the case where a failure happens, the user cancels the job while the failure handling is done and the `failJob` method might be called while being in a `CANCELED`/`CANCELLING` state?For this case, I would still think that we should archive the exception because the users intervention happened after the exception happened. ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ## @@ -870,6 +874,78 @@ public void allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(0)); } +@Test +public void testExceptionHistoryWithRestartableFailure() { +final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); +final JobID jobId = jobGraph.getJobID(); + +final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + +// initiate restartable failure +final ExecutionAttemptID restartableAttemptId = + Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices()) +.getCurrentExecutionAttempt() +.getAttemptId(); +final RuntimeException restartableException = new RuntimeException("restartable exception"); +Range updateStateTriggeringRestartTimeframe = +initiateFailure(scheduler, jobId, restartableAttemptId, restartableException); + +taskRestartExecutor.triggerNonPeriodicScheduledTask(); + +// initiate job failure +testRestartBackoffTimeStrategy.setCanRestart(false); + +final ExecutionAttemptID failingAttemptId = + Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices()) +.getCurrentExecutionAttempt() +.getAttemptId(); +final RuntimeException failingException = new RuntimeException("failing exception"); +Range updateStateTriggeringJobFailureTimeframe = +initiateFailure(scheduler, jobId, failingAttemptId, failingException); + +List actualExceptionHistory = scheduler.getExceptionHistory(); +assertThat(actualExceptionHistory.size(), is(2)); + +// assert restarted attempt +ErrorInfo restartableFailure = actualExceptionHistory.get(0); +assertThat( +restartableFailure +.getException() +.deserializeError(ClassLoader.getSystemClassLoader()), +is(restartableException)); +assertThat( +restartableFailure.getTimestamp(), + greaterThanOrEqualTo(updateStateTriggeringRestartTimeframe.lowerEndpoint())); +assertThat( +restartableFailure.getTimestamp(), + lessThanOrEqualTo(updateStateTriggeringRestartTimeframe.upperEndpoint())); + +// assert job failure attempt +ErrorInfo globalFailure = actualExceptionHistory.get(1); +Throwable actualException = + globalFailure.getException().deserializeError(ClassLoader.getSystemClassLoader()); +assertThat(actualException, org.hamcrest.core.IsInstanceOf.instanceOf(JobException.class)); +assertThat(actualException.getCause(), is(failingException)); +assertThat( +globalFailure.getTimestamp(), + greaterThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.lowerEndpoint())); +assertThat( +globalFailure.getTimestamp(), + lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint())); +} + +private static Range initiateFailure( +DefaultScheduler scheduler, +JobID jobId, +ExecutionAttemptID executionAttemptID, +Throwable exception) { +long start = System.currentTimeMillis(); Review comment: Thanks for clarification. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -1022,18 +1021,38 @@ private boolean transitionState(ExecutionState currentState, ExecutionState newS return transitionState(currentState, newState, null); } +/** + * Try to transition to FAILED state from a given state and sets the {@code
[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes
XComp commented on a change in pull request #14798: URL: https://github.com/apache/flink/pull/14798#discussion_r570520523 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ## @@ -1022,18 +1021,38 @@ private boolean transitionState(ExecutionState currentState, ExecutionState newS return transitionState(currentState, newState, null); } +/** + * Try to transition to FAILED state from a given state and sets the {@code failureCause}. + * + * @param currentState of the execution + * @param cause the {@link Throwable} causing the failure + * @return true if the transition was successful, otherwise false + * @throws NullPointerException if no {@code cause} is provided + */ +private boolean transitionToFailedStateAndSetFailureCause( +ExecutionState currentState, Throwable cause) { +Preconditions.checkNotNull(cause, "No cause is given when transitioning to FAILED state."); +failureCause = cause; +return transitionState(currentState, ExecutionState.FAILED, cause); Review comment: That's actually a good point. I will correct that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes
XComp commented on a change in pull request #14798: URL: https://github.com/apache/flink/pull/14798#discussion_r570012364 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ## @@ -870,6 +874,78 @@ public void allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(0)); } +@Test +public void testExceptionHistoryWithRestartableFailure() { +final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); +final JobID jobId = jobGraph.getJobID(); + +final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + +// initiate restartable failure +final ExecutionAttemptID restartableAttemptId = + Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices()) +.getCurrentExecutionAttempt() +.getAttemptId(); +final RuntimeException restartableException = new RuntimeException("restartable exception"); +Range updateStateTriggeringRestartTimeframe = +initiateFailure(scheduler, jobId, restartableAttemptId, restartableException); + +taskRestartExecutor.triggerNonPeriodicScheduledTask(); + +// initiate job failure +testRestartBackoffTimeStrategy.setCanRestart(false); + +final ExecutionAttemptID failingAttemptId = + Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices()) +.getCurrentExecutionAttempt() +.getAttemptId(); +final RuntimeException failingException = new RuntimeException("failing exception"); +Range updateStateTriggeringJobFailureTimeframe = +initiateFailure(scheduler, jobId, failingAttemptId, failingException); + +List actualExceptionHistory = scheduler.getExceptionHistory(); +assertThat(actualExceptionHistory.size(), is(2)); + +// assert restarted attempt +ErrorInfo restartableFailure = actualExceptionHistory.get(0); +assertThat( +restartableFailure +.getException() +.deserializeError(ClassLoader.getSystemClassLoader()), +is(restartableException)); +assertThat( +restartableFailure.getTimestamp(), + greaterThanOrEqualTo(updateStateTriggeringRestartTimeframe.lowerEndpoint())); +assertThat( +restartableFailure.getTimestamp(), + lessThanOrEqualTo(updateStateTriggeringRestartTimeframe.upperEndpoint())); + +// assert job failure attempt +ErrorInfo globalFailure = actualExceptionHistory.get(1); +Throwable actualException = + globalFailure.getException().deserializeError(ClassLoader.getSystemClassLoader()); +assertThat(actualException, org.hamcrest.core.IsInstanceOf.instanceOf(JobException.class)); +assertThat(actualException.getCause(), is(failingException)); +assertThat( +globalFailure.getTimestamp(), + greaterThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.lowerEndpoint())); +assertThat( +globalFailure.getTimestamp(), + lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint())); +} + +private static Range initiateFailure( +DefaultScheduler scheduler, +JobID jobId, +ExecutionAttemptID executionAttemptID, +Throwable exception) { +long start = System.currentTimeMillis(); Review comment: Thanks for clarification. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes
XComp commented on a change in pull request #14798: URL: https://github.com/apache/flink/pull/14798#discussion_r569986337 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -522,6 +527,7 @@ protected ComponentMainThreadExecutor getMainThreadExecutor() { protected void failJob(Throwable cause) { incrementVersionsOfAllVertices(); executionGraph.failJob(cause); +getTerminationFuture().thenRun(() -> archiveGlobalFailure(cause)); Review comment: Here, I don't understand fully: You're referring to the case where a failure happens, the user cancels the job while the failure handling is done and the `failJob` method might be called while being in a `CANCELED`/`CANCELLING` state?For this case, I would still think that we should archive the exception because the users intervention happened after the exception happened. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes
XComp commented on a change in pull request #14798: URL: https://github.com/apache/flink/pull/14798#discussion_r569720896 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java ## @@ -371,9 +375,12 @@ private static void compareExecution( assertEquals( runtimeExecution.getAssignedResourceLocation(), archivedExecution.getAssignedResourceLocation()); -assertEquals( -runtimeExecution.getFailureCauseAsString(), -archivedExecution.getFailureCauseAsString()); +assertThat( + runtimeExecution.getFailureInfo().map(ErrorInfo::getExceptionAsString), + is(archivedExecution.getFailureInfo().map(ErrorInfo::getExceptionAsString))); +assertThat( +runtimeExecution.getFailureInfo().map(ErrorInfo::getTimestamp), + is(archivedExecution.getFailureInfo().map(ErrorInfo::getExceptionAsString))); Review comment: Oh man, good catch. `ArchiveExecutionGraphTest` triggered a global failure which only set the `failureInfo` on the `ExecutionGraph` level. I switched to `SchedulerBase.updateTaskExecutionState` which also sets the `failureInfo` on an `Execution` level This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes
XComp commented on a change in pull request #14798: URL: https://github.com/apache/flink/pull/14798#discussion_r569551171 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -317,13 +316,11 @@ public TaskManagerLocation getAssignedResourceLocation() { : null; } -public Throwable getFailureCause() { -return failureCause; -} - @Override -public String getFailureCauseAsString() { -return ExceptionUtils.stringifyException(getFailureCause()); +public Optional getFailureInfo() { +return failureCause == null +? Optional.empty() +: Optional.of(new ErrorInfo(failureCause, getStateTimestamp(FAILED))); Review comment: Yes, it does. I actually had it like that in a first version of the code but changed it for the sake of consistency after realizing that it's not Serializable (which was a problem in `ArchivedExecution`). I will revert it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes
XComp commented on a change in pull request #14798: URL: https://github.com/apache/flink/pull/14798#discussion_r567422474 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ## @@ -870,6 +874,78 @@ public void allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(0)); } +@Test +public void testExceptionHistoryWithRestartableFailure() { +final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); +final JobID jobId = jobGraph.getJobID(); + +final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + +// initiate restartable failure +final ExecutionAttemptID restartableAttemptId = + Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices()) +.getCurrentExecutionAttempt() +.getAttemptId(); +final RuntimeException restartableException = new RuntimeException("restartable exception"); +Range updateStateTriggeringRestartTimeframe = +initiateFailure(scheduler, jobId, restartableAttemptId, restartableException); + +taskRestartExecutor.triggerNonPeriodicScheduledTask(); + +// initiate job failure +testRestartBackoffTimeStrategy.setCanRestart(false); + +final ExecutionAttemptID failingAttemptId = + Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices()) +.getCurrentExecutionAttempt() +.getAttemptId(); +final RuntimeException failingException = new RuntimeException("failing exception"); +Range updateStateTriggeringJobFailureTimeframe = +initiateFailure(scheduler, jobId, failingAttemptId, failingException); + +List actualExceptionHistory = scheduler.getExceptionHistory(); +assertThat(actualExceptionHistory.size(), is(2)); + +// assert restarted attempt +ErrorInfo restartableFailure = actualExceptionHistory.get(0); +assertThat( +restartableFailure +.getException() +.deserializeError(ClassLoader.getSystemClassLoader()), +is(restartableException)); +assertThat( +restartableFailure.getTimestamp(), + greaterThanOrEqualTo(updateStateTriggeringRestartTimeframe.lowerEndpoint())); +assertThat( +restartableFailure.getTimestamp(), + lessThanOrEqualTo(updateStateTriggeringRestartTimeframe.upperEndpoint())); + +// assert job failure attempt +ErrorInfo globalFailure = actualExceptionHistory.get(1); +Throwable actualException = + globalFailure.getException().deserializeError(ClassLoader.getSystemClassLoader()); +assertThat(actualException, org.hamcrest.core.IsInstanceOf.instanceOf(JobException.class)); +assertThat(actualException.getCause(), is(failingException)); +assertThat( +globalFailure.getTimestamp(), + greaterThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.lowerEndpoint())); +assertThat( +globalFailure.getTimestamp(), + lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint())); +} + +private static Range initiateFailure( +DefaultScheduler scheduler, +JobID jobId, +ExecutionAttemptID executionAttemptID, +Throwable exception) { +long start = System.currentTimeMillis(); Review comment: Correct me if I'm wrong here: Another way to measure the time would be `System.nanoTime()` since it ensures monotonicity. But using this in the test would require `Execution` to use `System.nanoTime()` as well which would not work as `System.nanoTime` cannot be used to compare timestamps between different JVMs [[1]](https://www.javaadvent.com/2019/12/measuring-time-from-java-to-kernel-and-back.html)?! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes
XComp commented on a change in pull request #14798: URL: https://github.com/apache/flink/pull/14798#discussion_r567421044 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java ## @@ -267,9 +267,16 @@ private void restartTasksWithDelay(final FailureHandlingResult failureHandlingRe delayExecutor.schedule( () -> FutureUtils.assertNoException( -cancelFuture.thenRunAsync( -restartTasks(executionVertexVersions, globalRecovery), -getMainThreadExecutor())), +cancelFuture +.thenRunAsync( +() -> + archiveFromFailureHandlingResult( + failureHandlingResult), +getMainThreadExecutor()) +.thenRunAsync( +restartTasks( + executionVertexVersions, globalRecovery), +getMainThreadExecutor())), Review comment: That's a good question. I did it when experimenting for convenience and didn't reiterate over it again. I refactored it now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes
XComp commented on a change in pull request #14798: URL: https://github.com/apache/flink/pull/14798#discussion_r567418341 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -641,6 +648,33 @@ public void cancel() { return executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn()); } +protected void archiveGlobalFailure(Throwable failure) { +Set executionAttemptIds = + IterableUtils.toStream(executionGraph.getAllExecutionVertices()) +.map(ExecutionVertex::getCurrentExecutionAttempt) +.map(Execution::getAttemptId) +.collect(Collectors.toSet()); + +globalTaskFailures.put( +executionAttemptIds, new ErrorInfo(failure, System.currentTimeMillis())); + +log.debug("Archive global {}: {}", failure, failure.getMessage()); +} + +protected void archiveFromFailureHandlingResult(FailureHandlingResult failureHandlingResult) { +if (failureHandlingResult.getExecutionVertexIdOfFailedTask() == null) { Review comment: I removed this check. This if was a relict from a previous code version. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes
XComp commented on a change in pull request #14798: URL: https://github.com/apache/flink/pull/14798#discussion_r567418118 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java ## @@ -870,6 +874,78 @@ public void allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(0)); } +@Test +public void testExceptionHistoryWithRestartableFailure() { +final JobGraph jobGraph = singleNonParallelJobVertexJobGraph(); +final JobID jobId = jobGraph.getJobID(); + +final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + +// initiate restartable failure +final ExecutionAttemptID restartableAttemptId = + Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices()) +.getCurrentExecutionAttempt() +.getAttemptId(); +final RuntimeException restartableException = new RuntimeException("restartable exception"); +Range updateStateTriggeringRestartTimeframe = +initiateFailure(scheduler, jobId, restartableAttemptId, restartableException); + +taskRestartExecutor.triggerNonPeriodicScheduledTask(); + +// initiate job failure +testRestartBackoffTimeStrategy.setCanRestart(false); + +final ExecutionAttemptID failingAttemptId = + Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices()) +.getCurrentExecutionAttempt() +.getAttemptId(); +final RuntimeException failingException = new RuntimeException("failing exception"); +Range updateStateTriggeringJobFailureTimeframe = +initiateFailure(scheduler, jobId, failingAttemptId, failingException); + +List actualExceptionHistory = scheduler.getExceptionHistory(); +assertThat(actualExceptionHistory.size(), is(2)); + +// assert restarted attempt +ErrorInfo restartableFailure = actualExceptionHistory.get(0); +assertThat( +restartableFailure +.getException() +.deserializeError(ClassLoader.getSystemClassLoader()), +is(restartableException)); +assertThat( +restartableFailure.getTimestamp(), + greaterThanOrEqualTo(updateStateTriggeringRestartTimeframe.lowerEndpoint())); +assertThat( +restartableFailure.getTimestamp(), + lessThanOrEqualTo(updateStateTriggeringRestartTimeframe.upperEndpoint())); + +// assert job failure attempt +ErrorInfo globalFailure = actualExceptionHistory.get(1); +Throwable actualException = + globalFailure.getException().deserializeError(ClassLoader.getSystemClassLoader()); +assertThat(actualException, org.hamcrest.core.IsInstanceOf.instanceOf(JobException.class)); +assertThat(actualException.getCause(), is(failingException)); Review comment: Ah, thanks for the hint. I didn't know about `FlinkMatchers`, yet. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes
XComp commented on a change in pull request #14798: URL: https://github.com/apache/flink/pull/14798#discussion_r567417737 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -164,6 +168,8 @@ private final Map coordinatorMap; private final ComponentMainThreadExecutor mainThreadExecutor; +private final Map localTaskFailures = new HashMap<>(); +private final Map, ErrorInfo> globalTaskFailures = new HashMap<>(); Review comment: You're right. That was still some leftover of my initial work considering multiple exceptions per failure cause. I'm gonna refactor it to use a List here in order to be more consistent. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] XComp commented on a change in pull request #14798: [FLINK-21187] Provide exception history for root causes
XComp commented on a change in pull request #14798: URL: https://github.com/apache/flink/pull/14798#discussion_r567417575 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java ## @@ -100,17 +116,22 @@ public long getRestartDelayMS() { } } +/** + * Returns the {@link ExecutionVertexID} of the task causing this failure. + * + * @return The {@code ExecutionVertexID} or {@code null} if it's a global failure. + */ +public ExecutionVertexID getExecutionVertexIdOfFailedTask() { Review comment: I switched to returning an `Optional` to make this more explicit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org