[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17341570#comment-17341570 ] Arvid Heise commented on FLINK-17170: - Merged into master as 442dc76bc76b9a7628202b33b21126ef3d48a90c..49d9092b31d0a95d727b790ee04a4ed8d72924e3. > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0, 1.11.3, 1.12.2 >Reporter: Vasii Cosmin Radu >Assignee: Arvid Heise >Priority: Critical > Labels: pull-request-available, usability > Attachments: Screenshot 2020-04-15 at 18.16.26.png, threaddump_tm1 > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the status id is still "IN_PROGRESS". > > When performing a cancel instead of stop, it works. But cancel is deprecated, > so I am a bit concerned that this might fail also, maybe I was just lucky. > > I attached a screenshot with what the UI is showing when this happens > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17340890#comment-17340890 ] Arvid Heise commented on FLINK-17170: - Merged into 1.13 as a102549f08759e177074dad286fef2f56176d005..282f9a3d5505a5aa58d7d9cca466939610d41ed3. > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0, 1.11.3, 1.12.2 >Reporter: Vasii Cosmin Radu >Assignee: Arvid Heise >Priority: Critical > Labels: pull-request-available, usability > Attachments: Screenshot 2020-04-15 at 18.16.26.png, threaddump_tm1 > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the status id is still "IN_PROGRESS". > > When performing a cancel instead of stop, it works. But cancel is deprecated, > so I am a bit concerned that this might fail also, maybe I was just lucky. > > I attached a screenshot with what the UI is showing when this happens > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17340239#comment-17340239 ] Arvid Heise commented on FLINK-17170: - Merged into 1.12 as 1de3d6345a4132dda4b31f66275fb732afe3ef30. > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0, 1.11.3, 1.12.2 >Reporter: Vasii Cosmin Radu >Assignee: Arvid Heise >Priority: Critical > Labels: pull-request-available, usability > Attachments: Screenshot 2020-04-15 at 18.16.26.png, threaddump_tm1 > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the status id is still "IN_PROGRESS". > > When performing a cancel instead of stop, it works. But cancel is deprecated, > so I am a bit concerned that this might fail also, maybe I was just lucky. > > I attached a screenshot with what the UI is showing when this happens > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17337299#comment-17337299 ] Piotr Nowojski commented on FLINK-17170: I also think sources shouldn't be doing blocking waits on such conditions inside {{cancel()}}. Basically what [~dannycranmer] suggested makes sense. > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0, 1.11.3, 1.12.2 >Reporter: Vasii Cosmin Radu >Priority: Critical > Labels: stale-critical, usability > Attachments: Screenshot 2020-04-15 at 18.16.26.png, threaddump_tm1 > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the status id is still "IN_PROGRESS". > > When performing a cancel instead of stop, it works. But cancel is deprecated, > so I am a bit concerned that this might fail also, maybe I was just lucky. > > I attached a screenshot with what the UI is showing when this happens > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17337132#comment-17337132 ] Arvid Heise commented on FLINK-17170: - Hi [~dannycranmer] , why do we need to have this{{ fetcher.awaitTermination()}} in cancel/close in the first place? Wouldn't it suffice to just rely on the await at the end of {{FlinkKinesisConsumer#run}}? As far as I can see, the {{cancel}} is shutting down the fetcher, so it should return in a graceful manner in {{run}}. Going further, {{runFetcher}} is already invoking {{awaitTermination}} on its own, so we could clean this up even further. > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0, 1.11.3, 1.12.2 >Reporter: Vasii Cosmin Radu >Priority: Critical > Labels: stale-critical, usability > Attachments: Screenshot 2020-04-15 at 18.16.26.png, threaddump_tm1 > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the status id is still "IN_PROGRESS". > > When performing a cancel instead of stop, it works. But cancel is deprecated, > so I am a bit concerned that this might fail also, maybe I was just lucky. > > I attached a screenshot with what the UI is showing when this happens > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17335794#comment-17335794 ] Danny Cranmer commented on FLINK-17170: --- I have taken a read through the issue/code and agree with the diagnosed deadlock. I agree that removing {{fetcher.awaitTermination()}} from {{FlinkKinesisConsumer::cancel}} will fix the deadlock. However this would potentially result in the job transitioning to finished with open resources. The expectation would be that the resources will terminate, but this could still result in temporary leak/dangling objects on the TM. I believe we can fix this by moving the {{fetcher.awaitTermination()}} to the [close()|https://github.com/apache/flink/blob/9ed5f1b4b5e1c8a9f9421b91ac00960b63916ebd/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L373] method. I have taken a quick dive and it looks like this would be called outside of the checkpoint lock. However I am not 100% sure this would be ok for all Source lifecycle possibilities. Does anyone have any reason to believe this is not a good idea? > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0, 1.11.3, 1.12.2 >Reporter: Vasii Cosmin Radu >Priority: Critical > Labels: stale-critical, usability > Attachments: Screenshot 2020-04-15 at 18.16.26.png, threaddump_tm1 > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17329045#comment-17329045 ] Flink Jira Bot commented on FLINK-17170: This critical issue is unassigned and itself and all of its Sub-Tasks have not been updated for 7 days. So, it has been labeled "stale-critical". If this ticket is indeed critical, please either assign yourself or give an update. Afterwards, please remove the label. In 7 days the issue will be deprioritized. > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0, 1.11.3, 1.12.2 >Reporter: Vasii Cosmin Radu >Priority: Critical > Labels: stale-critical, usability > Attachments: Screenshot 2020-04-15 at 18.16.26.png, threaddump_tm1 > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the status id is still "IN_PROGRESS". > > When performing a cancel instead of stop, it works. But cancel is deprecated, > so I am a bit concerned that this might fail also, maybe I was just lucky. > > I attached a screenshot with what the UI is showing when this happens > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17281492#comment-17281492 ] Kezhu Wang commented on FLINK-17170: {quote} In stop-with-savepoint path, {{FlinkKinesisConsumer.cancel}} is called with {{checkpointLock}} hold. {quote} This holds after 1.10, so this issue should exist in all versions after 1.10. > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0 >Reporter: Vasii Cosmin Radu >Priority: Critical > Labels: usability > Attachments: Screenshot 2020-04-15 at 18.16.26.png, threaddump_tm1 > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the status id is still "IN_PROGRESS". > > When performing a cancel instead of stop, it works. But cancel is deprecated, > so I am a bit concerned that this might fail also, maybe I was just lucky. > > I attached a screenshot with what the UI is showing when this happens > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17281491#comment-17281491 ] Kezhu Wang commented on FLINK-17170: I think it is problem of {{FlinkKinesisConsumer.cancel}}, it should not await fetcher to finished, it should do only signalling. [~qinjunjerry] is correct about the deadlock. In stop-with-savepoint path, {{FlinkKinesisConsumer.cancel}} is called with {{checkpointLock}} hold. > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0 >Reporter: Vasii Cosmin Radu >Priority: Critical > Labels: usability > Attachments: Screenshot 2020-04-15 at 18.16.26.png, threaddump_tm1 > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the status id is still "IN_PROGRESS". > > When performing a cancel instead of stop, it works. But cancel is deprecated, > so I am a bit concerned that this might fail also, maybe I was just lucky. > > I attached a screenshot with what the UI is showing when this happens > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17281338#comment-17281338 ] Matthias commented on FLINK-17170: -- We're investigating a bug with stop-with-savepoint where the savepoint is successfully created but the job does not finish due to some failure after savepoint creation. See FLINK-21030 for further details. FLINK-17170 might be related. > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0 >Reporter: Vasii Cosmin Radu >Priority: Critical > Labels: usability > Attachments: Screenshot 2020-04-15 at 18.16.26.png, threaddump_tm1 > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the status id is still "IN_PROGRESS". > > When performing a cancel instead of stop, it works. But cancel is deprecated, > so I am a bit concerned that this might fail also, maybe I was just lucky. > > I attached a screenshot with what the UI is showing when this happens > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237081#comment-17237081 ] Yun Tang commented on FLINK-17170: -- I think my previous suggestion of "Check whether this fetcher is running within emitRecordAndUpdateState first before try to access the checkpointLock" might not solve the problem fundamentally if {{running}} has been set as false while {{checkpointLock}} has been synchronized in the main thread. Shall the endless await termination is a must be for kinesis fetcher [~tzulitai]? > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0 >Reporter: Vasii Cosmin Radu >Priority: Critical > Labels: usability > Attachments: Screenshot 2020-04-15 at 18.16.26.png, threaddump_tm1 > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the status id is still "IN_PROGRESS". > > When performing a cancel instead of stop, it works. But cancel is deprecated, > so I am a bit concerned that this might fail also, maybe I was just lucky. > > I attached a screenshot with what the UI is showing when this happens > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17235700#comment-17235700 ] Jun Qin commented on FLINK-17170: - The source thread holds the checkpoint lock and is waiting here forever for the shardConsumers thread to finish, while the shardConsumers can only finish once they get the checkpoint lock: {code:java} // From: KinesisDataFetcher.java public void awaitTermination() throws InterruptedException { while (!shardConsumersExecutor.awaitTermination(1, TimeUnit.MINUTES)) { // Keep waiting. } } {code} > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0 >Reporter: Vasii Cosmin Radu >Priority: Critical > Labels: usability > Attachments: Screenshot 2020-04-15 at 18.16.26.png, threaddump_tm1 > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the status id is still "IN_PROGRESS". > > When performing a cancel instead of stop, it works. But cancel is deprecated, > so I am a bit concerned that this might fail also, maybe I was just lucky. > > I attached a screenshot with what the UI is showing when this happens > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17233564#comment-17233564 ] Jun Qin commented on FLINK-17170: - I also see the exact same behavior happened to one customer with just one TM and parallelism=1. By 'exact', I mean * the source thread is holding the checkpoint lock, and is waiting for the shardConsumers to finish, but the shardConsumers cannot finish because they are waiting to lock the checkpoint lock * the code line numbers in the stack trace is same as the one attached there. > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0 >Reporter: Vasii Cosmin Radu >Priority: Major > Attachments: Screenshot 2020-04-15 at 18.16.26.png, threaddump_tm1 > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the status id is still "IN_PROGRESS". > > When performing a cancel instead of stop, it works. But cancel is deprecated, > so I am a bit concerned that this might fail also, maybe I was just lucky. > > I attached a screenshot with what the UI is showing when this happens > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17138349#comment-17138349 ] Youjun Yuan commented on FLINK-17170: - any update on this? I hit the exactly same issue here, with 1.10.1. BTW, call the savepoint rest api can trigger savepoint and as well cancel the job. something like this: curl -d '\{"target-directory": "s3://dp-flink/savepoints/","cancel-job":true}' :jobmanagerTracking-URL[/jobs/:jobid/savepoints|http://ip-10-0-102-40.ap-northeast-1.compute.internal:41581/jobs/6b1f193a0a2e98a6eabf6fe6d876c3bc/savepoints] > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0 >Reporter: Vasii Cosmin Radu >Priority: Major > Attachments: Screenshot 2020-04-15 at 18.16.26.png, threaddump_tm1 > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the status id is still "IN_PROGRESS". > > When performing a cancel instead of stop, it works. But cancel is deprecated, > so I am a bit concerned that this might fail also, maybe I was just lucky. > > I attached a screenshot with what the UI is showing when this happens > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085626#comment-17085626 ] Yun Tang commented on FLINK-17170: -- [~cvasii] cancel job does not need to trigger a savepoint before shutdown. In other words, cancel would not trigger "notifyCheckpoitComplete" on task side once the checkpoint is completed on checkpoint coordinator side. When we call "notifyCheckpoitComplete", it will grab the lock which would also be grabbed when await termination of kinesis fetcher. > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0 >Reporter: Vasii Cosmin Radu >Priority: Major > Attachments: Screenshot 2020-04-15 at 18.16.26.png, threaddump_tm1 > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the status id is still "IN_PROGRESS". > > When performing a cancel instead of stop, it works. But cancel is deprecated, > so I am a bit concerned that this might fail also, maybe I was just lucky. > > I attached a screenshot with what the UI is showing when this happens > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17085127#comment-17085127 ] Vasii Cosmin Radu commented on FLINK-17170: --- How come it works when I perform a cancel, instead of stop action? > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0 >Reporter: Vasii Cosmin Radu >Priority: Major > Attachments: Screenshot 2020-04-15 at 18.16.26.png, threaddump_tm1 > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the status id is still "IN_PROGRESS". > > When performing a cancel instead of stop, it works. But cancel is deprecated, > so I am a bit concerned that this might fail also, maybe I was just lucky. > > I attached a screenshot with what the UI is showing when this happens > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084872#comment-17084872 ] Yun Tang commented on FLINK-17170: -- I think there might be three solutions: # Use new source operator API which introduced in FLIP-27 to avoid the checkpoint lock with mailbox when emitting records in source thread. However, I think new source operator API is not fully ready now. # Avoid the endless awaitTermination when {{shutdownFetcher}}, I am not sure whether this could meet kinesis requests, cc [~tzulitai] # Check whether this fetcher is running within {{emitRecordAndUpdateState}} first before try to access the {{checkpointLock}}. If not running, just return instead of access the {{checkpointLock}}. I think this could resolve this problem. > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0 >Reporter: Vasii Cosmin Radu >Priority: Major > Attachments: Screenshot 2020-04-15 at 18.16.26.png, threaddump_tm1 > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the status id is still "IN_PROGRESS". > > When performing a cancel instead of stop, it works. But cancel is deprecated, > so I am a bit concerned that this might fail also, maybe I was just lucky. > > I attached a screenshot with what the UI is showing when this happens > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084770#comment-17084770 ] Yun Tang commented on FLINK-17170: -- I could reproduce this with a batch lines of code. There existed a dead lock here: one thread is waiting to get the lock, while the other thread which holds the lock is waiting for the previous thread to terminate. I still need some time to investigate this to see whether there existed more possible dead locks there. > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0 >Reporter: Vasii Cosmin Radu >Priority: Major > Attachments: Screenshot 2020-04-15 at 18.16.26.png, threaddump_tm1 > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the status id is still "IN_PROGRESS". > > When performing a cancel instead of stop, it works. But cancel is deprecated, > so I am a bit concerned that this might fail also, maybe I was just lucky. > > I attached a screenshot with what the UI is showing when this happens > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084591#comment-17084591 ] Vasii Cosmin Radu commented on FLINK-17170: --- Thanks guys for looking into this, [~klion26] [~yunta] I've attached a full thread dump from one of the task managers, file called "threaddump_tm1". I've seen that shard consumers are waiting for the checkpoint lock, but the checkpoint lock it's already taken by the thread which performs the stop. Some relevant parts of the thread dump {code:java} shardConsumers-Source: source -> Sink: sink (12/12)-thread-0priority:5 - threadId:0x7f7c7c298000 - nativeId:0x774c - nativeId (decimal):30540 - state:BLOCKED stackTrace: java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:774) - waiting to lock <0x0007abe026c8> (a java.lang.Object) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:92) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:273) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:288) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:285) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:760) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:371) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:258) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} but the lock 0x0007abe026c8 it's already taken here {code:java} Source: source -> Sink: sink (12/12)priority:5 - threadId:0x7f7c2c0cf000 - nativeId:0x76fe - nativeId (decimal):30462 - state:TIMED_WAITING stackTrace: java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0007ac041158> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.awaitTermination(KinesisDataFetcher.java:637) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:365) at org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:147) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:136) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.finishTask(SourceStreamTask.java:147) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:947) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$7(StreamTask.java:924) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$838/1354138988.run(Unknown Source) at org.apache.flink.util.function.FunctionUtils.lambda$asCallable$5(FunctionUtils.java:125) at org.apache.flink.util.function.FunctionUtils$$Lambda$839/1349808364.call(Unknown Source) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87) - locked <0x0007abe026c8> (a java.lang.Object) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) {code} So, FlinkKinesisConsumer#close()
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084492#comment-17084492 ] Congxian Qiu(klion26) commented on FLINK-17170: --- Hi, [~cvasii] from the description, seems the savepoint successfully, and "unfinished" task was blocked by something. Currently, the lifetime of task logic is "trigger savepoint" -> "savepoint complete" -> "savepoint complete" -> "finish task" >From the previous comments you given, seems the stack was waiting for some >lock, could you please check what is it waiting for? or could you please share the whole jstack message about the "unfinished" task. > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0 >Reporter: Vasii Cosmin Radu >Priority: Major > Attachments: Screenshot 2020-04-15 at 18.16.26.png > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the status id is still "IN_PROGRESS". > > When performing a cancel instead of stop, it works. But cancel is deprecated, > so I am a bit concerned that this might fail also, maybe I was just lucky. > > I attached a screenshot with what the UI is showing when this happens > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084274#comment-17084274 ] Yun Tang commented on FLINK-17170: -- If the savepoint could finally complete, I doubt the task stuck when notified checkpoint complete to [finish|https://github.com/apache/flink/blob/9ed5f1b4b5e1c8a9f9421b91ac00960b63916ebd/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L930]. Could you share the information of source stream task or the whole jstack info of the hanged JVM? BTW, debug level info logs could also help to know more information. > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0 >Reporter: Vasii Cosmin Radu >Priority: Major > Attachments: Screenshot 2020-04-15 at 18.16.26.png > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the status id is still "IN_PROGRESS". > > When performing a cancel instead of stop, it works. But cancel is deprecated, > so I am a bit concerned that this might fail also, maybe I was just lucky. > > I attached a screenshot with what the UI is showing when this happens > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084258#comment-17084258 ] Vasii Cosmin Radu commented on FLINK-17170: --- It's quite hard to debug, because I have multiple threads with the same name blocked on the same lock. A sub-tasks consumes multiple shards, so a fetcher per subtask is created, but the method KinesisDataFetcher#createShardConsumersThreadPool just allocates the same identifier for the threads (any ThreadFactory implementation is better there by the way, a thing to improve). There is also one blocked thread which is single, not competing with others for any lock. > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0 >Reporter: Vasii Cosmin Radu >Priority: Major > Attachments: Screenshot 2020-04-15 at 18.16.26.png > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the status id is still "IN_PROGRESS". > > When performing a cancel instead of stop, it works. But cancel is deprecated, > so I am a bit concerned that this might fail also, maybe I was just lucky. > > I attached a screenshot with what the UI is showing when this happens > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084238#comment-17084238 ] Vasii Cosmin Radu commented on FLINK-17170: --- And regarding the savepoint, yes it finished successfully on all sub-tasks. > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0 >Reporter: Vasii Cosmin Radu >Priority: Major > Attachments: Screenshot 2020-04-15 at 18.16.26.png > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the status id is still "IN_PROGRESS". > > When performing a cancel instead of stop, it works. But cancel is deprecated, > so I am a bit concerned that this might fail also, maybe I was just lucky. > > I attached a screenshot with what the UI is showing when this happens > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084235#comment-17084235 ] Vasii Cosmin Radu commented on FLINK-17170: --- I was doing that. And the shard consumer threads are blocked. Looks like it's the checkpointLock they are all waiting on. Here is a sample of one stack: {code:java} shardConsumers-Source: source -> Sink: sink-s3a:// (12/12)-thread-0priority:5 - threadId:0x7f565c854000 - nativeId:0x49e0 - nativeId (decimal):18912 - state:BLOCKED stackTrace: java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:774) - waiting to lock <0x0006a4d18620> (a java.lang.Object) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:92) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:273) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:288) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:285) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:760) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:371) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:258) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0 >Reporter: Vasii Cosmin Radu >Priority: Major > Attachments: Screenshot 2020-04-15 at 18.16.26.png > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084230#comment-17084230 ] Yun Tang commented on FLINK-17170: -- [~cvasii] Has the savepoint completed via the web UI? Did the sub-task checkpoint on those hanged task finished? (You could get the information via the web UI of checkpoint details). And the quickest solution to detect the root cause is using jstack to capture what the task thread is doing when it is hanged. > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0 >Reporter: Vasii Cosmin Radu >Priority: Major > Attachments: Screenshot 2020-04-15 at 18.16.26.png > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the status id is still "IN_PROGRESS". > > When performing a cancel instead of stop, it works. But cancel is deprecated, > so I am a bit concerned that this might fail also, maybe I was just lucky. > > I attached a screenshot with what the UI is showing when this happens > -- This message was sent by Atlassian Jira (v8.3.4#803005)