[jira] [Commented] (FLINK-23874) JM did not store latest checkpiont id into Zookeeper, silently

2021-09-08 Thread Youjun Yuan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411798#comment-17411798
 ] 

Youjun Yuan commented on FLINK-23874:
-

thanks for the response.

Though I still don't understand why the chk id in zk wasn't updated for days.

The job was originally resumed from chk 258 on 14th Aug, then we tried to stop 
the job on 18th Aug, which caused the JM restarted. So the content in ZK should 
had been updated to ~chk 686, but on 19th Aug (when we realized the issue), I 
check the content in ZK, it's still chk 258.

> JM did not store latest checkpiont id into Zookeeper, silently
> --
>
> Key: FLINK-23874
> URL: https://issues.apache.org/jira/browse/FLINK-23874
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.1
>Reporter: Youjun Yuan
>Priority: Major
> Attachments: container_e04_1628083845581_0254_01_01_jm.log, 
> container_e04_1628083845581_0254_01_50_tm.log, 
> container_e04_1628083845581_0254_02_01_jm.log
>
>
> Job manager did not update the latest successful checkpoint id into zookeeper 
> (with ZK HA setup), at path /flink/\{app_id}/checkpoints/, when JM restart, 
> the job resumed from a very old position.
>  
> We had a job which was resumed from save point 258, after running for a few 
> days, the latest successful checkpoint was about chk 686. When something 
> trigged the JM to restart, it restored state to save point 258, instead of 
> chk 686.
> We checked zookeeper, indeed the stored checkpoint was still 258, which means 
> JM hasn't stored checkpoint id into zookeeper for few days, and without any 
> error message.
>  
> below are the relevant logs around the restart:
> {quote}
> {{2021-08-18 11:09:16,505 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed 
> checkpoint 686 for job  (228296 bytes in 827 
> ms).}}
> {quote}
>  
> {quote}2021-08-18 11:10:13,066 INFO 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation
>  [] - Finished restoring from state handle: 
> IncrementalRemoteKeyedStateHandle\{backendIdentifier=c11d290c-617b-4ea5-b7ed-4853272f32a3,
>  keyGroupRange=KeyGroupRange{startKeyGroup=47, endKeyGroup=48}, 
> checkpointId=258, sharedState={}, 
> privateState=\{OPTIONS-16=ByteStreamStateHandle{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/20f35556-5c60-4fca-908c-d05d641c2614',
>  dataBytes=15818}, 
> MANIFEST-06=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/3c8e1c2f-616d-4f18-8b07-4a818e3ca110',
>  dataBytes=336}, 
> CURRENT=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/1dc5f341-8a73-4e69-96fb-4b026653da6d',
>  dataBytes=16}}, 
> metaStateHandle=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//chk-258/0ef57eb3-0f38-45f5-8f3d-3e7b87f5fd15',
>  dataBytes=1704}, registered=false} without rescaling.
> {quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-23874) JM did not store latest checkpiont id into Zookeeper, silently

2021-09-07 Thread Youjun Yuan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411591#comment-17411591
 ] 

Youjun Yuan edited comment on FLINK-23874 at 9/8/21, 12:04 AM:
---

The first attempt was started by resuming from savepoint 258. But the second 
attempt was NOT started by me, actually I was canceling the job, but due to bug 
[20992|https://issues.apache.org/jira/browse/FLINK-20992] of Flink, at the end 
of shutting down the job, it hit a following error, so YARN restarted it:
{code:java}
RejectedExecutionException: Task 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@3460b405 
rejected from 
{code}
this log can be found at the end of JM log of attempt 1.

If you pay attention to the log timestamp, you can find that attempt 2 start 
immediately after attempt 1, just 2 seconds later.

 

 

 


was (Author: ubyyj):
The first attempt was started by resuming from savepoint 258. But the second 
attempt was NOT started by me, actually I was canceling the job, but due to a 
bug of Flink, at the end of shutting down the job, it hit a following error, so 
YARN restarted it:
{code:java}
RejectedExecutionException: Task 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@3460b405 
rejected from 
{code}
this log can be found at the end of JM log of attempt 1.

If you pay attention to the log timestamp, you can find that attempt 2 start 
immediately after attempt 1, just 2 seconds later.

 

 

 

> JM did not store latest checkpiont id into Zookeeper, silently
> --
>
> Key: FLINK-23874
> URL: https://issues.apache.org/jira/browse/FLINK-23874
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.1
>Reporter: Youjun Yuan
>Priority: Major
> Attachments: container_e04_1628083845581_0254_01_01_jm.log, 
> container_e04_1628083845581_0254_01_50_tm.log, 
> container_e04_1628083845581_0254_02_01_jm.log
>
>
> Job manager did not update the latest successful checkpoint id into zookeeper 
> (with ZK HA setup), at path /flink/\{app_id}/checkpoints/, when JM restart, 
> the job resumed from a very old position.
>  
> We had a job which was resumed from save point 258, after running for a few 
> days, the latest successful checkpoint was about chk 686. When something 
> trigged the JM to restart, it restored state to save point 258, instead of 
> chk 686.
> We checked zookeeper, indeed the stored checkpoint was still 258, which means 
> JM hasn't stored checkpoint id into zookeeper for few days, and without any 
> error message.
>  
> below are the relevant logs around the restart:
> {quote}
> {{2021-08-18 11:09:16,505 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed 
> checkpoint 686 for job  (228296 bytes in 827 
> ms).}}
> {quote}
>  
> {quote}2021-08-18 11:10:13,066 INFO 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation
>  [] - Finished restoring from state handle: 
> IncrementalRemoteKeyedStateHandle\{backendIdentifier=c11d290c-617b-4ea5-b7ed-4853272f32a3,
>  keyGroupRange=KeyGroupRange{startKeyGroup=47, endKeyGroup=48}, 
> checkpointId=258, sharedState={}, 
> privateState=\{OPTIONS-16=ByteStreamStateHandle{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/20f35556-5c60-4fca-908c-d05d641c2614',
>  dataBytes=15818}, 
> MANIFEST-06=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/3c8e1c2f-616d-4f18-8b07-4a818e3ca110',
>  dataBytes=336}, 
> CURRENT=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/1dc5f341-8a73-4e69-96fb-4b026653da6d',
>  dataBytes=16}}, 
> metaStateHandle=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//chk-258/0ef57eb3-0f38-45f5-8f3d-3e7b87f5fd15',
>  dataBytes=1704}, registered=false} without rescaling.
> {quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23874) JM did not store latest checkpiont id into Zookeeper, silently

