[jira] [Updated] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning

2020-01-10 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-15152:
---
Labels: checkpoint pull-request-available scheduler  (was: checkpoint 
scheduler)

> Job running without periodic checkpoint for stop failed at the beginning
> 
>
> Key: FLINK-15152
> URL: https://issues.apache.org/jira/browse/FLINK-15152
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.9.1
>Reporter: Feng Jiajie
>Assignee: Congxian Qiu(klion26)
>Priority: Critical
>  Labels: checkpoint, pull-request-available, scheduler
> Fix For: 1.10.0
>
>
> I have a streaming job configured with periodically checkpoint, but after one 
> week running, I found there isn't any checkpoint file.
> h2. Reproduce the problem:
> 1. Job was submitted to YARN:
> {code:java}
> bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
> flink-example-1.0-SNAPSHOT.jar{code}
> 2. Then immediately, before all the task switch to RUNNING (about seconds), 
> I(actually a job control script) send a "stop with savepoint" command by 
> flink cli:
> {code:java}
> bin/flink stop -yid application_1575872737452_0019 
> f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
> {code}
> log in jobmanager.log:
> {code:java}
> 2019-12-09 17:56:56,512 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
> triggering task Source: Socket Stream -> Map (1/1) of job 
> f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED 
> instead. Aborting checkpoint.
> {code}
> Then the job task(taskmanager) *continues to run normally without* checkpoint.
> h2. The cause of the problem:
> 1. "stop with savepoint" command call the code 
> stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612)
>  and then triggerSynchronousSavepoint:
> {code:java}
> // we stop the checkpoint coordinator so that we are guaranteed
> // to have only the data of the synchronous savepoint committed.
> // in case of failure, and if the job restarts, the coordinator
> // will be restarted by the CheckpointCoordinatorDeActivator.
> checkpointCoordinator.stopCheckpointScheduler();{code}
> 2. but "before all the task switch to RUNNING", triggerSynchronousSavepoint 
> failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509
> {code:java}
> LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} 
> instead. Aborting checkpoint.",
>   tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
>   job,
>   ExecutionState.RUNNING,
>   ee.getState());
> throw new 
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code}
> 3. finally, "stop with savepoint" failed, with 
> "checkpointCoordinator.stopCheckpointScheduler()" but without the termination 
> of the job. The job is still running without periodically checkpoint. 
>  
> sample code for reproduce:
> {code:java}
> public class StreamingJob {
>   private static StateBackend makeRocksdbBackend() throws IOException {
> RocksDBStateBackend rocksdbBackend = new 
> RocksDBStateBackend("file:///tmp/aaa");
> rocksdbBackend.enableTtlCompactionFilter();
> 
> rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
> return rocksdbBackend;
>   }
>   public static void main(String[] args) throws Exception {
> // set up the streaming execution environment
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 10 sec
> env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE);
> env.setStateBackend(makeRocksdbBackend());
> env.setRestartStrategy(RestartStrategies.noRestart());
> CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> checkpointConfig.enableExternalizedCheckpoints(
> 
> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> checkpointConfig.setFailOnCheckpointingErrors(true);
> DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n");
> text.map(new MapFunction>() {
>   @Override
>   public Tuple2 map(String s) {
> String[] s1 = s.split(" ");
> return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1]));
>   }
> }).keyBy(0).flatMap(new CountWindowAverage()).print();
> env.execute("Flink Streaming Java API Skeleton");
>   }
>   public static class CountWindowAverage extends 
> RichFlatMapFunction, Tuple2> {
> private transient ValueState> sum;
> @Override
> public void flatMap(Tuple2 input, Collector Long>> out) throws Exception {
>   Tuple2 currentSum = sum.value();
>   currentSum.f0 += 1;
>   currentSum.f1 += input.f1;
>  

[jira] [Updated] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning

2020-01-08 Thread Yu Li (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Li updated FLINK-15152:
--
Fix Version/s: 1.10.0

> Job running without periodic checkpoint for stop failed at the beginning
> 
>
> Key: FLINK-15152
> URL: https://issues.apache.org/jira/browse/FLINK-15152
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.9.1
>Reporter: Feng Jiajie
>Assignee: Congxian Qiu(klion26)
>Priority: Critical
>  Labels: checkpoint, scheduler
> Fix For: 1.10.0
>
>
> I have a streaming job configured with periodically checkpoint, but after one 
> week running, I found there isn't any checkpoint file.
> h2. Reproduce the problem:
> 1. Job was submitted to YARN:
> {code:java}
> bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
> flink-example-1.0-SNAPSHOT.jar{code}
> 2. Then immediately, before all the task switch to RUNNING (about seconds), 
> I(actually a job control script) send a "stop with savepoint" command by 
> flink cli:
> {code:java}
> bin/flink stop -yid application_1575872737452_0019 
> f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
> {code}
> log in jobmanager.log:
> {code:java}
> 2019-12-09 17:56:56,512 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
> triggering task Source: Socket Stream -> Map (1/1) of job 
> f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED 
> instead. Aborting checkpoint.
> {code}
> Then the job task(taskmanager) *continues to run normally without* checkpoint.
> h2. The cause of the problem:
> 1. "stop with savepoint" command call the code 
> stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612)
>  and then triggerSynchronousSavepoint:
> {code:java}
> // we stop the checkpoint coordinator so that we are guaranteed
> // to have only the data of the synchronous savepoint committed.
> // in case of failure, and if the job restarts, the coordinator
> // will be restarted by the CheckpointCoordinatorDeActivator.
> checkpointCoordinator.stopCheckpointScheduler();{code}
> 2. but "before all the task switch to RUNNING", triggerSynchronousSavepoint 
> failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509
> {code:java}
> LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} 
> instead. Aborting checkpoint.",
>   tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
>   job,
>   ExecutionState.RUNNING,
>   ee.getState());
> throw new 
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code}
> 3. finally, "stop with savepoint" failed, with 
> "checkpointCoordinator.stopCheckpointScheduler()" but without the termination 
> of the job. The job is still running without periodically checkpoint. 
>  
> sample code for reproduce:
> {code:java}
> public class StreamingJob {
>   private static StateBackend makeRocksdbBackend() throws IOException {
> RocksDBStateBackend rocksdbBackend = new 
> RocksDBStateBackend("file:///tmp/aaa");
> rocksdbBackend.enableTtlCompactionFilter();
> 
> rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
> return rocksdbBackend;
>   }
>   public static void main(String[] args) throws Exception {
> // set up the streaming execution environment
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 10 sec
> env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE);
> env.setStateBackend(makeRocksdbBackend());
> env.setRestartStrategy(RestartStrategies.noRestart());
> CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> checkpointConfig.enableExternalizedCheckpoints(
> 
> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> checkpointConfig.setFailOnCheckpointingErrors(true);
> DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n");
> text.map(new MapFunction>() {
>   @Override
>   public Tuple2 map(String s) {
> String[] s1 = s.split(" ");
> return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1]));
>   }
> }).keyBy(0).flatMap(new CountWindowAverage()).print();
> env.execute("Flink Streaming Java API Skeleton");
>   }
>   public static class CountWindowAverage extends 
> RichFlatMapFunction, Tuple2> {
> private transient ValueState> sum;
> @Override
> public void flatMap(Tuple2 input, Collector Long>> out) throws Exception {
>   Tuple2 currentSum = sum.value();
>   currentSum.f0 += 1;
>   currentSum.f1 += input.f1;
>   sum.update(currentSum);
>   out.collect(new Tuple2<>(input.f0, currentSum.f1));
> }
> 

[jira] [Updated] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning

2019-12-19 Thread Feng Jiajie (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jiajie updated FLINK-15152:

Priority: Critical  (was: Major)

> Job running without periodic checkpoint for stop failed at the beginning
> 
>
> Key: FLINK-15152
> URL: https://issues.apache.org/jira/browse/FLINK-15152
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.9.1
>Reporter: Feng Jiajie
>Priority: Critical
>  Labels: checkpoint, scheduler
>
> I have a streaming job configured with periodically checkpoint, but after one 
> week running, I found there isn't any checkpoint file.
> h2. Reproduce the problem:
> 1. Job was submitted to YARN:
> {code:java}
> bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
> flink-example-1.0-SNAPSHOT.jar{code}
> 2. Then immediately, before all the task switch to RUNNING (about seconds), 
> I(actually a job control script) send a "stop with savepoint" command by 
> flink cli:
> {code:java}
> bin/flink stop -yid application_1575872737452_0019 
> f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
> {code}
> log in jobmanager.log:
> {code:java}
> 2019-12-09 17:56:56,512 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
> triggering task Source: Socket Stream -> Map (1/1) of job 
> f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED 
> instead. Aborting checkpoint.
> {code}
> Then the job task(taskmanager) *continues to run normally without* checkpoint.
> h2. The cause of the problem:
> 1. "stop with savepoint" command call the code 
> stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612)
>  and then triggerSynchronousSavepoint:
> {code:java}
> // we stop the checkpoint coordinator so that we are guaranteed
> // to have only the data of the synchronous savepoint committed.
> // in case of failure, and if the job restarts, the coordinator
> // will be restarted by the CheckpointCoordinatorDeActivator.
> checkpointCoordinator.stopCheckpointScheduler();{code}
> 2. but "before all the task switch to RUNNING", triggerSynchronousSavepoint 
> failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509
> {code:java}
> LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} 
> instead. Aborting checkpoint.",
>   tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
>   job,
>   ExecutionState.RUNNING,
>   ee.getState());
> throw new 
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code}
> 3. finally, "stop with savepoint" failed, with 
> "checkpointCoordinator.stopCheckpointScheduler()" but without the termination 
> of the job. The job is still running without periodically checkpoint. 
>  
> sample code for reproduce:
> {code:java}
> public class StreamingJob {
>   private static StateBackend makeRocksdbBackend() throws IOException {
> RocksDBStateBackend rocksdbBackend = new 
> RocksDBStateBackend("file:///tmp/aaa");
> rocksdbBackend.enableTtlCompactionFilter();
> 
> rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
> return rocksdbBackend;
>   }
>   public static void main(String[] args) throws Exception {
> // set up the streaming execution environment
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 10 sec
> env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE);
> env.setStateBackend(makeRocksdbBackend());
> env.setRestartStrategy(RestartStrategies.noRestart());
> CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> checkpointConfig.enableExternalizedCheckpoints(
> 
> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> checkpointConfig.setFailOnCheckpointingErrors(true);
> DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n");
> text.map(new MapFunction>() {
>   @Override
>   public Tuple2 map(String s) {
> String[] s1 = s.split(" ");
> return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1]));
>   }
> }).keyBy(0).flatMap(new CountWindowAverage()).print();
> env.execute("Flink Streaming Java API Skeleton");
>   }
>   public static class CountWindowAverage extends 
> RichFlatMapFunction, Tuple2> {
> private transient ValueState> sum;
> @Override
> public void flatMap(Tuple2 input, Collector Long>> out) throws Exception {
>   Tuple2 currentSum = sum.value();
>   currentSum.f0 += 1;
>   currentSum.f1 += input.f1;
>   sum.update(currentSum);
>   out.collect(new Tuple2<>(input.f0, currentSum.f1));
> }
> @Override
> public void open(Configuration config) {

[jira] [Updated] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning

2019-12-11 Thread Feng Jiajie (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jiajie updated FLINK-15152:

Labels: checkpoint scheduler  (was: checkpoint)

> Job running without periodic checkpoint for stop failed at the beginning
> 
>
> Key: FLINK-15152
> URL: https://issues.apache.org/jira/browse/FLINK-15152
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.9.1
>Reporter: Feng Jiajie
>Priority: Major
>  Labels: checkpoint, scheduler
>
> I have a streaming job configured with periodically checkpoint, but after one 
> week running, I found there isn't any checkpoint file.
> h2. Reproduce the problem:
> 1. Job was submitted to YARN:
> {code:java}
> bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
> flink-example-1.0-SNAPSHOT.jar{code}
> 2. Then immediately, before all the task switch to RUNNING (about seconds), 
> I(actually a job control script) send a "stop with savepoint" command by 
> flink cli:
> {code:java}
> bin/flink stop -yid application_1575872737452_0019 
> f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
> {code}
> log in jobmanager.log:
> {code:java}
> 2019-12-09 17:56:56,512 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
> triggering task Source: Socket Stream -> Map (1/1) of job 
> f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED 
> instead. Aborting checkpoint.
> {code}
> Then the job task(taskmanager) *continues to run normally without* checkpoint.
> h2. The cause of the problem:
> 1. "stop with savepoint" command call the code 
> stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612)
>  and then triggerSynchronousSavepoint:
> {code:java}
> // we stop the checkpoint coordinator so that we are guaranteed
> // to have only the data of the synchronous savepoint committed.
> // in case of failure, and if the job restarts, the coordinator
> // will be restarted by the CheckpointCoordinatorDeActivator.
> checkpointCoordinator.stopCheckpointScheduler();{code}
> 2. but "before all the task switch to RUNNING", triggerSynchronousSavepoint 
> failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509
> {code:java}
> LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} 
> instead. Aborting checkpoint.",
>   tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
>   job,
>   ExecutionState.RUNNING,
>   ee.getState());
> throw new 
> CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code}
> 3. finally, "stop with savepoint" failed, with 
> "checkpointCoordinator.stopCheckpointScheduler()" but without the termination 
> of the job. The job is still running without periodically checkpoint. 
>  
> sample code for reproduce:
> {code:java}
> public class StreamingJob {
>   private static StateBackend makeRocksdbBackend() throws IOException {
> RocksDBStateBackend rocksdbBackend = new 
> RocksDBStateBackend("file:///tmp/aaa");
> rocksdbBackend.enableTtlCompactionFilter();
> 
> rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
> return rocksdbBackend;
>   }
>   public static void main(String[] args) throws Exception {
> // set up the streaming execution environment
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // 10 sec
> env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE);
> env.setStateBackend(makeRocksdbBackend());
> env.setRestartStrategy(RestartStrategies.noRestart());
> CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> checkpointConfig.enableExternalizedCheckpoints(
> 
> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> checkpointConfig.setFailOnCheckpointingErrors(true);
> DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n");
> text.map(new MapFunction>() {
>   @Override
>   public Tuple2 map(String s) {
> String[] s1 = s.split(" ");
> return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1]));
>   }
> }).keyBy(0).flatMap(new CountWindowAverage()).print();
> env.execute("Flink Streaming Java API Skeleton");
>   }
>   public static class CountWindowAverage extends 
> RichFlatMapFunction, Tuple2> {
> private transient ValueState> sum;
> @Override
> public void flatMap(Tuple2 input, Collector Long>> out) throws Exception {
>   Tuple2 currentSum = sum.value();
>   currentSum.f0 += 1;
>   currentSum.f1 += input.f1;
>   sum.update(currentSum);
>   out.collect(new Tuple2<>(input.f0, currentSum.f1));
> }
> @Override
> public void 

[jira] [Updated] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning

2019-12-09 Thread Feng Jiajie (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jiajie updated FLINK-15152:

Description: 
I have a streaming job configured with periodically checkpoint, but after one 
week running, I found there isn't any checkpoint file.
h2. Reproduce the problem:

1. Job was submitted to YARN:
{code:java}
bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
flink-example-1.0-SNAPSHOT.jar{code}
2. Then immediately, before all the task switch to RUNNING (about seconds), 
I(actually a job control script) send a "stop with savepoint" command by flink 
cli:
{code:java}
bin/flink stop -yid application_1575872737452_0019 
f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
{code}
log in jobmanager.log:
{code:java}
2019-12-09 17:56:56,512 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
triggering task Source: Socket Stream -> Map (1/1) of job 
f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.
{code}
Then the job task(taskmanager) *continues to run normally without* checkpoint.
h2. The cause of the problem:

1. "stop with savepoint" command call the code 
stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612)
 and then triggerSynchronousSavepoint:
{code:java}
// we stop the checkpoint coordinator so that we are guaranteed
// to have only the data of the synchronous savepoint committed.
// in case of failure, and if the job restarts, the coordinator
// will be restarted by the CheckpointCoordinatorDeActivator.
checkpointCoordinator.stopCheckpointScheduler();{code}
2. but "before all the task switch to RUNNING", triggerSynchronousSavepoint 
failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509
{code:java}
LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} 
instead. Aborting checkpoint.",
  tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
  job,
  ExecutionState.RUNNING,
  ee.getState());
throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code}
3. finally, "stop with savepoint" failed, with 
"checkpointCoordinator.stopCheckpointScheduler()" but without the termination 
of the job. The job is still running without periodically checkpoint. 

 

sample code for reproduce:
{code:java}
public class StreamingJob {

  private static StateBackend makeRocksdbBackend() throws IOException {
RocksDBStateBackend rocksdbBackend = new 
RocksDBStateBackend("file:///tmp/aaa");
rocksdbBackend.enableTtlCompactionFilter();

rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
return rocksdbBackend;
  }

  public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

// 10 sec
env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE);
env.setStateBackend(makeRocksdbBackend());
env.setRestartStrategy(RestartStrategies.noRestart());

CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
checkpointConfig.setFailOnCheckpointingErrors(true);

DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n");
text.map(new MapFunction>() {
  @Override
  public Tuple2 map(String s) {
String[] s1 = s.split(" ");
return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1]));
  }
}).keyBy(0).flatMap(new CountWindowAverage()).print();

env.execute("Flink Streaming Java API Skeleton");
  }

  public static class CountWindowAverage extends 
RichFlatMapFunction, Tuple2> {

private transient ValueState> sum;

@Override
public void flatMap(Tuple2 input, Collector> 
out) throws Exception {
  Tuple2 currentSum = sum.value();
  currentSum.f0 += 1;
  currentSum.f1 += input.f1;
  sum.update(currentSum);
  out.collect(new Tuple2<>(input.f0, currentSum.f1));
}

@Override
public void open(Configuration config) {
  ValueStateDescriptor> descriptor =
  new ValueStateDescriptor<>(
  "average", // the state name
  TypeInformation.of(new TypeHint>() {
  }), // type information
  Tuple2.of(0L, 0L)); // default value of the state, if nothing was 
set
  sum = getRuntimeContext().getState(descriptor);
}
  }
}
{code}

  was:
