Murtadha Hubail has submitted this change and it was merged. Change subject: [ASTERIXDB-2108][API][RT] Add Processed Objects Metric ......................................................................
[ASTERIXDB-2108][API][RT] Add Processed Objects Metric - user model changes: no - storage format changes: no - interface changes: yes Introduced IOperatorStats and IStatsCollector APIs to collect runtime stats. Details: - Introduce OperatorStats API to report operators runtime stats. - Introduce StatsCollector API to report task runtime stats. - Implement OperatorStats for IndexSearchOperatorNodePushable (tuple counter only). - Add "processedObjects" metric to QueryService API. - Add Stats to ExecuteStatementResponseMessage to pass stats from CC to NCQueryService. - Add metrics test cases. Change-Id: Ie4afe6a676ef0b8a31d36d7dafc13a4023ebf177 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2032 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/MetricsExecutionTest.java A asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.1.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.2.update.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.3.metrics.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.4.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.1.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.2.update.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.3.metrics.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.4.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.1.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.2.update.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.3.metrics.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.4.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/full-scan/full-scan.3.regexadm A asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/primary-index/primary-index.3.regexadm A asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/secondary-index/secondary-index.3.regexadm M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java A hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java A hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/counters/ICounter.java A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java 38 files changed, 987 insertions(+), 84 deletions(-) Approvals: Anon. E. Moose #1000171: Till Westmann: Looks good to me, approved Jenkins: Verified; No violations found; ; Verified diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java index 57c4809..d76c421 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java @@ -68,11 +68,13 @@ public List<Triple<JobId, ResultSetId, ARecordType>> getResultSets() { return resultSets; } + } - public static class Stats { + class Stats implements Serializable { private long count; private long size; + private long processedObjects; public long getCount() { return count; @@ -90,6 +92,13 @@ this.size = size; } + public long getProcessedObjects() { + return processedObjects; + } + + public void setProcessedObjects(long processedObjects) { + this.processedObjects = processedObjects; + } } /** diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java index f4c3949..da06dd1 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java @@ -118,6 +118,7 @@ IStatementExecutor.ResultMetadata resultMetadata = responseMsg.getMetadata(); if (delivery == IStatementExecutor.ResultDelivery.IMMEDIATE && !resultMetadata.getResultSets().isEmpty()) { + stats.setProcessedObjects(responseMsg.getStats().getProcessedObjects()); for (Triple<JobId, ResultSetId, ARecordType> rsmd : resultMetadata.getResultSets()) { ResultReader resultReader = new ResultReader(getHyracksDataset(), rsmd.getLeft(), rsmd.getMiddle()); ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats, rsmd.getRight()); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java index c4f58f6..6a92a26 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java @@ -147,7 +147,8 @@ EXECUTION_TIME("executionTime"), RESULT_COUNT("resultCount"), RESULT_SIZE("resultSize"), - ERROR_COUNT("errorCount"); + ERROR_COUNT("errorCount"), + PROCESSED_OBJECTS_COUNT("processedObjects"); private final String str; @@ -271,7 +272,7 @@ } private static void printMetrics(PrintWriter pw, long elapsedTime, long executionTime, long resultCount, - long resultSize, long errorCount) { + long resultSize, long processedObjects, long errorCount) { boolean hasErrors = errorCount != 0; pw.print("\t\""); pw.print(ResultFields.METRICS.str()); @@ -283,7 +284,10 @@ pw.print("\t"); ResultUtil.printField(pw, Metrics.RESULT_COUNT.str(), resultCount, true); pw.print("\t"); - ResultUtil.printField(pw, Metrics.RESULT_SIZE.str(), resultSize, hasErrors); + ResultUtil.printField(pw, Metrics.RESULT_SIZE.str(), resultSize, true); + pw.print("\t"); + ResultUtil.printField(pw, Metrics.PROCESSED_OBJECTS_COUNT.str(), processedObjects, hasErrors); + pw.print("\t"); if (hasErrors) { pw.print("\t"); ResultUtil.printField(pw, Metrics.ERROR_COUNT.str(), errorCount, false); @@ -421,7 +425,7 @@ } } printMetrics(resultWriter, System.nanoTime() - elapsedStart, execStartEnd[1] - execStartEnd[0], - stats.getCount(), stats.getSize(), errorCount); + stats.getCount(), stats.getSize(), stats.getProcessedObjects(), errorCount); resultWriter.print("}\n"); resultWriter.flush(); String result = stringWriter.toString(); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java index 27cdb66..ed683dc 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java @@ -119,13 +119,14 @@ MetadataManager.INSTANCE.init(); IStatementExecutor translator = statementExecutorFactory.create(ccAppCtx, statements, sessionOutput, compilationProvider, storageComponentProvider); + final IStatementExecutor.Stats stats = new IStatementExecutor.Stats(); final IRequestParameters requestParameters = - new RequestParameters(null, delivery, new IStatementExecutor.Stats(), outMetadata, clientContextID, - optionalParameters); + new RequestParameters(null, delivery, stats, outMetadata, clientContextID, optionalParameters); translator.compileAndExecute(ccApp.getHcc(), statementExecutorContext, requestParameters); outPrinter.close(); responseMsg.setResult(outWriter.toString()); responseMsg.setMetadata(outMetadata); + responseMsg.setStats(stats); } catch (AlgebricksException | HyracksException | TokenMgrError | org.apache.asterix.aqlplus.parser.TokenMgrError pe) { // we trust that "our" exceptions are serializable and have a comprehensible error message diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java index 54f0a4e..7475be4 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java @@ -35,6 +35,8 @@ private IStatementExecutor.ResultMetadata metadata; + private IStatementExecutor.Stats stats; + private Throwable error; public ExecuteStatementResponseMessage(long requestMessageId) { @@ -74,6 +76,14 @@ this.metadata = metadata; } + public IStatementExecutor.Stats getStats() { + return stats; + } + + public void setStats(IStatementExecutor.Stats stats) { + this.stats = stats; + } + @Override public String toString() { return String.format("%s(id=%s): %d characters", getClass().getSimpleName(), requestMessageId, diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 2abc18f..454b501 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -26,6 +26,7 @@ import java.io.InputStreamReader; import java.rmi.RemoteException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.EnumSet; @@ -195,7 +196,14 @@ import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.cc.job.IJobManager; +import org.apache.hyracks.control.cc.job.JobRun; import org.apache.hyracks.control.common.controllers.CCConfig; +import org.apache.hyracks.control.common.job.profiling.om.JobProfile; +import org.apache.hyracks.control.common.job.profiling.om.JobletProfile; +import org.apache.hyracks.control.common.job.profiling.om.TaskProfile; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; /* @@ -2367,12 +2375,14 @@ case IMMEDIATE: createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> { final ResultReader resultReader = new ResultReader(hdc, id, resultSetId); + updateJobStats(id, stats); ResultUtil.printResults(appCtx, resultReader, sessionOutput, stats, metadataProvider.findOutputRecordType()); }, clientContextId, ctx); break; case DEFERRED: createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> { + updateJobStats(id, stats); ResultUtil.printResultHandle(sessionOutput, new ResultHandle(id, resultSetId)); if (outMetadata != null) { outMetadata.getResultSets() @@ -2385,6 +2395,25 @@ } } + private void updateJobStats(JobId jobId, Stats stats) { + final IJobManager jobManager = + ((ClusterControllerService) appCtx.getServiceContext().getControllerService()).getJobManager(); + final JobRun run = jobManager.get(jobId); + if (run == null || run.getStatus() != JobStatus.TERMINATED) { + return; + } + final JobProfile jobProfile = run.getJobProfile(); + final Collection<JobletProfile> jobletProfiles = jobProfile.getJobletProfiles().values(); + long processedObjects = 0; + for (JobletProfile jp : jobletProfiles) { + final Collection<TaskProfile> jobletTasksProfile = jp.getTaskProfiles().values(); + for (TaskProfile tp : jobletTasksProfile) { + processedObjects += tp.getStatsCollector().getAggregatedStats().getTupleCounter().get(); + } + } + stats.setProcessedObjects(processedObjects); + } + private void asyncCreateAndRunJob(IHyracksClientConnection hcc, IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, String clientContextId, IStatementExecutorContext ctx, ResultSetId resultSetId, MutableBoolean printed) { diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java index 8c0f8e1..01b280d 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java @@ -20,7 +20,11 @@ import java.io.InputStream; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.EnumSet; +import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import java.util.logging.Logger; import org.apache.asterix.common.exceptions.AsterixException; @@ -40,47 +44,88 @@ */ public class ResultExtractor { + private enum ResultField { + RESULTS("results"), + REQUEST_ID("requestID"), + METRICS("metrics"), + CLIENT_CONTEXT_ID("clientContextID"), + SIGNATURE("signature"), + STATUS("status"), + TYPE("type"), + ERRORS("errors"); + + private static final Map<String, ResultField> fields = new HashMap<>(); + + static { + for (ResultField field : ResultField.values()) { + fields.put(field.getFieldName(), field); + } + } + + private String fieldName; + + ResultField(String fieldName) { + this.fieldName = fieldName; + } + + public String getFieldName() { + return fieldName; + } + + public static ResultField ofFieldName(String fieldName) { + return fields.get(fieldName); + } + } + private static final Logger LOGGER = Logger.getLogger(ResultExtractor.class.getName()); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); public static InputStream extract(InputStream resultStream) throws Exception { - ObjectMapper om = new ObjectMapper(); - String resultStr = IOUtils.toString(resultStream, Charset.defaultCharset()); - PrettyPrinter singleLine = new SingleLinePrettyPrinter(); - ObjectNode result = om.readValue(resultStr, ObjectNode.class); + return extract(resultStream, EnumSet.of(ResultField.RESULTS, ResultField.ERRORS)); + } + + public static InputStream extractMetrics(InputStream resultStream) throws Exception { + return extract(resultStream, EnumSet.of(ResultField.METRICS, ResultField.ERRORS)); + } + + public static String extractHandle(InputStream resultStream) throws Exception { + String result = IOUtils.toString(resultStream, StandardCharsets.UTF_8); + ObjectNode resultJson = OBJECT_MAPPER.readValue(result, ObjectNode.class); + final JsonNode handle = resultJson.get("handle"); + if (handle != null) { + return handle.asText(); + } else { + JsonNode errors = resultJson.get("errors"); + if (errors != null) { + JsonNode msg = errors.get(0).get("msg"); + throw new AsterixException(msg.asText()); + } + } + return null; + } + + private static InputStream extract(InputStream resultStream, EnumSet<ResultField> resultFields) throws Exception { + final String resultStr = IOUtils.toString(resultStream, Charset.defaultCharset()); + final PrettyPrinter singleLine = new SingleLinePrettyPrinter(); + final ObjectNode result = OBJECT_MAPPER.readValue(resultStr, ObjectNode.class); LOGGER.fine("+++++++\n" + result + "\n+++++++\n"); - String type = ""; - String status = ""; - StringBuilder resultBuilder = new StringBuilder(); - String field = ""; - String fieldPrefix = ""; - for (Iterator<String> sIter = result.fieldNames(); sIter.hasNext();) { + final StringBuilder resultBuilder = new StringBuilder(); + String field; + String fieldPrefix; + for (Iterator<String> sIter = result.fieldNames(); sIter.hasNext(); ) { field = sIter.next(); fieldPrefix = field.split("-")[0]; - switch (fieldPrefix) { - case "requestID": - break; - case "clientContextID": - break; - case "signature": - break; - case "status": - status = om.writeValueAsString(result.get(field)); - break; - case "type": - type = om.writeValueAsString(result.get(field)); - break; - case "metrics": - LOGGER.fine(om.writeValueAsString(result.get(field))); - break; - case "errors": - JsonNode errors = result.get(field).get(0).get("msg"); - if (!result.get("metrics").has("errorCount")) { - throw new AsterixException("Request reported error but not an errorCount"); - }; - throw new AsterixException(errors.asText()); - case "results": + final ResultField extractedResultField = ResultField.ofFieldName(fieldPrefix); + if (extractedResultField == null) { + throw new AsterixException("Unanticipated field \"" + field + "\""); + } + if (!resultFields.contains(extractedResultField)) { + continue; + } + switch (extractedResultField) { + case RESULTS: if (result.get(field).size() <= 1) { if (result.get(field).size() == 0) { resultBuilder.append(""); @@ -94,44 +139,41 @@ resultBuilder.append(omm.writer(singleLine).writeValueAsString(result.get(field))); } } else { - resultBuilder.append(om.writeValueAsString(result.get(field))); + resultBuilder.append(OBJECT_MAPPER.writeValueAsString(result.get(field))); } } else { JsonNode[] fields = Iterators.toArray(result.get(field).elements(), JsonNode.class); if (fields.length > 1) { for (JsonNode f : fields) { if (f.isObject()) { - resultBuilder.append(om.writeValueAsString(f)); + + resultBuilder.append(OBJECT_MAPPER.writeValueAsString(f)); } else { resultBuilder.append(f.asText()); } } } + } break; + case ERRORS: + final JsonNode errors = result.get(field).get(0).get("msg"); + if (!result.get(ResultField.METRICS.getFieldName()).has("errorCount")) { + throw new AsterixException("Request reported error but not an errorCount"); + } + throw new AsterixException(errors.asText()); + case REQUEST_ID: + case METRICS: + case CLIENT_CONTEXT_ID: + case SIGNATURE: + case STATUS: + case TYPE: + resultBuilder.append(OBJECT_MAPPER.writeValueAsString(result.get(field))); + break; default: - throw new AsterixException("Unanticipated field \"" + field + "\""); + throw new IllegalStateException("Unexpected result field: " + extractedResultField); } } - - return IOUtils.toInputStream(resultBuilder.toString()); + return IOUtils.toInputStream(resultBuilder.toString(), StandardCharsets.UTF_8); } - - public static String extractHandle(InputStream resultStream) throws Exception { - final Charset utf8 = Charset.forName("UTF-8"); - ObjectMapper om = new ObjectMapper(); - String result = IOUtils.toString(resultStream, utf8); - ObjectNode resultJson = om.readValue(result, ObjectNode.class); - final JsonNode handle = resultJson.get("handle"); - if (handle != null) { - return handle.asText(); - } else { - JsonNode errors = resultJson.get("errors"); - if (errors != null) { - JsonNode msg = errors.get(0).get("msg"); - throw new AsterixException(msg.asText()); - } - } - return null; - } -} +} \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java index b9b7bda..43b61b6 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java @@ -67,9 +67,9 @@ import org.apache.asterix.testframework.context.TestCaseContext.OutputFormat; import org.apache.asterix.testframework.context.TestFileContext; import org.apache.asterix.testframework.xml.ComparisonEnum; -import org.apache.asterix.testframework.xml.TestGroup; import org.apache.asterix.testframework.xml.TestCase.CompilationUnit; import org.apache.asterix.testframework.xml.TestCase.CompilationUnit.Parameter; +import org.apache.asterix.testframework.xml.TestGroup; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.io.output.ByteArrayOutputStream; @@ -124,6 +124,7 @@ public static final String DELIVERY_ASYNC = "async"; public static final String DELIVERY_DEFERRED = "deferred"; public static final String DELIVERY_IMMEDIATE = "immediate"; + private static final String METRICS_QUERY_TYPE = "metrics"; private static Method managixExecuteMethod = null; private static final HashMap<Integer, ITestServer> runningTestServers = new HashMap<>(); @@ -888,6 +889,7 @@ case "query": case "async": case "deferred": + case "metrics": // isDmlRecoveryTest: insert Crash and Recovery if (isDmlRecoveryTest) { executeScript(pb, pb.environment().get("SCRIPT_HOME") + File.separator + "dml_recovery" @@ -1203,7 +1205,9 @@ final URI uri = getEndpoint(Servlets.QUERY_SERVICE); if (DELIVERY_IMMEDIATE.equals(delivery)) { resultStream = executeQueryService(statement, fmt, uri, params, true, null, true); - resultStream = ResultExtractor.extract(resultStream); + resultStream = METRICS_QUERY_TYPE.equals(reqType) ? + ResultExtractor.extractMetrics(resultStream) : + ResultExtractor.extract(resultStream); } else { String handleVar = getHandleVariable(statement); resultStream = executeQueryService(statement, fmt, uri, upsertParam(params, "mode", delivery), true); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/MetricsExecutionTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/MetricsExecutionTest.java new file mode 100644 index 0000000..c7fae46 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/MetricsExecutionTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.test.runtime; + +import java.util.Collection; + +import org.apache.asterix.test.common.TestExecutor; +import org.apache.asterix.testframework.context.TestCaseContext; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +/** + * Runs the cluster state runtime tests with the storage parallelism. + */ +@RunWith(Parameterized.class) +public class MetricsExecutionTest { + protected static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml"; + + @BeforeClass + public static void setUp() throws Exception { + LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor()); + } + + @AfterClass + public static void tearDown() throws Exception { + LangExecutionUtil.tearDown(); + } + + @Parameters(name = "MetricsExecutionTest {index}: {0}") + public static Collection<Object[]> tests() throws Exception { + return LangExecutionUtil.buildTestsInXml("metrics.xml"); + } + + protected TestCaseContext tcCtx; + + public MetricsExecutionTest(TestCaseContext tcCtx) { + this.tcCtx = tcCtx; + } + + @Test + public void test() throws Exception { + LangExecutionUtil.test(tcCtx); + } +} diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml b/asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml new file mode 100644 index 0000000..f65ede3 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml @@ -0,0 +1,39 @@ +<?xml version="1.0" encoding="UTF-8" standalone="yes"?> +<!-- + ! Licensed to the Apache Software Foundation (ASF) under one + ! or more contributor license agreements. See the NOTICE file + ! distributed with this work for additional information + ! regarding copyright ownership. The ASF licenses this file + ! to you under the Apache License, Version 2.0 (the + ! "License"); you may not use this file except in compliance + ! with the License. You may obtain a copy of the License at + ! + ! http://www.apache.org/licenses/LICENSE-2.0 + ! + ! Unless required by applicable law or agreed to in writing, + ! software distributed under the License is distributed on an + ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ! KIND, either express or implied. See the License for the + ! specific language governing permissions and limitations + ! under the License. + !--> +<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" + QueryFileExtension=".sqlpp"> + <test-group name="metrics"> + <test-case FilePath="metrics"> + <compilation-unit name="full-scan"> + <output-dir compare="Text">full-scan</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="metrics"> + <compilation-unit name="primary-index"> + <output-dir compare="Text">primary-index</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="metrics"> + <compilation-unit name="secondary-index"> + <output-dir compare="Text">secondary-index</output-dir> + </compilation-unit> + </test-case> + </test-group> +</test-suite> \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.1.ddl.sqlpp new file mode 100644 index 0000000..441bee4 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.1.ddl.sqlpp @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Processed objects metrics on full scan + * Expected Res : Success + * Date : 28 Sep 2017 + */ + +drop dataverse test if exists; +create dataverse test; + +use test; + +create type test.AddressType as +{ + number : bigint, + street : string, + city : string +}; + +create type test.CustomerType as + closed { + cid : bigint, + name : string, + age : bigint?, + address : AddressType?, + lastorder : { + oid : bigint, + total : float + } +}; + +create dataset Customers(CustomerType) primary key cid; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.2.update.sqlpp new file mode 100644 index 0000000..2c5d5d9 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.2.update.sqlpp @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Processed objects metrics on full scan + * Expected Res : Success + * Date : 28 Sep 2017 + */ + +use test; + +load dataset Customers using localfs + ((`path`=`asterix_nc1://data/custord-tiny/customer-tiny-neg.adm`), + (`format`=`adm`)); + diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.3.metrics.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.3.metrics.sqlpp new file mode 100644 index 0000000..3671133 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.3.metrics.sqlpp @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Processed objects metrics on full scan + * Expected Res : Success + * Date : 28 Sep 2017 + */ + +use test; + +select count(*) from Customers +where name = "Marvella Loud"; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.4.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.4.ddl.sqlpp new file mode 100644 index 0000000..4cccd9f --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.4.ddl.sqlpp @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Processed objects metrics on full scan + * Expected Res : Success + * Date : 28 Sep 2017 + */ + +drop dataverse test; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.1.ddl.sqlpp new file mode 100644 index 0000000..5fb58b4 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.1.ddl.sqlpp @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Processed objects metrics on primary index scan + * Expected Res : Success + * Date : 28 Sep 2017 + */ + +drop dataverse test if exists; +create dataverse test; + +use test; + +create type test.AddressType as +{ + number : bigint, + street : string, + city : string +}; + +create type test.CustomerType as + closed { + cid : bigint, + name : string, + age : bigint?, + address : AddressType?, + lastorder : { + oid : bigint, + total : float + } +}; + +create dataset Customers(CustomerType) primary key cid; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.2.update.sqlpp new file mode 100644 index 0000000..046c40b --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.2.update.sqlpp @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Processed objects metrics on primary index scan + * Expected Res : Success + * Date : 28 Sep 2017 + */ + +use test; + +load dataset Customers using localfs + ((`path`=`asterix_nc1://data/custord-tiny/customer-tiny-neg.adm`), + (`format`=`adm`)); + diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.3.metrics.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.3.metrics.sqlpp new file mode 100644 index 0000000..969949e --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.3.metrics.sqlpp @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Processed objects metrics on primary index scan + * Expected Res : Success + * Date : 28 Sep 2017 + */ + +use test; + +select count(*) from Customers +where cid = 996; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.4.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.4.ddl.sqlpp new file mode 100644 index 0000000..1dbe56e --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.4.ddl.sqlpp @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Processed objects metrics on primary index scan + * Expected Res : Success + * Date : 28 Sep 2017 + */ + +drop dataverse test; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.1.ddl.sqlpp new file mode 100644 index 0000000..74f8a19 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.1.ddl.sqlpp @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Processed objects metrics on secondary index scan + * Expected Res : Success + * Date : 28 Sep 2017 + */ + +drop dataverse test if exists; +create dataverse test; + +use test; + +create type test.AddressType as +{ + number : bigint, + street : string, + city : string +}; + +create type test.CustomerType as + closed { + cid : bigint, + name : string, + age : bigint?, + address : AddressType?, + lastorder : { + oid : bigint, + total : float + } +}; + +create dataset Customers(CustomerType) primary key cid; +create index customer_name_idx on Customers(name); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.2.update.sqlpp new file mode 100644 index 0000000..40d87f7 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.2.update.sqlpp @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Processed objects metrics on secondary index scan + * Expected Res : Success + * Date : 28 Sep 2017 + */ + +use test; + +load dataset Customers using localfs + ((`path`=`asterix_nc1://data/custord-tiny/customer-tiny-neg.adm`), + (`format`=`adm`)); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.3.metrics.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.3.metrics.sqlpp new file mode 100644 index 0000000..1c5126f --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.3.metrics.sqlpp @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Processed objects metrics on secondary index scan + * Expected Res : Success + * Date : 28 Sep 2017 + */ + +use test; + +select count(*) from Customers +where name = "Marvella Loud"; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.4.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.4.ddl.sqlpp new file mode 100644 index 0000000..1de3b3b --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.4.ddl.sqlpp @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Processed objects metrics on secondary index scan + * Expected Res : Success + * Date : 28 Sep 2017 + */ + +drop dataverse test; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/full-scan/full-scan.3.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/full-scan/full-scan.3.regexadm new file mode 100644 index 0000000..aca314d --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/full-scan/full-scan.3.regexadm @@ -0,0 +1 @@ +.*"processedObjects":10.* \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/primary-index/primary-index.3.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/primary-index/primary-index.3.regexadm new file mode 100644 index 0000000..91a2dfd --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/primary-index/primary-index.3.regexadm @@ -0,0 +1 @@ +.*"processedObjects":1.* \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/secondary-index/secondary-index.3.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/secondary-index/secondary-index.3.regexadm new file mode 100644 index 0000000..8578d41 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/secondary-index/secondary-index.3.regexadm @@ -0,0 +1 @@ +.*"processedObjects":2.* \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java index df693b2..10bb336 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java @@ -28,6 +28,7 @@ import org.apache.hyracks.api.io.IWorkspaceFileFactory; import org.apache.hyracks.api.job.IOperatorEnvironment; import org.apache.hyracks.api.job.JobFlag; +import org.apache.hyracks.api.job.profiling.IStatsCollector; import org.apache.hyracks.api.job.profiling.counters.ICounterContext; import org.apache.hyracks.api.resources.IDeallocatableRegistry; @@ -52,4 +53,6 @@ Object getSharedObject(); Set<JobFlag> getJobFlags(); + + IStatsCollector getStatsCollector(); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java new file mode 100644 index 0000000..35ae8b6 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.job.profiling; + +import java.io.Serializable; + +import org.apache.hyracks.api.io.IWritable; +import org.apache.hyracks.api.job.profiling.counters.ICounter; + +public interface IOperatorStats extends IWritable, Serializable { + + /** + * @return The name of the operator + */ + String getName(); + + /** + * @return A counter used to track the number of tuples + * accessed by an operator + */ + ICounter getTupleCounter(); + + /** + * @return A counter used to track the execution time + * of an operator + */ + ICounter getTimeCounter(); +} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java new file mode 100644 index 0000000..1e73146 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.job.profiling; + +import java.io.Serializable; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.IWritable; + +public interface IStatsCollector extends IWritable, Serializable { + + /** + * Adds {@link IOperatorStats} to the stats collections + * + * @param operatorStats + * @throws HyracksDataException when an operator with the same was already added. + */ + void add(IOperatorStats operatorStats) throws HyracksDataException; + + /** + * @param operatorName + * @return {@link IOperatorStats} for the operator with name <code>operatorName</code> + * if one exists or else null. + */ + IOperatorStats getOperatorStats(String operatorName); + + /** + * @return A special {@link IOperatorStats} that has the aggregated stats + * from all operators in the collection. + */ + IOperatorStats getAggregatedStats(); +} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/counters/ICounter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/counters/ICounter.java index 987b86b..e43a2e1 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/counters/ICounter.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/counters/ICounter.java @@ -18,13 +18,15 @@ */ package org.apache.hyracks.api.job.profiling.counters; -public interface ICounter { +import java.io.Serializable; + +public interface ICounter extends Serializable { /** * Get the fully-qualified name of the counter. * * @return Name of the counter. */ - public String getName(); + String getName(); /** * Update the value of the counter to be current + delta. @@ -33,7 +35,7 @@ * - Amount to change the counter value by. * @return the new value after update. */ - public long update(long delta); + long update(long delta); /** * Set the value of the counter. @@ -42,12 +44,12 @@ * - New value of the counter. * @return Old value of the counter. */ - public long set(long value); + long set(long value); /** * Get the value of the counter. * * @return the value of the counter. */ - public long get(); + long get(); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java new file mode 100644 index 0000000..718ac3d --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.common.job.profiling; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hyracks.api.job.profiling.IOperatorStats; +import org.apache.hyracks.api.job.profiling.counters.ICounter; +import org.apache.hyracks.control.common.job.profiling.counters.Counter; + +public class OperatorStats implements IOperatorStats { + + public final String operatorName; + public final ICounter tupleCounter; + public final ICounter timeCounter; + + public OperatorStats(String operatorName) { + if (operatorName == null || operatorName.isEmpty()) { + throw new IllegalArgumentException("operatorName must not be null or empty"); + } + this.operatorName = operatorName; + tupleCounter = new Counter("tupleCounter"); + timeCounter = new Counter("timeCounter"); + } + + public static IOperatorStats create(DataInput input) throws IOException { + String name = input.readUTF(); + OperatorStats operatorStats = new OperatorStats(name); + operatorStats.readFields(input); + return operatorStats; + } + + @Override + public String getName() { + return operatorName; + } + + @Override + public ICounter getTupleCounter() { + return tupleCounter; + } + + @Override + public ICounter getTimeCounter() { + return timeCounter; + } + + @Override + public void writeFields(DataOutput output) throws IOException { + output.writeUTF(operatorName); + output.writeLong(tupleCounter.get()); + output.writeLong(timeCounter.get()); + } + + @Override + public void readFields(DataInput input) throws IOException { + tupleCounter.set(input.readLong()); + timeCounter.set(input.readLong()); + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java new file mode 100644 index 0000000..90cdc72 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.common.job.profiling; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.profiling.IOperatorStats; +import org.apache.hyracks.api.job.profiling.IStatsCollector; + +public class StatsCollector implements IStatsCollector { + + private final Map<String, IOperatorStats> operatorStatsMap = new HashMap<>(); + + @Override + public void add(IOperatorStats operatorStats) throws HyracksDataException { + if (operatorStatsMap.containsKey(operatorStats.getName())) { + throw new IllegalArgumentException("Operator with the same name already exists"); + } + operatorStatsMap.put(operatorStats.getName(), operatorStats); + } + + @Override + public IOperatorStats getOperatorStats(String operatorName) { + return operatorStatsMap.get(operatorName); + } + + public static StatsCollector create(DataInput input) throws IOException { + StatsCollector statsCollector = new StatsCollector(); + statsCollector.readFields(input); + return statsCollector; + } + + @Override + public IOperatorStats getAggregatedStats() { + IOperatorStats aggregatedStats = new OperatorStats("aggregated"); + for (IOperatorStats stats : operatorStatsMap.values()) { + aggregatedStats.getTupleCounter().update(stats.getTupleCounter().get()); + aggregatedStats.getTimeCounter().update(stats.getTupleCounter().get()); + } + return aggregatedStats; + } + + @Override + public void writeFields(DataOutput output) throws IOException { + output.writeInt(operatorStatsMap.size()); + for (IOperatorStats operatorStats : operatorStatsMap.values()) { + operatorStats.writeFields(output); + } + } + + @Override + public void readFields(DataInput input) throws IOException { + int operatorCount = input.readInt(); + for (int i = 0; i < operatorCount; i++) { + IOperatorStats opStats = OperatorStats.create(input); + operatorStatsMap.put(opStats.getName(), opStats); + } + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java index 680d2f9..3b54887 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java @@ -25,12 +25,15 @@ import java.util.Map; import java.util.Map.Entry; +import org.apache.hyracks.api.dataflow.TaskAttemptId; +import org.apache.hyracks.api.job.profiling.IStatsCollector; +import org.apache.hyracks.api.partitions.PartitionId; +import org.apache.hyracks.control.common.job.profiling.StatsCollector; +import org.apache.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler; + import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.hyracks.api.dataflow.TaskAttemptId; -import org.apache.hyracks.api.partitions.PartitionId; -import org.apache.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler; public class TaskProfile extends AbstractProfile { private static final long serialVersionUID = 1L; @@ -38,6 +41,8 @@ private TaskAttemptId taskAttemptId; private Map<PartitionId, PartitionProfile> partitionSendProfile; + + private IStatsCollector statsCollector; public static TaskProfile create(DataInput dis) throws IOException { TaskProfile taskProfile = new TaskProfile(); @@ -49,9 +54,11 @@ } - public TaskProfile(TaskAttemptId taskAttemptId, Map<PartitionId, PartitionProfile> partitionSendProfile) { + public TaskProfile(TaskAttemptId taskAttemptId, Map<PartitionId, PartitionProfile> partitionSendProfile, + IStatsCollector statsCollector) { this.taskAttemptId = taskAttemptId; - this.partitionSendProfile = new HashMap<PartitionId, PartitionProfile>(partitionSendProfile); + this.partitionSendProfile = new HashMap<>(partitionSendProfile); + this.statsCollector = statsCollector; } public TaskAttemptId getTaskId() { @@ -104,17 +111,22 @@ return json; } + public IStatsCollector getStatsCollector() { + return statsCollector; + } + @Override public void readFields(DataInput input) throws IOException { super.readFields(input); taskAttemptId = TaskAttemptId.create(input); int size = input.readInt(); - partitionSendProfile = new HashMap<PartitionId, PartitionProfile>(); + partitionSendProfile = new HashMap<>(); for (int i = 0; i < size; i++) { PartitionId key = PartitionId.create(input); PartitionProfile value = PartitionProfile.create(input); partitionSendProfile.put(key, value); } + statsCollector = StatsCollector.create(input); } @Override @@ -126,5 +138,6 @@ entry.getKey().writeFields(output); entry.getValue().writeFields(output); } + statsCollector.writeFields(output); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java index d7b4be0..6dc9619 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java @@ -53,6 +53,7 @@ import org.apache.hyracks.control.common.deployment.DeploymentUtils; import org.apache.hyracks.control.common.job.PartitionRequest; import org.apache.hyracks.control.common.job.PartitionState; +import org.apache.hyracks.control.common.job.profiling.StatsCollector; import org.apache.hyracks.control.common.job.profiling.counters.Counter; import org.apache.hyracks.control.common.job.profiling.om.JobletProfile; import org.apache.hyracks.control.common.job.profiling.om.TaskProfile; @@ -190,7 +191,7 @@ } for (Task task : taskMap.values()) { TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(), - new Hashtable<>(task.getPartitionSendProfile())); + new Hashtable<>(task.getPartitionSendProfile()), new StatsCollector()); task.dumpProfile(taskProfile); jProfile.getTaskProfiles().put(task.getTaskAttemptId(), taskProfile); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java index bff2794..fcd4bde 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java @@ -53,12 +53,14 @@ import org.apache.hyracks.api.io.IWorkspaceFileFactory; import org.apache.hyracks.api.job.IOperatorEnvironment; import org.apache.hyracks.api.job.JobFlag; +import org.apache.hyracks.api.job.profiling.IStatsCollector; import org.apache.hyracks.api.job.profiling.counters.ICounter; import org.apache.hyracks.api.job.profiling.counters.ICounterContext; import org.apache.hyracks.api.partitions.PartitionId; import org.apache.hyracks.api.resources.IDeallocatable; import org.apache.hyracks.api.util.JavaSerializationUtils; import org.apache.hyracks.control.common.job.PartitionState; +import org.apache.hyracks.control.common.job.profiling.StatsCollector; import org.apache.hyracks.control.common.job.profiling.counters.Counter; import org.apache.hyracks.control.common.job.profiling.om.PartitionProfile; import org.apache.hyracks.control.common.job.profiling.om.TaskProfile; @@ -107,6 +109,8 @@ private final Set<JobFlag> jobFlags; + private final IStatsCollector statsCollector; + public Task(Joblet joblet, Set<JobFlag> jobFlags, TaskAttemptId taskId, String displayName, ExecutorService executor, NodeControllerService ncs, List<List<PartitionChannel>> inputChannelsFromConnectors) { @@ -124,6 +128,7 @@ exceptions = new CopyOnWriteArrayList<>(); // Multiple threads could add exceptions to this list. this.ncs = ncs; this.inputChannelsFromConnectors = inputChannelsFromConnectors; + statsCollector = new StatsCollector(); } public void setTaskRuntime(IPartitionCollector[] collectors, IOperatorNodePushable operator) { @@ -453,4 +458,9 @@ public Set<JobFlag> getJobFlags() { return jobFlags; } + + @Override + public IStatsCollector getStatsCollector() { + return statsCollector; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java index 7728d16..675926e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java @@ -38,8 +38,8 @@ @Override public void run() { - TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(), task.getPartitionSendProfile()); - task.dumpProfile(taskProfile); + TaskProfile taskProfile = + new TaskProfile(task.getTaskAttemptId(), task.getPartitionSendProfile(), task.getStatsCollector()); try { ncs.getClusterController().notifyTaskComplete(task.getJobletContext().getJobId(), task.getTaskAttemptId(), ncs.getId(), taskProfile); @@ -53,4 +53,4 @@ public String toString() { return getClass().getSimpleName() + ":" + task.getTaskAttemptId(); } -} +} \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml index 526caa9..2b3f3cb 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml @@ -81,6 +81,11 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-control-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java index 07d07c3..1e3d5f0 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java @@ -30,6 +30,8 @@ import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.profiling.IOperatorStats; +import org.apache.hyracks.control.common.job.profiling.OperatorStats; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; @@ -79,6 +81,7 @@ protected ArrayTupleBuilder nonFilterTupleBuild; protected final ISearchOperationCallbackFactory searchCallbackFactory; protected boolean failed = false; + private final IOperatorStats stats; public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc, int partition, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, IIndexDataflowHelperFactory indexHelperFactory, @@ -104,6 +107,10 @@ if (maxFilterFieldIndexes != null && maxFilterFieldIndexes.length > 0) { maxFilterKey = new PermutingFrameTupleReference(); maxFilterKey.setFieldPermutation(maxFilterFieldIndexes); + } + stats = new OperatorStats(getDisplayName()); + if (ctx.getStatsCollector() != null) { + ctx.getStatsCollector().add(stats); } } @@ -154,9 +161,9 @@ } protected void writeSearchResults(int tupleIndex) throws Exception { - boolean matched = false; + long matchingTupleCount = 0; while (cursor.hasNext()) { - matched = true; + matchingTupleCount++; tb.reset(); cursor.next(); if (retainInput) { @@ -174,8 +181,9 @@ } FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize()); } + stats.getTupleCounter().update(matchingTupleCount); - if (!matched && retainInput && retainMissing) { + if (matchingTupleCount == 0 && retainInput && retainMissing) { FrameUtils.appendConcatToWriter(writer, appender, accessor, tupleIndex, nonMatchTupleBuild.getFieldEndOffsets(), nonMatchTupleBuild.getByteArray(), 0, nonMatchTupleBuild.getSize()); diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java index b100300..3d13cf9 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java @@ -36,8 +36,10 @@ import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.api.job.JobFlag; +import org.apache.hyracks.api.job.profiling.IStatsCollector; import org.apache.hyracks.api.job.profiling.counters.ICounterContext; import org.apache.hyracks.api.resources.IDeallocatable; +import org.apache.hyracks.control.common.job.profiling.StatsCollector; import org.apache.hyracks.control.nc.io.WorkspaceFileFactory; public class TestTaskContext implements IHyracksTaskContext { @@ -46,6 +48,7 @@ private WorkspaceFileFactory fileFactory; private Map<Object, IStateObject> stateObjectMap = new HashMap<>(); private Object sharedObject; + private final IStatsCollector statsCollector = new StatsCollector(); public TestTaskContext(TestJobletContext jobletContext, TaskAttemptId taskId) { this.jobletContext = jobletContext; @@ -163,4 +166,9 @@ public Set<JobFlag> getJobFlags() { return EnumSet.noneOf(JobFlag.class); } + + @Override + public IStatsCollector getStatsCollector() { + return statsCollector; + } } -- To view, visit https://asterix-gerrit.ics.uci.edu/2032 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ie4afe6a676ef0b8a31d36d7dafc13a4023ebf177 Gerrit-PatchSet: 11 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
