[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-16 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1633#issuecomment-184719993
  
Closing this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-16 Thread uce
Github user uce closed the pull request at:

https://github.com/apache/flink/pull/1633


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r53019468
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1079,6 +1079,9 @@ class JobManager(
   executionGraph.registerExecutionListener(gateway)
   executionGraph.registerJobStatusListener(gateway)
 }
+
+// All good. Submission succeeded!
+jobInfo.client ! 
decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
--- End diff --

Hmm but now we have the problem that the user might see a 
`JobSubmitSuccess` without the job being stored in the 
`SubmittedJobGraphStore`, right? This means that if the `JobManager` dies 
before the job is persisted, it will never be recovered. I think this violates 
the `JobSubmitSuccess` contract.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r53019879
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1079,6 +1079,9 @@ class JobManager(
   executionGraph.registerExecutionListener(gateway)
   executionGraph.registerJobStatusListener(gateway)
 }
+
+// All good. Submission succeeded!
+jobInfo.client ! 
decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
--- End diff --

Oh boy... not my day today. Thanks for catching that. This was not expected.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-16 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1633#issuecomment-184694713
  
Travis has passed. Did you have another look at this @tillrohrmann?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r53000913
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1073,57 +1073,73 @@ class JobManager(
   // execute the recovery/writing the jobGraph into the 
SubmittedJobGraphStore asynchronously
   // because it is a blocking operation
   future {
-try {
-  if (isRecovery) {
-executionGraph.restoreLatestCheckpointedState()
-  }
-  else {
-val snapshotSettings = jobGraph.getSnapshotSettings
-if (snapshotSettings != null) {
-  val savepointPath = snapshotSettings.getSavepointPath()
+val restoreStateSuccess =
+  try {
+if (isRecovery) {
+  executionGraph.restoreLatestCheckpointedState()
--- End diff --

Regarding the `JobSubmitSuccess`: we had it as a follow up to have more 
fine-grained integration with the the client and left it as a duplicate submit 
message for the time being (instead of something like `JobRecovered`).

The other behaviour is back to the previous state now. I hear you that it 
makes sense to integrate the state restore behaviour with the execution graph 
restart.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r52999704
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1079,6 +1079,9 @@ class JobManager(
   executionGraph.registerExecutionListener(gateway)
   executionGraph.registerJobStatusListener(gateway)
 }
+
+// All good. Submission succeeded!
+jobInfo.client ! 
decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
--- End diff --

Moved this one up to have correct ACKing behaviour.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r52998947
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1073,57 +1073,73 @@ class JobManager(
   // execute the recovery/writing the jobGraph into the 
SubmittedJobGraphStore asynchronously
   // because it is a blocking operation
   future {
-try {
-  if (isRecovery) {
-executionGraph.restoreLatestCheckpointedState()
-  }
-  else {
-val snapshotSettings = jobGraph.getSnapshotSettings
-if (snapshotSettings != null) {
-  val savepointPath = snapshotSettings.getSavepointPath()
+val restoreStateSuccess =
+  try {
+if (isRecovery) {
+  executionGraph.restoreLatestCheckpointedState()
--- End diff --

Had an offline discussion with Stephan. He agrees with you that the failure 
in this case is too hard. I'll undo that change by ACK'ing the submission 
earlier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r52998567
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1073,57 +1073,73 @@ class JobManager(
   // execute the recovery/writing the jobGraph into the 
SubmittedJobGraphStore asynchronously
   // because it is a blocking operation
   future {
-try {
-  if (isRecovery) {
-executionGraph.restoreLatestCheckpointedState()
-  }
-  else {
-val snapshotSettings = jobGraph.getSnapshotSettings
-if (snapshotSettings != null) {
-  val savepointPath = snapshotSettings.getSavepointPath()
+val restoreStateSuccess =
+  try {
+if (isRecovery) {
+  executionGraph.restoreLatestCheckpointedState()
--- End diff --

The behaviour right now for a failure while doing a job recovery would 
simply fail the `ExecutionGraph` triggering a restart. A successful job 
recovery would send a `JobSubmitSuccess` to the client. I'm not sure whether 
this is actually correct, since the client already received a 
`JobSubmitMessage` from the `JobManager` while initially submitting the job. 
But I think this will simply be ignored.

Thus, suppressing the restart behaviour in case of a job recovery would 
actually change the behaviour.

If it makes sense and if it is possible to recover from failures while 
recovering a job or restoring a savepoint, it would make sense to not directly 
fail the job without restarting. Maybe one should distinguish that based on the 
actually occurring exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r52997375
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1073,57 +1073,73 @@ class JobManager(
   // execute the recovery/writing the jobGraph into the 
SubmittedJobGraphStore asynchronously
   // because it is a blocking operation
   future {
-try {
-  if (isRecovery) {
-executionGraph.restoreLatestCheckpointedState()
-  }
-  else {
-val snapshotSettings = jobGraph.getSnapshotSettings
-if (snapshotSettings != null) {
-  val savepointPath = snapshotSettings.getSavepointPath()
+val restoreStateSuccess =
+  try {
+if (isRecovery) {
+  executionGraph.restoreLatestCheckpointedState()
--- End diff --

But then I would not keep the behaviour as it is right now. Instead, we 
should then consider the job submitted before trying to recover any checkpoint 
state and keep the restart behaviour. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r52992451
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1073,57 +1073,73 @@ class JobManager(
   // execute the recovery/writing the jobGraph into the 
SubmittedJobGraphStore asynchronously
   // because it is a blocking operation
   future {
-try {
-  if (isRecovery) {
-executionGraph.restoreLatestCheckpointedState()
-  }
-  else {
-val snapshotSettings = jobGraph.getSnapshotSettings
-if (snapshotSettings != null) {
-  val savepointPath = snapshotSettings.getSavepointPath()
+val restoreStateSuccess =
+  try {
+if (isRecovery) {
+  executionGraph.restoreLatestCheckpointedState()
--- End diff --

I'm not so sure about that, to be honest. What if the 
`restoreLatestCheckpointedState` fails because of some HDFS/ZooKeeper problems. 
Then you would like to try restarting the job, wouldn't you? The client should 
then be notified once all restarting attempts have been exhausted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-16 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1633#issuecomment-184599724
  
Thanks for review Till! I've [opened an 
issue](https://issues.apache.org/jira/browse/FLINK-3411) for the failure 
behaviour in case of checkpoint state recovery.

I'll merge this today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r52986635
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1073,57 +1073,73 @@ class JobManager(
   // execute the recovery/writing the jobGraph into the 
SubmittedJobGraphStore asynchronously
   // because it is a blocking operation
   future {
-try {
-  if (isRecovery) {
-executionGraph.restoreLatestCheckpointedState()
-  }
-  else {
-val snapshotSettings = jobGraph.getSnapshotSettings
-if (snapshotSettings != null) {
-  val savepointPath = snapshotSettings.getSavepointPath()
+val restoreStateSuccess =
+  try {
+if (isRecovery) {
+  executionGraph.restoreLatestCheckpointedState()
--- End diff --

Yes, it had the same issue as the savepoint restore. If restoring the 
checkpoint failed, job submission was never ACK'd, but execution was restarted 
etc. I'm not sure that this is the behaviour we want in the long term, but I 
think failing the submission is clearer behaviour right now.

Another issue I've noticed is the following: when HA checkpoint recovery 
fails and the job cannot be scheduled for execution, the job will eventually be 
removed from ZooKeeper and hence never be recovered again. Let me open an issue 
for this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-15 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1633#issuecomment-184313039
  
Changes look good to me @uce. I had only one inline question concerning a 
semantic change.

Apart from that +1 for merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1633#discussion_r52924511
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1073,57 +1073,73 @@ class JobManager(
   // execute the recovery/writing the jobGraph into the 
SubmittedJobGraphStore asynchronously
   // because it is a blocking operation
   future {
-try {
-  if (isRecovery) {
-executionGraph.restoreLatestCheckpointedState()
-  }
-  else {
-val snapshotSettings = jobGraph.getSnapshotSettings
-if (snapshotSettings != null) {
-  val savepointPath = snapshotSettings.getSavepointPath()
+val restoreStateSuccess =
+  try {
+if (isRecovery) {
+  executionGraph.restoreLatestCheckpointedState()
--- End diff --

Is it intended that now failures in the `restoreLatestCheckpointedState` 
are non recoverable as well? This seems to be different from the former 
implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-15 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1633#issuecomment-184250973
  
@tillrohrmann, could you have a look at this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3396] [runtime] Fail job submission aft...

2016-02-12 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/1633

[FLINK-3396] [runtime] Fail job submission after state restore failure

If state restore fails during job graph submission, the submitting client 
never gets notified about the submission failure. For detached submissions, 
this results in a time out rather than the actual failure cause. The actual 
failure can only be seen in the logs.

This PR changes the behaviour to fail the submission.

The failure cause is wrapped in a `SuppressRestartsException` in order to 
prevent the restart strategy from kicking in. We call 
`ExecutionGraph#fail(Throwable)` in the first place in order to have proper 
clean up.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 3396-state_restore

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1633.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1633


commit 982857bc0f5248a694d908141df0330de2befa3c
Author: Ufuk Celebi 
Date:   2016-02-12T20:59:58Z

[FLINK-3396] [runtime] Fail job submission after state restore failure

Problem: If state restore fails during job graph submission, the submitting
client never gets notified about the submission failure. For detached
submissions, this results in a time out rather than the actual failure 
cause.
The actual failure can only be seen in the logs.

Solution: Fail the submission if state restore fails.

commit 9d2f4d88e99c3e50d8c001c8aa2cdb2848c80478
Author: Ufuk Celebi 
Date:   2016-02-12T21:09:29Z

[hotfix] Rename UnrecoverableException to SuppressRestartsException




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---