[jira] [Commented] (FLINK-23874) JM did not store latest checkpiont id into Zookeeper, silently
[ https://issues.apache.org/jira/browse/FLINK-23874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411798#comment-17411798 ] Youjun Yuan commented on FLINK-23874: - thanks for the response. Though I still don't understand why the chk id in zk wasn't updated for days. The job was originally resumed from chk 258 on 14th Aug, then we tried to stop the job on 18th Aug, which caused the JM restarted. So the content in ZK should had been updated to ~chk 686, but on 19th Aug (when we realized the issue), I check the content in ZK, it's still chk 258. > JM did not store latest checkpiont id into Zookeeper, silently > -- > > Key: FLINK-23874 > URL: https://issues.apache.org/jira/browse/FLINK-23874 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.12.1 >Reporter: Youjun Yuan >Priority: Major > Attachments: container_e04_1628083845581_0254_01_01_jm.log, > container_e04_1628083845581_0254_01_50_tm.log, > container_e04_1628083845581_0254_02_01_jm.log > > > Job manager did not update the latest successful checkpoint id into zookeeper > (with ZK HA setup), at path /flink/\{app_id}/checkpoints/, when JM restart, > the job resumed from a very old position. > > We had a job which was resumed from save point 258, after running for a few > days, the latest successful checkpoint was about chk 686. When something > trigged the JM to restart, it restored state to save point 258, instead of > chk 686. > We checked zookeeper, indeed the stored checkpoint was still 258, which means > JM hasn't stored checkpoint id into zookeeper for few days, and without any > error message. > > below are the relevant logs around the restart: > {quote} > {{2021-08-18 11:09:16,505 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 686 for job (228296 bytes in 827 > ms).}} > {quote} > > {quote}2021-08-18 11:10:13,066 INFO > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation > [] - Finished restoring from state handle: > IncrementalRemoteKeyedStateHandle\{backendIdentifier=c11d290c-617b-4ea5-b7ed-4853272f32a3, > keyGroupRange=KeyGroupRange{startKeyGroup=47, endKeyGroup=48}, > checkpointId=258, sharedState={}, > privateState=\{OPTIONS-16=ByteStreamStateHandle{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/20f35556-5c60-4fca-908c-d05d641c2614', > dataBytes=15818}, > MANIFEST-06=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/3c8e1c2f-616d-4f18-8b07-4a818e3ca110', > dataBytes=336}, > CURRENT=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/1dc5f341-8a73-4e69-96fb-4b026653da6d', > dataBytes=16}}, > metaStateHandle=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//chk-258/0ef57eb3-0f38-45f5-8f3d-3e7b87f5fd15', > dataBytes=1704}, registered=false} without rescaling. > {quote} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-23874) JM did not store latest checkpiont id into Zookeeper, silently
[ https://issues.apache.org/jira/browse/FLINK-23874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411591#comment-17411591 ] Youjun Yuan edited comment on FLINK-23874 at 9/8/21, 12:04 AM: --- The first attempt was started by resuming from savepoint 258. But the second attempt was NOT started by me, actually I was canceling the job, but due to bug [20992|https://issues.apache.org/jira/browse/FLINK-20992] of Flink, at the end of shutting down the job, it hit a following error, so YARN restarted it: {code:java} RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@3460b405 rejected from {code} this log can be found at the end of JM log of attempt 1. If you pay attention to the log timestamp, you can find that attempt 2 start immediately after attempt 1, just 2 seconds later. was (Author: ubyyj): The first attempt was started by resuming from savepoint 258. But the second attempt was NOT started by me, actually I was canceling the job, but due to a bug of Flink, at the end of shutting down the job, it hit a following error, so YARN restarted it: {code:java} RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@3460b405 rejected from {code} this log can be found at the end of JM log of attempt 1. If you pay attention to the log timestamp, you can find that attempt 2 start immediately after attempt 1, just 2 seconds later. > JM did not store latest checkpiont id into Zookeeper, silently > -- > > Key: FLINK-23874 > URL: https://issues.apache.org/jira/browse/FLINK-23874 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.12.1 >Reporter: Youjun Yuan >Priority: Major > Attachments: container_e04_1628083845581_0254_01_01_jm.log, > container_e04_1628083845581_0254_01_50_tm.log, > container_e04_1628083845581_0254_02_01_jm.log > > > Job manager did not update the latest successful checkpoint id into zookeeper > (with ZK HA setup), at path /flink/\{app_id}/checkpoints/, when JM restart, > the job resumed from a very old position. > > We had a job which was resumed from save point 258, after running for a few > days, the latest successful checkpoint was about chk 686. When something > trigged the JM to restart, it restored state to save point 258, instead of > chk 686. > We checked zookeeper, indeed the stored checkpoint was still 258, which means > JM hasn't stored checkpoint id into zookeeper for few days, and without any > error message. > > below are the relevant logs around the restart: > {quote} > {{2021-08-18 11:09:16,505 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 686 for job (228296 bytes in 827 > ms).}} > {quote} > > {quote}2021-08-18 11:10:13,066 INFO > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation > [] - Finished restoring from state handle: > IncrementalRemoteKeyedStateHandle\{backendIdentifier=c11d290c-617b-4ea5-b7ed-4853272f32a3, > keyGroupRange=KeyGroupRange{startKeyGroup=47, endKeyGroup=48}, > checkpointId=258, sharedState={}, > privateState=\{OPTIONS-16=ByteStreamStateHandle{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/20f35556-5c60-4fca-908c-d05d641c2614', > dataBytes=15818}, > MANIFEST-06=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/3c8e1c2f-616d-4f18-8b07-4a818e3ca110', > dataBytes=336}, > CURRENT=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/1dc5f341-8a73-4e69-96fb-4b026653da6d', > dataBytes=16}}, > metaStateHandle=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//chk-258/0ef57eb3-0f38-45f5-8f3d-3e7b87f5fd15', > dataBytes=1704}, registered=false} without rescaling. > {quote} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23874) JM did not store latest checkpiont id into Zookeeper, silently
[ https://issues.apache.org/jira/browse/FLINK-23874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411591#comment-17411591 ] Youjun Yuan commented on FLINK-23874: - The first attempt was started by resuming from savepoint 258. But the second attempt was NOT started by me, actually I was canceling the job, but due to a bug of Flink, at the end of shutting down the job, it hit a following error, so YARN restarted it: {code:java} RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@3460b405 rejected from {code} this log can be found at the end of JM log of attempt 1. If you pay attention to the log timestamp, you can find that attempt 2 start immediately after attempt 1, just 2 seconds later. > JM did not store latest checkpiont id into Zookeeper, silently > -- > > Key: FLINK-23874 > URL: https://issues.apache.org/jira/browse/FLINK-23874 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.12.1 >Reporter: Youjun Yuan >Priority: Major > Attachments: container_e04_1628083845581_0254_01_01_jm.log, > container_e04_1628083845581_0254_01_50_tm.log, > container_e04_1628083845581_0254_02_01_jm.log > > > Job manager did not update the latest successful checkpoint id into zookeeper > (with ZK HA setup), at path /flink/\{app_id}/checkpoints/, when JM restart, > the job resumed from a very old position. > > We had a job which was resumed from save point 258, after running for a few > days, the latest successful checkpoint was about chk 686. When something > trigged the JM to restart, it restored state to save point 258, instead of > chk 686. > We checked zookeeper, indeed the stored checkpoint was still 258, which means > JM hasn't stored checkpoint id into zookeeper for few days, and without any > error message. > > below are the relevant logs around the restart: > {quote} > {{2021-08-18 11:09:16,505 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 686 for job (228296 bytes in 827 > ms).}} > {quote} > > {quote}2021-08-18 11:10:13,066 INFO > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation > [] - Finished restoring from state handle: > IncrementalRemoteKeyedStateHandle\{backendIdentifier=c11d290c-617b-4ea5-b7ed-4853272f32a3, > keyGroupRange=KeyGroupRange{startKeyGroup=47, endKeyGroup=48}, > checkpointId=258, sharedState={}, > privateState=\{OPTIONS-16=ByteStreamStateHandle{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/20f35556-5c60-4fca-908c-d05d641c2614', > dataBytes=15818}, > MANIFEST-06=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/3c8e1c2f-616d-4f18-8b07-4a818e3ca110', > dataBytes=336}, > CURRENT=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/1dc5f341-8a73-4e69-96fb-4b026653da6d', > dataBytes=16}}, > metaStateHandle=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//chk-258/0ef57eb3-0f38-45f5-8f3d-3e7b87f5fd15', > dataBytes=1704}, registered=false} without rescaling. > {quote} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (FLINK-23874) JM did not store latest checkpiont id into Zookeeper, silently
[ https://issues.apache.org/jira/browse/FLINK-23874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Youjun Yuan updated FLINK-23874: Comment: was deleted (was: One thing I don't understand, why the last checkpoint id stored in ZK wasn't updated for long time?) > JM did not store latest checkpiont id into Zookeeper, silently > -- > > Key: FLINK-23874 > URL: https://issues.apache.org/jira/browse/FLINK-23874 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.12.1 >Reporter: Youjun Yuan >Priority: Major > Attachments: container_e04_1628083845581_0254_01_01_jm.log, > container_e04_1628083845581_0254_01_50_tm.log, > container_e04_1628083845581_0254_02_01_jm.log > > > Job manager did not update the latest successful checkpoint id into zookeeper > (with ZK HA setup), at path /flink/\{app_id}/checkpoints/, when JM restart, > the job resumed from a very old position. > > We had a job which was resumed from save point 258, after running for a few > days, the latest successful checkpoint was about chk 686. When something > trigged the JM to restart, it restored state to save point 258, instead of > chk 686. > We checked zookeeper, indeed the stored checkpoint was still 258, which means > JM hasn't stored checkpoint id into zookeeper for few days, and without any > error message. > > below are the relevant logs around the restart: > {quote} > {{2021-08-18 11:09:16,505 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 686 for job (228296 bytes in 827 > ms).}} > {quote} > > {quote}2021-08-18 11:10:13,066 INFO > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation > [] - Finished restoring from state handle: > IncrementalRemoteKeyedStateHandle\{backendIdentifier=c11d290c-617b-4ea5-b7ed-4853272f32a3, > keyGroupRange=KeyGroupRange{startKeyGroup=47, endKeyGroup=48}, > checkpointId=258, sharedState={}, > privateState=\{OPTIONS-16=ByteStreamStateHandle{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/20f35556-5c60-4fca-908c-d05d641c2614', > dataBytes=15818}, > MANIFEST-06=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/3c8e1c2f-616d-4f18-8b07-4a818e3ca110', > dataBytes=336}, > CURRENT=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/1dc5f341-8a73-4e69-96fb-4b026653da6d', > dataBytes=16}}, > metaStateHandle=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//chk-258/0ef57eb3-0f38-45f5-8f3d-3e7b87f5fd15', > dataBytes=1704}, registered=false} without rescaling. > {quote} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-23874) JM did not store latest checkpiont id into Zookeeper, silently
[ https://issues.apache.org/jira/browse/FLINK-23874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411096#comment-17411096 ] Youjun Yuan edited comment on FLINK-23874 at 9/7/21, 9:58 AM: -- One thing I don't understand, why the last checkpoint id stored in ZK wasn't updated for long time? was (Author: ubyyj): One thing I don't understand, why the last checkpoint id store in ZK wasn't updated for long time? > JM did not store latest checkpiont id into Zookeeper, silently > -- > > Key: FLINK-23874 > URL: https://issues.apache.org/jira/browse/FLINK-23874 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.12.1 >Reporter: Youjun Yuan >Priority: Major > Attachments: container_e04_1628083845581_0254_01_01_jm.log, > container_e04_1628083845581_0254_01_50_tm.log, > container_e04_1628083845581_0254_02_01_jm.log > > > Job manager did not update the latest successful checkpoint id into zookeeper > (with ZK HA setup), at path /flink/\{app_id}/checkpoints/, when JM restart, > the job resumed from a very old position. > > We had a job which was resumed from save point 258, after running for a few > days, the latest successful checkpoint was about chk 686. When something > trigged the JM to restart, it restored state to save point 258, instead of > chk 686. > We checked zookeeper, indeed the stored checkpoint was still 258, which means > JM hasn't stored checkpoint id into zookeeper for few days, and without any > error message. > > below are the relevant logs around the restart: > {quote} > {{2021-08-18 11:09:16,505 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 686 for job (228296 bytes in 827 > ms).}} > {quote} > > {quote}2021-08-18 11:10:13,066 INFO > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation > [] - Finished restoring from state handle: > IncrementalRemoteKeyedStateHandle\{backendIdentifier=c11d290c-617b-4ea5-b7ed-4853272f32a3, > keyGroupRange=KeyGroupRange{startKeyGroup=47, endKeyGroup=48}, > checkpointId=258, sharedState={}, > privateState=\{OPTIONS-16=ByteStreamStateHandle{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/20f35556-5c60-4fca-908c-d05d641c2614', > dataBytes=15818}, > MANIFEST-06=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/3c8e1c2f-616d-4f18-8b07-4a818e3ca110', > dataBytes=336}, > CURRENT=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/1dc5f341-8a73-4e69-96fb-4b026653da6d', > dataBytes=16}}, > metaStateHandle=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//chk-258/0ef57eb3-0f38-45f5-8f3d-3e7b87f5fd15', > dataBytes=1704}, registered=false} without rescaling. > {quote} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23874) JM did not store latest checkpiont id into Zookeeper, silently
[ https://issues.apache.org/jira/browse/FLINK-23874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411096#comment-17411096 ] Youjun Yuan commented on FLINK-23874: - One thing I don't understand, why the last checkpoint id store in ZK wasn't updated for long time? > JM did not store latest checkpiont id into Zookeeper, silently > -- > > Key: FLINK-23874 > URL: https://issues.apache.org/jira/browse/FLINK-23874 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.12.1 >Reporter: Youjun Yuan >Priority: Major > Attachments: container_e04_1628083845581_0254_01_01_jm.log, > container_e04_1628083845581_0254_01_50_tm.log, > container_e04_1628083845581_0254_02_01_jm.log > > > Job manager did not update the latest successful checkpoint id into zookeeper > (with ZK HA setup), at path /flink/\{app_id}/checkpoints/, when JM restart, > the job resumed from a very old position. > > We had a job which was resumed from save point 258, after running for a few > days, the latest successful checkpoint was about chk 686. When something > trigged the JM to restart, it restored state to save point 258, instead of > chk 686. > We checked zookeeper, indeed the stored checkpoint was still 258, which means > JM hasn't stored checkpoint id into zookeeper for few days, and without any > error message. > > below are the relevant logs around the restart: > {quote} > {{2021-08-18 11:09:16,505 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 686 for job (228296 bytes in 827 > ms).}} > {quote} > > {quote}2021-08-18 11:10:13,066 INFO > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation > [] - Finished restoring from state handle: > IncrementalRemoteKeyedStateHandle\{backendIdentifier=c11d290c-617b-4ea5-b7ed-4853272f32a3, > keyGroupRange=KeyGroupRange{startKeyGroup=47, endKeyGroup=48}, > checkpointId=258, sharedState={}, > privateState=\{OPTIONS-16=ByteStreamStateHandle{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/20f35556-5c60-4fca-908c-d05d641c2614', > dataBytes=15818}, > MANIFEST-06=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/3c8e1c2f-616d-4f18-8b07-4a818e3ca110', > dataBytes=336}, > CURRENT=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/1dc5f341-8a73-4e69-96fb-4b026653da6d', > dataBytes=16}}, > metaStateHandle=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//chk-258/0ef57eb3-0f38-45f5-8f3d-3e7b87f5fd15', > dataBytes=1704}, registered=false} without rescaling. > {quote} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-23874) JM did not store latest checkpiont id into Zookeeper, silently
[ https://issues.apache.org/jira/browse/FLINK-23874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411095#comment-17411095 ] Youjun Yuan commented on FLINK-23874: - I have attached the two JM's log (JM failed and restarted), and one of the TM's log. [~trohrmann] you are right, the JM restarted when I tried to stop it (I believe this is another bug) > JM did not store latest checkpiont id into Zookeeper, silently > -- > > Key: FLINK-23874 > URL: https://issues.apache.org/jira/browse/FLINK-23874 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.12.1 >Reporter: Youjun Yuan >Priority: Major > Attachments: container_e04_1628083845581_0254_01_01_jm.log, > container_e04_1628083845581_0254_01_50_tm.log, > container_e04_1628083845581_0254_02_01_jm.log > > > Job manager did not update the latest successful checkpoint id into zookeeper > (with ZK HA setup), at path /flink/\{app_id}/checkpoints/, when JM restart, > the job resumed from a very old position. > > We had a job which was resumed from save point 258, after running for a few > days, the latest successful checkpoint was about chk 686. When something > trigged the JM to restart, it restored state to save point 258, instead of > chk 686. > We checked zookeeper, indeed the stored checkpoint was still 258, which means > JM hasn't stored checkpoint id into zookeeper for few days, and without any > error message. > > below are the relevant logs around the restart: > {quote} > {{2021-08-18 11:09:16,505 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 686 for job (228296 bytes in 827 > ms).}} > {quote} > > {quote}2021-08-18 11:10:13,066 INFO > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation > [] - Finished restoring from state handle: > IncrementalRemoteKeyedStateHandle\{backendIdentifier=c11d290c-617b-4ea5-b7ed-4853272f32a3, > keyGroupRange=KeyGroupRange{startKeyGroup=47, endKeyGroup=48}, > checkpointId=258, sharedState={}, > privateState=\{OPTIONS-16=ByteStreamStateHandle{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/20f35556-5c60-4fca-908c-d05d641c2614', > dataBytes=15818}, > MANIFEST-06=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/3c8e1c2f-616d-4f18-8b07-4a818e3ca110', > dataBytes=336}, > CURRENT=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/1dc5f341-8a73-4e69-96fb-4b026653da6d', > dataBytes=16}}, > metaStateHandle=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//chk-258/0ef57eb3-0f38-45f5-8f3d-3e7b87f5fd15', > dataBytes=1704}, registered=false} without rescaling. > {quote} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-23874) JM did not store latest checkpiont id into Zookeeper, silently
[ https://issues.apache.org/jira/browse/FLINK-23874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Youjun Yuan updated FLINK-23874: Attachment: container_e04_1628083845581_0254_02_01_jm.log container_e04_1628083845581_0254_01_50_tm.log container_e04_1628083845581_0254_01_01_jm.log > JM did not store latest checkpiont id into Zookeeper, silently > -- > > Key: FLINK-23874 > URL: https://issues.apache.org/jira/browse/FLINK-23874 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.12.1 >Reporter: Youjun Yuan >Priority: Major > Attachments: container_e04_1628083845581_0254_01_01_jm.log, > container_e04_1628083845581_0254_01_50_tm.log, > container_e04_1628083845581_0254_02_01_jm.log > > > Job manager did not update the latest successful checkpoint id into zookeeper > (with ZK HA setup), at path /flink/\{app_id}/checkpoints/, when JM restart, > the job resumed from a very old position. > > We had a job which was resumed from save point 258, after running for a few > days, the latest successful checkpoint was about chk 686. When something > trigged the JM to restart, it restored state to save point 258, instead of > chk 686. > We checked zookeeper, indeed the stored checkpoint was still 258, which means > JM hasn't stored checkpoint id into zookeeper for few days, and without any > error message. > > below are the relevant logs around the restart: > {quote} > {{2021-08-18 11:09:16,505 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 686 for job (228296 bytes in 827 > ms).}} > {quote} > > {quote}2021-08-18 11:10:13,066 INFO > org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation > [] - Finished restoring from state handle: > IncrementalRemoteKeyedStateHandle\{backendIdentifier=c11d290c-617b-4ea5-b7ed-4853272f32a3, > keyGroupRange=KeyGroupRange{startKeyGroup=47, endKeyGroup=48}, > checkpointId=258, sharedState={}, > privateState=\{OPTIONS-16=ByteStreamStateHandle{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/20f35556-5c60-4fca-908c-d05d641c2614', > dataBytes=15818}, > MANIFEST-06=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/3c8e1c2f-616d-4f18-8b07-4a818e3ca110', > dataBytes=336}, > CURRENT=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/1dc5f341-8a73-4e69-96fb-4b026653da6d', > dataBytes=16}}, > metaStateHandle=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//chk-258/0ef57eb3-0f38-45f5-8f3d-3e7b87f5fd15', > dataBytes=1704}, registered=false} without rescaling. > {quote} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24187) Could not commit s3 file after JM restart during state initialization
Youjun Yuan created FLINK-24187: --- Summary: Could not commit s3 file after JM restart during state initialization Key: FLINK-24187 URL: https://issues.apache.org/jira/browse/FLINK-24187 Project: Flink Issue Type: Bug Components: FileSystems Affects Versions: 1.12.1 Reporter: Youjun Yuan we have a SQL job which consumes from Kafka, and write hive table, data stored in S3. One day the zookeeper leader failed over, caused Flink job restart. However the job got stuck during state restore, with the following error: {code:java} java.io.IOException: Could not commit file from s3://mybuck/hourly_account_activities/dt=2021-09-02/hh=21/activity_category=verification/.part-33ef16e7-55b7-4abb-9d97-0cdc7529509c-0-22371.inprogress.400506e4-23ea-428c-b8eb-9ff196eeca64 to s3://mybuck/hourly_account_activities/dt=2021-09-02/hh=21/activity_category=verification/part-33ef16e7-55b7-4abb-9d97-0cdc7529509c-0-22371 at org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.rename(HadoopRenameFileCommitter.java:104) ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] at org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.commitAfterRecovery(HadoopRenameFileCommitter.java:83) ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] at org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedPendingFile.commitAfterRecovery(HadoopPathBasedPartFileWriter.java:101) ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:466) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:192) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:179) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.(StreamingFileSinkHelper.java:75) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.table.filesystem.stream.AbstractStreamingWriter.initializeState(AbstractStreamingWriter.java:120) ~[flink-table-blink_2.11-1.12.1.jar:1.12.1] at org.apache.flink.table.filesystem.stream.StreamingFileWriter.initializeState(StreamingFileWriter.java:55) ~[flink-table-blink_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [flink-dist_2.11-1.12.1.jar:1.12.1] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242] Caused by: java.io.IOException: java.util.concurrent.CancellationException at com.amazon.ws.emr.hadoop.fs.s3.MultipartCopyManager.copy(MultipartCopyManager.java:171) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:466) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1122) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.jav
[jira] [Created] (FLINK-23874) JM did not store latest checkpiont id into Zookeeper, silently
Youjun Yuan created FLINK-23874: --- Summary: JM did not store latest checkpiont id into Zookeeper, silently Key: FLINK-23874 URL: https://issues.apache.org/jira/browse/FLINK-23874 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.12.1 Reporter: Youjun Yuan Job manager did not update the latest successful checkpoint id into zookeeper (with ZK HA setup), at path /flink/\{app_id}/checkpoints/, when JM restart, the job resumed from a very old position. We had a job which was resumed from save point 258, after running for a few days, the latest successful checkpoint was about chk 686. When something trigged the JM to restart, it restored state to save point 258, instead of chk 686. We checked zookeeper, indeed the stored checkpoint was still 258, which means JM hasn't stored checkpoint id into zookeeper for few days, and without any error message. below are the relevant logs around the restart: {quote} {{2021-08-18 11:09:16,505 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 686 for job (228296 bytes in 827 ms).}} {quote} {quote}2021-08-18 11:10:13,066 INFO org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation [] - Finished restoring from state handle: IncrementalRemoteKeyedStateHandle\{backendIdentifier=c11d290c-617b-4ea5-b7ed-4853272f32a3, keyGroupRange=KeyGroupRange{startKeyGroup=47, endKeyGroup=48}, checkpointId=258, sharedState={}, privateState=\{OPTIONS-16=ByteStreamStateHandle{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/20f35556-5c60-4fca-908c-d05d641c2614', dataBytes=15818}, MANIFEST-06=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/3c8e1c2f-616d-4f18-8b07-4a818e3ca110', dataBytes=336}, CURRENT=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/1dc5f341-8a73-4e69-96fb-4b026653da6d', dataBytes=16}}, metaStateHandle=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//chk-258/0ef57eb3-0f38-45f5-8f3d-3e7b87f5fd15', dataBytes=1704}, registered=false} without rescaling. {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer
[ https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17138349#comment-17138349 ] Youjun Yuan commented on FLINK-17170: - any update on this? I hit the exactly same issue here, with 1.10.1. BTW, call the savepoint rest api can trigger savepoint and as well cancel the job. something like this: curl -d '\{"target-directory": "s3://dp-flink/savepoints/","cancel-job":true}' :jobmanagerTracking-URL[/jobs/:jobid/savepoints|http://ip-10-0-102-40.ap-northeast-1.compute.internal:41581/jobs/6b1f193a0a2e98a6eabf6fe6d876c3bc/savepoints] > Cannot stop streaming job with savepoint which uses kinesis consumer > > > Key: FLINK-17170 > URL: https://issues.apache.org/jira/browse/FLINK-17170 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kinesis >Affects Versions: 1.10.0 >Reporter: Vasii Cosmin Radu >Priority: Major > Attachments: Screenshot 2020-04-15 at 18.16.26.png, threaddump_tm1 > > > I am encountering a very strange situation where I can't stop with savepoint > a streaming job. > The job reads from kinesis and sinks to S3, very simple job, no mapping > function, no watermarks, just source->sink. > Source is using flink-kinesis-consumer, sink is using StreamingFileSink. > Everything works fine, except stopping the job with savepoints. > The behaviour happens only when multiple task managers are involved, having > sub-tasks off the job spread across multiple task manager instances. When a > single task manager has all the sub-tasks this issue never occurred. > Using latest Flink 1.10.0 version, deployment done in HA mode (2 job > managers), in EC2, savepoints and checkpoints written on S3. > When trying to stop, the savepoint is created correctly and appears on S3, > but not all sub-tasks are stopped. Some of them finished, but some just > remain hanged. Sometimes, on the same task manager part of the sub-tasks are > finished, part aren't. > The logs don't show any errors. For the ones that succeed, the standard > messages appear, with "Source: <> switched from RUNNING to FINISHED". > For the sub-tasks hanged the last message is > "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 0 ..." and that's it. > > I tried using the cli (flink stop ) > Timeout Message: > {code:java} > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25 > root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop > cf43cecd9339e8f02a12333e52966a25Suspending job > "cf43cecd9339e8f02a12333e52966a25" with a savepoint. > The program > finished with the following exception: org.apache.flink.util.FlinkException: > Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843) > at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) > at java.security.AccessController.doPrivileged(Native Method) at > javax.security.auth.Subject.doAs(Subject.java:422) at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused > by: java.util.concurrent.TimeoutException at > java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at > org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) > ... 9 more{code} > > Using the monitoring api, I keep getting infinite message when querying based > on the savepoint id, that the status id is still "IN_PROGRESS". > > When performing a cancel instead of stop, it works. But cancel is deprecated, > so I am a bit concerned that this might fail also, maybe I was just lucky. > > I attached a screenshot with what the UI is showing when this happens > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-10381) concurrent submit job get ProgramAbortException
Youjun Yuan created FLINK-10381: --- Summary: concurrent submit job get ProgramAbortException Key: FLINK-10381 URL: https://issues.apache.org/jira/browse/FLINK-10381 Project: Flink Issue Type: Bug Components: JobManager Affects Versions: 1.6.0, 1.5.1, 1.4.0 Environment: Flink 1.4.0, standardalone cluster. Reporter: Youjun Yuan Fix For: 1.7.0 Attachments: image-2018-09-20-22-40-31-846.png if submit multiple jobs concurrently, some the them are likely to fail, and return following exception: _java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Could not run the jar._ _at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)_ _at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler$$Lambda$47/713642705.get(Unknown Source)_ _at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1582)_ _at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)_ _at java.util.concurrent.FutureTask.run(FutureTask.java:266)_ _at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)_ _at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)_ _at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)_ _at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)_ _at java.lang.Thread.run(Thread.java:745)_ _Caused by: org.apache.flink.util.FlinkException: Could not run the jar. ... 10 more_ _Caused by: org.apache.flink.client.program.ProgramInvocationException: The program caused an error:_ _at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)_ _at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)_ _at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:76)_ _at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69) ... 9 more_ _Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException_ _at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:72)_ _..._ _at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)_ _at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)_ _at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)_ _at java.lang.reflect.Method.invoke(Method.java:497)_ _at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)_ _at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)_ _at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83_ h2. Possible Cause: in OptimizerPlanEnvironment.getOptimizerPlan(), setAsContext() will set a static variable named contextEnvironmentFactory in ExecutionEnviroment, which will eventually cause ExecutionEnviroment.getExecutionEnvironment() returns the currently OptimizerPlanEnvironment instance, and capture the optimizerPlan and save to a instance vairable in OptimizerPlanEnvironment. However, if multiple jobs are submitted at the same time, the static variable contextEnvironmentFactory in ExecutionEnvironment will be set again by a following job, hence force ExecutionEnviroment.getExecutionEnvironment() return another new instance of OptimizerPlanEnvironment, therefore, the first intance of OptimizerPlanEnvironment will not caputre the optimizerPlan, and throws ProgramInvocationException. The spot is copied below for you convience: setAsContext(); try { prog.invokeInteractiveModeForExecution(); } catch (ProgramInvocationException e) { throw e; } catch (Throwable t) { // the invocation gets aborted with the preview plan if (optimizerPlan != null) { return optimizerPlan; } else { throw new ProgramInvocationException("The program caused an error: ", t); } } -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Youjun Yuan updated FLINK-9630: --- Description: when the Kafka topic got deleted, during task starting process, Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in getAllPartitionsForTopics(), and it get no chance to close the kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). *this issue can bring down the whole Flink cluster*, because, in a default setup (fixedDelay with INT.MAX restart attempt), job manager will randomly schedule the job to any TaskManager that has free slot, and each attemp will cause the TaskManager to leak a TCP connection, eventually almost every TaskManager will run out of file handle, hence no taskmanger could make snapshot, or accept new job. Effectly stops the whole cluster. The leak happens when StreamTask.invoke() calls openAllOperators(), then FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), when kafkaConsumer.partitionsFor(topic) in KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a *TopicAuthorizationException,* no one catches this. Though StreamTask.open catches Exception and invoks the dispose() method of each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), however it does not close the kakfaConsumer in partitionDiscoverer, not even invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was null. below is the code of FlinkKakfaConsumerBase.cancel() for your convenience public void cancel() { // set ourselves as not running; // this would let the main discovery loop escape as soon as possible running = false; if (discoveryLoopThread != null) { if (partitionDiscoverer != null) { // we cannot close the discoverer here, as it is error-prone to concurrent access; // only wakeup the discoverer, the discovery loop will clean itself up after it escapes partitionDiscoverer.wakeup(); } // the discovery loop may currently be sleeping in-between // consecutive discoveries; interrupt to shutdown faster discoveryLoopThread.interrupt(); } // abort the fetcher, if there is one if (kafkaFetcher != null) { kafkaFetcher.cancel(); } } was: when the Kafka topic got deleted, during task starting process, Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in getAllPartitionsForTopics(), and it get no chance to close the kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). *this issue can bring down the whole Flink cluster*, because, in a default setup (fixedDelay with INT.MAX restart attempt), job manager will randomly schedule the job to any TaskManager that as free slot, and each attemp will cause the TaskManager to leak a TCP connection, eventually almost every TaskManager will run out of file handle, hence no taskmanger could make snaptshot, or accept new job. Effectly stops the whole cluster. The leak happens when StreamTask.invoke() calls openAllOperators(), then FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), when kafkaConsumer.partitionsFor(topic) in KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a *TopicAuthorizationException,* no one catches this. Though StreamTask.open catches Exception and invoks the dispose() method of each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), however it does not close the kakfaConsumer in partitionDiscoverer, not event invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was null. below the code of FlinkKakfaConsumerBase.cancel() for your convenience public void cancel() { // set ourselves as not running; // this would let the main discovery loop escape as soon as possible running = false; if (discoveryLoopThread != null) { if (partitionDiscoverer != null) { // we cannot close the discoverer here, as it is error-prone to concurrent access; // only wakeup the discoverer, the discovery loop will clean itself up after it escapes partitionDiscoverer.wakeup(); } // the discovery loop may currently be sleeping in-between // consecutive discoveries; interrupt to shutdown faster discoveryLoopThread.interrupt(); } // abort the fetcher, if there is one if (kafkaFetcher != null) { kafkaFetcher.cancel(); } } > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Kafka Connecto
[jira] [Updated] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Youjun Yuan updated FLINK-9630: --- Description: when the Kafka topic got deleted, during task starting process, Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in getAllPartitionsForTopics(), and it get no chance to close the kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). *this issue can bring down the whole Flink cluster*, because, in a default setup (fixedDelay with INT.MAX restart attempt), job manager will randomly schedule the job to any TaskManager that as free slot, and each attemp will cause the TaskManager to leak a TCP connection, eventually almost every TaskManager will run out of file handle, hence no taskmanger could make snaptshot, or accept new job. Effectly stops the whole cluster. The leak happens when StreamTask.invoke() calls openAllOperators(), then FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), when kafkaConsumer.partitionsFor(topic) in KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a *TopicAuthorizationException,* no one catches this. Though StreamTask.open catches Exception and invoks the dispose() method of each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), however it does not close the kakfaConsumer in partitionDiscoverer, not event invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was null. below the code of FlinkKakfaConsumerBase.cancel() for your convenience public void cancel() { // set ourselves as not running; // this would let the main discovery loop escape as soon as possible running = false; if (discoveryLoopThread != null) { if (partitionDiscoverer != null) { // we cannot close the discoverer here, as it is error-prone to concurrent access; // only wakeup the discoverer, the discovery loop will clean itself up after it escapes partitionDiscoverer.wakeup(); } // the discovery loop may currently be sleeping in-between // consecutive discoveries; interrupt to shutdown faster discoveryLoopThread.interrupt(); } // abort the fetcher, if there is one if (kafkaFetcher != null) { kafkaFetcher.cancel(); } } was: when the Kafka topic got deleted, during task starting process, Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in getAllPartitionsForTopics(), and it get no chance to close the kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). *this issue can bring down the whole Flink cluster*, because, in a default setup (fixedDelay with INT.MAX restart attempt), job manager will randomly schedule the job to any TaskManager that as free slot, and each attemp will cause the TaskManager to leak a TCP connection, eventually almost every TaskManager will run out of file handle, hence no taskmanger could make snaptshot, or accept new job. Effectly stops the whole cluster. The leak happens when StreamTask.invoke() calls openAllOperators(), then FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), when kafkaConsumer.partitionsFor(topic) in KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a *TopicAuthorizationException,* no one catches this. Though StreamTask.open catches Exception and invoks the dispose() method of each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), however it does not close the kakfaConsumer in partitionDiscoverer, not event invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was null. below the code of FlinkKakfaConsumerBase.cancel() for your convenience public void cancel() { // set ourselves as not running; // this would let the main discovery loop escape as soon as possible running = false; if (discoveryLoopThread != null) { if (partitionDiscoverer != null) { // we cannot close the discoverer here, as it is error-prone to concurrent access; // only wakeup the discoverer, the discovery loop will clean itself up after it escapes partitionDiscoverer.wakeup(); } // the discovery loop may currently be sleeping in-between // consecutive discoveries; interrupt to shutdown faster discoveryLoopThread.interrupt(); } // abort the fetcher, if there is one if (kafkaFetcher != null) { kafkaFetcher.cancel(); } } I tried to fix it by catching *TopicAuthorizationException* in ** Kafka09PartitionDiscoverer.getAllPartitionsForTopics(), and close the kafkaConsumer. Which has been verified working. So I'd like to take this issue. > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > ---
[jira] [Created] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
Youjun Yuan created FLINK-9630: -- Summary: Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException Key: FLINK-9630 URL: https://issues.apache.org/jira/browse/FLINK-9630 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.4.2, 1.5.0 Environment: Linux 2.6, java 8, Kafka broker 0.10.x Reporter: Youjun Yuan Fix For: 1.5.1 when the Kafka topic got deleted, during task starting process, Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in getAllPartitionsForTopics(), and it get no chance to close the kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). *this issue can bring down the whole Flink cluster*, because, in a default setup (fixedDelay with INT.MAX restart attempt), job manager will randomly schedule the job to any TaskManager that as free slot, and each attemp will cause the TaskManager to leak a TCP connection, eventually almost every TaskManager will run out of file handle, hence no taskmanger could make snaptshot, or accept new job. Effectly stops the whole cluster. The leak happens when StreamTask.invoke() calls openAllOperators(), then FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), when kafkaConsumer.partitionsFor(topic) in KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a *TopicAuthorizationException,* no one catches this. Though StreamTask.open catches Exception and invoks the dispose() method of each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), however it does not close the kakfaConsumer in partitionDiscoverer, not event invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was null. below the code of FlinkKakfaConsumerBase.cancel() for your convenience public void cancel() { // set ourselves as not running; // this would let the main discovery loop escape as soon as possible running = false; if (discoveryLoopThread != null) { if (partitionDiscoverer != null) { // we cannot close the discoverer here, as it is error-prone to concurrent access; // only wakeup the discoverer, the discovery loop will clean itself up after it escapes partitionDiscoverer.wakeup(); } // the discovery loop may currently be sleeping in-between // consecutive discoveries; interrupt to shutdown faster discoveryLoopThread.interrupt(); } // abort the fetcher, if there is one if (kafkaFetcher != null) { kafkaFetcher.cancel(); } } -- This message was sent by Atlassian JIRA (v7.6.3#76005)