I have a streaming job configured with periodically checkpoint, but after one 
week running, I found there isn't any checkpoint file.
h2. Reproduce the problem:

1. Job was submitted to YARN:
{code:java}
bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
flink-example-1.0-SNAPSHOT.jar{code}
2. Then immediately, before all 

[jira] [Updated] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning

2019-12-09 Thread Feng Jiajie (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jiajie updated FLINK-15152:

Description: 
I have a streaming job configured with periodically checkpoint, but after one 
week running, I found there isn't any checkpoint file.
h2. Reproduce the problem:

1. Job was submitted to YARN:
{code:java}
bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
flink-example-1.0-SNAPSHOT.jar{code}
2. Then immediately, before all the task switch to RUNNING (about seconds), 
I(actually a job control script) send a "stop with savepoint" command by flink 
cli:
{code:java}
bin/flink stop -yid application_1575872737452_0019 
f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
{code}
log in jobmanager.log:
{code:java}
2019-12-09 17:56:56,512 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
triggering task Source: Socket Stream -> Map (1/1) of job 
f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.
{code}
Then the job task *continues to run normally without* checkpoint.
h2. The cause of the problem:

1. "stop with savepoint" command call the code 
stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612)
 and then triggerSynchronousSavepoint:
{code:java}
// we stop the checkpoint coordinator so that we are guaranteed
// to have only the data of the synchronous savepoint committed.
// in case of failure, and if the job restarts, the coordinator
// will be restarted by the CheckpointCoordinatorDeActivator.
checkpointCoordinator.stopCheckpointScheduler();{code}
2. but "before all the task switch to RUNNING", checkpoint failed at 
org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509
{code:java}
LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} 
instead. Aborting checkpoint.",
  tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
  job,
  ExecutionState.RUNNING,
  ee.getState());
throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code}
3. finally, "stop with savepoint" failed, with 
"checkpointCoordinator.stopCheckpointScheduler()" but without the termination 
of the job.

 

sample code:
{code:java}
public class StreamingJob {

  private static StateBackend makeRocksdbBackend() throws IOException {
RocksDBStateBackend rocksdbBackend = new 
RocksDBStateBackend("file:///tmp/aaa");
rocksdbBackend.enableTtlCompactionFilter();

rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
return rocksdbBackend;
  }

  public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

// 10 sec
env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE);
env.setStateBackend(makeRocksdbBackend());
env.setRestartStrategy(RestartStrategies.noRestart());

CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
checkpointConfig.setFailOnCheckpointingErrors(true);

DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n");
text.map(new MapFunction>() {
  @Override
  public Tuple2 map(String s) {
String[] s1 = s.split(" ");
return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1]));
  }
}).keyBy(0).flatMap(new CountWindowAverage()).print();

env.execute("Flink Streaming Java API Skeleton");
  }

  public static class CountWindowAverage extends 
RichFlatMapFunction, Tuple2> {

private transient ValueState> sum;

@Override
public void flatMap(Tuple2 input, Collector> 
out) throws Exception {
  Tuple2 currentSum = sum.value();
  currentSum.f0 += 1;
  currentSum.f1 += input.f1;
  sum.update(currentSum);
  out.collect(new Tuple2<>(input.f0, currentSum.f1));
}

@Override
public void open(Configuration config) {
  ValueStateDescriptor> descriptor =
  new ValueStateDescriptor<>(
  "average", // the state name
  TypeInformation.of(new TypeHint>() {
  }), // type information
  Tuple2.of(0L, 0L)); // default value of the state, if nothing was 
set
  sum = getRuntimeContext().getState(descriptor);
}
  }
}
{code}

  was:
