This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new 1b63cec KYLIN-4730 Add scan bytes metric to the query results 1b63cec is described below commit 1b63cec0b57f1d5367abd37dd4364f78cf741c42 Author: Zhichao Zhang <441586...@qq.com> AuthorDate: Fri Aug 28 19:08:08 2020 +0800 KYLIN-4730 Add scan bytes metric to the query results --- .../apache/spark/sql/execution/datasource/FilePruner.scala | 4 ---- .../org/apache/spark/sql/hive/utils/QueryMetricUtils.scala | 4 ++-- .../org/apache/kylin/query/pushdown/SparkSqlClient.scala | 5 +++++ .../scala/org/apache/kylin/query/runtime/SparkEngine.java | 8 ++++++-- .../java/org/apache/kylin/rest/service/QueryService.java | 12 ++++++++---- .../java/org/apache/kylin/rest/response/SQLResponseTest.java | 3 ++- 6 files changed, 23 insertions(+), 13 deletions(-) diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala index 2ee32a0..14d63e1 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala @@ -186,7 +186,6 @@ class FilePruner( } var cached = new java.util.HashMap[(Seq[Expression], Seq[Expression]), Seq[PartitionDirectory]]() - var totalSize = 0L override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { if (cached.containsKey((partitionFilters, dataFilters))) { @@ -224,9 +223,6 @@ class FilePruner( val totalFileSize = selected.flatMap(partition => partition.files).map(_.getLen).sum logInfo(s"totalFileSize is ${totalFileSize}") setShufflePartitions(totalFileSize, session) - totalSize = totalFileSize - // val sourceRows = selected.map(seg => cubeInstance.getSegment(seg.segmentID).getLayout(layout.getId).getRows).sum - // QueryContextFacade.current().addAndGetSourceScanRows(sourceRows) if (selected.isEmpty) { val value = Seq.empty[PartitionDirectory] cached.put((partitionFilters, dataFilters), value) diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala index 2f89f03..d4f8c50 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala @@ -39,8 +39,8 @@ object QueryMetricUtils extends Logging { exec.metrics.apply("metadataTime").value, exec.metrics.apply("scanTime").value, -1l) case exec: HiveTableScanExec => //(exec.metrics.apply("numOutputRows").value, exec.metrics.apply("readBytes").value) - (exec.metrics.apply("numOutputRows").value, exec.metrics.apply("numFiles").value, - exec.metrics.apply("metadataTime").value, exec.metrics.apply("scanTime").value, -1l) + // There is only 'numOutputRows' metric in HiveTableScanExec + (exec.metrics.apply("numOutputRows").value, -1l, -1l, -1l, -1l) } val scanRows = metrics.map(metric => java.lang.Long.valueOf(metric._1)).toList.asJava val scanFiles = metrics.map(metrics => java.lang.Long.valueOf(metrics._2)).toList.asJava diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala index 4cc2802..5ce4f8f 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala @@ -87,6 +87,11 @@ object SparkSqlClient { val rowList = frame.collect().map(_.toSeq.map(_.asInstanceOf[String]).asJava).toSeq.asJava val fieldList = df.schema.map(field => SparkTypeUtil.convertSparkFieldToJavaField(field)).asJava val (scanRows, scanFiles, metadataTime, scanTime, scanBytes) = QueryMetricUtils.collectScanMetrics(frame.queryExecution.executedPlan) + QueryContextFacade.current().addAndGetScannedRows(scanRows.asScala.map(Long2long(_)).sum) + QueryContextFacade.current().addAndGetScanFiles(scanFiles.asScala.map(Long2long(_)).sum) + QueryContextFacade.current().addAndGetScannedBytes(scanBytes.asScala.map(Long2long(_)).sum) + QueryContextFacade.current().addAndGetMetadataTime(metadataTime.asScala.map(Long2long(_)).sum) + QueryContextFacade.current().addAndGetScanTime(scanTime.asScala.map(Long2long(_)).sum) Pair.newPair(rowList, fieldList) } catch { case e: Throwable => diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java index 6d444b6..ed51427 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java @@ -37,7 +37,9 @@ public class SparkEngine implements QueryEngine { @Override public Enumerable<Object> computeSCALA(DataContext dataContext, RelNode relNode, RelDataType resultType) { Dataset<Row> sparkPlan = toSparkPlan(dataContext, relNode); - log.trace("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution()); + if (System.getProperty("calcite.debug") != null) { + log.debug("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution()); + } return ResultPlan.getResult(sparkPlan, resultType, ResultType.SCALA()).right().get(); } @@ -45,7 +47,9 @@ public class SparkEngine implements QueryEngine { @Override public Enumerable<Object[]> compute(DataContext dataContext, RelNode relNode, RelDataType resultType) { Dataset<Row> sparkPlan = toSparkPlan(dataContext, relNode); - log.trace("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution()); + if (System.getProperty("calcite.debug") != null) { + log.info("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution()); + } return ResultPlan.getResult(sparkPlan, resultType, ResultType.NORMAL()).left().get(); } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index ee79c99..25a4fcf 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -1187,10 +1187,14 @@ public class QueryService extends BasicService { SQLResponse response = new SQLResponse(columnMetas, results, cubeSb.toString(), 0, isException, exceptionMessage, isPartialResult, isPushDown); response.setTotalScanCount(queryContext.getScannedRows()); - response.setTotalScanFiles(queryContext.getScanFiles()); - response.setMetadataTime(queryContext.getMedataTime()); - response.setTotalSparkScanTime(queryContext.getScanTime()); - response.setTotalScanBytes(queryContext.getScannedBytes()); + response.setTotalScanFiles((queryContext.getScanFiles() < 0) ? -1 : + queryContext.getScanFiles()); + response.setMetadataTime((queryContext.getMedataTime() < 0) ? -1 : + queryContext.getMedataTime()); + response.setTotalSparkScanTime((queryContext.getScanTime() < 0) ? -1 : + queryContext.getScanTime()); + response.setTotalScanBytes((queryContext.getScannedBytes() < 0) ? + (queryContext.getSourceScanBytes() < 1 ? -1 : queryContext.getSourceScanBytes()) : queryContext.getScannedBytes()); response.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList()); response.setSparkPool(queryContext.getSparkPool()); if (getConfig().isQueryCacheSignatureEnabled()) { diff --git a/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java b/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java index f939eb0..48f4791 100644 --- a/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java +++ b/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java @@ -34,7 +34,8 @@ public class SQLResponseTest { public void testInterfaceConsistency() throws IOException { String[] attrArray = new String[] { "columnMetas", "results", "cube", "affectedRowCount", "isException", "exceptionMessage", "duration", "partial", "totalScanCount", "hitExceptionCache", "storageCacheUsed", - "sparkPool", "pushDown", "traceUrl", "totalScanBytes" }; + "sparkPool", "pushDown", "traceUrl", "totalScanBytes", "totalScanFiles", + "metadataTime", "totalSparkScanTime" }; SQLResponse sqlResponse = new SQLResponse(null, null, "learn_cube", 100, false, null, false, false); String jsonStr = JsonUtil.writeValueAsString(sqlResponse);