2021-09-07 Thread Youjun Yuan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411591#comment-17411591
 ] 

Youjun Yuan commented on FLINK-23874:
-

The first attempt was started by resuming from savepoint 258. But the second 
attempt was NOT started by me, actually I was canceling the job, but due to a 
bug of Flink, at the end of shutting down the job, it hit a following error, so 
YARN restarted it:
{code:java}
RejectedExecutionException: Task 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@3460b405 
rejected from 
{code}
this log can be found at the end of JM log of attempt 1.

If you pay attention to the log timestamp, you can find that attempt 2 start 
immediately after attempt 1, just 2 seconds later.

 

 

 

> JM did not store latest checkpiont id into Zookeeper, silently
> --
>
> Key: FLINK-23874
> URL: https://issues.apache.org/jira/browse/FLINK-23874
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.1
>Reporter: Youjun Yuan
>Priority: Major
> Attachments: container_e04_1628083845581_0254_01_01_jm.log, 
> container_e04_1628083845581_0254_01_50_tm.log, 
> container_e04_1628083845581_0254_02_01_jm.log
>
>
> Job manager did not update the latest successful checkpoint id into zookeeper 
> (with ZK HA setup), at path /flink/\{app_id}/checkpoints/, when JM restart, 
> the job resumed from a very old position.
>  
> We had a job which was resumed from save point 258, after running for a few 
> days, the latest successful checkpoint was about chk 686. When something 
> trigged the JM to restart, it restored state to save point 258, instead of 
> chk 686.
> We checked zookeeper, indeed the stored checkpoint was still 258, which means 
> JM hasn't stored checkpoint id into zookeeper for few days, and without any 
> error message.
>  
> below are the relevant logs around the restart:
> {quote}
> {{2021-08-18 11:09:16,505 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed 
> checkpoint 686 for job  (228296 bytes in 827 
> ms).}}
> {quote}
>  
> {quote}2021-08-18 11:10:13,066 INFO 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation
>  [] - Finished restoring from state handle: 
> IncrementalRemoteKeyedStateHandle\{backendIdentifier=c11d290c-617b-4ea5-b7ed-4853272f32a3,
>  keyGroupRange=KeyGroupRange{startKeyGroup=47, endKeyGroup=48}, 
> checkpointId=258, sharedState={}, 
> privateState=\{OPTIONS-16=ByteStreamStateHandle{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/20f35556-5c60-4fca-908c-d05d641c2614',
>  dataBytes=15818}, 
> MANIFEST-06=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/3c8e1c2f-616d-4f18-8b07-4a818e3ca110',
>  dataBytes=336}, 
> CURRENT=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/1dc5f341-8a73-4e69-96fb-4b026653da6d',
>  dataBytes=16}}, 
> metaStateHandle=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//chk-258/0ef57eb3-0f38-45f5-8f3d-3e7b87f5fd15',
>  dataBytes=1704}, registered=false} without rescaling.
> {quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (FLINK-23874) JM did not store latest checkpiont id into Zookeeper, silently

2021-09-07 Thread Youjun Yuan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Youjun Yuan updated FLINK-23874:

Comment: was deleted

(was: One thing I don't understand, why the last checkpoint id stored in ZK 
wasn't updated for long time?)

> JM did not store latest checkpiont id into Zookeeper, silently
> --
>
> Key: FLINK-23874
> URL: https://issues.apache.org/jira/browse/FLINK-23874
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.1
>Reporter: Youjun Yuan
>Priority: Major
> Attachments: container_e04_1628083845581_0254_01_01_jm.log, 
> container_e04_1628083845581_0254_01_50_tm.log, 
> container_e04_1628083845581_0254_02_01_jm.log
>
>
> Job manager did not update the latest successful checkpoint id into zookeeper 
> (with ZK HA setup), at path /flink/\{app_id}/checkpoints/, when JM restart, 
> the job resumed from a very old position.
>  
> We had a job which was resumed from save point 258, after running for a few 
> days, the latest successful checkpoint was about chk 686. When something 
> trigged the JM to restart, it restored state to save point 258, instead of 
> chk 686.
> We checked zookeeper, indeed the stored checkpoint was still 258, which means 
> JM hasn't stored checkpoint id into zookeeper for few days, and without any 
> error message.
>  
> below are the relevant logs around the restart:
> {quote}
> {{2021-08-18 11:09:16,505 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed 
> checkpoint 686 for job  (228296 bytes in 827 
> ms).}}
> {quote}
>  
> {quote}2021-08-18 11:10:13,066 INFO 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation
>  [] - Finished restoring from state handle: 
> IncrementalRemoteKeyedStateHandle\{backendIdentifier=c11d290c-617b-4ea5-b7ed-4853272f32a3,
>  keyGroupRange=KeyGroupRange{startKeyGroup=47, endKeyGroup=48}, 
> checkpointId=258, sharedState={}, 
> privateState=\{OPTIONS-16=ByteStreamStateHandle{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/20f35556-5c60-4fca-908c-d05d641c2614',
>  dataBytes=15818}, 
> MANIFEST-06=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/3c8e1c2f-616d-4f18-8b07-4a818e3ca110',
>  dataBytes=336}, 
> CURRENT=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/1dc5f341-8a73-4e69-96fb-4b026653da6d',
>  dataBytes=16}}, 
> metaStateHandle=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//chk-258/0ef57eb3-0f38-45f5-8f3d-3e7b87f5fd15',
>  dataBytes=1704}, registered=false} without rescaling.
> {quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-23874) JM did not store latest checkpiont id into Zookeeper, silently

