[jira] [Commented] (FLINK-4803) Job Cancel can hang forever waiting for OutputFormat.close()

2017-02-21 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875892#comment-15875892
 ] 

Stephan Ewen commented on FLINK-4803:
-

This has actually been fixed with the introduction of the cancellation watchdog.
[~uce] implemented that.

> Job Cancel can hang forever waiting for OutputFormat.close()
> 
>
> Key: FLINK-4803
> URL: https://issues.apache.org/jira/browse/FLINK-4803
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.1.1
>Reporter: Shannon Carey
>
> If the Flink job uses a badly-behaved OutputFormat (in this example, a 
> HadoopOutputFormat containing a CqlBulkOutputFormat), where the close() 
> method blocks forever, it is impossible to cancel the Flink job even though 
> the blocked thread would respond to an interrupt. The stack traces below show 
> the state of the important threads when a job is canceled and the 
> OutputFormat is blocking forever inside of close().
> I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on 
> `this.format.close()`. When the timeout is reached, the Task thread should be 
> interrupted.
> {code}
> "Canceler for DataSink 
> (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" 
> #6422 daemon prio=5 os_prio=0 tid=0x7fb7e42f nid=0x34f3 waiting for 
> monitor entry [0x7fb7be079000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
> - waiting to lock <0x0006bae5f788> (a java.lang.Object)
> at 
> org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268)
> at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149)
> at java.lang.Thread.run(Thread.java:745)
> "DataSink 
> (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" 
> #6410 daemon prio=5 os_prio=0 tid=0x7fb7e79a4800 nid=0x2ad8 waiting on 
> condition [0x7fb7bdf78000]
>java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x0006c5ab5e20> (a 
> java.util.concurrent.SynchronousQueue$TransferStack)
> at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at 
> java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
> at 
> java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
> at 
> java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895)
> at 
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.put(SSTableSimpleUnsortedWriter.java:194)
> at 
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.sync(SSTableSimpleUnsortedWriter.java:180)
> at 
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.close(SSTableSimpleUnsortedWriter.java:156)
> at 
> org.apache.cassandra.io.sstable.CQLSSTableWriter.close(CQLSSTableWriter.java:275)
> at 
> org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:133)
> at 
> org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:126)
> at 
> org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
> - locked <0x0006bae5f788> (a java.lang.Object)
> at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:234)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4803) Job Cancel can hang forever waiting for OutputFormat.close()

2016-10-12 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15569025#comment-15569025
 ] 

Shannon Carey commented on FLINK-4803:
--

Yes, that's right. cancel() blocks on close(), and therefore if close() 
misbehaves the thread is never interrupted and cancel() blocks forever.

In the issue description, I suggested your option #2. I think you'll want #1 no 
matter what. However, #2 allows for at least one message and/or exception to be 
logged that tells the user what went wrong (why their job is taking a long time 
to cancel, or why it did not cancel gracefully). I'm not sure what your 
DataSink-specific option would look like. Maybe it is similar to my workaround, 
where I wrapped my HadoopOutputFormat in a subclass that calls super.close() 
from a separate thread with a timeout? That workaround is ok, but I had to 
expend a fair amount of effort to figure out what the problem was, and also 
there was nothing I could do but restart Flink in order to get my job to 
terminate (not a desirable solution). You'll want Flink to function smoothly 
regardless of what data sink the user chooses.

