[ https://issues.apache.org/jira/browse/FLINK-4803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ufuk Celebi closed FLINK-4803. ------------------------------ Resolution: Duplicate > Job Cancel can hang forever waiting for OutputFormat.close() > ------------------------------------------------------------ > > Key: FLINK-4803 > URL: https://issues.apache.org/jira/browse/FLINK-4803 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats > Affects Versions: 1.1.1 > Reporter: Shannon Carey > > If the Flink job uses a badly-behaved OutputFormat (in this example, a > HadoopOutputFormat containing a CqlBulkOutputFormat), where the close() > method blocks forever, it is impossible to cancel the Flink job even though > the blocked thread would respond to an interrupt. The stack traces below show > the state of the important threads when a job is canceled and the > OutputFormat is blocking forever inside of close(). > I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on > `this.format.close()`. When the timeout is reached, the Task thread should be > interrupted. > {code} > "Canceler for DataSink > (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" > #6422 daemon prio=5 os_prio=0 tid=0x00007fb7e42f0000 nid=0x34f3 waiting for > monitor entry [0x00007fb7be079000] > java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158) > - waiting to lock <0x00000006bae5f788> (a java.lang.Object) > at > org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268) > at > org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149) > at java.lang.Thread.run(Thread.java:745) > "DataSink > (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" > #6410 daemon prio=5 os_prio=0 tid=0x00007fb7e79a4800 nid=0x2ad8 waiting on > condition [0x00007fb7bdf78000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000006c5ab5e20> (a > java.util.concurrent.SynchronousQueue$TransferStack) > at > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > at > java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460) > at > java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362) > at > java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895) > at > org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.put(SSTableSimpleUnsortedWriter.java:194) > at > org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.sync(SSTableSimpleUnsortedWriter.java:180) > at > org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.close(SSTableSimpleUnsortedWriter.java:156) > at > org.apache.cassandra.io.sstable.CQLSSTableWriter.close(CQLSSTableWriter.java:275) > at > org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:133) > at > org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:126) > at > org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158) > - locked <0x00000006bae5f788> (a java.lang.Object) > at > org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:234) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)