>From Ali Alsuliman <[email protected]>: Ali Alsuliman has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18110 )
Change subject: [ASTERIXDB-3343][API] Include job details in active/completed requests ...................................................................... [ASTERIXDB-3343][API] Include job details in active/completed requests - user model changes: no - storage format changes: no - interface changes: yes Details: Include job status, create time, start time, queue time, and end time. Change-Id: Id1bdc935be8bf84674aa6e35ee6c19f2ee1f7971 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18148 Reviewed-by: Murtadha Hubail <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Ali Alsuliman <[email protected]> (cherry picked from commit b767d093235ee01b2b39d98f64592f0ffd822cf4) Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18110 Reviewed-by: Ali Alsuliman <[email protected]> Tested-by: Jenkins <[email protected]> --- M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java 9 files changed, 127 insertions(+), 32 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Ali Alsuliman: Looks good to me, but someone else must approve Jenkins: Verified; Verified diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java index db633e6..25fcbf5 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java @@ -20,7 +20,6 @@ import org.apache.asterix.common.api.IClientRequest; import org.apache.asterix.common.api.IRequestReference; -import org.apache.asterix.common.api.RequestReference; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.om.base.ADateTime; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -101,8 +100,8 @@ json.put("elapsedTime", getElapsedTimeInSecs()); json.put("node", requestReference.getNode()); json.put("state", state.getLabel()); - json.put("userAgent", ((RequestReference) requestReference).getUserAgent()); - json.put("remoteAddr", ((RequestReference) requestReference).getRemoteAddr()); + json.put("userAgent", requestReference.getUserAgent()); + json.put("remoteAddr", requestReference.getRemoteAddr()); json.put("cancellable", cancellable); return json; } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java index 4f19366..28549f3 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java @@ -18,11 +18,17 @@ */ package org.apache.asterix.translator; +import java.util.concurrent.TimeUnit; + import org.apache.asterix.common.api.ICommonRequestParameters; import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.om.base.AMutableDateTime; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.resource.IClusterCapacity; +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.cc.job.JobRun; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -31,14 +37,17 @@ protected final long creationTime = System.nanoTime(); protected final Thread executor; protected final String statement; + protected final ClusterControllerService ccService; protected final String clientContextId; protected volatile JobId jobId; + private String plan; // can be null - public ClientRequest(ICommonRequestParameters requestParameters) { + public ClientRequest(ICommonRequestParameters requestParameters, ICcApplicationContext appCtx) { super(requestParameters.getRequestReference()); this.clientContextId = requestParameters.getClientContextId(); this.statement = requestParameters.getStatement(); this.executor = Thread.currentThread(); + this.ccService = (ClusterControllerService) appCtx.getServiceContext().getControllerService(); } @Override @@ -46,6 +55,10 @@ return clientContextId; } + public void setPlan(String plan) { + this.plan = plan; + } + public synchronized void setJobId(JobId jobId) { this.jobId = jobId; setRunning(); @@ -78,9 +91,53 @@ @Override public ObjectNode asJson() { ObjectNode json = super.asJson(); - json.put("jobId", jobId != null ? jobId.toString() : null); + putJobDetails(json); json.put("statement", statement); json.put("clientContextID", clientContextId); + if (plan != null) { + json.put("plan", plan); + } return json; } + + private void putJobDetails(ObjectNode json) { + if (jobId == null) { + json.putNull("jobId"); + } else { + try { + json.put("jobId", jobId.toString()); + JobRun jobRun = ccService.getJobManager().get(jobId); + if (jobRun != null) { + json.put("jobStatus", String.valueOf(jobRun.getStatus())); + putJobRequiredResources(json, jobRun); + putTimes(json, jobRun); + } + } catch (Throwable th) { + // ignore + } + } + } + + private static void putTimes(ObjectNode json, JobRun jobRun) { + AMutableDateTime dateTime = new AMutableDateTime(0); + putTime(json, jobRun.getCreateTime(), "jobCreateTime", dateTime); + putTime(json, jobRun.getStartTime(), "jobStartTime", dateTime); + putTime(json, jobRun.getEndTime(), "jobEndTime", dateTime); + json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(jobRun.getQueueWaitTimeInMillis())); + } + + private static void putJobRequiredResources(ObjectNode json, JobRun jobRun) { + IClusterCapacity jobCapacity = jobRun.getJobSpecification().getRequiredClusterCapacity(); + if (jobCapacity != null) { + json.put("jobRequiredCPUs", jobCapacity.getAggregatedCores()); + json.put("jobRequiredMemory", jobCapacity.getAggregatedMemoryByteSize()); + } + } + + private static void putTime(ObjectNode json, long time, String label, AMutableDateTime dateTime) { + if (time > 0) { + dateTime.setValue(time); + json.put(label, dateTime.toSimpleString()); + } + } } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java index 84bee6a..6893d5c 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/Receptionist.java @@ -27,6 +27,7 @@ import org.apache.asterix.common.api.IRequestReference; import org.apache.asterix.common.api.ISchedulableClientRequest; import org.apache.asterix.common.api.RequestReference; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.http.HttpHeaders; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.http.api.IServletRequest; @@ -46,8 +47,9 @@ } @Override - public IClientRequest requestReceived(ICommonRequestParameters requestParameters) throws HyracksDataException { - return new ClientRequest(requestParameters); + public IClientRequest requestReceived(ICommonRequestParameters requestParameters, ICcApplicationContext appCtx) + throws HyracksDataException { + return new ClientRequest(requestParameters, appCtx); } @Override diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java index 7ac1431..fa1baa7 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java @@ -22,6 +22,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.translator.ResultMetadata; @@ -97,7 +98,7 @@ } } } - metadata.setQueueWaitTimeInNanos(run.getJobProfile().getQueueWaitTimeInNanos()); + metadata.setQueueWaitTimeInNanos(TimeUnit.MILLISECONDS.toNanos(run.getQueueWaitTimeInMillis())); } metadata.setProcessedObjects(processedObjects); metadata.setBufferCacheHitRatio(pagesRead > 0 ? (pagesRead - nonPagedReads) / (double) pagesRead : Double.NaN); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 0db8dff..1110cd3 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -3682,8 +3682,8 @@ public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc, IResultSet resultSet, ResultDelivery resultDelivery, - ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, - Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter) throws Exception { + ResultMetadata outMetadata, Stats stats, IRequestParameters reqParams, Map<String, IAObject> stmtParams, + IStatementRewriter stmtRewriter) throws Exception { InsertStatement stmtInsertUpsert = (InsertStatement) stmt; String datasetName = stmtInsertUpsert.getDatasetName(); metadataProvider.validateDatabaseObjectName(stmtInsertUpsert.getDataverseName(), datasetName, @@ -3720,10 +3720,11 @@ throw e; } }; - + IRequestTracker requestTracker = appCtx.getRequestTracker(); + ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqParams.getRequestReference().getUuid()); if (stmtInsertUpsert.getReturnExpression() != null) { deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats, - requestParameters, false); + reqParams, false, clientRequest); } else { locker.lock(); try { @@ -4726,13 +4727,13 @@ } }; deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats, - requestParameters, true); + requestParameters, true, clientRequest); } private void deliverResult(IHyracksClientConnection hcc, IResultSet resultSet, IStatementCompiler compiler, MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery, - ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, boolean cancellable) - throws Exception { + ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, boolean cancellable, + ClientRequest clientRequest) throws Exception { final ResultSetId resultSetId = metadataProvider.getResultSetId(); switch (resultDelivery) { case ASYNC: @@ -4748,7 +4749,7 @@ case IMMEDIATE: createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> { final ResultReader resultReader = new ResultReader(resultSet, id, resultSetId); - updateJobStats(id, stats, metadataProvider.getResultSetId()); + updateJobStats(id, stats, metadataProvider.getResultSetId(), clientRequest); responsePrinter.addResultPrinter(new ResultsPrinter(appCtx, resultReader, metadataProvider.findOutputRecordType(), stats, sessionOutput)); responsePrinter.printResults(); @@ -4756,7 +4757,7 @@ break; case DEFERRED: createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> { - updateJobStats(id, stats, metadataProvider.getResultSetId()); + updateJobStats(id, stats, metadataProvider.getResultSetId(), clientRequest); responsePrinter.addResultPrinter( new ResultHandlePrinter(sessionOutput, new ResultHandle(id, resultSetId))); responsePrinter.printResults(); @@ -4771,7 +4772,8 @@ } } - private void updateJobStats(JobId jobId, Stats stats, ResultSetId rsId) throws HyracksDataException { + private void updateJobStats(JobId jobId, Stats stats, ResultSetId rsId, ClientRequest clientRequest) + throws HyracksDataException { final ClusterControllerService controllerService = (ClusterControllerService) appCtx.getServiceContext().getControllerService(); org.apache.asterix.translator.ResultMetadata resultMetadata = @@ -4784,6 +4786,7 @@ stats.setJobProfile(resultMetadata.getJobProfile()); apiFramework.generateOptimizedLogicalPlanWithProfile(resultMetadata.getJobProfile()); } + clientRequest.setPlan(apiFramework.getExecutionPlans().getOptimizedLogicalPlan()); stats.updateTotalWarningsCount(resultMetadata.getTotalWarningsCount()); WarningUtil.mergeWarnings(resultMetadata.getWarnings(), warningCollector); } @@ -5234,8 +5237,8 @@ } protected void trackRequest(IRequestParameters requestParameters) throws HyracksDataException { - final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters); - appCtx.getRequestTracker().track(clientRequest); + final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters, appCtx); + this.appCtx.getRequestTracker().track(clientRequest); } protected void validateStatements(IRequestParameters requestParameters) diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java index b38c366..0e063ca 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java @@ -35,8 +35,10 @@ import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.runtime.utils.RequestTracker; import org.apache.asterix.translator.ClientRequest; +import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; import org.junit.Test; @@ -66,13 +68,18 @@ // Tests the case that query is not in the map. IServletRequest mockRequest = mockRequest("1"); IServletResponse mockResponse = mock(IServletResponse.class); + ICCServiceContext mockCCServiceCtx = mock(ICCServiceContext.class); + ClusterControllerService mockCCService = mock(ClusterControllerService.class); + Mockito.when(appCtx.getServiceContext()).thenReturn(mockCCServiceCtx); + Mockito.when(appCtx.getServiceContext().getControllerService()).thenReturn(mockCCService); + Mockito.when(mockCCServiceCtx.getControllerService()).thenReturn(mockCCService); cancellationServlet.handle(mockRequest, mockResponse); verify(mockResponse, times(1)).setStatus(HttpResponseStatus.NOT_FOUND); final RequestReference requestReference = RequestReference.of("1", "node1", System.currentTimeMillis()); RequestParameters requestParameters = new RequestParameters(requestReference, "select 1", null, null, null, null, null, "1", null, null, null, true); - ClientRequest request = new ClientRequest(requestParameters); + ClientRequest request = new ClientRequest(requestParameters, appCtx); request.setJobId(new JobId(1)); request.markCancellable(); tracker.track(request); @@ -89,7 +96,7 @@ final RequestReference requestReference2 = RequestReference.of("2", "node1", System.currentTimeMillis()); requestParameters = new RequestParameters(requestReference2, "select 1", null, null, null, null, null, "2", null, null, null, true); - ClientRequest request2 = new ClientRequest(requestParameters); + ClientRequest request2 = new ClientRequest(requestParameters, appCtx); request2.setJobId(new JobId(2)); request2.markCancellable(); tracker.track(request2); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java index 95ed22e..f0b4944 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.common.api; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.http.api.IServletRequest; @@ -35,10 +36,12 @@ * Generates a {@link IClientRequest} based on the requests parameters * * @param requestParameters + * @param appCtx * @return the client request * @throws HyracksDataException */ - IClientRequest requestReceived(ICommonRequestParameters requestParameters) throws HyracksDataException; + IClientRequest requestReceived(ICommonRequestParameters requestParameters, ICcApplicationContext appCtx) + throws HyracksDataException; /** * Ensures a client's request can be executed before its job is started diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java index d865b4f..da4d12d 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java @@ -82,13 +82,13 @@ private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap; - private long createTime; + private final long createTime; - private long startTime; + private volatile long startTime; private String startTimeZoneId; - private long endTime; + private volatile long endTime; private JobStatus status; @@ -98,7 +98,7 @@ private List<Exception> pendingExceptions; - private Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations; + private final Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations; private JobRun(DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags, JobSpecification spec, ActivityClusterGraph acg) { @@ -222,6 +222,10 @@ this.profile.setEndTime(endTime); } + public long getQueueWaitTimeInMillis() { + return startTime > 0 ? startTime - createTime : System.currentTimeMillis() - createTime; + } + public void registerOperatorLocation(OperatorDescriptorId op, int partition, String location) { operatorLocations.computeIfAbsent(op, k -> new HashMap<>()).put(partition, location); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java index 40bc1ba..2031808 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.dataflow.TaskId; @@ -92,10 +91,6 @@ this.endTime = endTime; } - public long getQueueWaitTimeInNanos() { - return TimeUnit.MILLISECONDS.toNanos(startTime - createTime); - } - @Override public ObjectNode toJSON() { ObjectMapper om = new ObjectMapper(); -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18110 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: trinity Gerrit-Change-Id: Id1bdc935be8bf84674aa6e35ee6c19f2ee1f7971 Gerrit-Change-Number: 18110 Gerrit-PatchSet: 3 Gerrit-Owner: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-MessageType: merged
