Hi, I developed a single Flink job that read a huge amount of files and after some simple preprocessing, sink them into the database. I use the built-in JDBCOutputFormat for inserting records into the database. The problem is when I cancel the job using either the WebUI or the command line, the job did not cancel completely and finally, the taskmanager process crashes! Here are the taskmanager logs (generated continuously for some seconds):
2020-02-15 01:17:17,208 WARN > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator > - The reader is stuck in method: > java.lang.Object.wait(Native Method) > org.postgresql.jdbc.PgStatement.killTimerTask(PgStatement.java:999) > org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:856) > > org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1546) > > org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216) > > org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:210) > > org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:41) > > org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:86) > > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325) > 2020-02-15 01:17:17,224 INFO > akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting > shut down. > 2020-02-15 01:17:17,225 INFO > akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting > shut down. I'm using the Flink: 1.7.2, java: Java(TM) SE Runtime Environment (build 1.8.0_91-b14) Any help will be appreciated. All the best, Soheil