[jira] [Updated] (FLINK-11991) Set headers to use for CSV output
[ https://issues.apache.org/jira/browse/FLINK-11991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Nioche updated FLINK-11991: -- Description: As discussed in [https://stackoverflow.com/questions/54530755/flink-write-tuples-with-csv-header-into-file/54536586?noredirect=1#comment97248717_54536586], it would be nice to be able to specify headers to print out at the beginning of a CSV output. I've written a patch for this and will add submit it as a PR. was: As discussed in [https://stackoverflow.com/questions/54530755/flink-write-tuples-with-csv-header-into-file/54536586?noredirect=1#comment97248717_54536586|[http://stackoverflow.com],|http://stackoverflow.com]%2C/] it would be nice to be able to specify headers to print out at the beginning of a CSV output. I've written a patch for this and will add submit it as a PR. > Set headers to use for CSV output > - > > Key: FLINK-11991 > URL: https://issues.apache.org/jira/browse/FLINK-11991 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) >Reporter: Julien Nioche >Priority: Minor > Fix For: 1.9.0 > > > As discussed in > [https://stackoverflow.com/questions/54530755/flink-write-tuples-with-csv-header-into-file/54536586?noredirect=1#comment97248717_54536586], > it would be nice to be able to specify headers to print out at the beginning > of a CSV output. > I've written a patch for this and will add submit it as a PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11991) Set headers to use for CSV output
Julien Nioche created FLINK-11991: - Summary: Set headers to use for CSV output Key: FLINK-11991 URL: https://issues.apache.org/jira/browse/FLINK-11991 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Reporter: Julien Nioche Fix For: 1.9.0 As discussed in [https://stackoverflow.com/questions/54530755/flink-write-tuples-with-csv-header-into-file/54536586?noredirect=1#comment97248717_54536586|[http://stackoverflow.com],|http://stackoverflow.com]%2C/] it would be nice to be able to specify headers to print out at the beginning of a CSV output. I've written a patch for this and will add submit it as a PR. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11783) Deadlock during Join operation
[ https://issues.apache.org/jira/browse/FLINK-11783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782311#comment-16782311 ] Julien Nioche commented on FLINK-11783: --- Same issue with a different job on the same cluster, the problem occurs on the same node. This time during a reduce step. "GroupReduce (GroupReduce at (*GlobalStatsJob*.java:67)) (11/150)" #20948 prio=5 os_prio=0 tid=0x7faa7c027800 nid=0x57d0 in Object.wait() [0x7fa981df6000] java.lang.Thread.State: WAITING (on object monitor) -- "GroupReduce (GroupReduce at (*GlobalStatsJob*.java:67)) (66/150)" #20947 prio=5 os_prio=0 tid=0x7faa7c026000 nid=0x57cf in Object.wait() [0x7fa9815ee000] java.lang.Thread.State: WAITING (on object monitor) -- "GroupReduce (GroupReduce at (*GlobalStatsJob*.java:67)) (10/150)" #20943 prio=5 os_prio=0 tid=0x7faa7c024800 nid=0x57cb in Object.wait() [0x7fa980ae9000] java.lang.Thread.State: WAITING (on object monitor) -- "GroupReduce (GroupReduce at (*GlobalStatsJob*.java:67)) (65/150)" #20942 prio=5 os_prio=0 tid=0x7faa7c023800 nid=0x57ca in Object.wait() [0x7fa9804e3000] java.lang.Thread.State: WAITING (on object monitor) -- "GroupReduce (GroupReduce at (*GlobalStatsJob*.java:67)) (69/150)" #20941 prio=5 os_prio=0 tid=0x7faa7c022800 nid=0x57c9 in Object.wait() [0x7fa9807e6000] java.lang.Thread.State: WAITING (on object monitor) -- "GroupReduce (GroupReduce at (*GlobalStatsJob*.java:67)) (67/150)" #20940 prio=5 os_prio=0 tid=0x7faa7c021800 nid=0x57c8 in Object.wait() [0x7fa9d2bf] java.lang.Thread.State: WAITING (on object monitor) -- "GroupReduce (GroupReduce at (*GlobalStatsJob*.java:67)) (19/150)" #20939 prio=5 os_prio=0 tid=0x7faa7c020800 nid=0x57c7 in Object.wait() [0x7fa980dec000] java.lang.Thread.State: WAITING (on object monitor) -- "GroupReduce (GroupReduce at (*GlobalStatsJob*.java:67)) (20/150)" #20938 prio=5 os_prio=0 tid=0x7faa7c01f800 nid=0x57c6 in Object.wait() [0x7fa98c89b000] java.lang.Thread.State: WAITING (on object monitor) -- "GroupReduce (GroupReduce at (*GlobalStatsJob*.java:66)) (72/150)" #12276 prio=5 os_prio=0 tid=0x7faa0c02f000 nid=0x33a4 waiting on condition [0x7fa981ff8000] java.lang.Thread.State: TIMED_WAITING (parking) > Deadlock during Join operation > -- > > Key: FLINK-11783 > URL: https://issues.apache.org/jira/browse/FLINK-11783 > Project: Flink > Issue Type: Bug > Components: API / DataSet >Affects Versions: 1.7.2 >Reporter: Julien Nioche >Priority: Major > Attachments: flink_is_stuck.png > > > I am running a filtering job on a large dataset with Flink running in > distributed mode. Most tasks in the Join operation have completed a while ago > and only the tasks from a particular TaskManager are still running. These > tasks make progress but extremely slowly. > When logging onto the machine running this TM I can see that all threads are > TIMED_WAITING . > Could there be a synchronization problem? > See attachment for a screenshot of the Flink UI and the stack below. > > *{{$ jstack 9183 | grep -A 15 "DataSetFilterJob"}}* > {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at > (DataSetFilterJob.java:67)) (66/150)" #155 prio=5 os_prio=0 > tid=0x7faa5c01c000 nid=0x248c waiting on condition [0x7fa9d15d5000]}} > {{ java.lang.Thread.State: TIMED_WAITING (parking)}} > {{ at sun.misc.Unsafe.park(Native Method)}} > {{ - parking to wait for <0x0007bfa89578> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}} > {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)}} > {{ at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)}} > {{ at > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:98)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:43)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:228)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:158)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readByte(AbstractPagedInputView.java:271)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readUnsignedByte(AbstractPagedInputView.java:278)}} > {{ at org.apache.flink.types.StringValue.readString(StringValue.java:746)}} > {{ at >
[jira] [Updated] (FLINK-11783) Deadlock during Join operation
[ https://issues.apache.org/jira/browse/FLINK-11783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Nioche updated FLINK-11783: -- Attachment: (was: flink_is_stuck.png) > Deadlock during Join operation > -- > > Key: FLINK-11783 > URL: https://issues.apache.org/jira/browse/FLINK-11783 > Project: Flink > Issue Type: Bug >Affects Versions: 1.7.2 >Reporter: Julien Nioche >Priority: Major > > I am running a filtering job on a large dataset with Flink running in > distributed mode. Most tasks in the Join operation have completed a while ago > and only the tasks from a particular TaskManager are still running. These > tasks make progress but extremely slowly. > When logging onto the machine running this TM I can see that all threads are > TIMED_WAITING . > Could there be a synchronization problem? > See attachment for a screenshot of the Flink UI and the stack below. > > *{{$ jstack 9183 | grep -A 15 "DataSetFilterJob"}}* > {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at > (DataSetFilterJob.java:67)) (66/150)" #155 prio=5 os_prio=0 > tid=0x7faa5c01c000 nid=0x248c waiting on condition [0x7fa9d15d5000]}} > {{ java.lang.Thread.State: TIMED_WAITING (parking)}} > {{ at sun.misc.Unsafe.park(Native Method)}} > {{ - parking to wait for <0x0007bfa89578> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}} > {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)}} > {{ at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)}} > {{ at > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:98)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:43)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:228)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:158)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readByte(AbstractPagedInputView.java:271)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readUnsignedByte(AbstractPagedInputView.java:278)}} > {{ at org.apache.flink.types.StringValue.readString(StringValue.java:746)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:80)}} > {{--}} > {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at > (DataSetFilterJob.java:67)) (65/150)" #154 prio=5 os_prio=0 > tid=0x7faa5c01b000 nid=0x248b waiting on condition [0x7fa9d14d4000]}} > {{ java.lang.Thread.State: TIMED_WAITING (parking)}} > {{ at sun.misc.Unsafe.park(Native Method)}} > {{ - parking to wait for <0x0007b8e0eb50> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}} > {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)}} > {{ at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)}} > {{ at > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:98)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:43)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:228)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:158)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readByte(AbstractPagedInputView.java:271)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readUnsignedByte(AbstractPagedInputView.java:278)}} > {{ at org.apache.flink.types.StringValue.readString(StringValue.java:746)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:80)}} > {{--}} > {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at > (DataSetFilterJob.java:67)) (68/150)" #153 prio=5 os_prio=0 > tid=0x7faa5c019800 nid=0x248a waiting on condition [0x7fa981df6000]}} > {{ java.lang.Thread.State: TIMED_WAITING (parking)}} > {{ at sun.misc.Unsafe.park(Native Method)}}
[jira] [Updated] (FLINK-11783) Deadlock during Join operation
[ https://issues.apache.org/jira/browse/FLINK-11783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Nioche updated FLINK-11783: -- Attachment: flink_is_stuck.png > Deadlock during Join operation > -- > > Key: FLINK-11783 > URL: https://issues.apache.org/jira/browse/FLINK-11783 > Project: Flink > Issue Type: Bug >Affects Versions: 1.7.2 >Reporter: Julien Nioche >Priority: Major > Attachments: flink_is_stuck.png > > > I am running a filtering job on a large dataset with Flink running in > distributed mode. Most tasks in the Join operation have completed a while ago > and only the tasks from a particular TaskManager are still running. These > tasks make progress but extremely slowly. > When logging onto the machine running this TM I can see that all threads are > TIMED_WAITING . > Could there be a synchronization problem? > See attachment for a screenshot of the Flink UI and the stack below. > > *{{$ jstack 9183 | grep -A 15 "DataSetFilterJob"}}* > {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at > (DataSetFilterJob.java:67)) (66/150)" #155 prio=5 os_prio=0 > tid=0x7faa5c01c000 nid=0x248c waiting on condition [0x7fa9d15d5000]}} > {{ java.lang.Thread.State: TIMED_WAITING (parking)}} > {{ at sun.misc.Unsafe.park(Native Method)}} > {{ - parking to wait for <0x0007bfa89578> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}} > {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)}} > {{ at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)}} > {{ at > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:98)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:43)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:228)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:158)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readByte(AbstractPagedInputView.java:271)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readUnsignedByte(AbstractPagedInputView.java:278)}} > {{ at org.apache.flink.types.StringValue.readString(StringValue.java:746)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:80)}} > {{--}} > {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at > (DataSetFilterJob.java:67)) (65/150)" #154 prio=5 os_prio=0 > tid=0x7faa5c01b000 nid=0x248b waiting on condition [0x7fa9d14d4000]}} > {{ java.lang.Thread.State: TIMED_WAITING (parking)}} > {{ at sun.misc.Unsafe.park(Native Method)}} > {{ - parking to wait for <0x0007b8e0eb50> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}} > {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)}} > {{ at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)}} > {{ at > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:98)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:43)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:228)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:158)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readByte(AbstractPagedInputView.java:271)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readUnsignedByte(AbstractPagedInputView.java:278)}} > {{ at org.apache.flink.types.StringValue.readString(StringValue.java:746)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:80)}} > {{--}} > {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at > (DataSetFilterJob.java:67)) (68/150)" #153 prio=5 os_prio=0 > tid=0x7faa5c019800 nid=0x248a waiting on condition [0x7fa981df6000]}} > {{ java.lang.Thread.State: TIMED_WAITING (parking)}} > {{ at
[jira] [Updated] (FLINK-11783) Deadlock during Join operation
[ https://issues.apache.org/jira/browse/FLINK-11783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Nioche updated FLINK-11783: -- Attachment: flink_is_stuck.png > Deadlock during Join operation > -- > > Key: FLINK-11783 > URL: https://issues.apache.org/jira/browse/FLINK-11783 > Project: Flink > Issue Type: Bug >Affects Versions: 1.7.2 >Reporter: Julien Nioche >Priority: Major > > I am running a filtering job on a large dataset with Flink running in > distributed mode. Most tasks in the Join operation have completed a while ago > and only the tasks from a particular TaskManager are still running. These > tasks make progress but extremely slowly. > When logging onto the machine running this TM I can see that all threads are > TIMED_WAITING . > Could there be a synchronization problem? > See attachment for a screenshot of the Flink UI and the stack below. > > *{{$ jstack 9183 | grep -A 15 "DataSetFilterJob"}}* > {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at > (DataSetFilterJob.java:67)) (66/150)" #155 prio=5 os_prio=0 > tid=0x7faa5c01c000 nid=0x248c waiting on condition [0x7fa9d15d5000]}} > {{ java.lang.Thread.State: TIMED_WAITING (parking)}} > {{ at sun.misc.Unsafe.park(Native Method)}} > {{ - parking to wait for <0x0007bfa89578> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}} > {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)}} > {{ at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)}} > {{ at > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:98)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:43)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:228)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:158)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readByte(AbstractPagedInputView.java:271)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readUnsignedByte(AbstractPagedInputView.java:278)}} > {{ at org.apache.flink.types.StringValue.readString(StringValue.java:746)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:80)}} > {{--}} > {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at > (DataSetFilterJob.java:67)) (65/150)" #154 prio=5 os_prio=0 > tid=0x7faa5c01b000 nid=0x248b waiting on condition [0x7fa9d14d4000]}} > {{ java.lang.Thread.State: TIMED_WAITING (parking)}} > {{ at sun.misc.Unsafe.park(Native Method)}} > {{ - parking to wait for <0x0007b8e0eb50> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}} > {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)}} > {{ at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)}} > {{ at > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:98)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:43)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:228)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:158)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readByte(AbstractPagedInputView.java:271)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readUnsignedByte(AbstractPagedInputView.java:278)}} > {{ at org.apache.flink.types.StringValue.readString(StringValue.java:746)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:80)}} > {{--}} > {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at > (DataSetFilterJob.java:67)) (68/150)" #153 prio=5 os_prio=0 > tid=0x7faa5c019800 nid=0x248a waiting on condition [0x7fa981df6000]}} > {{ java.lang.Thread.State: TIMED_WAITING (parking)}} > {{ at sun.misc.Unsafe.park(Native Method)}} > {{ -
[jira] [Updated] (FLINK-11783) Deadlock during Join operation
[ https://issues.apache.org/jira/browse/FLINK-11783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Nioche updated FLINK-11783: -- Attachment: (was: flink_is_stuck.png) > Deadlock during Join operation > -- > > Key: FLINK-11783 > URL: https://issues.apache.org/jira/browse/FLINK-11783 > Project: Flink > Issue Type: Bug >Affects Versions: 1.7.2 >Reporter: Julien Nioche >Priority: Major > > I am running a filtering job on a large dataset with Flink running in > distributed mode. Most tasks in the Join operation have completed a while ago > and only the tasks from a particular TaskManager are still running. These > tasks make progress but extremely slowly. > When logging onto the machine running this TM I can see that all threads are > TIMED_WAITING . > Could there be a synchronization problem? > See attachment for a screenshot of the Flink UI and the stack below. > > *{{$ jstack 9183 | grep -A 15 "DataSetFilterJob"}}* > {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at > (DataSetFilterJob.java:67)) (66/150)" #155 prio=5 os_prio=0 > tid=0x7faa5c01c000 nid=0x248c waiting on condition [0x7fa9d15d5000]}} > {{ java.lang.Thread.State: TIMED_WAITING (parking)}} > {{ at sun.misc.Unsafe.park(Native Method)}} > {{ - parking to wait for <0x0007bfa89578> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}} > {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)}} > {{ at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)}} > {{ at > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:98)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:43)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:228)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:158)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readByte(AbstractPagedInputView.java:271)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readUnsignedByte(AbstractPagedInputView.java:278)}} > {{ at org.apache.flink.types.StringValue.readString(StringValue.java:746)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:80)}} > {{--}} > {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at > (DataSetFilterJob.java:67)) (65/150)" #154 prio=5 os_prio=0 > tid=0x7faa5c01b000 nid=0x248b waiting on condition [0x7fa9d14d4000]}} > {{ java.lang.Thread.State: TIMED_WAITING (parking)}} > {{ at sun.misc.Unsafe.park(Native Method)}} > {{ - parking to wait for <0x0007b8e0eb50> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}} > {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)}} > {{ at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)}} > {{ at > java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:98)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:43)}} > {{ at > org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:228)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:158)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readByte(AbstractPagedInputView.java:271)}} > {{ at > org.apache.flink.runtime.memory.AbstractPagedInputView.readUnsignedByte(AbstractPagedInputView.java:278)}} > {{ at org.apache.flink.types.StringValue.readString(StringValue.java:746)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)}} > {{ at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:80)}} > {{--}} > {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at > (DataSetFilterJob.java:67)) (68/150)" #153 prio=5 os_prio=0 > tid=0x7faa5c019800 nid=0x248a waiting on condition [0x7fa981df6000]}} > {{ java.lang.Thread.State: TIMED_WAITING (parking)}} > {{ at sun.misc.Unsafe.park(Native Method)}}
[jira] [Created] (FLINK-11783) Deadlock during Join operation
Julien Nioche created FLINK-11783: - Summary: Deadlock during Join operation Key: FLINK-11783 URL: https://issues.apache.org/jira/browse/FLINK-11783 Project: Flink Issue Type: Bug Affects Versions: 1.7.2 Reporter: Julien Nioche Attachments: flink_is_stuck.png I am running a filtering job on a large dataset with Flink running in distributed mode. Most tasks in the Join operation have completed a while ago and only the tasks from a particular TaskManager are still running. These tasks make progress but extremely slowly. When logging onto the machine running this TM I can see that all threads are TIMED_WAITING . Could there be a synchronization problem? See attachment for a screenshot of the Flink UI and the stack below. *{{$ jstack 9183 | grep -A 15 "DataSetFilterJob"}}* {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at (DataSetFilterJob.java:67)) (66/150)" #155 prio=5 os_prio=0 tid=0x7faa5c01c000 nid=0x248c waiting on condition [0x7fa9d15d5000]}} {{ java.lang.Thread.State: TIMED_WAITING (parking)}} {{ at sun.misc.Unsafe.park(Native Method)}} {{ - parking to wait for <0x0007bfa89578> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}} {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)}} {{ at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)}} {{ at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)}} {{ at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:98)}} {{ at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:43)}} {{ at org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:228)}} {{ at org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:158)}} {{ at org.apache.flink.runtime.memory.AbstractPagedInputView.readByte(AbstractPagedInputView.java:271)}} {{ at org.apache.flink.runtime.memory.AbstractPagedInputView.readUnsignedByte(AbstractPagedInputView.java:278)}} {{ at org.apache.flink.types.StringValue.readString(StringValue.java:746)}} {{ at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)}} {{ at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:80)}} {{--}} {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at (DataSetFilterJob.java:67)) (65/150)" #154 prio=5 os_prio=0 tid=0x7faa5c01b000 nid=0x248b waiting on condition [0x7fa9d14d4000]}} {{ java.lang.Thread.State: TIMED_WAITING (parking)}} {{ at sun.misc.Unsafe.park(Native Method)}} {{ - parking to wait for <0x0007b8e0eb50> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}} {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)}} {{ at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)}} {{ at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)}} {{ at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:98)}} {{ at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.getNextReturnedBlock(AsynchronousBlockReader.java:43)}} {{ at org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:228)}} {{ at org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:158)}} {{ at org.apache.flink.runtime.memory.AbstractPagedInputView.readByte(AbstractPagedInputView.java:271)}} {{ at org.apache.flink.runtime.memory.AbstractPagedInputView.readUnsignedByte(AbstractPagedInputView.java:278)}} {{ at org.apache.flink.types.StringValue.readString(StringValue.java:746)}} {{ at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)}} {{ at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:80)}} {{--}} {{"CHAIN Join (Join at with(JoinOperator.java:543)) -> Map (Map at (DataSetFilterJob.java:67)) (68/150)" #153 prio=5 os_prio=0 tid=0x7faa5c019800 nid=0x248a waiting on condition [0x7fa981df6000]}} {{ java.lang.Thread.State: TIMED_WAITING (parking)}} {{ at sun.misc.Unsafe.park(Native Method)}} {{ - parking to wait for <0x000774903a00> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}} {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)}} {{ at