Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2032
Change subject: [WIP][ASTERIXDB-2108][API][RT] Add Processed Objects Metric
......................................................................
[WIP][ASTERIXDB-2108][API][RT] Add Processed Objects Metric
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Introduce OperatorStats API to report operators runtime stats.
- Introduce StatsCollector API to report task runtime stats.
- Add OperatorStats for IndexSearchOperatorNodePushable (tuple count only).
- Add "processedObjects" metric to QueryService API.
- Pass "processedObjects" metric from CC to NCQueryService in ResultMetadata.
- Add metrics test cases.
Change-Id: Ie4afe6a676ef0b8a31d36d7dafc13a4023ebf177
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
A
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/MetricsExecutionTest.java
A asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.1.ddl.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.2.update.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.3.metrics.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.4.ddl.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.1.ddl.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.2.update.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.3.metrics.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.4.ddl.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.1.ddl.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.2.update.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.3.metrics.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.4.ddl.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/full-scan/full-scan.3.regexadm
A
asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/primary-index/primary-index.3.regexadm
A
asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/secondary-index/secondary-index.3.regexadm
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
A
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
A
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
A
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java
A
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
M
hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
34 files changed, 892 insertions(+), 18 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/32/2032/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 57c4809..9f71be4 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -62,17 +62,27 @@
class ResultMetadata implements Serializable {
private static final long serialVersionUID = 1L;
+ private long processedObjects;
private final List<Triple<JobId, ResultSetId, ARecordType>> resultSets
= new ArrayList<>();
public List<Triple<JobId, ResultSetId, ARecordType>> getResultSets() {
return resultSets;
}
+
+ public long getProcessedObjects() {
+ return processedObjects;
+ }
+
+ public void setProcessedObjects(long processedObjects) {
+ this.processedObjects = processedObjects;
+ }
}
- public static class Stats {
+ class Stats {
private long count;
private long size;
+ private long processedObjects;
public long getCount() {
return count;
@@ -90,6 +100,13 @@
this.size = size;
}
+ public long getProcessedObjects() {
+ return processedObjects;
+ }
+
+ public void setProcessedObjects(long processedObjects) {
+ this.processedObjects = processedObjects;
+ }
}
/**
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index f4c3949..b592ef2 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -120,6 +120,7 @@
if (delivery == IStatementExecutor.ResultDelivery.IMMEDIATE &&
!resultMetadata.getResultSets().isEmpty()) {
for (Triple<JobId, ResultSetId, ARecordType> rsmd :
resultMetadata.getResultSets()) {
ResultReader resultReader = new
ResultReader(getHyracksDataset(), rsmd.getLeft(), rsmd.getMiddle());
+
stats.setProcessedObjects(resultMetadata.getProcessedObjects());
ResultUtil.printResults(appCtx, resultReader, sessionOutput,
stats, rsmd.getRight());
}
} else {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index ad34d96..4e1d724 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -147,7 +147,8 @@
EXECUTION_TIME("executionTime"),
RESULT_COUNT("resultCount"),
RESULT_SIZE("resultSize"),
- ERROR_COUNT("errorCount");
+ ERROR_COUNT("errorCount"),
+ PROCESSED_OBJECTS_COUNT("processedObjects");
private final String str;
@@ -271,7 +272,7 @@
}
private static void printMetrics(PrintWriter pw, long elapsedTime, long
executionTime, long resultCount,
- long resultSize, long errorCount) {
+ long resultSize, long processedObjects, long errorCount) {
boolean hasErrors = errorCount != 0;
pw.print("\t\"");
pw.print(ResultFields.METRICS.str());
@@ -283,7 +284,10 @@
pw.print("\t");
ResultUtil.printField(pw, Metrics.RESULT_COUNT.str(), resultCount,
true);
pw.print("\t");
- ResultUtil.printField(pw, Metrics.RESULT_SIZE.str(), resultSize,
hasErrors);
+ ResultUtil.printField(pw, Metrics.RESULT_SIZE.str(), resultSize, true);
+ pw.print("\t");
+ ResultUtil.printField(pw, Metrics.PROCESSED_OBJECTS_COUNT.str(),
processedObjects, hasErrors);
+ pw.print("\t");
if (hasErrors) {
pw.print("\t");
ResultUtil.printField(pw, Metrics.ERROR_COUNT.str(), errorCount,
false);
@@ -421,7 +425,7 @@
}
}
printMetrics(resultWriter, System.nanoTime() - elapsedStart,
execStartEnd[1] - execStartEnd[0],
- stats.getCount(), stats.getSize(), errorCount);
+ stats.getCount(), stats.getSize(),
stats.getProcessedObjects(), errorCount);
resultWriter.print("}\n");
resultWriter.flush();
String result = stringWriter.toString();
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 2abc18f..768fe23 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -26,6 +26,7 @@
import java.io.InputStreamReader;
import java.rmi.RemoteException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
@@ -195,7 +196,14 @@
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.cc.job.IJobManager;
+import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
+import org.apache.hyracks.control.common.job.profiling.om.JobletProfile;
+import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
/*
@@ -2367,12 +2375,14 @@
case IMMEDIATE:
createAndRunJob(hcc, jobFlags, null, compiler, locker,
resultDelivery, id -> {
final ResultReader resultReader = new ResultReader(hdc,
id, resultSetId);
+ updateJobStats(id, outMetadata, stats);
ResultUtil.printResults(appCtx, resultReader,
sessionOutput, stats,
metadataProvider.findOutputRecordType());
}, clientContextId, ctx);
break;
case DEFERRED:
createAndRunJob(hcc, jobFlags, null, compiler, locker,
resultDelivery, id -> {
+ updateJobStats(id, outMetadata, stats);
ResultUtil.printResultHandle(sessionOutput, new
ResultHandle(id, resultSetId));
if (outMetadata != null) {
outMetadata.getResultSets()
@@ -2385,6 +2395,28 @@
}
}
+ private void updateJobStats(JobId jobId, ResultMetadata outMetadata, Stats
stats) {
+ final IJobManager jobManager =
+ ((ClusterControllerService)
appCtx.getServiceContext().getControllerService()).getJobManager();
+ final JobRun run = jobManager.get(jobId);
+ if (run.getStatus() != JobStatus.TERMINATED) {
+ return;
+ }
+ final JobProfile jobProfile = run.getJobProfile();
+ final Collection<JobletProfile> jobletProfiles =
jobProfile.getJobletProfiles().values();
+ long processedObjects = 0;
+ for (JobletProfile jp : jobletProfiles) {
+ final Collection<TaskProfile> jobletTasksProfile =
jp.getTaskProfiles().values();
+ for (TaskProfile tp : jobletTasksProfile) {
+ processedObjects +=
tp.getStatsCollector().getAggregatedStats().getTupleCounter().get();
+ }
+ }
+ stats.setProcessedObjects(processedObjects);
+ if (outMetadata != null) {
+ outMetadata.setProcessedObjects(processedObjects);
+ }
+ }
+
private void asyncCreateAndRunJob(IHyracksClientConnection hcc,
IStatementCompiler compiler, IMetadataLocker locker,
ResultDelivery resultDelivery, String clientContextId,
IStatementExecutorContext ctx,
ResultSetId resultSetId, MutableBoolean printed) {
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
index 0bba635..e1040ba 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/ResultExtractor.java
@@ -20,6 +20,7 @@
import java.io.InputStream;
import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.logging.Logger;
@@ -134,4 +135,23 @@
}
return null;
}
+
+ public static InputStream extractMetrics(InputStream resultStream) throws
Exception {
+ ObjectMapper om = new ObjectMapper();
+ String resultStr = IOUtils.toString(resultStream,
Charset.defaultCharset());
+ ObjectNode result = om.readValue(resultStr, ObjectNode.class);
+ String metrics = "";
+ String field;
+ for (Iterator<String> sIter = result.fieldNames(); sIter.hasNext(); ) {
+ field = sIter.next();
+ switch (field) {
+ case "metrics":
+ metrics = om.writeValueAsString(result.get(field));
+ break;
+ default:
+ break;
+ }
+ }
+ return IOUtils.toInputStream(metrics, StandardCharsets.UTF_8);
+ }
}
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index b9b7bda..6dff571 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -67,9 +67,9 @@
import org.apache.asterix.testframework.context.TestCaseContext.OutputFormat;
import org.apache.asterix.testframework.context.TestFileContext;
import org.apache.asterix.testframework.xml.ComparisonEnum;
-import org.apache.asterix.testframework.xml.TestGroup;
import org.apache.asterix.testframework.xml.TestCase.CompilationUnit;
import org.apache.asterix.testframework.xml.TestCase.CompilationUnit.Parameter;
+import org.apache.asterix.testframework.xml.TestGroup;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.output.ByteArrayOutputStream;
@@ -124,6 +124,7 @@
public static final String DELIVERY_ASYNC = "async";
public static final String DELIVERY_DEFERRED = "deferred";
public static final String DELIVERY_IMMEDIATE = "immediate";
+ private static final String METRICS_QUERY_TYPE = "metrics";
private static Method managixExecuteMethod = null;
private static final HashMap<Integer, ITestServer> runningTestServers =
new HashMap<>();
@@ -888,6 +889,7 @@
case "query":
case "async":
case "deferred":
+ case METRICS_QUERY_TYPE:
// isDmlRecoveryTest: insert Crash and Recovery
if (isDmlRecoveryTest) {
executeScript(pb, pb.environment().get("SCRIPT_HOME") +
File.separator + "dml_recovery"
@@ -1203,7 +1205,11 @@
final URI uri = getEndpoint(Servlets.QUERY_SERVICE);
if (DELIVERY_IMMEDIATE.equals(delivery)) {
resultStream = executeQueryService(statement, fmt, uri,
params, true, null, true);
- resultStream = ResultExtractor.extract(resultStream);
+ if (reqType.equals(METRICS_QUERY_TYPE)) {
+ resultStream =
ResultExtractor.extractMetrics(resultStream);
+ } else {
+ resultStream = ResultExtractor.extract(resultStream);
+ }
} else {
String handleVar = getHandleVariable(statement);
resultStream = executeQueryService(statement, fmt, uri,
upsertParam(params, "mode", delivery), true);
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/MetricsExecutionTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/MetricsExecutionTest.java
new file mode 100644
index 0000000..c7fae46
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/MetricsExecutionTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.runtime;
+
+import java.util.Collection;
+
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Runs the cluster state runtime tests with the storage parallelism.
+ */
+@RunWith(Parameterized.class)
+public class MetricsExecutionTest {
+ protected static final String TEST_CONFIG_FILE_NAME =
"asterix-build-configuration.xml";
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ LangExecutionUtil.tearDown();
+ }
+
+ @Parameters(name = "MetricsExecutionTest {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ return LangExecutionUtil.buildTestsInXml("metrics.xml");
+ }
+
+ protected TestCaseContext tcCtx;
+
+ public MetricsExecutionTest(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @Test
+ public void test() throws Exception {
+ LangExecutionUtil.test(tcCtx);
+ }
+}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml
new file mode 100644
index 0000000..f65ede3
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/metrics.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org"
ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp"
+ QueryFileExtension=".sqlpp">
+ <test-group name="metrics">
+ <test-case FilePath="metrics">
+ <compilation-unit name="full-scan">
+ <output-dir compare="Text">full-scan</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="metrics">
+ <compilation-unit name="primary-index">
+ <output-dir compare="Text">primary-index</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="metrics">
+ <compilation-unit name="secondary-index">
+ <output-dir compare="Text">secondary-index</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
+</test-suite>
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.1.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.1.ddl.sqlpp
new file mode 100644
index 0000000..441bee4
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.1.ddl.sqlpp
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on full scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create type test.AddressType as
+{
+ number : bigint,
+ street : string,
+ city : string
+};
+
+create type test.CustomerType as
+ closed {
+ cid : bigint,
+ name : string,
+ age : bigint?,
+ address : AddressType?,
+ lastorder : {
+ oid : bigint,
+ total : float
+ }
+};
+
+create dataset Customers(CustomerType) primary key cid;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.2.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.2.update.sqlpp
new file mode 100644
index 0000000..2c5d5d9
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.2.update.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on full scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+use test;
+
+load dataset Customers using localfs
+ ((`path`=`asterix_nc1://data/custord-tiny/customer-tiny-neg.adm`),
+ (`format`=`adm`));
+
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.3.metrics.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.3.metrics.sqlpp
new file mode 100644
index 0000000..3671133
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.3.metrics.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on full scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+use test;
+
+select count(*) from Customers
+where name = "Marvella Loud";
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.4.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.4.ddl.sqlpp
new file mode 100644
index 0000000..4cccd9f
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/full-scan/full-scan.4.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on full scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+drop dataverse test;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.1.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.1.ddl.sqlpp
new file mode 100644
index 0000000..5fb58b4
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.1.ddl.sqlpp
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on primary index scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create type test.AddressType as
+{
+ number : bigint,
+ street : string,
+ city : string
+};
+
+create type test.CustomerType as
+ closed {
+ cid : bigint,
+ name : string,
+ age : bigint?,
+ address : AddressType?,
+ lastorder : {
+ oid : bigint,
+ total : float
+ }
+};
+
+create dataset Customers(CustomerType) primary key cid;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.2.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.2.update.sqlpp
new file mode 100644
index 0000000..046c40b
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.2.update.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on primary index scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+use test;
+
+load dataset Customers using localfs
+ ((`path`=`asterix_nc1://data/custord-tiny/customer-tiny-neg.adm`),
+ (`format`=`adm`));
+
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.3.metrics.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.3.metrics.sqlpp
new file mode 100644
index 0000000..969949e
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.3.metrics.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on primary index scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+use test;
+
+select count(*) from Customers
+where cid = 996;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.4.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.4.ddl.sqlpp
new file mode 100644
index 0000000..1dbe56e
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/primary-index/primary-index.4.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on primary index scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+drop dataverse test;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.1.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.1.ddl.sqlpp
new file mode 100644
index 0000000..74f8a19
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.1.ddl.sqlpp
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on secondary index scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+
+use test;
+
+create type test.AddressType as
+{
+ number : bigint,
+ street : string,
+ city : string
+};
+
+create type test.CustomerType as
+ closed {
+ cid : bigint,
+ name : string,
+ age : bigint?,
+ address : AddressType?,
+ lastorder : {
+ oid : bigint,
+ total : float
+ }
+};
+
+create dataset Customers(CustomerType) primary key cid;
+create index customer_name_idx on Customers(name);
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.2.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.2.update.sqlpp
new file mode 100644
index 0000000..40d87f7
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.2.update.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on secondary index scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+use test;
+
+load dataset Customers using localfs
+ ((`path`=`asterix_nc1://data/custord-tiny/customer-tiny-neg.adm`),
+ (`format`=`adm`));
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.3.metrics.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.3.metrics.sqlpp
new file mode 100644
index 0000000..1c5126f
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.3.metrics.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on secondary index scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+use test;
+
+select count(*) from Customers
+where name = "Marvella Loud";
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.4.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.4.ddl.sqlpp
new file mode 100644
index 0000000..1de3b3b
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/metrics/secondary-index/secondary-index.4.ddl.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Description : Processed objects metrics on secondary index scan
+ * Expected Res : Success
+ * Date : 28 Sep 2017
+ */
+
+drop dataverse test;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/full-scan/full-scan.3.regexadm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/full-scan/full-scan.3.regexadm
new file mode 100644
index 0000000..aca314d
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/full-scan/full-scan.3.regexadm
@@ -0,0 +1 @@
+.*"processedObjects":10.*
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/primary-index/primary-index.3.regexadm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/primary-index/primary-index.3.regexadm
new file mode 100644
index 0000000..91a2dfd
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/primary-index/primary-index.3.regexadm
@@ -0,0 +1 @@
+.*"processedObjects":1.*
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/secondary-index/secondary-index.3.regexadm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/secondary-index/secondary-index.3.regexadm
new file mode 100644
index 0000000..8578d41
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/metrics/secondary-index/secondary-index.3.regexadm
@@ -0,0 +1 @@
+.*"processedObjects":2.*
\ No newline at end of file
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index df693b2..10bb336 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.api.io.IWorkspaceFileFactory;
import org.apache.hyracks.api.job.IOperatorEnvironment;
import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
import org.apache.hyracks.api.resources.IDeallocatableRegistry;
@@ -52,4 +53,6 @@
Object getSharedObject();
Set<JobFlag> getJobFlags();
+
+ IStatsCollector getStatsCollector();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
new file mode 100644
index 0000000..35ae8b6
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job.profiling;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.io.IWritable;
+import org.apache.hyracks.api.job.profiling.counters.ICounter;
+
+public interface IOperatorStats extends IWritable, Serializable {
+
+ /**
+ * @return The name of the operator
+ */
+ String getName();
+
+ /**
+ * @return A counter used to track the number of tuples
+ * accessed by an operator
+ */
+ ICounter getTupleCounter();
+
+ /**
+ * @return A counter used to track the execution time
+ * of an operator
+ */
+ ICounter getTimeCounter();
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
new file mode 100644
index 0000000..1e73146
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IStatsCollector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.job.profiling;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IWritable;
+
+public interface IStatsCollector extends IWritable, Serializable {
+
+ /**
+ * Adds {@link IOperatorStats} to the stats collections
+ *
+ * @param operatorStats
+ * @throws HyracksDataException when an operator with the same was already
added.
+ */
+ void add(IOperatorStats operatorStats) throws HyracksDataException;
+
+ /**
+ * @param operatorName
+ * @return {@link IOperatorStats} for the operator with name
<code>operatorName</code>
+ * if one exists or else null.
+ */
+ IOperatorStats getOperatorStats(String operatorName);
+
+ /**
+ * @return A special {@link IOperatorStats} that has the aggregated stats
+ * from all operators in the collection.
+ */
+ IOperatorStats getAggregatedStats();
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java
new file mode 100644
index 0000000..718ac3d
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/OperatorStats.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.job.profiling;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+import org.apache.hyracks.api.job.profiling.counters.ICounter;
+import org.apache.hyracks.control.common.job.profiling.counters.Counter;
+
+public class OperatorStats implements IOperatorStats {
+
+ public final String operatorName;
+ public final ICounter tupleCounter;
+ public final ICounter timeCounter;
+
+ public OperatorStats(String operatorName) {
+ if (operatorName == null || operatorName.isEmpty()) {
+ throw new IllegalArgumentException("operatorName must not be null
or empty");
+ }
+ this.operatorName = operatorName;
+ tupleCounter = new Counter("tupleCounter");
+ timeCounter = new Counter("timeCounter");
+ }
+
+ public static IOperatorStats create(DataInput input) throws IOException {
+ String name = input.readUTF();
+ OperatorStats operatorStats = new OperatorStats(name);
+ operatorStats.readFields(input);
+ return operatorStats;
+ }
+
+ @Override
+ public String getName() {
+ return operatorName;
+ }
+
+ @Override
+ public ICounter getTupleCounter() {
+ return tupleCounter;
+ }
+
+ @Override
+ public ICounter getTimeCounter() {
+ return timeCounter;
+ }
+
+ @Override
+ public void writeFields(DataOutput output) throws IOException {
+ output.writeUTF(operatorName);
+ output.writeLong(tupleCounter.get());
+ output.writeLong(timeCounter.get());
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ tupleCounter.set(input.readLong());
+ timeCounter.set(input.readLong());
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
new file mode 100644
index 0000000..90cdc72
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/StatsCollector.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.common.job.profiling;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
+
+public class StatsCollector implements IStatsCollector {
+
+ private final Map<String, IOperatorStats> operatorStatsMap = new
HashMap<>();
+
+ @Override
+ public void add(IOperatorStats operatorStats) throws HyracksDataException {
+ if (operatorStatsMap.containsKey(operatorStats.getName())) {
+ throw new IllegalArgumentException("Operator with the same name
already exists");
+ }
+ operatorStatsMap.put(operatorStats.getName(), operatorStats);
+ }
+
+ @Override
+ public IOperatorStats getOperatorStats(String operatorName) {
+ return operatorStatsMap.get(operatorName);
+ }
+
+ public static StatsCollector create(DataInput input) throws IOException {
+ StatsCollector statsCollector = new StatsCollector();
+ statsCollector.readFields(input);
+ return statsCollector;
+ }
+
+ @Override
+ public IOperatorStats getAggregatedStats() {
+ IOperatorStats aggregatedStats = new OperatorStats("aggregated");
+ for (IOperatorStats stats : operatorStatsMap.values()) {
+
aggregatedStats.getTupleCounter().update(stats.getTupleCounter().get());
+
aggregatedStats.getTimeCounter().update(stats.getTupleCounter().get());
+ }
+ return aggregatedStats;
+ }
+
+ @Override
+ public void writeFields(DataOutput output) throws IOException {
+ output.writeInt(operatorStatsMap.size());
+ for (IOperatorStats operatorStats : operatorStatsMap.values()) {
+ operatorStats.writeFields(output);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ int operatorCount = input.readInt();
+ for (int i = 0; i < operatorCount; i++) {
+ IOperatorStats opStats = OperatorStats.create(input);
+ operatorStatsMap.put(opStats.getName(), opStats);
+ }
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
index 680d2f9..5cac38c 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/TaskProfile.java
@@ -25,12 +25,15 @@
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
+import org.apache.hyracks.api.partitions.PartitionId;
+import org.apache.hyracks.control.common.job.profiling.StatsCollector;
+import
org.apache.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler;
+
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.hyracks.api.dataflow.TaskAttemptId;
-import org.apache.hyracks.api.partitions.PartitionId;
-import
org.apache.hyracks.control.common.job.profiling.counters.MultiResolutionEventProfiler;
public class TaskProfile extends AbstractProfile {
private static final long serialVersionUID = 1L;
@@ -38,6 +41,8 @@
private TaskAttemptId taskAttemptId;
private Map<PartitionId, PartitionProfile> partitionSendProfile;
+
+ private IStatsCollector statsCollector;
public static TaskProfile create(DataInput dis) throws IOException {
TaskProfile taskProfile = new TaskProfile();
@@ -49,9 +54,11 @@
}
- public TaskProfile(TaskAttemptId taskAttemptId, Map<PartitionId,
PartitionProfile> partitionSendProfile) {
+ public TaskProfile(TaskAttemptId taskAttemptId, Map<PartitionId,
PartitionProfile> partitionSendProfile,
+ IStatsCollector statsCollector) {
this.taskAttemptId = taskAttemptId;
this.partitionSendProfile = new HashMap<PartitionId,
PartitionProfile>(partitionSendProfile);
+ this.statsCollector = statsCollector;
}
public TaskAttemptId getTaskId() {
@@ -104,6 +111,10 @@
return json;
}
+ public IStatsCollector getStatsCollector() {
+ return statsCollector;
+ }
+
@Override
public void readFields(DataInput input) throws IOException {
super.readFields(input);
@@ -115,6 +126,7 @@
PartitionProfile value = PartitionProfile.create(input);
partitionSendProfile.put(key, value);
}
+ statsCollector = StatsCollector.create(input);
}
@Override
@@ -126,5 +138,6 @@
entry.getKey().writeFields(output);
entry.getValue().writeFields(output);
}
+ statsCollector.writeFields(output);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index d7b4be0..6dc9619 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -53,6 +53,7 @@
import org.apache.hyracks.control.common.deployment.DeploymentUtils;
import org.apache.hyracks.control.common.job.PartitionRequest;
import org.apache.hyracks.control.common.job.PartitionState;
+import org.apache.hyracks.control.common.job.profiling.StatsCollector;
import org.apache.hyracks.control.common.job.profiling.counters.Counter;
import org.apache.hyracks.control.common.job.profiling.om.JobletProfile;
import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -190,7 +191,7 @@
}
for (Task task : taskMap.values()) {
TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(),
- new Hashtable<>(task.getPartitionSendProfile()));
+ new Hashtable<>(task.getPartitionSendProfile()), new
StatsCollector());
task.dumpProfile(taskProfile);
jProfile.getTaskProfiles().put(task.getTaskAttemptId(),
taskProfile);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index bff2794..fcd4bde 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -53,12 +53,14 @@
import org.apache.hyracks.api.io.IWorkspaceFileFactory;
import org.apache.hyracks.api.job.IOperatorEnvironment;
import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
import org.apache.hyracks.api.job.profiling.counters.ICounter;
import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.api.resources.IDeallocatable;
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.common.job.PartitionState;
+import org.apache.hyracks.control.common.job.profiling.StatsCollector;
import org.apache.hyracks.control.common.job.profiling.counters.Counter;
import org.apache.hyracks.control.common.job.profiling.om.PartitionProfile;
import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -107,6 +109,8 @@
private final Set<JobFlag> jobFlags;
+ private final IStatsCollector statsCollector;
+
public Task(Joblet joblet, Set<JobFlag> jobFlags, TaskAttemptId taskId,
String displayName,
ExecutorService executor, NodeControllerService ncs,
List<List<PartitionChannel>> inputChannelsFromConnectors) {
@@ -124,6 +128,7 @@
exceptions = new CopyOnWriteArrayList<>(); // Multiple threads could
add exceptions to this list.
this.ncs = ncs;
this.inputChannelsFromConnectors = inputChannelsFromConnectors;
+ statsCollector = new StatsCollector();
}
public void setTaskRuntime(IPartitionCollector[] collectors,
IOperatorNodePushable operator) {
@@ -453,4 +458,9 @@
public Set<JobFlag> getJobFlags() {
return jobFlags;
}
+
+ @Override
+ public IStatsCollector getStatsCollector() {
+ return statsCollector;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
index 7728d16..675926e 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
@@ -38,8 +38,8 @@
@Override
public void run() {
- TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(),
task.getPartitionSendProfile());
- task.dumpProfile(taskProfile);
+ TaskProfile taskProfile =
+ new TaskProfile(task.getTaskAttemptId(),
task.getPartitionSendProfile(), task.getStatsCollector());
try {
ncs.getClusterController().notifyTaskComplete(task.getJobletContext().getJobId(),
task.getTaskAttemptId(),
ncs.getId(), taskProfile);
@@ -53,4 +53,4 @@
public String toString() {
return getClass().getSimpleName() + ":" + task.getTaskAttemptId();
}
-}
+}
\ No newline at end of file
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 07d07c3..85b6ff3 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -30,6 +30,8 @@
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.profiling.IOperatorStats;
+import org.apache.hyracks.control.common.job.profiling.OperatorStats;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -79,6 +81,7 @@
protected ArrayTupleBuilder nonFilterTupleBuild;
protected final ISearchOperationCallbackFactory searchCallbackFactory;
protected boolean failed = false;
+ private final IOperatorStats stats;
public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx,
RecordDescriptor inputRecDesc, int partition,
int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes,
IIndexDataflowHelperFactory indexHelperFactory,
@@ -105,6 +108,8 @@
maxFilterKey = new PermutingFrameTupleReference();
maxFilterKey.setFieldPermutation(maxFilterFieldIndexes);
}
+ stats = new OperatorStats(getDisplayName());
+ ctx.getStatsCollector().add(stats);
}
protected abstract ISearchPredicate createSearchPredicate();
@@ -154,9 +159,9 @@
}
protected void writeSearchResults(int tupleIndex) throws Exception {
- boolean matched = false;
+ long matchingTupleCount = 0;
while (cursor.hasNext()) {
- matched = true;
+ matchingTupleCount++;
tb.reset();
cursor.next();
if (retainInput) {
@@ -174,8 +179,9 @@
}
FrameUtils.appendToWriter(writer, appender,
tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
}
+ stats.getTupleCounter().update(matchingTupleCount);
- if (!matched && retainInput && retainMissing) {
+ if (matchingTupleCount == 0 && retainInput && retainMissing) {
FrameUtils.appendConcatToWriter(writer, appender, accessor,
tupleIndex,
nonMatchTupleBuild.getFieldEndOffsets(),
nonMatchTupleBuild.getByteArray(), 0,
nonMatchTupleBuild.getSize());
diff --git
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index b100300..3d13cf9 100644
---
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -36,8 +36,10 @@
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.profiling.IStatsCollector;
import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
import org.apache.hyracks.api.resources.IDeallocatable;
+import org.apache.hyracks.control.common.job.profiling.StatsCollector;
import org.apache.hyracks.control.nc.io.WorkspaceFileFactory;
public class TestTaskContext implements IHyracksTaskContext {
@@ -46,6 +48,7 @@
private WorkspaceFileFactory fileFactory;
private Map<Object, IStateObject> stateObjectMap = new HashMap<>();
private Object sharedObject;
+ private final IStatsCollector statsCollector = new StatsCollector();
public TestTaskContext(TestJobletContext jobletContext, TaskAttemptId
taskId) {
this.jobletContext = jobletContext;
@@ -163,4 +166,9 @@
public Set<JobFlag> getJobFlags() {
return EnumSet.noneOf(JobFlag.class);
}
+
+ @Override
+ public IStatsCollector getStatsCollector() {
+ return statsCollector;
+ }
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/2032
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie4afe6a676ef0b8a31d36d7dafc13a4023ebf177
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>