gaborgsomogyi commented on code in PR #382:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/382#discussion_r980187297


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java:
##########
@@ -72,22 +81,91 @@ public boolean observe(
         }
 
         if (!clusterJobStatuses.isEmpty()) {
+            // There are jobs on the cluster, we filter the ones for this 
resource
             Optional<JobStatusMessage> targetJobStatusMessage =
                     filterTargetJob(jobStatus, clusterJobStatuses);
+
             if (targetJobStatusMessage.isEmpty()) {
-                
jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+                LOG.warn("No matching jobs found on the cluster");
+                ifRunningMoveToReconciling(jobStatus, previousJobStatus);
+                // We could list the jobs but cannot find the one for this 
resource
+                if (resource instanceof FlinkDeployment) {
+                    // This should never happen for application clusters, 
there is something wrong
+                    setUnknownJobError((FlinkDeployment) resource);
+                } else {
+                    ifHaDisabledMarkSessionJobMissing((FlinkSessionJob) 
resource, deployedConfig);
+                }
                 return false;
             } else {
                 updateJobStatus(resource, targetJobStatusMessage.get(), 
deployedConfig);
             }
             ReconciliationUtils.checkAndUpdateStableSpec(resource.getStatus());
             return true;
         } else {
+            LOG.debug("No jobs found on the cluster");
+            // No jobs found on the cluster, it is possible that the 
jobmanager is still starting up
             ifRunningMoveToReconciling(jobStatus, previousJobStatus);
+
+            if (resource instanceof FlinkSessionJob) {

Review Comment:
   Seems like this is not common logic, maybe this can be extracted from here.



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java:
##########
@@ -72,22 +81,91 @@ public boolean observe(
         }
 
         if (!clusterJobStatuses.isEmpty()) {
+            // There are jobs on the cluster, we filter the ones for this 
resource
             Optional<JobStatusMessage> targetJobStatusMessage =
                     filterTargetJob(jobStatus, clusterJobStatuses);
+
             if (targetJobStatusMessage.isEmpty()) {
-                
jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+                LOG.warn("No matching jobs found on the cluster");
+                ifRunningMoveToReconciling(jobStatus, previousJobStatus);
+                // We could list the jobs but cannot find the one for this 
resource
+                if (resource instanceof FlinkDeployment) {
+                    // This should never happen for application clusters, 
there is something wrong
+                    setUnknownJobError((FlinkDeployment) resource);
+                } else {
+                    ifHaDisabledMarkSessionJobMissing((FlinkSessionJob) 
resource, deployedConfig);
+                }
                 return false;
             } else {
                 updateJobStatus(resource, targetJobStatusMessage.get(), 
deployedConfig);
             }
             ReconciliationUtils.checkAndUpdateStableSpec(resource.getStatus());
             return true;
         } else {
+            LOG.debug("No jobs found on the cluster");
+            // No jobs found on the cluster, it is possible that the 
jobmanager is still starting up
             ifRunningMoveToReconciling(jobStatus, previousJobStatus);
+
+            if (resource instanceof FlinkSessionJob) {
+                ifHaDisabledMarkSessionJobMissing((FlinkSessionJob) resource, 
deployedConfig);
+            }
             return false;
         }
     }
 
+    /**
+     * When HA is disabled the session job will not recover on JM restarts. If 
the JM goes down /
+     * restarted the session job should be marked missing.
+     *
+     * @param sessionJob Flink session job.
+     * @param conf Flink config.
+     */
+    private void ifHaDisabledMarkSessionJobMissing(FlinkSessionJob sessionJob, 
Configuration conf) {

Review Comment:
   This is more like a taste question so we may or may not do it. I prefer 
`get/is` and `set` functions and build upper level functionality like
   ```
   if (something) {
     react
   }
   ```
   More re-usable if somebody wants to implement 
`ifTheMoonIsGreenMarkSessionJobMissing` functionality.
   But again, this is taste question so we can leave it like it if you don't 
agree.
   



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java:
##########
@@ -72,22 +81,91 @@ public boolean observe(
         }
 
         if (!clusterJobStatuses.isEmpty()) {
+            // There are jobs on the cluster, we filter the ones for this 
resource
             Optional<JobStatusMessage> targetJobStatusMessage =
                     filterTargetJob(jobStatus, clusterJobStatuses);
+
             if (targetJobStatusMessage.isEmpty()) {
-                
jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+                LOG.warn("No matching jobs found on the cluster");
+                ifRunningMoveToReconciling(jobStatus, previousJobStatus);
+                // We could list the jobs but cannot find the one for this 
resource
+                if (resource instanceof FlinkDeployment) {

Review Comment:
   Seems like this is not common logic, maybe this can be extracted from here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to