Repository: hive Updated Branches: refs/heads/master 758ff4490 -> a15d75b47
HIVE-19766: Show the number of rows inserted when execution engine is Spark (Bharathkrishna Guruvayoor Murali, reviewed by Sahil Takiar) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a15d75b4 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a15d75b4 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a15d75b4 Branch: refs/heads/master Commit: a15d75b47af7acbe567d30c865e05f21c5ca7229 Parents: 758ff44 Author: Bharathkrishna Guruvayoor Murali <bhar...@cloudera.com> Authored: Wed Jul 25 14:11:47 2018 -0500 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Wed Jul 25 14:11:47 2018 -0500 ---------------------------------------------------------------------- .../apache/hadoop/hive/ql/exec/spark/SparkTask.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/a15d75b4/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index ad5049a..9277510 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -38,6 +38,8 @@ import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsNames; import org.apache.hadoop.hive.ql.exec.spark.status.impl.SparkMetricsUtils; import org.apache.hadoop.hive.ql.exec.spark.status.SparkStage; +import org.apache.hive.spark.counter.SparkCounter; +import org.apache.hive.spark.counter.SparkCounters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -163,6 +165,17 @@ public class SparkTask extends Task<SparkWork> { if (rc == 0) { sparkStatistics = sparkJobStatus.getSparkStatistics(); + if (SessionState.get() != null) { + //Set the number of rows written in case of insert queries, to print in the client(beeline). + SparkCounters counters = sparkJobStatus.getCounter(); + if (counters != null) { + SparkCounter counter = counters.getCounter(HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP), + FileSinkOperator.TOTAL_TABLE_ROWS_WRITTEN); + if (counter != null) { + queryState.setNumModifiedRows(counter.getValue()); + } + } + } printConsoleMetrics(); printExcessiveGCWarning(); if (LOG.isInfoEnabled() && sparkStatistics != null) { @@ -500,6 +513,7 @@ public class SparkTask extends Task<SparkWork> { List<String> hiveCounters = new LinkedList<String>(); counters.put(groupName, hiveCounters); hiveCounters.add(Operator.HIVE_COUNTER_CREATED_FILES); + hiveCounters.add(FileSinkOperator.TOTAL_TABLE_ROWS_WRITTEN); // MapOperator is out of SparkWork, SparkMapRecordHandler use it to bridge // Spark transformation and Hive operators in SparkWork. for (MapOperator.Counter counter : MapOperator.Counter.values()) {