2021-09-07 Thread Youjun Yuan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411096#comment-17411096
 ] 

Youjun Yuan edited comment on FLINK-23874 at 9/7/21, 9:58 AM:
--

One thing I don't understand, why the last checkpoint id stored in ZK wasn't 
updated for long time?


was (Author: ubyyj):
One thing I don't understand, why the last checkpoint id store in ZK wasn't 
updated for long time?

> JM did not store latest checkpiont id into Zookeeper, silently
> --
>
> Key: FLINK-23874
> URL: https://issues.apache.org/jira/browse/FLINK-23874
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.1
>Reporter: Youjun Yuan
>Priority: Major
> Attachments: container_e04_1628083845581_0254_01_01_jm.log, 
> container_e04_1628083845581_0254_01_50_tm.log, 
> container_e04_1628083845581_0254_02_01_jm.log
>
>
> Job manager did not update the latest successful checkpoint id into zookeeper 
> (with ZK HA setup), at path /flink/\{app_id}/checkpoints/, when JM restart, 
> the job resumed from a very old position.
>  
> We had a job which was resumed from save point 258, after running for a few 
> days, the latest successful checkpoint was about chk 686. When something 
> trigged the JM to restart, it restored state to save point 258, instead of 
> chk 686.
> We checked zookeeper, indeed the stored checkpoint was still 258, which means 
> JM hasn't stored checkpoint id into zookeeper for few days, and without any 
> error message.
>  
> below are the relevant logs around the restart:
> {quote}
> {{2021-08-18 11:09:16,505 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed 
> checkpoint 686 for job  (228296 bytes in 827 
> ms).}}
> {quote}
>  
> {quote}2021-08-18 11:10:13,066 INFO 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation
>  [] - Finished restoring from state handle: 
> IncrementalRemoteKeyedStateHandle\{backendIdentifier=c11d290c-617b-4ea5-b7ed-4853272f32a3,
>  keyGroupRange=KeyGroupRange{startKeyGroup=47, endKeyGroup=48}, 
> checkpointId=258, sharedState={}, 
> privateState=\{OPTIONS-16=ByteStreamStateHandle{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/20f35556-5c60-4fca-908c-d05d641c2614',
>  dataBytes=15818}, 
> MANIFEST-06=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/3c8e1c2f-616d-4f18-8b07-4a818e3ca110',
>  dataBytes=336}, 
> CURRENT=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/1dc5f341-8a73-4e69-96fb-4b026653da6d',
>  dataBytes=16}}, 
> metaStateHandle=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//chk-258/0ef57eb3-0f38-45f5-8f3d-3e7b87f5fd15',
>  dataBytes=1704}, registered=false} without rescaling.
> {quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23874) JM did not store latest checkpiont id into Zookeeper, silently

2021-09-07 Thread Youjun Yuan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411096#comment-17411096
 ] 

Youjun Yuan commented on FLINK-23874:
-

One thing I don't understand, why the last checkpoint id store in ZK wasn't 
updated for long time?

> JM did not store latest checkpiont id into Zookeeper, silently
> --
>
> Key: FLINK-23874
> URL: https://issues.apache.org/jira/browse/FLINK-23874
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.1
>Reporter: Youjun Yuan
>Priority: Major
> Attachments: container_e04_1628083845581_0254_01_01_jm.log, 
> container_e04_1628083845581_0254_01_50_tm.log, 
> container_e04_1628083845581_0254_02_01_jm.log
>
>
> Job manager did not update the latest successful checkpoint id into zookeeper 
> (with ZK HA setup), at path /flink/\{app_id}/checkpoints/, when JM restart, 
> the job resumed from a very old position.
>  
> We had a job which was resumed from save point 258, after running for a few 
> days, the latest successful checkpoint was about chk 686. When something 
> trigged the JM to restart, it restored state to save point 258, instead of 
> chk 686.
> We checked zookeeper, indeed the stored checkpoint was still 258, which means 
> JM hasn't stored checkpoint id into zookeeper for few days, and without any 
> error message.
>  
> below are the relevant logs around the restart:
> {quote}
> {{2021-08-18 11:09:16,505 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed 
> checkpoint 686 for job  (228296 bytes in 827 
> ms).}}
> {quote}
>  
> {quote}2021-08-18 11:10:13,066 INFO 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation
>  [] - Finished restoring from state handle: 
> IncrementalRemoteKeyedStateHandle\{backendIdentifier=c11d290c-617b-4ea5-b7ed-4853272f32a3,
>  keyGroupRange=KeyGroupRange{startKeyGroup=47, endKeyGroup=48}, 
> checkpointId=258, sharedState={}, 
> privateState=\{OPTIONS-16=ByteStreamStateHandle{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/20f35556-5c60-4fca-908c-d05d641c2614',
>  dataBytes=15818}, 
> MANIFEST-06=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/3c8e1c2f-616d-4f18-8b07-4a818e3ca110',
>  dataBytes=336}, 
> CURRENT=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/1dc5f341-8a73-4e69-96fb-4b026653da6d',
>  dataBytes=16}}, 
> metaStateHandle=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//chk-258/0ef57eb3-0f38-45f5-8f3d-3e7b87f5fd15',
>  dataBytes=1704}, registered=false} without rescaling.
> {quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23874) JM did not store latest checkpiont id into Zookeeper, silently