I have a streaming job configured with periodically checkpoint, but after one 
week running, I found there isn't any checkpoint file.
h2. Reproduce the problem:

1. Job was submitted to YARN:
{code:java}
bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
flink-example-1.0-SNAPSHOT.jar{code}
2. Then immediately, before all the task switch to RUNNING (about seconds), 
I(actually a job control script) send a "stop with 

[jira] [Updated] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning

2019-12-09 Thread Feng Jiajie (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jiajie updated FLINK-15152:

Description: 
I have a streaming job configured with periodically checkpoint, but after one 
week running, I found there isn't any checkpoint file.
h2. Reproduce the problem:

1. Job was submitted to YARN:
{code:java}
bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
flink-example-1.0-SNAPSHOT.jar{code}
2. Then immediately, before all the task switch to RUNNING (about seconds), 
I(actually a job control script) send a "stop with savepoint" command by flink 
cli:
{code:java}
bin/flink stop -yid application_1575872737452_0019 
f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
{code}
Then the job task continues to run normally, but no checkpointing.
h2. The cause of the problem:

1. "stop with savepoint" command call the code 
stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612)
 and then triggerSynchronousSavepoint:
{code:java}
// we stop the checkpoint coordinator so that we are guaranteed
// to have only the data of the synchronous savepoint committed.
// in case of failure, and if the job restarts, the coordinator
// will be restarted by the CheckpointCoordinatorDeActivator.
checkpointCoordinator.stopCheckpointScheduler();{code}
2. but "before all the task switch to RUNNING", checkpoint failed at 
org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509
{code:java}
LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} 
instead. Aborting checkpoint.",
  tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
  job,
  ExecutionState.RUNNING,
  ee.getState());
throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code}
3. finally, "stop with savepoint" failed, with 
"checkpointCoordinator.stopCheckpointScheduler()" but without the termination 
of the job.

 

sample code:
{code:java}
public class StreamingJob {

  private static StateBackend makeRocksdbBackend() throws IOException {
RocksDBStateBackend rocksdbBackend = new 
RocksDBStateBackend("file:///tmp/aaa");
rocksdbBackend.enableTtlCompactionFilter();

rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
return rocksdbBackend;
  }

  public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

// 10 sec
env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE);
env.setStateBackend(makeRocksdbBackend());
env.setRestartStrategy(RestartStrategies.noRestart());

CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
checkpointConfig.setFailOnCheckpointingErrors(true);

DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n");
text.map(new MapFunction>() {
  @Override
  public Tuple2 map(String s) {
String[] s1 = s.split(" ");
return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1]));
  }
}).keyBy(0).flatMap(new CountWindowAverage()).print();

env.execute("Flink Streaming Java API Skeleton");
  }

  public static class CountWindowAverage extends 
RichFlatMapFunction, Tuple2> {

private transient ValueState> sum;

@Override
public void flatMap(Tuple2 input, Collector> 
out) throws Exception {
  Tuple2 currentSum = sum.value();
  currentSum.f0 += 1;
  currentSum.f1 += input.f1;
  sum.update(currentSum);
  out.collect(new Tuple2<>(input.f0, currentSum.f1));
}

@Override
public void open(Configuration config) {
  ValueStateDescriptor> descriptor =
  new ValueStateDescriptor<>(
  "average", // the state name
  TypeInformation.of(new TypeHint>() {
  }), // type information
  Tuple2.of(0L, 0L)); // default value of the state, if nothing was 
set
  sum = getRuntimeContext().getState(descriptor);
}
  }
}
{code}

  was:
