>From Hussain Towaileb <hussai...@gmail.com>: Hussain Towaileb has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18263 )
Change subject: [NO ISSUE]: Track more requests + jobs counts ...................................................................... [NO ISSUE]: Track more requests + jobs counts Change-Id: I8fa31a1e6bb6b1f1bcf90c59da646fc47546fc7c Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18263 Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com> --- M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java 6 files changed, 144 insertions(+), 15 deletions(-) Approvals: Ali Alsuliman: Looks good to me, approved Jenkins: Verified; Verified diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java index f84e45a..da21769 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java @@ -166,7 +166,7 @@ return resultStatus; } - HttpResponseStatus getHttpStatus() { + public HttpResponseStatus getHttpStatus() { return httpResponseStatus; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 1110cd3..3cac3d8 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -529,6 +529,9 @@ "Unexpected statement: " + kind); } } + } catch (Exception ex) { + this.appCtx.getRequestTracker().incrementFailedRequests(); + throw ex; } finally { // async queries are completed after their job completes if (ResultDelivery.ASYNC != resultDelivery) { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java index 0019015..b2dc309 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java @@ -81,4 +81,14 @@ * @return the total number of requests since cluster start/restart */ long getTotalNumberOfRequests(); + + /** + * increments total number of failed requests + */ + void incrementFailedRequests(); + + /** + * @return the total number of failed requests + */ + long getTotalNumberOfFailedRequests(); } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java index c9425c6..c16f825 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java @@ -39,11 +39,13 @@ private final CircularFifoQueue<IClientRequest> completedRequests; private final ICcApplicationContext ccAppCtx; private final AtomicLong numRequests; + private final AtomicLong numOfFailedRequests; public RequestTracker(ICcApplicationContext ccAppCtx) { this.ccAppCtx = ccAppCtx; completedRequests = new CircularFifoQueue<>(ccAppCtx.getExternalProperties().getRequestsArchiveSize()); numRequests = new AtomicLong(0); + numOfFailedRequests = new AtomicLong(0); } @Override @@ -120,4 +122,14 @@ public long getTotalNumberOfRequests() { return numRequests.get(); } + + @Override + public void incrementFailedRequests() { + numOfFailedRequests.incrementAndGet(); + } + + @Override + public long getTotalNumberOfFailedRequests() { + return numOfFailedRequests.get(); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java index b2795d4..f65b261 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java @@ -130,4 +130,24 @@ * @return the maximum number of jobs to queue before rejecting new jobs */ int getJobQueueCapacity(); + + /** + * @return total successful jobs + */ + long getSuccessfulJobs(); + + /** + * @return total failed jobs + */ + long getTotalFailedJobs(); + + /** + * @return total cancelled jobs + */ + long getTotalCancelledJobs(); + + /** + * @return total rejected jobs + */ + long getTotalRejectedJobs(); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java index 4882f4a..703294c 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java @@ -30,10 +30,12 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.exceptions.IError; import org.apache.hyracks.api.job.ActivityClusterGraph; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; @@ -66,11 +68,19 @@ private final Map<JobId, JobRun> runMapArchive; private final Map<JobId, List<Exception>> runMapHistory; private final IJobCapacityController jobCapacityController; + private final AtomicLong successfulJobs; + private final AtomicLong totalFailedJobs; + private final AtomicLong totalCancelledJobs; + private final AtomicLong totalRejectedJobs; private IJobQueue jobQueue; public JobManager(CCConfig ccConfig, ClusterControllerService ccs, IJobCapacityController jobCapacityController) { this.ccs = ccs; this.jobCapacityController = jobCapacityController; + this.successfulJobs = new AtomicLong(); + this.totalFailedJobs = new AtomicLong(); + this.totalCancelledJobs = new AtomicLong(); + this.totalRejectedJobs = new AtomicLong(); try { Constructor<?> jobQueueConstructor = this.getClass().getClassLoader().loadClass(ccConfig.getJobQueueClass()) .getConstructor(IJobManager.class, IJobCapacityController.class); @@ -84,7 +94,7 @@ jobQueue = new FIFOJobQueue(this, jobCapacityController); } activeRunMap = new HashMap<>(); - runMapArchive = new LinkedHashMap<JobId, JobRun>() { + runMapArchive = new LinkedHashMap<>() { private static final long serialVersionUID = -1406441385508773629L; @Override @@ -92,7 +102,7 @@ return size() > ccConfig.getJobHistorySize(); } }; - runMapHistory = new LinkedHashMap<JobId, List<Exception>>() { + runMapHistory = new LinkedHashMap<>() { private static final long serialVersionUID = 7572062687032652986L; /** history size + 1 is for the case when history size = 0 */ private final int allowedSize = 100 * (ccConfig.getJobHistorySize() + 1); @@ -108,18 +118,24 @@ public void add(JobRun jobRun) throws HyracksException { checkJob(jobRun); JobSpecification job = jobRun.getJobSpecification(); - IJobCapacityController.JobSubmissionStatus status = jobCapacityController.allocate(job); - CCServiceContext serviceCtx = ccs.getContext(); - serviceCtx.notifyJobCreation(jobRun.getJobId(), job); - switch (status) { - case QUEUE: - queueJob(jobRun); - break; - case EXECUTE: - executeJob(jobRun); - break; - default: - throw new IllegalStateException("unknown submission status: " + status); + IJobCapacityController.JobSubmissionStatus status; + try { + status = jobCapacityController.allocate(job); + CCServiceContext serviceCtx = ccs.getContext(); + serviceCtx.notifyJobCreation(jobRun.getJobId(), job); + switch (status) { + case QUEUE: + queueJob(jobRun); + break; + case EXECUTE: + executeJob(jobRun); + break; + default: + throw new IllegalStateException("unknown submission status: " + status); + } + } catch (HyracksDataException ex) { + handleException(ex); + throw ex; } } @@ -132,11 +148,13 @@ // trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecyle of the job. // Therefore, we do not remove the job out of activeRunMap here. jobRun.getExecutor().cancelJob(callback); + incrementCancelledJobs(); return; } // Removes a pending job. JobRun jobRun = jobQueue.remove(jobId); if (jobRun != null) { + incrementCancelledJobs(); List<Exception> exceptions = Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId)); // Since the job has not been executed, we only need to update its status and lifecyle here. @@ -219,6 +237,12 @@ @Override public void finalComplete(JobRun run) throws HyracksException { checkJob(run); + if (run.getPendingStatus() == JobStatus.FAILURE) { + incrementFailedJobs(); + } else if (run.getPendingStatus() == JobStatus.TERMINATED) { + incrementSuccessfulJobs(); + } + JobId jobId = run.getJobId(); Throwable caughtException = null; CCServiceContext serviceCtx = ccs.getContext(); @@ -301,6 +325,26 @@ return ccs.getCCConfig().getJobQueueCapacity(); } + @Override + public long getSuccessfulJobs() { + return successfulJobs.get(); + } + + @Override + public long getTotalFailedJobs() { + return totalFailedJobs.get(); + } + + @Override + public long getTotalCancelledJobs() { + return totalCancelledJobs.get(); + } + + @Override + public long getTotalRejectedJobs() { + return totalRejectedJobs.get(); + } + private void pickJobsToRun() throws HyracksException { List<JobRun> selectedRuns = jobQueue.pull(); for (JobRun run : selectedRuns) { @@ -356,4 +400,31 @@ final JobSpecification job = jobRun.getJobSpecification(); jobCapacityController.release(job); } + + private void handleException(HyracksException ex) { + if (ex.getError().isPresent()) { + IError error = ex.getError().get(); + switch ((ErrorCode) error) { + case JOB_QUEUE_FULL: + case JOB_REQUIREMENTS_EXCEED_CAPACITY: + incrementRejectedJobs(); + } + } + } + + private void incrementSuccessfulJobs() { + successfulJobs.incrementAndGet(); + } + + private void incrementFailedJobs() { + totalFailedJobs.incrementAndGet(); + } + + private void incrementCancelledJobs() { + totalCancelledJobs.incrementAndGet(); + } + + private void incrementRejectedJobs() { + totalRejectedJobs.incrementAndGet(); + } } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18263 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: trinity Gerrit-Change-Id: I8fa31a1e6bb6b1f1bcf90c59da646fc47546fc7c Gerrit-Change-Number: 18263 Gerrit-PatchSet: 4 Gerrit-Owner: Hussain Towaileb <hussai...@gmail.com> Gerrit-Reviewer: Ali Alsuliman <ali.al.solai...@gmail.com> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Hussain Towaileb <hussai...@gmail.com> Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Michael Blow <mb...@apache.org> Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org> Gerrit-MessageType: merged