gaborgsomogyi commented on code in PR #382: URL: https://github.com/apache/flink-kubernetes-operator/pull/382#discussion_r980187297
########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java: ########## @@ -72,22 +81,91 @@ public boolean observe( } if (!clusterJobStatuses.isEmpty()) { + // There are jobs on the cluster, we filter the ones for this resource Optional<JobStatusMessage> targetJobStatusMessage = filterTargetJob(jobStatus, clusterJobStatuses); + if (targetJobStatusMessage.isEmpty()) { - jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name()); + LOG.warn("No matching jobs found on the cluster"); + ifRunningMoveToReconciling(jobStatus, previousJobStatus); + // We could list the jobs but cannot find the one for this resource + if (resource instanceof FlinkDeployment) { + // This should never happen for application clusters, there is something wrong + setUnknownJobError((FlinkDeployment) resource); + } else { + ifHaDisabledMarkSessionJobMissing((FlinkSessionJob) resource, deployedConfig); + } return false; } else { updateJobStatus(resource, targetJobStatusMessage.get(), deployedConfig); } ReconciliationUtils.checkAndUpdateStableSpec(resource.getStatus()); return true; } else { + LOG.debug("No jobs found on the cluster"); + // No jobs found on the cluster, it is possible that the jobmanager is still starting up ifRunningMoveToReconciling(jobStatus, previousJobStatus); + + if (resource instanceof FlinkSessionJob) { Review Comment: Seems like this is not common logic, maybe this can be extracted from here. ########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java: ########## @@ -72,22 +81,91 @@ public boolean observe( } if (!clusterJobStatuses.isEmpty()) { + // There are jobs on the cluster, we filter the ones for this resource Optional<JobStatusMessage> targetJobStatusMessage = filterTargetJob(jobStatus, clusterJobStatuses); + if (targetJobStatusMessage.isEmpty()) { - jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name()); + LOG.warn("No matching jobs found on the cluster"); + ifRunningMoveToReconciling(jobStatus, previousJobStatus); + // We could list the jobs but cannot find the one for this resource + if (resource instanceof FlinkDeployment) { + // This should never happen for application clusters, there is something wrong + setUnknownJobError((FlinkDeployment) resource); + } else { + ifHaDisabledMarkSessionJobMissing((FlinkSessionJob) resource, deployedConfig); + } return false; } else { updateJobStatus(resource, targetJobStatusMessage.get(), deployedConfig); } ReconciliationUtils.checkAndUpdateStableSpec(resource.getStatus()); return true; } else { + LOG.debug("No jobs found on the cluster"); + // No jobs found on the cluster, it is possible that the jobmanager is still starting up ifRunningMoveToReconciling(jobStatus, previousJobStatus); + + if (resource instanceof FlinkSessionJob) { + ifHaDisabledMarkSessionJobMissing((FlinkSessionJob) resource, deployedConfig); + } return false; } } + /** + * When HA is disabled the session job will not recover on JM restarts. If the JM goes down / + * restarted the session job should be marked missing. + * + * @param sessionJob Flink session job. + * @param conf Flink config. + */ + private void ifHaDisabledMarkSessionJobMissing(FlinkSessionJob sessionJob, Configuration conf) { Review Comment: This is more like a taste question so we may or may not do it. I prefer `get/is` and `set` functions and build upper level functionality like ``` if (something) { react } ``` More re-usable if somebody wants to implement `ifTheMoonIsGreenMarkSessionJobMissing` functionality. But again, this is taste question so we can leave it like it if you don't agree. ########## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java: ########## @@ -72,22 +81,91 @@ public boolean observe( } if (!clusterJobStatuses.isEmpty()) { + // There are jobs on the cluster, we filter the ones for this resource Optional<JobStatusMessage> targetJobStatusMessage = filterTargetJob(jobStatus, clusterJobStatuses); + if (targetJobStatusMessage.isEmpty()) { - jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name()); + LOG.warn("No matching jobs found on the cluster"); + ifRunningMoveToReconciling(jobStatus, previousJobStatus); + // We could list the jobs but cannot find the one for this resource + if (resource instanceof FlinkDeployment) { Review Comment: Seems like this is not common logic, maybe this can be extracted from here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org