2021-09-07 Thread Youjun Yuan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411095#comment-17411095
 ] 

Youjun Yuan commented on FLINK-23874:
-

I have attached the two JM's log (JM failed and restarted), and one of the TM's 
log.

[~trohrmann] you are right, the JM restarted when I tried to stop it (I believe 
this is another bug)

> JM did not store latest checkpiont id into Zookeeper, silently
> --
>
> Key: FLINK-23874
> URL: https://issues.apache.org/jira/browse/FLINK-23874
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.1
>Reporter: Youjun Yuan
>Priority: Major
> Attachments: container_e04_1628083845581_0254_01_01_jm.log, 
> container_e04_1628083845581_0254_01_50_tm.log, 
> container_e04_1628083845581_0254_02_01_jm.log
>
>
> Job manager did not update the latest successful checkpoint id into zookeeper 
> (with ZK HA setup), at path /flink/\{app_id}/checkpoints/, when JM restart, 
> the job resumed from a very old position.
>  
> We had a job which was resumed from save point 258, after running for a few 
> days, the latest successful checkpoint was about chk 686. When something 
> trigged the JM to restart, it restored state to save point 258, instead of 
> chk 686.
> We checked zookeeper, indeed the stored checkpoint was still 258, which means 
> JM hasn't stored checkpoint id into zookeeper for few days, and without any 
> error message.
>  
> below are the relevant logs around the restart:
> {quote}
> {{2021-08-18 11:09:16,505 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed 
> checkpoint 686 for job  (228296 bytes in 827 
> ms).}}
> {quote}
>  
> {quote}2021-08-18 11:10:13,066 INFO 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation
>  [] - Finished restoring from state handle: 
> IncrementalRemoteKeyedStateHandle\{backendIdentifier=c11d290c-617b-4ea5-b7ed-4853272f32a3,
>  keyGroupRange=KeyGroupRange{startKeyGroup=47, endKeyGroup=48}, 
> checkpointId=258, sharedState={}, 
> privateState=\{OPTIONS-16=ByteStreamStateHandle{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/20f35556-5c60-4fca-908c-d05d641c2614',
>  dataBytes=15818}, 
> MANIFEST-06=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/3c8e1c2f-616d-4f18-8b07-4a818e3ca110',
>  dataBytes=336}, 
> CURRENT=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/1dc5f341-8a73-4e69-96fb-4b026653da6d',
>  dataBytes=16}}, 
> metaStateHandle=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//chk-258/0ef57eb3-0f38-45f5-8f3d-3e7b87f5fd15',
>  dataBytes=1704}, registered=false} without rescaling.
> {quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23874) JM did not store latest checkpiont id into Zookeeper, silently

2021-09-07 Thread Youjun Yuan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Youjun Yuan updated FLINK-23874:

Attachment: container_e04_1628083845581_0254_02_01_jm.log
container_e04_1628083845581_0254_01_50_tm.log
container_e04_1628083845581_0254_01_01_jm.log

> JM did not store latest checkpiont id into Zookeeper, silently
> --
>
> Key: FLINK-23874
> URL: https://issues.apache.org/jira/browse/FLINK-23874
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.1
>Reporter: Youjun Yuan
>Priority: Major
> Attachments: container_e04_1628083845581_0254_01_01_jm.log, 
> container_e04_1628083845581_0254_01_50_tm.log, 
> container_e04_1628083845581_0254_02_01_jm.log
>
>
> Job manager did not update the latest successful checkpoint id into zookeeper 
> (with ZK HA setup), at path /flink/\{app_id}/checkpoints/, when JM restart, 
> the job resumed from a very old position.
>  
> We had a job which was resumed from save point 258, after running for a few 
> days, the latest successful checkpoint was about chk 686. When something 
> trigged the JM to restart, it restored state to save point 258, instead of 
> chk 686.
> We checked zookeeper, indeed the stored checkpoint was still 258, which means 
> JM hasn't stored checkpoint id into zookeeper for few days, and without any 
> error message.
>  
> below are the relevant logs around the restart:
> {quote}
> {{2021-08-18 11:09:16,505 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed 
> checkpoint 686 for job  (228296 bytes in 827 
> ms).}}
> {quote}
>  
> {quote}2021-08-18 11:10:13,066 INFO 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation
>  [] - Finished restoring from state handle: 
> IncrementalRemoteKeyedStateHandle\{backendIdentifier=c11d290c-617b-4ea5-b7ed-4853272f32a3,
>  keyGroupRange=KeyGroupRange{startKeyGroup=47, endKeyGroup=48}, 
> checkpointId=258, sharedState={}, 
> privateState=\{OPTIONS-16=ByteStreamStateHandle{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/20f35556-5c60-4fca-908c-d05d641c2614',
>  dataBytes=15818}, 
> MANIFEST-06=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/3c8e1c2f-616d-4f18-8b07-4a818e3ca110',
>  dataBytes=336}, 
> CURRENT=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/1dc5f341-8a73-4e69-96fb-4b026653da6d',
>  dataBytes=16}}, 
> metaStateHandle=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//chk-258/0ef57eb3-0f38-45f5-8f3d-3e7b87f5fd15',
>  dataBytes=1704}, registered=false} without rescaling.
> {quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24187) Could not commit s3 file after JM restart during state initialization