I have a streaming job configured with periodically checkpoint, but after one 
week running, I found there isn't any checkpoint file.
h2. Reproduce the problem:

1. Job was submitted to YARN:
{code:java}
bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
flink-example-1.0-SNAPSHOT.jar{code}
2. Then immediately, before all the task switch to RUNNING (about seconds), 
I(actually a job control script) send a stop with savepoint command by flink 
cli:
{code:java}
bin/flink stop -yid application_1575872737452_0019 
f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
{code}
Then the job task continues to run normally, but no checkpointing.
h2. The cause of the problem:

1. "stop with savepoint" command call the code 

[jira] [Updated] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning

2019-12-09 Thread Feng Jiajie (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feng Jiajie updated FLINK-15152:

Description: 
I have a streaming job configured with periodically checkpoint, but after one 
week running, I found there isn't any checkpoint file.
h2. Reproduce the problem:

1. Job was submitted to YARN:
{code:java}
bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
flink-example-1.0-SNAPSHOT.jar{code}
2. Then immediately, before all the task switch to RUNNING (about seconds), 
I(actually a job control script) send a stop with savepoint command by flink 
cli:
{code:java}
bin/flink stop -yid application_1575872737452_0019 
f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
{code}
Then the job task continues to run normally, but no checkpointing.
h2. The cause of the problem:

1. "stop with savepoint" command call the code 
stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612)
 and then triggerSynchronousSavepoint:
{code:java}
// we stop the checkpoint coordinator so that we are guaranteed
// to have only the data of the synchronous savepoint committed.
// in case of failure, and if the job restarts, the coordinator
// will be restarted by the CheckpointCoordinatorDeActivator.
checkpointCoordinator.stopCheckpointScheduler();{code}
2. but "before all the task switch to RUNNING", checkpoint failed at 
org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509
{code:java}
LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} 
instead. Aborting checkpoint.",
  tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
  job,
  ExecutionState.RUNNING,
  ee.getState());
throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code}
3. finally, "stop with savepoint" failed, with 
"checkpointCoordinator.stopCheckpointScheduler()" but without the termination 
of the job.

 

sample code:
{code:java}
public class StreamingJob {

  private static StateBackend makeRocksdbBackend() throws IOException {
RocksDBStateBackend rocksdbBackend = new 
RocksDBStateBackend("file:///tmp/aaa");
rocksdbBackend.enableTtlCompactionFilter();

rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
return rocksdbBackend;
  }

  public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

// 10 sec
env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE);
env.setStateBackend(makeRocksdbBackend());
env.setRestartStrategy(RestartStrategies.noRestart());

CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
checkpointConfig.setFailOnCheckpointingErrors(true);

DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n");
text.map(new MapFunction>() {
  @Override
  public Tuple2 map(String s) {
String[] s1 = s.split(" ");
return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1]));
  }
}).keyBy(0).flatMap(new CountWindowAverage()).print();

env.execute("Flink Streaming Java API Skeleton");
  }

  public static class CountWindowAverage extends 
RichFlatMapFunction, Tuple2> {

private transient ValueState> sum;

@Override
public void flatMap(Tuple2 input, Collector> 
out) throws Exception {
  Tuple2 currentSum = sum.value();
  currentSum.f0 += 1;
  currentSum.f1 += input.f1;
  sum.update(currentSum);
  out.collect(new Tuple2<>(input.f0, currentSum.f1));
}

@Override
public void open(Configuration config) {
  ValueStateDescriptor> descriptor =
  new ValueStateDescriptor<>(
  "average", // the state name
  TypeInformation.of(new TypeHint>() {
  }), // type information
  Tuple2.of(0L, 0L)); // default value of the state, if nothing was 
set
  sum = getRuntimeContext().getState(descriptor);
}
  }
}
{code}

  was:
I have a streaming job configured with periodically checkpoint, but after one 
week running, I found there isn't any checkpoint file.
h2. Reproduce the problem:
 # Job was submitted to YARN:

{code:java}
bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m 
flink-example-1.0-SNAPSHOT.jar{code}

 # Then immediately, before all the task switch to RUNNING (about seconds), 
I(actually a job control script) send a stop with savepoint command by flink 
cli:
{code:java}
bin/flink stop -yid application_1575872737452_0019 
f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
{code}

Then the job task continues to run normally, but no checkpointing.
h2. The cause of the problem:
 # "stop with savepoint" command call the code