>From Hussain Towaileb <hussai...@gmail.com>:

Hussain Towaileb has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18263 )

Change subject: [NO ISSUE]: Track more requests + jobs counts
......................................................................

[NO ISSUE]: Track more requests + jobs counts

Change-Id: I8fa31a1e6bb6b1f1bcf90c59da646fc47546fc7c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18263
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com>
---
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
6 files changed, 144 insertions(+), 15 deletions(-)

Approvals:
  Ali Alsuliman: Looks good to me, approved
  Jenkins: Verified; Verified




diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index f84e45a..da21769 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -166,7 +166,7 @@
             return resultStatus;
         }

-        HttpResponseStatus getHttpStatus() {
+        public HttpResponseStatus getHttpStatus() {
             return httpResponseStatus;
         }

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 1110cd3..3cac3d8 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -529,6 +529,9 @@
                                 "Unexpected statement: " + kind);
                 }
             }
+        } catch (Exception ex) {
+            this.appCtx.getRequestTracker().incrementFailedRequests();
+            throw ex;
         } finally {
             // async queries are completed after their job completes
             if (ResultDelivery.ASYNC != resultDelivery) {
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
index 0019015..b2dc309 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java
@@ -81,4 +81,14 @@
      * @return the total number of requests since cluster start/restart
      */
     long getTotalNumberOfRequests();
+
+    /**
+     * increments total number of failed requests
+     */
+    void incrementFailedRequests();
+
+    /**
+     * @return the total number of failed requests
+     */
+    long getTotalNumberOfFailedRequests();
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
index c9425c6..c16f825 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java
@@ -39,11 +39,13 @@
     private final CircularFifoQueue<IClientRequest> completedRequests;
     private final ICcApplicationContext ccAppCtx;
     private final AtomicLong numRequests;
+    private final AtomicLong numOfFailedRequests;

     public RequestTracker(ICcApplicationContext ccAppCtx) {
         this.ccAppCtx = ccAppCtx;
         completedRequests = new 
CircularFifoQueue<>(ccAppCtx.getExternalProperties().getRequestsArchiveSize());
         numRequests = new AtomicLong(0);
+        numOfFailedRequests = new AtomicLong(0);
     }

     @Override
@@ -120,4 +122,14 @@
     public long getTotalNumberOfRequests() {
         return numRequests.get();
     }
+
+    @Override
+    public void incrementFailedRequests() {
+        numOfFailedRequests.incrementAndGet();
+    }
+
+    @Override
+    public long getTotalNumberOfFailedRequests() {
+        return numOfFailedRequests.get();
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
index b2795d4..f65b261 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
@@ -130,4 +130,24 @@
      * @return the maximum number of jobs to queue before rejecting new jobs
      */
     int getJobQueueCapacity();
+
+    /**
+     * @return total successful jobs
+     */
+    long getSuccessfulJobs();
+
+    /**
+     * @return total failed jobs
+     */
+    long getTotalFailedJobs();
+
+    /**
+     * @return total cancelled jobs
+     */
+    long getTotalCancelledJobs();
+
+    /**
+     * @return total rejected jobs
+     */
+    long getTotalRejectedJobs();
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 4882f4a..703294c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -30,10 +30,12 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;

 import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.exceptions.IError;
 import org.apache.hyracks.api.job.ActivityClusterGraph;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
@@ -66,11 +68,19 @@
     private final Map<JobId, JobRun> runMapArchive;
     private final Map<JobId, List<Exception>> runMapHistory;
     private final IJobCapacityController jobCapacityController;
+    private final AtomicLong successfulJobs;
+    private final AtomicLong totalFailedJobs;
+    private final AtomicLong totalCancelledJobs;
+    private final AtomicLong totalRejectedJobs;
     private IJobQueue jobQueue;

     public JobManager(CCConfig ccConfig, ClusterControllerService ccs, 
IJobCapacityController jobCapacityController) {
         this.ccs = ccs;
         this.jobCapacityController = jobCapacityController;
+        this.successfulJobs = new AtomicLong();
+        this.totalFailedJobs = new AtomicLong();
+        this.totalCancelledJobs = new AtomicLong();
+        this.totalRejectedJobs = new AtomicLong();
         try {
             Constructor<?> jobQueueConstructor = 
this.getClass().getClassLoader().loadClass(ccConfig.getJobQueueClass())
                     .getConstructor(IJobManager.class, 
IJobCapacityController.class);
@@ -84,7 +94,7 @@
             jobQueue = new FIFOJobQueue(this, jobCapacityController);
         }
         activeRunMap = new HashMap<>();
-        runMapArchive = new LinkedHashMap<JobId, JobRun>() {
+        runMapArchive = new LinkedHashMap<>() {
             private static final long serialVersionUID = -1406441385508773629L;

             @Override
@@ -92,7 +102,7 @@
                 return size() > ccConfig.getJobHistorySize();
             }
         };
-        runMapHistory = new LinkedHashMap<JobId, List<Exception>>() {
+        runMapHistory = new LinkedHashMap<>() {
             private static final long serialVersionUID = 7572062687032652986L;
             /** history size + 1 is for the case when history size = 0 */
             private final int allowedSize = 100 * 
(ccConfig.getJobHistorySize() + 1);
@@ -108,18 +118,24 @@
     public void add(JobRun jobRun) throws HyracksException {
         checkJob(jobRun);
         JobSpecification job = jobRun.getJobSpecification();
-        IJobCapacityController.JobSubmissionStatus status = 
jobCapacityController.allocate(job);
-        CCServiceContext serviceCtx = ccs.getContext();
-        serviceCtx.notifyJobCreation(jobRun.getJobId(), job);
-        switch (status) {
-            case QUEUE:
-                queueJob(jobRun);
-                break;
-            case EXECUTE:
-                executeJob(jobRun);
-                break;
-            default:
-                throw new IllegalStateException("unknown submission status: " 
+ status);
+        IJobCapacityController.JobSubmissionStatus status;
+        try {
+            status = jobCapacityController.allocate(job);
+            CCServiceContext serviceCtx = ccs.getContext();
+            serviceCtx.notifyJobCreation(jobRun.getJobId(), job);
+            switch (status) {
+                case QUEUE:
+                    queueJob(jobRun);
+                    break;
+                case EXECUTE:
+                    executeJob(jobRun);
+                    break;
+                default:
+                    throw new IllegalStateException("unknown submission 
status: " + status);
+            }
+        } catch (HyracksDataException ex) {
+            handleException(ex);
+            throw ex;
         }
     }

@@ -132,11 +148,13 @@
             // trigger JobCleanupWork and JobCleanupNotificationWork which 
will update the lifecyle of the job.
             // Therefore, we do not remove the job out of activeRunMap here.
             jobRun.getExecutor().cancelJob(callback);
+            incrementCancelledJobs();
             return;
         }
         // Removes a pending job.
         JobRun jobRun = jobQueue.remove(jobId);
         if (jobRun != null) {
+            incrementCancelledJobs();
             List<Exception> exceptions =
                     
Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, 
jobId));
             // Since the job has not been executed, we only need to update its 
status and lifecyle here.
@@ -219,6 +237,12 @@
     @Override
     public void finalComplete(JobRun run) throws HyracksException {
         checkJob(run);
+        if (run.getPendingStatus() == JobStatus.FAILURE) {
+            incrementFailedJobs();
+        } else if (run.getPendingStatus() == JobStatus.TERMINATED) {
+            incrementSuccessfulJobs();
+        }
+
         JobId jobId = run.getJobId();
         Throwable caughtException = null;
         CCServiceContext serviceCtx = ccs.getContext();
@@ -301,6 +325,26 @@
         return ccs.getCCConfig().getJobQueueCapacity();
     }

+    @Override
+    public long getSuccessfulJobs() {
+        return successfulJobs.get();
+    }
+
+    @Override
+    public long getTotalFailedJobs() {
+        return totalFailedJobs.get();
+    }
+
+    @Override
+    public long getTotalCancelledJobs() {
+        return totalCancelledJobs.get();
+    }
+
+    @Override
+    public long getTotalRejectedJobs() {
+        return totalRejectedJobs.get();
+    }
+
     private void pickJobsToRun() throws HyracksException {
         List<JobRun> selectedRuns = jobQueue.pull();
         for (JobRun run : selectedRuns) {
@@ -356,4 +400,31 @@
         final JobSpecification job = jobRun.getJobSpecification();
         jobCapacityController.release(job);
     }
+
+    private void handleException(HyracksException ex) {
+        if (ex.getError().isPresent()) {
+            IError error = ex.getError().get();
+            switch ((ErrorCode) error) {
+                case JOB_QUEUE_FULL:
+                case JOB_REQUIREMENTS_EXCEED_CAPACITY:
+                    incrementRejectedJobs();
+            }
+        }
+    }
+
+    private void incrementSuccessfulJobs() {
+        successfulJobs.incrementAndGet();
+    }
+
+    private void incrementFailedJobs() {
+        totalFailedJobs.incrementAndGet();
+    }
+
+    private void incrementCancelledJobs() {
+        totalCancelledJobs.incrementAndGet();
+    }
+
+    private void incrementRejectedJobs() {
+        totalRejectedJobs.incrementAndGet();
+    }
 }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18263
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: trinity
Gerrit-Change-Id: I8fa31a1e6bb6b1f1bcf90c59da646fc47546fc7c
Gerrit-Change-Number: 18263
Gerrit-PatchSet: 4
Gerrit-Owner: Hussain Towaileb <hussai...@gmail.com>
Gerrit-Reviewer: Ali Alsuliman <ali.al.solai...@gmail.com>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Hussain Towaileb <hussai...@gmail.com>
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mb...@apache.org>
Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org>
Gerrit-MessageType: merged

Reply via email to