[jira] [Updated] (FLINK-32754) Using SplitEnumeratorContext.metricGroup() in restoreEnumerator causes NPE

2023-08-04 Thread Yu Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Chen updated FLINK-32754:

Description: 
We registered some metrics in the `enumerator` of the flip-27 source via 
`SplitEnumerator.metricGroup()`, but found that the task prints NPE logs in JM 
when restoring, suggesting that `SplitEnumerator. metricGroup()` is null.
{*}Meanwhile, the task does not experience failover, and the Checkpoints cannot 
be successfully created even after the task is in running state{*}.

We found that the implementation class of `SplitEnumerator` is 
`LazyInitializedCoordinatorContext`, however, the metricGroup() is initialized 
after calling lazyInitialize(). By reviewing the code, we found that at the 
time of SourceCoordinator.resetToCheckpoint(), lazyInitialize() has not been 
called yet, so NPE is thrown.

*Q: Why does this bug prevent the task from creating the Checkpoint?*
`SourceCoordinator.resetToCheckpoint()` throws an NPE which results in the 
member variable `enumerator` in `SourceCoordinator` being null. Unfortunately, 
all Checkpoint-related calls in `SourceCoordinator` are called via 
`runInEventLoop()`.
In `runInEventLoop()`, if the enumerator is null, it will return directly.

*Q: Why this bug doesn't trigger a task failover?*
In `RecreateOnResetOperatorCoordinator.resetAndStart()`, if 
`internalCoordinator.resetToCheckpoint` throws an exception, then it will catch 
the exception and call `cleanAndFailJob ` to try to fail the job.
However, `globalFailureHandler` is also initialized in `lazyInitialize()`, 
while `schedulerExecutor.execute` will ignore the NPE triggered by 
`globalFailureHandler.handleGlobalFailure(e)`.
Thus it appears that the task did not failover.
!image-2023-08-04-18-28-05-897.png|width=963,height=443!

  was:
We registered some metrics in the `enumerator` of the flip-27 source via 
`SplitEnumerator.metricGroup()`, but found that the task prints NPE logs in JM 
when restoring, suggesting that `SplitEnumerator. metricGroup()` is null.
Meanwhile, the task does not experience failover, and the Checkpoints cannot be 
successfully created even after the task is in running state.

We found that the implementation class of `SplitEnumerator` is 
`LazyInitializedCoordinatorContext`, however, the metricGroup() is initialized 
after calling lazyInitialize(). By reviewing the code, we found that at the 
time of SourceCoordinator.resetToCheckpoint(), lazyInitialize() has not been 
called yet, so NPE is thrown.

Q: Why does this bug prevent the task from creating the Checkpoint?
`SourceCoordinator.resetToCheckpoint()` throws an NPE which results in the 
member variable `enumerator` in `SourceCoordinator` being null. Unfortunately, 
all Checkpoint-related calls in `SourceCoordinator` are called via 
`runInEventLoop()`.
In `runInEventLoop()`, if the enumerator is null, it will return directly.

Q: Why this bug doesn't trigger a task failover?
In `RecreateOnResetOperatorCoordinator.resetAndStart()`, if 
`internalCoordinator.resetToCheckpoint` throws an exception, then it will catch 
the exception and call `cleanAndFailJob ` to try to fail the job.
However, `globalFailureHandler` is also initialized in `lazyInitialize()`, 
while `schedulerExecutor.execute` will ignore the NPE triggered by 
`globalFailureHandler.handleGlobalFailure(e)`.
Thus it appears that the task did not failover.
!image-2023-08-04-18-28-05-897.png|width=963,height=443!


> Using SplitEnumeratorContext.metricGroup() in restoreEnumerator causes NPE
> --
>
> Key: FLINK-32754
> URL: https://issues.apache.org/jira/browse/FLINK-32754
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.0, 1.17.1
>Reporter: Yu Chen
>Priority: Major
> Attachments: image-2023-08-04-18-28-05-897.png
>
>
> We registered some metrics in the `enumerator` of the flip-27 source via 
> `SplitEnumerator.metricGroup()`, but found that the task prints NPE logs in 
> JM when restoring, suggesting that `SplitEnumerator. metricGroup()` is null.
> {*}Meanwhile, the task does not experience failover, and the Checkpoints 
> cannot be successfully created even after the task is in running state{*}.
> We found that the implementation class of `SplitEnumerator` is 
> `LazyInitializedCoordinatorContext`, however, the metricGroup() is 
> initialized after calling lazyInitialize(). By reviewing the code, we found 
> that at the time of SourceCoordinator.resetToCheckpoint(), lazyInitialize() 
> has not been called yet, so NPE is thrown.
> *Q: Why does this bug prevent the task from creating the Checkpoint?*
> `SourceCoordinator.resetToCheckpoint()` throws an NPE which results in the 
> member variable `enumerator` in `SourceCoordinator` being null. 
> Unfortunately, 