2021-09-07 Thread Youjun Yuan (Jira)
Youjun Yuan created FLINK-24187:
---

 Summary: Could not commit s3 file after JM restart during state 
initialization
 Key: FLINK-24187
 URL: https://issues.apache.org/jira/browse/FLINK-24187
 Project: Flink
  Issue Type: Bug
  Components: FileSystems
Affects Versions: 1.12.1
Reporter: Youjun Yuan


we have a SQL job which consumes from Kafka, and write hive table, data stored 
in S3.

One day the zookeeper leader failed over, caused Flink job restart. However the 
job got stuck during state restore, with the following error:
{code:java}
java.io.IOException: Could not commit file from 
s3://mybuck/hourly_account_activities/dt=2021-09-02/hh=21/activity_category=verification/.part-33ef16e7-55b7-4abb-9d97-0cdc7529509c-0-22371.inprogress.400506e4-23ea-428c-b8eb-9ff196eeca64
 to 
s3://mybuck/hourly_account_activities/dt=2021-09-02/hh=21/activity_category=verification/part-33ef16e7-55b7-4abb-9d97-0cdc7529509c-0-22371
 at 
org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.rename(HadoopRenameFileCommitter.java:104)
 ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.commitAfterRecovery(HadoopRenameFileCommitter.java:83)
 ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedPendingFile.commitAfterRecovery(HadoopPathBasedPartFileWriter.java:101)
 ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:466)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:192)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:179)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:163)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.(StreamingFileSinkHelper.java:75)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.table.filesystem.stream.AbstractStreamingWriter.initializeState(AbstractStreamingWriter.java:120)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.table.filesystem.stream.StreamingFileWriter.initializeState(StreamingFileWriter.java:55)
 ~[flink-table-blink_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
 ~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) 
~[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) 
[flink-dist_2.11-1.12.1.jar:1.12.1] at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) 
[flink-dist_2.11-1.12.1.jar:1.12.1] at java.lang.Thread.run(Thread.java:748) 
[?:1.8.0_242] Caused by: java.io.IOException: 
java.util.concurrent.CancellationException at 
com.amazon.ws.emr.hadoop.fs.s3.MultipartCopyManager.copy(MultipartCopyManager.java:171)
 ~[emrfs-hadoop-assembly-2.39.0.jar:?] at 
com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:466)
 ~[emrfs-hadoop-assembly-2.39.0.jar:?] at 
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1122)
 ~[emrfs-hadoop-assembly-2.39.0.jar:?] at 

[jira] [Created] (FLINK-23874) JM did not store latest checkpiont id into Zookeeper, silently

2021-08-19 Thread Youjun Yuan (Jira)
Youjun Yuan created FLINK-23874:
---

 Summary: JM did not store latest checkpiont id into Zookeeper, 
silently
 Key: FLINK-23874
 URL: https://issues.apache.org/jira/browse/FLINK-23874
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.12.1
Reporter: Youjun Yuan


Job manager did not update the latest successful checkpoint id into zookeeper 
(with ZK HA setup), at path /flink/\{app_id}/checkpoints/, when JM restart, the 
job resumed from a very old position.

 

We had a job which was resumed from save point 258, after running for a few 
days, the latest successful checkpoint was about chk 686. When something 
trigged the JM to restart, it restored state to save point 258, instead of chk 
686.

We checked zookeeper, indeed the stored checkpoint was still 258, which means 
JM hasn't stored checkpoint id into zookeeper for few days, and without any 
error message.

 

below are the relevant logs around the restart:
{quote}
{{2021-08-18 11:09:16,505 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed 
checkpoint 686 for job  (228296 bytes in 827 
ms).}}
{quote}
 
{quote}2021-08-18 11:10:13,066 INFO 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation
 [] - Finished restoring from state handle: 
IncrementalRemoteKeyedStateHandle\{backendIdentifier=c11d290c-617b-4ea5-b7ed-4853272f32a3,
 keyGroupRange=KeyGroupRange{startKeyGroup=47, endKeyGroup=48}, 
checkpointId=258, sharedState={}, 
privateState=\{OPTIONS-16=ByteStreamStateHandle{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/20f35556-5c60-4fca-908c-d05d641c2614',
 dataBytes=15818}, 
MANIFEST-06=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/3c8e1c2f-616d-4f18-8b07-4a818e3ca110',
 dataBytes=336}, 
CURRENT=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//shared/1dc5f341-8a73-4e69-96fb-4b026653da6d',
 dataBytes=16}}, 
metaStateHandle=ByteStreamStateHandle\{handleName='s3://dp-flink/prd/checkpoints/3beb4dd5-9008-4fd0-9910-f564759b466a/1628912020683//chk-258/0ef57eb3-0f38-45f5-8f3d-3e7b87f5fd15',
 dataBytes=1704}, registered=false} without rescaling.
{quote}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17170) Cannot stop streaming job with savepoint which uses kinesis consumer

2020-06-17 Thread Youjun Yuan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17138349#comment-17138349
 ] 

Youjun Yuan commented on FLINK-17170:
-

any update on this?

I hit the exactly same issue here, with 1.10.1.