> Job Cancel can hang forever waiting for OutputFormat.close()
> 
>
> Key: FLINK-4803
> URL: https://issues.apache.org/jira/browse/FLINK-4803
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.1
>Reporter: Shannon Carey
>
> If the Flink job uses a badly-behaved OutputFormat (in this example, a 
> HadoopOutputFormat containing a CqlBulkOutputFormat), where the close() 
> method blocks forever, it is impossible to cancel the Flink job even though 
> the blocked thread would respond to an interrupt. The stack traces below show 
> the state of the important threads when a job is canceled and the 
> OutputFormat is blocking forever inside of close().
> I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on 
> `this.format.close()`. When the timeout is reached, the Task thread should be 
> interrupted.
> {code}
> "Canceler for DataSink 
> (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" 
> #6422 daemon prio=5 os_prio=0 tid=0x7fb7e42f nid=0x34f3 waiting for 
> monitor entry [0x7fb7be079000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
> - waiting to lock <0x0006bae5f788> (a java.lang.Object)
> at 
> org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268)
> at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149)
> at java.lang.Thread.run(Thread.java:745)
> "DataSink 
> (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" 
> #6410 daemon prio=5 os_prio=0 tid=0x7fb7e79a4800 nid=0x2ad8 waiting on 
> condition [0x7fb7bdf78000]
>java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x0006c5ab5e20> (a 
> java.util.concurrent.SynchronousQueue$TransferStack)
> at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at 
> java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
> at 
> java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
> at 
> java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895)
> at 
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.put(SSTableSimpleUnsortedWriter.java:194)
> at 
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.sync(SSTableSimpleUnsortedWriter.java:180)
> at 
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.close(SSTableSimpleUnsortedWriter.java:156)
> at 
> org.apache.cassandra.io.sstable.CQLSSTableWriter.close(CQLSSTableWriter.java:275)
> at 
> org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:133)
> at 
> org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:126)
> at 
> org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
> - locked <0x0006bae5f788> (a java.lang.Object)
> at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:234)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4803) Job Cancel can hang forever waiting for OutputFormat.close()

2016-10-12 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15568076#comment-15568076
 ] 

Stephan Ewen commented on FLINK-4803:
-

The task thread is actually interrupted. Immediately, after the call to 
"cancel()" and then periodically, if it does not exit.

The problem here seems to be that "cancel()" cannot proceed (after which the 
interruption would come) because in the HadoopOutputFormatBase, it is blocked 
on a lock held by the regular "close()" call.

I think there are two ways to address that:
  - Generic (1) - [FLINK-4715] - the TaskManagers should exit themselves (and 
rely on Yarn / Mesos / container service) to be restarted. This is the hard 
fallback that should catch all problems with respect to cancellation.
  - Generic (2) Let the cancel() call and the interrupt() calls come from two 
different threads (or add a watchdog that calls "interrupt()" if "cancel()" 
blocks for too long)
  - DataSink specific - attempt to call close() on cancellation, but do not 
block on locks and rather throw an exception.

What is your take on each of these approaches?

> Job Cancel can hang forever waiting for OutputFormat.close()
> 
>
> Key: FLINK-4803
> URL: https://issues.apache.org/jira/browse/FLINK-4803
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.1
>Reporter: Shannon Carey
>
> If the Flink job uses a badly-behaved OutputFormat (in this example, a 
> HadoopOutputFormat containing a CqlBulkOutputFormat), where the close() 
> method blocks forever, it is impossible to cancel the Flink job even though 
> the blocked thread would respond to an interrupt. The stack traces below show 
> the state of the important threads when a job is canceled and the 
> OutputFormat is blocking forever inside of close().
> I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on 
> `this.format.close()`. When the timeout is reached, the Task thread should be 
> interrupted.
> {code}
> "Canceler for DataSink 
> (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" 
> #6422 daemon prio=5 os_prio=0 tid=0x7fb7e42f nid=0x34f3 waiting for 
> monitor entry [0x7fb7be079000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
> - waiting to lock <0x0006bae5f788> (a java.lang.Object)
> at 
> org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268)
> at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149)
> at java.lang.Thread.run(Thread.java:745)
> "DataSink 
> (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" 
> #6410 daemon prio=5 os_prio=0 tid=0x7fb7e79a4800 nid=0x2ad8 waiting on 
> condition [0x7fb7bdf78000]
>java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x0006c5ab5e20> (a 
> java.util.concurrent.SynchronousQueue$TransferStack)
> at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at 
> java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
> at 
> java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
> at 
> java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895)
> at 
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.put(SSTableSimpleUnsortedWriter.java:194)
> at 
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.sync(SSTableSimpleUnsortedWriter.java:180)
> at 
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.close(SSTableSimpleUnsortedWriter.java:156)
> at 
> org.apache.cassandra.io.sstable.CQLSSTableWriter.close(CQLSSTableWriter.java:275)
> at 
> org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:133)
> at 
> org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:126)
> at 
> org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
> - locked <0x0006bae5f788> (a java.lang.Object)
> at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:234)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)