[jira] [Updated] (FLINK-32754) Using SplitEnumeratorContext.metricGroup() in restoreEnumerator causes NPE

2023-08-04 Thread Yu Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Chen updated FLINK-32754:

Description: 
We registered some metrics in the `enumerator` of the flip-27 source via 
`SplitEnumerator.metricGroup()`, but found that the task prints NPE logs in JM 
when restoring, suggesting that `SplitEnumerator. metricGroup()` is null.
Meanwhile, the task does not experience failover, and the Checkpoints cannot be 
successfully created even after the task is in running state.

We found that the implementation class of `SplitEnumerator` is 
`LazyInitializedCoordinatorContext`, however, the metricGroup() is initialized 
after calling lazyInitialize(). By reviewing the code, we found that at the 
time of SourceCoordinator.resetToCheckpoint(), lazyInitialize() has not been 
called yet, so NPE is thrown.

Q: Why does this bug prevent the task from creating the Checkpoint?
`SourceCoordinator.resetToCheckpoint()` throws an NPE which results in the 
member variable `enumerator` in `SourceCoordinator` being null. Unfortunately, 
all Checkpoint-related calls in `SourceCoordinator` are called via 
`runInEventLoop()`.
In `runInEventLoop()`, if the enumerator is null, it will return directly.

Q: Why this bug doesn't trigger a task failover?
In `RecreateOnResetOperatorCoordinator.resetAndStart()`, if 
`internalCoordinator.resetToCheckpoint` throws an exception, then it will catch 
the exception and call `cleanAndFailJob ` to try to fail the job.
However, `globalFailureHandler` is also initialized in `lazyInitialize()`, 
while `schedulerExecutor.execute` will ignore the NPE triggered by 
`globalFailureHandler.handleGlobalFailure(e)`.
Thus it appears that the task did not failover.
!image-2023-08-04-18-28-05-897.png|width=963,height=443!

  was:
We registered some metrics in the `enumerator` of the flip-27 source via 
`SplitEnumerator.metricGroup()`, but found that the task prints NPE logs in JM 
when restoring, suggesting that `SplitEnumerator. metricGroup()` is null.
Meanwhile, the task does not experience failover, and the Checkpoints cannot be 
successfully created even after the task is in running state.

We found that the implementation class of `SplitEnumerator` is 
`LazyInitializedCoordinatorContext`, however, the metricGroup() is initialized 
after calling lazyInitialize(). By reviewing the code, we found that at the 
time of SourceCoordinator.resetToCheckpoint(), lazyInitialize() has not been 
called yet, so NPE is thrown.


Q: Why does this bug prevent the task from creating the Checkpoint?
`SourceCoordinator.resetToCheckpoint()` throws an NPE which results in the 
member variable `enumerator` in `SourceCoordinator` being null. Unfortunately, 
all Checkpoint-related calls in `SourceCoordinator` are called via 
`runInEventLoop()`.
In `runInEventLoop()`, if the enumerator is null, it will return directly.

Q: Why this bug doesn't trigger a task failover?
In `RecreateOnResetOperatorCoordinator.resetAndStart()`, if 
`internalCoordinator.resetToCheckpoint` throws an exception, then it will catch 
the exception and call `cleanAndFailJob ` to try to fail the job.
However, `globalFailureHandler` is also initialized in `lazyInitialize()`, 
while `schedulerExecutor.execute` will ignore the NPE triggered by 
`globalFailureHandler.handleGlobalFailure(e)`.
Thus it appears that the task did not failover.
!image-2023-08-04-18-28-05-897.png|width=2442,height=1123!


> Using SplitEnumeratorContext.metricGroup() in restoreEnumerator causes NPE
> --
>
> Key: FLINK-32754
> URL: https://issues.apache.org/jira/browse/FLINK-32754
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.17.0, 1.17.1
>Reporter: Yu Chen
>Priority: Major
> Attachments: image-2023-08-04-18-28-05-897.png
>
>
> We registered some metrics in the `enumerator` of the flip-27 source via 
> `SplitEnumerator.metricGroup()`, but found that the task prints NPE logs in 
> JM when restoring, suggesting that `SplitEnumerator. metricGroup()` is null.
> Meanwhile, the task does not experience failover, and the Checkpoints cannot 
> be successfully created even after the task is in running state.
> We found that the implementation class of `SplitEnumerator` is 
> `LazyInitializedCoordinatorContext`, however, the metricGroup() is 
> initialized after calling lazyInitialize(). By reviewing the code, we found 
> that at the time of SourceCoordinator.resetToCheckpoint(), lazyInitialize() 
> has not been called yet, so NPE is thrown.
> Q: Why does this bug prevent the task from creating the Checkpoint?
> `SourceCoordinator.resetToCheckpoint()` throws an NPE which results in the 
> member variable `enumerator` in `SourceCoordinator` being null. 
> Unfortunately, all