BTW, call the savepoint rest api can trigger savepoint and as well cancel the 
job.

something like this:

curl -d '\{"target-directory": "s3://dp-flink/savepoints/","cancel-job":true}' 
:jobmanagerTracking-URL[/jobs/:jobid/savepoints|http://ip-10-0-102-40.ap-northeast-1.compute.internal:41581/jobs/6b1f193a0a2e98a6eabf6fe6d876c3bc/savepoints]

> Cannot stop streaming job with savepoint which uses kinesis consumer
> 
>
> Key: FLINK-17170
> URL: https://issues.apache.org/jira/browse/FLINK-17170
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, Connectors / Kinesis
>Affects Versions: 1.10.0
>Reporter: Vasii Cosmin Radu
>Priority: Major
> Attachments: Screenshot 2020-04-15 at 18.16.26.png, threaddump_tm1
>
>
> I am encountering a very strange situation where I can't stop with savepoint 
> a streaming job.
> The job reads from kinesis and sinks to S3, very simple job, no mapping 
> function, no watermarks, just source->sink. 
> Source is using flink-kinesis-consumer, sink is using StreamingFileSink. 
> Everything works fine, except stopping the job with savepoints.
> The behaviour happens only when multiple task managers are involved, having 
> sub-tasks off the job spread across multiple task manager instances. When a 
> single task manager has all the sub-tasks this issue never occurred.
> Using latest Flink 1.10.0 version, deployment done in HA mode (2 job 
> managers), in EC2, savepoints and checkpoints written on S3.
> When trying to stop, the savepoint is created correctly and appears on S3, 
> but not all sub-tasks are stopped. Some of them finished, but some just 
> remain hanged. Sometimes, on the same task manager part of the sub-tasks are 
> finished, part aren't.
> The logs don't show any errors. For the ones that succeed, the standard 
> messages appear, with "Source: <> switched from RUNNING to FINISHED".
> For the sub-tasks hanged the last message is 
> "org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - 
> Shutting down the shard consumer threads of subtask 0 ..." and that's it.
>  
> I tried using the cli (flink stop )
> Timeout Message:
> {code:java}
> root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop 
> cf43cecd9339e8f02a12333e52966a25
> root@ec2-XX-XX-XX-XX:/opt/flink/current/bin# ./flink stop 
> cf43cecd9339e8f02a12333e52966a25Suspending job 
> "cf43cecd9339e8f02a12333e52966a25" with a savepoint. 
>  The program 
> finished with the following exception: org.apache.flink.util.FlinkException: 
> Could not stop with a savepoint job "cf43cecd9339e8f02a12333e52966a25". at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462) 
> at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>  at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454) at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907) 
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) 
> at java.security.AccessController.doPrivileged(Native Method) at 
> javax.security.auth.Subject.doAs(Subject.java:422) at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)Caused 
> by: java.util.concurrent.TimeoutException at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784) 
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at 
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460) 
> ... 9 more{code}
>  
> Using the monitoring api, I keep getting infinite message when querying based 
> on the savepoint id, that the status id is still "IN_PROGRESS".
>  
> When performing a cancel instead of stop, it works. But cancel is deprecated, 
> so I am a bit concerned that this might fail also, maybe I was just lucky.
>  
> I attached a screenshot with what the UI is showing when this happens
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-10381) concurrent submit job get ProgramAbortException

2018-09-20 Thread Youjun Yuan (JIRA)
Youjun Yuan created FLINK-10381:
---

 Summary: concurrent submit job get ProgramAbortException
 Key: FLINK-10381
 URL: https://issues.apache.org/jira/browse/FLINK-10381
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.6.0, 1.5.1, 1.4.0
 Environment: Flink 1.4.0, standardalone cluster.
Reporter: Youjun Yuan
 Fix For: 1.7.0
 Attachments: image-2018-09-20-22-40-31-846.png

if submit multiple jobs concurrently, some the them are likely to fail, and 
return following exception: 

_java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkException: Could not run the jar._ 
_at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)_
 
_at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler$$Lambda$47/713642705.get(Unknown
 Source)_ 
_at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1582)_
 
_at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)_ 
_at java.util.concurrent.FutureTask.run(FutureTask.java:266)_ 
_at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)_
 
_at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)_
 
_at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)_
 
_at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)_
 
_at java.lang.Thread.run(Thread.java:745)_
_Caused by: org.apache.flink.util.FlinkException: Could not run the jar. ... 10 
more_

_Caused by: org.apache.flink.client.program.ProgramInvocationException: The 
program caused an error:_ 
_at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)_
 
_at 
org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)_
 
_at 
org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:76)_
 
_at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
 ... 9 more_

_Caused by: 
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException_ 
_at 
org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:72)_
 
_..._ 
_at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)_ 
_at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)_ 
_at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)_
 
_at java.lang.reflect.Method.invoke(Method.java:497)_ 
_at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)_
 
_at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)_
 
_at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83_

 
h2. Possible Cause:

in OptimizerPlanEnvironment.getOptimizerPlan(), setAsContext() will set a 
static variable named contextEnvironmentFactory in ExecutionEnviroment, which 
will eventually cause ExecutionEnviroment.getExecutionEnvironment() returns the 
currently OptimizerPlanEnvironment instance, and capture the optimizerPlan and 
save to a instance vairable in OptimizerPlanEnvironment.

However, if multiple jobs are submitted at the same time, the static variable 
contextEnvironmentFactory in ExecutionEnvironment will be set again by a 
following job, hence force ExecutionEnviroment.getExecutionEnvironment() return 
another new instance of OptimizerPlanEnvironment, therefore, the first intance 
of OptimizerPlanEnvironment will not caputre the optimizerPlan, and throws 
ProgramInvocationException. The spot is copied below for you convience:

