Hi, folks

I’m running Flink application that use HybridSource, patched with fixes 
FLINK-27479 and FLINK-27529

This application use HybridSource and presto plugin to read from a few 
thousands s3 directories, and then switch to reading from Kafka.

Reading from s3 could cause intermittent errors, that usually are fixed after 
retrying, but there is a problem, when Flink try to recover from this failure 
and restart from checkpoint:
java.lang.NullPointerException: Source for index=0 not available
     at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
     at 
org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36)

Complete scenario:


  1.  CheckpointCoordinator - Completed checkpoint 14 for job 
00000000000000000000000000000000
  2.  HybridSource successfully completed processing a few SourceFactories, 
that reads from s3
  3.  Next SourceFactory try to read contents of s3 dir, and it cause an error 
Unable to execute HTTP request: Read timed out
  4.  CheckpointCoordinator - Restoring job 00000000000000000000000000000000 
from Checkpoint 14
  5.  HybridSourceSplitEnumerator - Restoring enumerator for sourceIndex=47
  6.  This restoring fail, because of NullPointerException: in 
HybridSourceSplitEnumerator.close
  7.  Again, CheckpointCoordinator trying to - Restoring job 
00000000000000000000000000000000 from Checkpoint 14
  8.  It causes

2022/08/02 22:26:52.469 ERROR o.a.f.r.s.c.SourceCoordinator - Uncaught 
exception in the SplitEnumerator for Source Source: hybrid-source while 
handling operator event 
SourceEventWrapper[SourceReaderFinishedEvent{sourceIndex=-1}] from subtask 10. 
Triggering job failover.

java.lang.NullPointerException: Source for index=0 not available

     at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)

     at 
org.apache.flink.connector.base.source.hybridspp.SwitchedSources.sourceOf(SwitchedSources.java:36)

     at 
org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.sendSwitchSourceEvent(HybridSourceSplitEnumerator.java:149)

     at 
org.apache.flink.connector.base.source.hybridspp.HybridSourceSplitEnumerator.handleSourceEvent(HybridSourceSplitEnumerator.java:223)

     at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$handleEventFromOperator$1(SourceCoordinator.java:182)

     at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$8(SourceCoordinator.java:344)

     at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

     at java.lang.Thread.run(Thread.java:750)

10.  And this pattern continues forever: Flink try restoring from checkpoint, 
but it fails, because of NullPointerException: Source for index=0 not available


Any idea, what could be the cause of the problem? Could some experts in 
HybridSource look at the issue?

I have attached extract of JobMgr log, that contains related information, I can 
send complete log, but its size is a few M.

The problem is reproducible, after a few hours run in my environment.

And I think we need Jira for this issue, could someone, please, create it?

Attachment: bf-29-JM-err-analysis.log
Description: bf-29-JM-err-analysis.log

Reply via email to