This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this 
push:
     new 1b63cec  KYLIN-4730 Add scan bytes metric to the query results
1b63cec is described below

commit 1b63cec0b57f1d5367abd37dd4364f78cf741c42
Author: Zhichao Zhang <441586...@qq.com>
AuthorDate: Fri Aug 28 19:08:08 2020 +0800

    KYLIN-4730 Add scan bytes metric to the query results
---
 .../apache/spark/sql/execution/datasource/FilePruner.scala   |  4 ----
 .../org/apache/spark/sql/hive/utils/QueryMetricUtils.scala   |  4 ++--
 .../org/apache/kylin/query/pushdown/SparkSqlClient.scala     |  5 +++++
 .../scala/org/apache/kylin/query/runtime/SparkEngine.java    |  8 ++++++--
 .../java/org/apache/kylin/rest/service/QueryService.java     | 12 ++++++++----
 .../java/org/apache/kylin/rest/response/SQLResponseTest.java |  3 ++-
 6 files changed, 23 insertions(+), 13 deletions(-)

diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
index 2ee32a0..14d63e1 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
@@ -186,7 +186,6 @@ class FilePruner(
   }
 
   var cached = new java.util.HashMap[(Seq[Expression], Seq[Expression]), 
Seq[PartitionDirectory]]()
-  var totalSize = 0L
 
   override def listFiles(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): Seq[PartitionDirectory] = {
     if (cached.containsKey((partitionFilters, dataFilters))) {
@@ -224,9 +223,6 @@ class FilePruner(
     val totalFileSize = selected.flatMap(partition => 
partition.files).map(_.getLen).sum
     logInfo(s"totalFileSize is ${totalFileSize}")
     setShufflePartitions(totalFileSize, session)
-    totalSize = totalFileSize
-    //    val sourceRows = selected.map(seg => 
cubeInstance.getSegment(seg.segmentID).getLayout(layout.getId).getRows).sum
-    //    QueryContextFacade.current().addAndGetSourceScanRows(sourceRows)
     if (selected.isEmpty) {
       val value = Seq.empty[PartitionDirectory]
       cached.put((partitionFilters, dataFilters), value)
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
index 2f89f03..d4f8c50 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
@@ -39,8 +39,8 @@ object QueryMetricUtils extends Logging {
                   exec.metrics.apply("metadataTime").value, 
exec.metrics.apply("scanTime").value, -1l)
         case exec: HiveTableScanExec =>
           //(exec.metrics.apply("numOutputRows").value, 
exec.metrics.apply("readBytes").value)
-          (exec.metrics.apply("numOutputRows").value, 
exec.metrics.apply("numFiles").value,
-                  exec.metrics.apply("metadataTime").value, 
exec.metrics.apply("scanTime").value, -1l)
+          // There is only 'numOutputRows' metric in HiveTableScanExec
+          (exec.metrics.apply("numOutputRows").value, -1l, -1l, -1l, -1l)
       }
       val scanRows = metrics.map(metric => 
java.lang.Long.valueOf(metric._1)).toList.asJava
       val scanFiles = metrics.map(metrics => 
java.lang.Long.valueOf(metrics._2)).toList.asJava
diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
index 4cc2802..5ce4f8f 100644
--- 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
+++ 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
@@ -87,6 +87,11 @@ object SparkSqlClient {
                        val rowList = 
frame.collect().map(_.toSeq.map(_.asInstanceOf[String]).asJava).toSeq.asJava
                        val fieldList = df.schema.map(field => 
SparkTypeUtil.convertSparkFieldToJavaField(field)).asJava
                        val (scanRows, scanFiles, metadataTime, scanTime, 
scanBytes) = 
QueryMetricUtils.collectScanMetrics(frame.queryExecution.executedPlan)
+                       
QueryContextFacade.current().addAndGetScannedRows(scanRows.asScala.map(Long2long(_)).sum)
+                       
QueryContextFacade.current().addAndGetScanFiles(scanFiles.asScala.map(Long2long(_)).sum)
+                       
QueryContextFacade.current().addAndGetScannedBytes(scanBytes.asScala.map(Long2long(_)).sum)
+                       
QueryContextFacade.current().addAndGetMetadataTime(metadataTime.asScala.map(Long2long(_)).sum)
+                       
QueryContextFacade.current().addAndGetScanTime(scanTime.asScala.map(Long2long(_)).sum)
                        Pair.newPair(rowList, fieldList)
                } catch {
                        case e: Throwable =>
diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
index 6d444b6..ed51427 100644
--- 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
+++ 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
@@ -37,7 +37,9 @@ public class SparkEngine implements QueryEngine {
     @Override
     public Enumerable<Object> computeSCALA(DataContext dataContext, RelNode 
relNode, RelDataType resultType) {
         Dataset<Row> sparkPlan = toSparkPlan(dataContext, relNode);
-        log.trace("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution());
+        if (System.getProperty("calcite.debug") != null) {
+            log.debug("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution());
+        }
         return ResultPlan.getResult(sparkPlan, resultType, 
ResultType.SCALA()).right().get();
 
     }
@@ -45,7 +47,9 @@ public class SparkEngine implements QueryEngine {
     @Override
     public Enumerable<Object[]> compute(DataContext dataContext, RelNode 
relNode, RelDataType resultType) {
         Dataset<Row> sparkPlan = toSparkPlan(dataContext, relNode);
-        log.trace("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution());
+        if (System.getProperty("calcite.debug") != null) {
+            log.info("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution());
+        }
         return ResultPlan.getResult(sparkPlan, resultType, 
ResultType.NORMAL()).left().get();
     }
 
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index ee79c99..25a4fcf 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -1187,10 +1187,14 @@ public class QueryService extends BasicService {
         SQLResponse response = new SQLResponse(columnMetas, results, 
cubeSb.toString(), 0, isException,
                 exceptionMessage, isPartialResult, isPushDown);
         response.setTotalScanCount(queryContext.getScannedRows());
-        response.setTotalScanFiles(queryContext.getScanFiles());
-        response.setMetadataTime(queryContext.getMedataTime());
-        response.setTotalSparkScanTime(queryContext.getScanTime());
-        response.setTotalScanBytes(queryContext.getScannedBytes());
+        response.setTotalScanFiles((queryContext.getScanFiles() < 0) ? -1 :
+                queryContext.getScanFiles());
+        response.setMetadataTime((queryContext.getMedataTime() < 0) ? -1 :
+                queryContext.getMedataTime());
+        response.setTotalSparkScanTime((queryContext.getScanTime() < 0) ? -1 :
+                queryContext.getScanTime());
+        response.setTotalScanBytes((queryContext.getScannedBytes() < 0) ?
+                (queryContext.getSourceScanBytes() < 1 ? -1 : 
queryContext.getSourceScanBytes()) : queryContext.getScannedBytes());
         
response.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList());
         response.setSparkPool(queryContext.getSparkPool());
         if (getConfig().isQueryCacheSignatureEnabled()) {
diff --git 
a/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java 
b/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
index f939eb0..48f4791 100644
--- 
a/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
+++ 
b/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
@@ -34,7 +34,8 @@ public class SQLResponseTest {
     public void testInterfaceConsistency() throws IOException {
         String[] attrArray = new String[] { "columnMetas", "results", "cube", 
"affectedRowCount", "isException",
                 "exceptionMessage", "duration", "partial", "totalScanCount", 
"hitExceptionCache", "storageCacheUsed",
-                "sparkPool", "pushDown", "traceUrl", "totalScanBytes" };
+                "sparkPool", "pushDown", "traceUrl", "totalScanBytes", 
"totalScanFiles",
+                "metadataTime", "totalSparkScanTime" };
 
         SQLResponse sqlResponse = new SQLResponse(null, null, "learn_cube", 
100, false, null, false, false);
         String jsonStr = JsonUtil.writeValueAsString(sqlResponse);

Reply via email to