setAsContext();
 try {
 prog.invokeInteractiveModeForExecution();
 }
 catch (ProgramInvocationException e) {
 throw e;
 }
 catch (Throwable t) {
 // the invocation gets aborted with the preview plan
 if (optimizerPlan != null) {
 return optimizerPlan;
 } else {
 throw new ProgramInvocationException("The program caused an error: ", t);
 }
 }



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException

2018-06-21 Thread Youjun Yuan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Youjun Yuan updated FLINK-9630:
---
Description: 
when the Kafka topic got deleted, during task starting process, 
Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in 
getAllPartitionsForTopics(), and it get no chance to close the  kafkaConsumer, 
hence resulting TCP connection leak (to Kafka broker).

 

*this issue can bring down the whole Flink cluster*, because, in a default 
setup (fixedDelay with INT.MAX restart attempt), job manager will randomly 
schedule the job to any TaskManager that has free slot, and each attemp will 
cause the TaskManager to leak a TCP connection, eventually almost every 
TaskManager will run out of file handle, hence no taskmanger could make 
snapshot, or accept new job. Effectly stops the whole cluster.

 

The leak happens when StreamTask.invoke() calls openAllOperators(), then 
FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), 
when kafkaConsumer.partitionsFor(topic) in 
KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a 
*TopicAuthorizationException,* no one catches this.

Though StreamTask.open catches Exception and invoks the dispose() method of 
each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), however 
it does not close the kakfaConsumer in partitionDiscoverer, not even invoke the 
partitionDiscoverer.wakeup(), because the discoveryLoopThread was null.

 

below is the code of FlinkKakfaConsumerBase.cancel() for your convenience

public void cancel() {
     // set ourselves as not running;
     // this would let the main discovery loop escape as soon as possible
     running = false;

    if (discoveryLoopThread != null) {

        if (partitionDiscoverer != null)

{             // we cannot close the discoverer here, as it is error-prone to 
concurrent access;             // only wakeup the discoverer, the discovery 
loop will clean itself up after it escapes             
partitionDiscoverer.wakeup();         }

    // the discovery loop may currently be sleeping in-between
     // consecutive discoveries; interrupt to shutdown faster
     discoveryLoopThread.interrupt();
     }

    // abort the fetcher, if there is one
     if (kafkaFetcher != null)

{          kafkaFetcher.cancel();     }

}

 

 

  was:
when the Kafka topic got deleted, during task starting process, 
Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in 
getAllPartitionsForTopics(), and it get no chance to close the  kafkaConsumer, 
hence resulting TCP connection leak (to Kafka broker).

 

*this issue can bring down the whole Flink cluster*, because, in a default 
setup (fixedDelay with INT.MAX restart attempt), job manager will randomly 
schedule the job to any TaskManager that as free slot, and each attemp will 
cause the TaskManager to leak a TCP connection, eventually almost every 
TaskManager will run out of file handle, hence no taskmanger could make 
snaptshot, or accept new job. Effectly stops the whole cluster.

 

The leak happens when StreamTask.invoke() calls openAllOperators(), then 
FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), 
when kafkaConsumer.partitionsFor(topic) in 
KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a 
*TopicAuthorizationException,* no one catches this.

Though StreamTask.open catches Exception and invoks the dispose() method of 
each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), however 
it does not close the kakfaConsumer in partitionDiscoverer, not event invoke 
the partitionDiscoverer.wakeup(), because the discoveryLoopThread was null.

 

below the code of FlinkKakfaConsumerBase.cancel() for your convenience

public void cancel() {
     // set ourselves as not running;
     // this would let the main discovery loop escape as soon as possible
     running = false;

    if (discoveryLoopThread != null) {

        if (partitionDiscoverer != null)

{             // we cannot close the discoverer here, as it is error-prone to 
concurrent access;             // only wakeup the discoverer, the discovery 
loop will clean itself up after it escapes             
partitionDiscoverer.wakeup();         }

    // the discovery loop may currently be sleeping in-between
     // consecutive discoveries; interrupt to shutdown faster
     discoveryLoopThread.interrupt();
     }

    // abort the fetcher, if there is one
     if (kafkaFetcher != null)

{          kafkaFetcher.cancel();     }

}

 

 


> Kafka09PartitionDiscoverer cause connection leak on 
> TopicAuthorizationException
> ---
>
> Key: FLINK-9630
> URL: https://issues.apache.org/jira/browse/FLINK-9630
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka 

[jira] [Updated] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException

2018-06-20 Thread Youjun Yuan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Youjun Yuan updated FLINK-9630:
---
Description: 
when the Kafka topic got deleted, during task starting process, 
Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in 
getAllPartitionsForTopics(), and it get no chance to close the  kafkaConsumer, 
hence resulting TCP connection leak (to Kafka broker).

 

*this issue can bring down the whole Flink cluster*, because, in a default 
setup (fixedDelay with INT.MAX restart attempt), job manager will randomly 
schedule the job to any TaskManager that as free slot, and each attemp will 
cause the TaskManager to leak a TCP connection, eventually almost every 
TaskManager will run out of file handle, hence no taskmanger could make 
snaptshot, or accept new job. Effectly stops the whole cluster.

 

The leak happens when StreamTask.invoke() calls openAllOperators(), then 
FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), 
when kafkaConsumer.partitionsFor(topic) in 
KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a 
*TopicAuthorizationException,* no one catches this.

Though StreamTask.open catches Exception and invoks the dispose() method of 
each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), however 
it does not close the kakfaConsumer in partitionDiscoverer, not event invoke 
the partitionDiscoverer.wakeup(), because the discoveryLoopThread was null.

 

below the code of FlinkKakfaConsumerBase.cancel() for your convenience

public void cancel() {
     // set ourselves as not running;
     // this would let the main discovery loop escape as soon as possible
     running = false;

    if (discoveryLoopThread != null) {

        if (partitionDiscoverer != null)

{             // we cannot close the discoverer here, as it is error-prone to 
concurrent access;             // only wakeup the discoverer, the discovery 
loop will clean itself up after it escapes             
partitionDiscoverer.wakeup();         }

    // the discovery loop may currently be sleeping in-between
     // consecutive discoveries; interrupt to shutdown faster
     discoveryLoopThread.interrupt();
     }

    // abort the fetcher, if there is one
     if (kafkaFetcher != null)

{          kafkaFetcher.cancel();     }

}

 

 

  was:
when the Kafka topic got deleted, during task starting process, 
Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in 
getAllPartitionsForTopics(), and it get no chance to close the  kafkaConsumer, 
hence resulting TCP connection leak (to Kafka broker).

 

*this issue can bring down the whole Flink cluster*, because, in a default 
setup (fixedDelay with INT.MAX restart attempt), job manager will randomly 
schedule the job to any TaskManager that as free slot, and each attemp will 
cause the TaskManager to leak a TCP connection, eventually almost every 
TaskManager will run out of file handle, hence no taskmanger could make 
snaptshot, or accept new job. Effectly stops the whole cluster.

 

The leak happens when StreamTask.invoke() calls openAllOperators(), then 
FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), 
when kafkaConsumer.partitionsFor(topic) in 
KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a 
*TopicAuthorizationException,* no one catches this.

Though StreamTask.open catches Exception and invoks the dispose() method of 
each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), however 
it does not close the kakfaConsumer in partitionDiscoverer, not event invoke 
the partitionDiscoverer.wakeup(), because the discoveryLoopThread was null.

 

below the code of FlinkKakfaConsumerBase.cancel() for your convenience

public void cancel() {
    // set ourselves as not running;
    // this would let the main discovery loop escape as soon as possible
    running = false;

    if (discoveryLoopThread != null) {

        if (partitionDiscoverer != null) {
            // we cannot close the discoverer here, as it is error-prone to 
concurrent access;
            // only wakeup the discoverer, the discovery loop will clean itself 
up after it escapes
            partitionDiscoverer.wakeup();
        }

    // the discovery loop may currently be sleeping in-between
    // consecutive discoveries; interrupt to shutdown faster
    discoveryLoopThread.interrupt();
    }

    // abort the fetcher, if there is one
    if (kafkaFetcher != null) {
         kafkaFetcher.cancel();
    }
}


I tried to fix it by catching *TopicAuthorizationException* in ** 
Kafka09PartitionDiscoverer.getAllPartitionsForTopics(), and close the 
kafkaConsumer. Which has been verified working.

So I'd like to take this issue.

> Kafka09PartitionDiscoverer cause connection leak on 
> TopicAuthorizationException
> ---

[jira] [Created] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException

2018-06-20 Thread Youjun Yuan (JIRA)
Youjun Yuan created FLINK-9630:
--

 Summary: Kafka09PartitionDiscoverer cause connection leak on 
TopicAuthorizationException
 Key: FLINK-9630
 URL: https://issues.apache.org/jira/browse/FLINK-9630
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.4.2, 1.5.0
 Environment: Linux 2.6, java 8, Kafka broker 0.10.x
Reporter: Youjun Yuan
 Fix For: 1.5.1


when the Kafka topic got deleted, during task starting process, 
Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in 
getAllPartitionsForTopics(), and it get no chance to close the  kafkaConsumer, 
hence resulting TCP connection leak (to Kafka broker).

 

*this issue can bring down the whole Flink cluster*, because, in a default 
setup (fixedDelay with INT.MAX restart attempt), job manager will randomly 
schedule the job to any TaskManager that as free slot, and each attemp will 
cause the TaskManager to leak a TCP connection, eventually almost every 
TaskManager will run out of file handle, hence no taskmanger could make 
snaptshot, or accept new job. Effectly stops the whole cluster.

 

The leak happens when StreamTask.invoke() calls openAllOperators(), then 
FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), 
when kafkaConsumer.partitionsFor(topic) in 
KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a 
*TopicAuthorizationException,* no one catches this.

Though StreamTask.open catches Exception and invoks the dispose() method of 
each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), however 
it does not close the kakfaConsumer in partitionDiscoverer, not event invoke 
the partitionDiscoverer.wakeup(), because the discoveryLoopThread was null.

 

below the code of FlinkKakfaConsumerBase.cancel() for your convenience

public void cancel() {
    // set ourselves as not running;
    // this would let the main discovery loop escape as soon as possible
    running = false;

    if (discoveryLoopThread != null) {

        if (partitionDiscoverer != null) {
            // we cannot close the discoverer here, as it is error-prone to 
concurrent access;
            // only wakeup the discoverer, the discovery loop will clean itself 
up after it escapes
            partitionDiscoverer.wakeup();
        }

    // the discovery loop may currently be sleeping in-between
    // consecutive discoveries; interrupt to shutdown faster
    discoveryLoopThread.interrupt();
    }

    // abort the fetcher, if there is one
    if (kafkaFetcher != null) {
         kafkaFetcher.cancel();
    }
}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)