This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 84e3297  [CARBONDATA-3736] Support show segment by query
84e3297 is described below

commit 84e3297332fb8a14e5bdb94806de36f44395a54c
Author: Jacky Li <jacky.li...@qq.com>
AuthorDate: Wed Mar 18 13:06:04 2020 +0800

    [CARBONDATA-3736] Support show segment by query
    
    Why is this PR needed?
    There are many fields in segment that not shown in SHOW SEGMENTS command.
    
    What changes were proposed in this PR?
    This PR change SHOW SEGMENTS command to:
    
     SHOW [HISTORY] SEGMENTS
     [FOR TABLE | ON] [db_name.]table_name
     [AS select_query]
    
    User can query the segments as it is a table. A sample output as following:
    
    show segments on source as
    select id, status, dataSize from source_segments where status = 'Success' 
order by dataSize
    
    +------------------------+
    |output                  |
    +------------------------+
    |id  | status  | dataSize|
    |4.1 | Success | 1762    |
    |0.2 | Success | 3524    |
    +------------------------+
    
    Does this PR introduce any user interface change?
    Yes
    
    Is any new testcase added?
    Yes
    
    This closes #3657
---
 .../java/org/apache/carbondata/common/Strings.java |  24 +--
 docs/segment-management-on-carbondata.md           |  56 +++++-
 .../org/apache/carbondata/api/CarbonStore.scala    | 209 ++++++++++-----------
 .../management/CarbonShowLoadsCommand.scala        |  76 --------
 .../CarbonShowSegmentsAsSelectCommand.scala        | 137 ++++++++++++++
 .../management/CarbonShowSegmentsCommand.scala     |  97 ++++++++++
 .../spark/sql/parser/CarbonSpark2SqlParser.scala   |  44 +++--
 .../bloom/BloomCoarseGrainIndexFunctionSuite.scala |   1 -
 .../testsuite/addsegment/AddSegmentTestCase.scala  |  12 +-
 .../testsuite/segment/ShowSegmentTestCase.scala    | 198 +++++++++++++++++++
 .../segmentreading/TestSegmentReading.scala        |   8 +-
 .../StandardPartitionTableLoadingTestCase.scala    |   4 +-
 .../carbondata/TestStreamingTableOpName.scala      |  12 +-
 .../TestStreamingTableWithRowParser.scala          |   8 +-
 .../org/apache/spark/util/CarbonCommandSuite.scala |  59 ------
 15 files changed, 651 insertions(+), 294 deletions(-)

diff --git a/common/src/main/java/org/apache/carbondata/common/Strings.java 
b/common/src/main/java/org/apache/carbondata/common/Strings.java
index 4bb9dc8..0a3eea0 100644
--- a/common/src/main/java/org/apache/carbondata/common/Strings.java
+++ b/common/src/main/java/org/apache/carbondata/common/Strings.java
@@ -43,26 +43,26 @@ public class Strings {
 
   /**
    * Append KB/MB/GB/TB to the input size and return
-   * @param size data size
+   * @param sizeInBytes data size in bytes
    * @return data size with unit
    */
-  public static String formatSize(float size) {
+  public static String formatSize(float sizeInBytes) {
     long KB = 1024L;
     long MB = KB << 10;
     long GB = MB << 10;
     long TB = GB << 10;
-    if (size < 0) {
+    if (sizeInBytes < 0) {
       return "NA";
-    } else if (size >= 0 && size < KB) {
-      return String.format("%sB", size);
-    } else if (size >= KB && size < MB) {
-      return String.format("%.2fKB", size / KB);
-    } else if (size >= MB && size < GB) {
-      return String.format("%.2fMB", size / MB);
-    } else if (size >= GB && size < TB) {
-      return String.format("%.2fGB", size / GB);
+    } else if (sizeInBytes >= 0 && sizeInBytes < KB) {
+      return String.format("%sB", sizeInBytes);
+    } else if (sizeInBytes >= KB && sizeInBytes < MB) {
+      return String.format("%.2fKB", sizeInBytes / KB);
+    } else if (sizeInBytes >= MB && sizeInBytes < GB) {
+      return String.format("%.2fMB", sizeInBytes / MB);
+    } else if (sizeInBytes >= GB && sizeInBytes < TB) {
+      return String.format("%.2fGB", sizeInBytes / GB);
     } else {
-      return String.format("%.2fTB", size / TB);
+      return String.format("%.2fTB", sizeInBytes / TB);
     }
   }
 }
diff --git a/docs/segment-management-on-carbondata.md 
b/docs/segment-management-on-carbondata.md
index fe0cbd4..d18aca1 100644
--- a/docs/segment-management-on-carbondata.md
+++ b/docs/segment-management-on-carbondata.md
@@ -31,19 +31,69 @@ concept which helps to maintain consistency of data and 
easy transaction managem
   This command is used to list the segments of CarbonData table.
 
   ```
-  SHOW [HISTORY] SEGMENTS FOR TABLE [db_name.]table_name LIMIT 
number_of_segments
+  SHOW [HISTORY] SEGMENTS
+  [FOR TABLE | ON] [db_name.]table_name
+  [AS (select query from table_name_segments)]
   ```
 
+  By default, SHOW SEGMENT command will return following fields: 
+
+- Segment ID
+- Segment Status
+- Load Start Time
+- Load Time Taken
+- Partition
+- Data Size
+- Index Size
+
+
   Example:
   Show visible segments
+
   ```
-  SHOW SEGMENTS FOR TABLE CarbonDatabase.CarbonTable LIMIT 4
+  SHOW SEGMENTS ON CarbonDatabase.CarbonTable
   ```
+
   Show all segments, include invisible segments
   ```
-  SHOW HISTORY SEGMENTS FOR TABLE CarbonDatabase.CarbonTable LIMIT 4
+  SHOW HISTORY SEGMENTS ON CarbonDatabase.CarbonTable
+  ```
+
+
+  When more detail of the segment is required, user can issue SHOW SEGMENT by 
query.    
+    
+  The query should against table name with '_segments' appended and select 
from following fields:
+    
+- id: String, the id of the segment
+- status: String, status of the segment
+- loadStartTime: String, loading start time
+- loadEndTime: String, loading end time
+- timeTakenMs: Long, time spent in loading of the segment in milliseconds
+- partitions: String array, partition key and values
+- dataSize: Long, data size in bytes
+- indexSize: Long, index size in bytes
+- mergedToId: String, the target segment that this segment has been compacted
+- format: String, data format of the segment
+- path: String, in case of external segment this will be the path of the 
segment, otherwise it is null
+- segmentFileName: String, name of the segment file
+
+  Example: 
+
+  ```
+  SHOW SEGMENTS ON CarbonTable AS 
+  SELECT * FROM CarbonTable_segments
+  
+  SHOW SEGMENTS ON CarbonTable AS
+  SELECT id, dataSize FROM CarbonTable_segments 
+  WHERE status='Success' 
+  ORDER BY dataSize
+  
+  SHOW SEGMENTS ON CarbonTable AS
+  SELECT avg(timeTakenMs) FROM CarbonTable_segments  
   ```
 
+
+
 ### DELETE SEGMENT BY ID
 
   This command is used to delete segment by using the segment ID. Each segment 
has a unique segment ID associated with it. 
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala 
b/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 275d260..f516431 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -17,17 +17,14 @@
 
 package org.apache.carbondata.api
 
-import java.lang.Long
+import java.time.{Duration, Instant}
 
 import scala.collection.JavaConverters._
 
 import org.apache.commons.lang3.StringUtils
-import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.unsafe.types.UTF8String
 
-import org.apache.carbondata.common.Strings
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -38,126 +35,128 @@ import org.apache.carbondata.core.locks.{CarbonLockUtil, 
ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
SegmentFileStore}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus, 
SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.{FileFormat, 
LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.streaming.segment.StreamSegment
 
 object CarbonStore {
   private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
-  def showSegments(
-      limit: Option[String],
-      tablePath: String,
-      showHistory: Boolean): Seq[Row] = {
+  def readSegments(tablePath: String, showHistory: Boolean): 
Array[LoadMetadataDetails] = {
     val metaFolder = CarbonTablePath.getMetadataPath(tablePath)
-    val loadMetadataDetailsArray = if (showHistory) {
+    val segmentsMetadataDetails = if (showHistory) {
       SegmentStatusManager.readLoadMetadata(metaFolder) ++
       SegmentStatusManager.readLoadHistoryMetadata(metaFolder)
     } else {
       SegmentStatusManager.readLoadMetadata(metaFolder)
     }
+    if (!showHistory) {
+      segmentsMetadataDetails.filter(_.getVisibility.equalsIgnoreCase("true"))
+    } else {
+      segmentsMetadataDetails
+    }
+  }
 
-    if (loadMetadataDetailsArray.nonEmpty) {
-      var loadMetadataDetailsSortedArray = loadMetadataDetailsArray.sortWith { 
(l1, l2) =>
-        java.lang.Double.parseDouble(l1.getLoadName) > 
java.lang.Double.parseDouble(l2.getLoadName)
-      }
-      if (!showHistory) {
-        loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray
-          .filter(_.getVisibility.equalsIgnoreCase("true"))
+  def getPartitions(tablePath: String, load: LoadMetadataDetails): Seq[String] 
= {
+    val segmentFile = SegmentFileStore.readSegmentFile(
+      CarbonTablePath.getSegmentFilePath(tablePath, load.getSegmentFile))
+    if (segmentFile == null) {
+      return Seq.empty
+    }
+    val locationMap = segmentFile.getLocationMap
+    if (locationMap != null) {
+      locationMap.asScala.map {
+        case (_, detail) =>
+          s"{${ detail.getPartitions.asScala.mkString(",") }}"
+      }.toSeq
+    } else {
+      Seq.empty
+    }
+  }
+
+  def getMergeTo(load: LoadMetadataDetails): String = {
+    if (load.getMergedLoadName != null) {
+      load.getMergedLoadName
+    } else {
+      "NA"
+    }
+  }
+
+  def getExternalSegmentPath(load: LoadMetadataDetails): String = {
+    if (StringUtils.isNotEmpty(load.getPath)) {
+      load.getPath
+    } else {
+      "NA"
+    }
+  }
+
+  def getLoadStartTime(load: LoadMetadataDetails): String = {
+    val startTime =
+      if (load.getLoadStartTime == 
CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
+        "NA"
+      } else {
+        new java.sql.Timestamp(load.getLoadStartTime).toString
       }
-      if (limit.isDefined) {
-        val limitLoads = limit.get
-        try {
-          val lim = Integer.parseInt(limitLoads)
-          loadMetadataDetailsSortedArray = 
loadMetadataDetailsSortedArray.slice(0, lim)
-        } catch {
-          case _: NumberFormatException =>
-            CarbonException.analysisException("Entered limit is not a valid 
Number")
-        }
+    startTime
+  }
+
+  def getLoadEndTime(load: LoadMetadataDetails): String = {
+    val endTime =
+      if (load.getLoadStartTime == 
CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
+        "NA"
+      } else {
+        new java.sql.Timestamp(load.getLoadEndTime).toString
       }
+    endTime
+  }
 
-      loadMetadataDetailsSortedArray
-        .map { load =>
-          val mergedTo =
-            if (load.getMergedLoadName != null) {
-              load.getMergedLoadName
-            } else {
-              "NA"
-            }
-
-          val path =
-            if (StringUtils.isNotEmpty(load.getPath)) {
-              load.getPath
-            } else {
-              "NA"
-            }
-
-          val startTime =
-            if (load.getLoadStartTime == 
CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
-              "NA"
-            } else {
-              new java.sql.Timestamp(load.getLoadStartTime).toString
-            }
-
-          val endTime =
-            if (load.getLoadEndTime == 
CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
-              "NA"
-            } else {
-              new java.sql.Timestamp(load.getLoadEndTime).toString
-            }
-
-          val (dataSize, indexSize) = if 
(load.getFileFormat.equals(FileFormat.ROW_V1)) {
-            // for streaming segment, we should get the actual size from the 
index file
-            // since it is continuously inserting data
-            val segmentDir = CarbonTablePath.getSegmentPath(tablePath, 
load.getLoadName)
-            val indexPath = 
CarbonTablePath.getCarbonStreamIndexFilePath(segmentDir)
-            val indexFile = FileFactory.getCarbonFile(indexPath)
-            if (indexFile.exists()) {
-              val indices =
-                StreamSegment.readIndexFile(indexPath)
-              (indices.asScala.map(_.getFile_size).sum, indexFile.getSize)
-            } else {
-              (-1L, -1L)
-            }
-          } else {
-            // If the added segment is other than carbon segment then we can 
only display the data
-            // size and not index size, we can get the data size from table 
status file directly
-            if (!load.getFileFormat.isCarbonFormat) {
-              (if (load.getDataSize == null) -1L else load.getDataSize.toLong, 
-1L)
-            } else {
-              (if (load.getDataSize == null) -1L else load.getDataSize.toLong,
-                if (load.getIndexSize == null) -1L else 
load.getIndexSize.toLong)
-            }
-          }
+  def getLoadTimeTaken(load: LoadMetadataDetails): String = {
+    if (load.getLoadEndTime == 
CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
+      "NA"
+    } else {
+      Duration.between(
+        Instant.ofEpochMilli(load.getLoadEndTime),
+        Instant.ofEpochMilli(load.getLoadStartTime)
+      ).toString
+    }
+  }
 
-          if (showHistory) {
-            Row(
-              load.getLoadName,
-              load.getSegmentStatus.getMessage,
-              startTime,
-              endTime,
-              mergedTo,
-              load.getFileFormat.toString.toUpperCase,
-              load.getVisibility,
-              Strings.formatSize(dataSize.toFloat),
-              Strings.formatSize(indexSize.toFloat),
-              path)
-          } else {
-            Row(
-              load.getLoadName,
-              load.getSegmentStatus.getMessage,
-              startTime,
-              endTime,
-              mergedTo,
-              load.getFileFormat.toString.toUpperCase,
-              Strings.formatSize(dataSize.toFloat),
-              Strings.formatSize(indexSize.toFloat),
-              path)
-          }
-        }.toSeq
+  def getLoadTimeTakenAsMillis(load: LoadMetadataDetails): Long = {
+    if (load.getLoadEndTime == 
CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
+      // loading in progress
+      -1L
     } else {
-      Seq.empty
+      load.getLoadEndTime - load.getLoadStartTime
+    }
+  }
+
+  def getDataAndIndexSize(
+      tablePath: String,
+      load: LoadMetadataDetails): (Long, Long) = {
+    val (dataSize, indexSize) = if 
(load.getFileFormat.equals(FileFormat.ROW_V1)) {
+      // for streaming segment, we should get the actual size from the index 
file
+      // since it is continuously inserting data
+      val segmentDir = CarbonTablePath.getSegmentPath(tablePath, 
load.getLoadName)
+      val indexPath = CarbonTablePath.getCarbonStreamIndexFilePath(segmentDir)
+      val indexFile = FileFactory.getCarbonFile(indexPath)
+      if (indexFile.exists()) {
+        val indices =
+          StreamSegment.readIndexFile(indexPath)
+        (indices.asScala.map(_.getFile_size).sum, indexFile.getSize)
+      } else {
+        (-1L, -1L)
+      }
+    } else {
+      // If the added segment is other than carbon segment then we can only 
display the data
+      // size and not index size, we can get the data size from table status 
file directly
+      if (!load.getFileFormat.isCarbonFormat) {
+        (if (load.getDataSize == null) -1L else load.getDataSize.toLong, -1L)
+      } else {
+        (if (load.getDataSize == null) -1L else load.getDataSize.toLong,
+          if (load.getIndexSize == null) -1L else load.getIndexSize.toLong)
+      }
     }
+    (dataSize, indexSize)
   }
 
   /**
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
deleted file mode 100644
index 9ce9cfb..0000000
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.management
-
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
-import org.apache.spark.sql.execution.command.{Checker, DataCommand}
-import org.apache.spark.sql.types.StringType
-
-import org.apache.carbondata.api.CarbonStore
-import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-
-case class CarbonShowLoadsCommand(
-    databaseNameOp: Option[String],
-    tableName: String,
-    limit: Option[String],
-    showHistory: Boolean = false)
-  extends DataCommand {
-
-  // add new columns of show segments at last
-  override def output: Seq[Attribute] = {
-    if (showHistory) {
-      Seq(AttributeReference("SegmentSequenceId", StringType, nullable = 
false)(),
-        AttributeReference("Status", StringType, nullable = false)(),
-        AttributeReference("Load Start Time", StringType, nullable = false)(),
-        AttributeReference("Load End Time", StringType, nullable = true)(),
-        AttributeReference("Merged To", StringType, nullable = false)(),
-        AttributeReference("File Format", StringType, nullable = false)(),
-        AttributeReference("Visibility", StringType, nullable = false)(),
-        AttributeReference("Data Size", StringType, nullable = false)(),
-        AttributeReference("Index Size", StringType, nullable = false)(),
-        AttributeReference("Path", StringType, nullable = false)())
-    } else {
-      Seq(AttributeReference("SegmentSequenceId", StringType, nullable = 
false)(),
-        AttributeReference("Status", StringType, nullable = false)(),
-        AttributeReference("Load Start Time", StringType, nullable = false)(),
-        AttributeReference("Load End Time", StringType, nullable = true)(),
-        AttributeReference("Merged To", StringType, nullable = false)(),
-        AttributeReference("File Format", StringType, nullable = false)(),
-        AttributeReference("Data Size", StringType, nullable = false)(),
-        AttributeReference("Index Size", StringType, nullable = false)(),
-        AttributeReference("Path", StringType, nullable = false)())
-    }
-  }
-
-  override def processData(sparkSession: SparkSession): Seq[Row] = {
-    Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
-    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName)(sparkSession)
-    setAuditTable(carbonTable)
-    if (!carbonTable.getTableInfo.isTransactionalTable) {
-      throw new MalformedCarbonCommandException("Unsupported operation on non 
transactional table")
-    }
-    CarbonStore.showSegments(
-      limit,
-      carbonTable.getTablePath,
-      showHistory
-    )
-  }
-
-  override protected def opName: String = "SHOW SEGMENTS"
-}
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala
new file mode 100644
index 0000000..26772dd
--- /dev/null
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
+import org.apache.spark.sql.execution.command.{Checker, DataCommand}
+
+import org.apache.carbondata.api.CarbonStore
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+
+case class SegmentRow(
+    id: String, status: String, loadStartTime: String, timeTakenMs: Long, 
partitions: Seq[String],
+    dataSize: Long, indexSize: Long, mergedToId: String, format: String, path: 
String,
+    loadEndTime: String, segmentFileName: String)
+
+case class CarbonShowSegmentsAsSelectCommand(
+    databaseNameOp: Option[String],
+    tableName: String,
+    query: String,
+    showHistory: Boolean = false)
+  extends DataCommand {
+
+  private lazy val sparkSession = SparkSession.getActiveSession.get
+  private lazy val carbonTable = {
+    Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+    CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+  }
+  private lazy val df = createDataFrame
+
+  override def output: Seq[Attribute] = {
+    df.queryExecution.analyzed.output.map { attr =>
+      AttributeReference(attr.name, attr.dataType, nullable = false)()
+    }
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    try {
+      setAuditTable(carbonTable)
+      if (!carbonTable.getTableInfo.isTransactionalTable) {
+        throw new MalformedCarbonCommandException(
+          "Unsupported operation on non transactional table")
+      }
+      df.collect()
+    } catch {
+      case ex: Throwable =>
+        throw new MalformedCarbonCommandException("failed to run query: " + 
ex.getMessage)
+    } finally {
+      sparkSession.catalog.dropTempView(makeTempViewName(carbonTable))
+    }
+  }
+
+  override protected def opName: String = "SHOW SEGMENTS"
+
+  private def createDataFrame: DataFrame = {
+    val tablePath = carbonTable.getTablePath
+    val segments = CarbonStore.readSegments(tablePath, showHistory)
+    val tempViewName = makeTempViewName(carbonTable)
+    registerSegmentRowView(sparkSession, tempViewName, carbonTable, segments)
+    try {
+      sparkSession.sql(query)
+    } catch {
+      case t: Throwable =>
+        sparkSession.catalog.dropTempView(tempViewName)
+        throw t
+    }
+  }
+
+  /**
+   * Generate temp view name for the query to execute
+   */
+  private def makeTempViewName(carbonTable: CarbonTable): String = {
+    s"${carbonTable.getTableName}_segments"
+  }
+
+  private def registerSegmentRowView(
+      sparkSession: SparkSession,
+      tempViewName: String,
+      carbonTable: CarbonTable,
+      segments: Array[LoadMetadataDetails]): Unit = {
+
+    // populate a dataframe containing all segment information
+    val tablePath = carbonTable.getTablePath
+    val segmentRows = segments.toSeq.map { segment =>
+      val mergedToId = CarbonStore.getMergeTo(segment)
+      val path = CarbonStore.getExternalSegmentPath(segment)
+      val startTime = CarbonStore.getLoadStartTime(segment)
+      val endTime = CarbonStore.getLoadEndTime(segment)
+      val timeTaken = CarbonStore.getLoadTimeTakenAsMillis(segment)
+      val (dataSize, indexSize) = CarbonStore.getDataAndIndexSize(tablePath, 
segment)
+      val partitions = CarbonStore.getPartitions(tablePath, segment)
+      SegmentRow(
+        segment.getLoadName,
+        segment.getSegmentStatus.toString,
+        startTime,
+        timeTaken,
+        partitions,
+        dataSize,
+        indexSize,
+        mergedToId,
+        segment.getFileFormat.toString,
+        path,
+        endTime,
+        if (segment.getSegmentFile == null) "NA" else segment.getSegmentFile)
+    }
+
+    // create a temp view using the populated dataframe and execute the query 
on it
+    val df = sparkSession.createDataFrame(segmentRows)
+    checkIfTableExist(sparkSession, tempViewName)
+    df.createOrReplaceTempView(tempViewName)
+  }
+
+  private def checkIfTableExist(sparkSession: SparkSession, tempViewName: 
String): Unit = {
+    if (sparkSession.catalog.tableExists(tempViewName)) {
+      throw new MalformedCarbonCommandException(s"$tempViewName already 
exists, " +
+                                                s"can not show segment by 
query")
+    }
+  }
+
+}
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala
new file mode 100644
index 0000000..c3157ca
--- /dev/null
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
+import org.apache.spark.sql.execution.command.{Checker, DataCommand}
+import org.apache.spark.sql.types.StringType
+
+import org.apache.carbondata.api.CarbonStore.{getDataAndIndexSize, 
getLoadStartTime, getLoadTimeTaken, getPartitions, readSegments}
+import org.apache.carbondata.common.Strings
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+
+case class CarbonShowSegmentsCommand(
+    databaseNameOp: Option[String],
+    tableName: String,
+    showHistory: Boolean = false)
+  extends DataCommand {
+
+  // add new columns of show segments at last
+  override def output: Seq[Attribute] = {
+    Seq(
+      AttributeReference("ID", StringType, nullable = false)(),
+      AttributeReference("Status", StringType, nullable = false)(),
+      AttributeReference("Load Start Time", StringType, nullable = false)(),
+      AttributeReference("Load Time Taken", StringType, nullable = true)(),
+      AttributeReference("Partition", StringType, nullable = true)(),
+      AttributeReference("Data Size", StringType, nullable = false)(),
+      AttributeReference("Index Size", StringType, nullable = false)())
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName)(sparkSession)
+    setAuditTable(carbonTable)
+    if (!carbonTable.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non 
transactional table")
+    }
+    val tablePath = carbonTable.getTablePath
+    val segments = readSegments(tablePath, showHistory)
+    if (segments.nonEmpty) {
+      showBasic(segments, tablePath)
+    } else {
+      Seq.empty
+    }
+  }
+
+  override protected def opName: String = "SHOW SEGMENTS"
+
+  private def showBasic(
+      allSegments: Array[LoadMetadataDetails],
+      tablePath: String): Seq[Row] = {
+    val segments = allSegments.sortWith { (l1, l2) =>
+      java.lang.Double.parseDouble(l1.getLoadName) >
+      java.lang.Double.parseDouble(l2.getLoadName)
+    }
+
+    segments
+      .map { segment =>
+        val startTime = getLoadStartTime(segment)
+        val timeTaken = getLoadTimeTaken(segment)
+        val (dataSize, indexSize) = getDataAndIndexSize(tablePath, segment)
+        val partitions = getPartitions(tablePath, segment)
+        val partitionString = if (partitions.size == 1) {
+          partitions.head
+        } else if (partitions.size > 1) {
+          partitions.head + ", ..."
+        } else {
+          "NA"
+        }
+        Row(
+          segment.getLoadName,
+          segment.getSegmentStatus.getMessage,
+          startTime,
+          timeTaken,
+          partitionString,
+          Strings.formatSize(dataSize.toFloat),
+          Strings.formatSize(indexSize.toFloat))
+      }.toSeq
+  }
+}
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 84bbd59..c917d4f 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -81,12 +81,13 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     startCommand | extendedSparkSyntax
 
   protected lazy val startCommand: Parser[LogicalPlan] =
-    loadManagement | showLoads | alterTable | restructure | updateTable | 
deleteRecords |
+    segmentManagement | alterTable | restructure | updateTable | deleteRecords 
|
     alterTableFinishStreaming | stream | cli |
     cacheManagement | insertStageData | indexCommands | mvCommands
 
-  protected lazy val loadManagement: Parser[LogicalPlan] =
-    deleteLoadsByID | deleteLoadsByLoadDate | deleteStage | cleanFiles | 
addLoad
+  protected lazy val segmentManagement: Parser[LogicalPlan] =
+    deleteSegmentByID | deleteSegmentByLoadDate | deleteStage | cleanFiles | 
addSegment |
+    showSegments
 
   protected lazy val restructure: Parser[LogicalPlan] = alterTableDropColumn
 
@@ -452,7 +453,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
           databaseNameOp, tableName, optionsList, partitions, filePath, 
isOverwrite)
     }
 
-  protected lazy val deleteLoadsByID: Parser[LogicalPlan] =
+  protected lazy val deleteSegmentByID: Parser[LogicalPlan] =
     DELETE ~> FROM ~ TABLE ~> (ident <~ ".").? ~ ident ~
     (WHERE ~> (SEGMENT ~ "." ~ ID) ~> IN ~> "(" ~> repsep(segmentId, ",")) <~ 
")" ~
     opt(";") ^^ {
@@ -460,7 +461,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
         CarbonDeleteLoadByIdCommand(loadids, dbName, tableName.toLowerCase())
     }
 
-  protected lazy val deleteLoadsByLoadDate: Parser[LogicalPlan] =
+  protected lazy val deleteSegmentByLoadDate: Parser[LogicalPlan] =
     DELETE ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
     (WHERE ~> (SEGMENT ~ "." ~ STARTTIME ~> BEFORE) ~ stringLit) <~
     opt(";") ^^ {
@@ -493,7 +494,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
    * schema list format: column_name:data_type
    * for example: 'partition'='a:int,b:string'
    */
-  protected lazy val addLoad: Parser[LogicalPlan] =
+  protected lazy val addSegment: Parser[LogicalPlan] =
     ALTER ~ TABLE ~> (ident <~ ".").? ~ ident ~ (ADD ~> SEGMENT) ~
     (OPTIONS ~> "(" ~> repsep(options, ",") <~ ")") <~ opt(";") ^^ {
       case dbName ~ tableName ~ segment ~ optionsList =>
@@ -529,16 +530,27 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
         }
     }
 
-  protected lazy val showLoads: Parser[LogicalPlan] =
-    (SHOW ~> opt(HISTORY) <~ SEGMENTS <~ FOR <~ TABLE) ~ (ident <~ ".").? ~ 
ident ~
-    (LIMIT ~> numericLit).? <~
-    opt(";") ^^ {
-      case showHistory ~ databaseName ~ tableName ~ limit =>
-        CarbonShowLoadsCommand(
-          CarbonParserUtil.convertDbNameToLowerCase(databaseName),
-          tableName.toLowerCase(),
-          limit,
-          showHistory.isDefined)
+  /**
+   * SHOW [HISTORY] SEGMENTS
+   * [FOR TABLE | ON] [db_name.]table_name
+   * [AS (select query)]
+   */
+  protected lazy val showSegments: Parser[LogicalPlan] =
+    (SHOW ~> opt(HISTORY) <~ SEGMENTS <~ ((FOR <~ TABLE) | ON)) ~ (ident <~ 
".").? ~ ident ~
+    (AS  ~> restInput).? <~ opt(";") ^^ {
+      case showHistory ~ databaseName ~ tableName ~ queryOp =>
+        if (queryOp.isEmpty) {
+          CarbonShowSegmentsCommand(
+            CarbonParserUtil.convertDbNameToLowerCase(databaseName),
+            tableName.toLowerCase(),
+            showHistory.isDefined)
+        } else {
+          CarbonShowSegmentsAsSelectCommand(
+            CarbonParserUtil.convertDbNameToLowerCase(databaseName),
+            tableName.toLowerCase(),
+            queryOp.get,
+            showHistory.isDefined)
+        }
     }
 
   protected lazy val showCache: Parser[LogicalPlan] =
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainIndexFunctionSuite.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainIndexFunctionSuite.scala
index 3ddb959..88d9498 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainIndexFunctionSuite.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainIndexFunctionSuite.scala
@@ -41,7 +41,6 @@ class BloomCoarseGrainIndexFunctionSuite  extends QueryTest 
with BeforeAndAfterA
   val indexName = "bloom_dm"
 
   override protected def beforeAll(): Unit = {
-    sqlContext.sparkContext.setLogLevel("info")
     deleteFile(bigFile)
     new File(CarbonProperties.getInstance().getSystemFolderLocation).delete()
     createFile(bigFile, line = 2000)
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index 3e9f5de..0fd32c5 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -233,8 +233,8 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
     val descFormattedSize = sql("desc formatted 
addsegment1").collect().filter(_.get(0).toString.startsWith("Table Data 
Size")).head.get(1).toString
     val size = getDataSize(newPath)
     assert(descFormattedSize.split("KB")(0).toDouble > 0.0d)
-    assert(showSeg.get(0).get(6).toString.equalsIgnoreCase(size))
-    assert(showSeg.get(0).get(7).toString.equalsIgnoreCase("NA"))
+    assert(showSeg.get(0).get(5).toString.equalsIgnoreCase(size))
+    assert(showSeg.get(0).get(6).toString.equalsIgnoreCase("NA"))
     FileFactory.deleteAllFilesOfDir(new File(newPath))
   }
 
@@ -446,8 +446,8 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
     checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30)))
 
     sql(s"alter table addsegment1 add segment options('path'='$newPath', 
'format'='PARQUET')").show()
-    checkExistence(sql(s"show segments for table addsegment1"), true, 
"spark/target/warehouse/addsegtest")
-    checkExistence(sql(s"show history segments for table addsegment1"), true, 
"spark/target/warehouse/addsegtest")
+    checkExistence(sql(s"show segments for table addsegment1 as select * from 
addsegment1_segments"), true, "spark/target/warehouse/addsegtest")
+    checkExistence(sql(s"show history segments for table addsegment1 as select 
* from addsegment1_segments"), true, "spark/target/warehouse/addsegtest")
     FileFactory.deleteAllFilesOfDir(new File(newPath))
   }
 
@@ -555,8 +555,8 @@ class AddSegmentTestCase extends QueryTest with 
BeforeAndAfterAll {
     checkAnswer(sql("select * from carbon_table"), sql("select * from 
parquet_table"))
 
     // test show segment
-    checkExistence(sql(s"show segments for table carbon_table"), true, 
"spark/target/warehouse/parquet_table")
-    checkExistence(sql(s"show history segments for table carbon_table"), true, 
"spark/target/warehouse/parquet_table")
+    checkExistence(sql(s"show segments for table carbon_table as select * from 
carbon_table_segments"), true, "spark/target/warehouse/parquet_table")
+    checkExistence(sql(s"show history segments for table carbon_table as 
select * from carbon_table_segments"), true, 
"spark/target/warehouse/parquet_table")
 
     sql("drop table if exists parquet_table")
     sql("drop table if exists carbon_table")
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segment/ShowSegmentTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segment/ShowSegmentTestCase.scala
new file mode 100644
index 0000000..a6d481f
--- /dev/null
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segment/ShowSegmentTestCase.scala
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.segment
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+
+/**
+ * Test Class for SHOW SEGMENTS command
+ */
+class ShowSegmentTestCase extends QueryTest with BeforeAndAfterAll {
+
+  test("test show segment by query, success case") {
+    sql("drop table if exists source")
+    sql(
+      """
+        |create table source (age int)
+        |STORED AS carbondata
+        |partitioned by (name string, class string)
+        
|TBLPROPERTIES('AUTO_LOAD_MERGE'='true','COMPACTION_LEVEL_THRESHOLD'='2,2')
+        |""".stripMargin)
+    sql("insert into source select 1, 'abc1', 'classA'")
+    sql("insert into source select 2, 'abc2', 'classB'")
+    sql("insert into source select 3, 'abc3', 'classA'")
+    sql("insert into source select 4, 'abc4', 'classB'")
+    sql("insert into source select 5, 'abc5', 'classA'")
+    sql("insert into source select 6, 'abc6', 'classC'")
+    sql("show segments on source").show(false)
+
+    val df = sql(s"""show segments on source""").collect()
+    // validating headers
+    val header = df(0).schema
+    assert(header(0).name.equalsIgnoreCase("ID"))
+    assert(header(1).name.equalsIgnoreCase("Status"))
+    assert(header(2).name.equalsIgnoreCase("Load Start Time"))
+    assert(header(3).name.equalsIgnoreCase("Load Time Taken"))
+    assert(header(4).name.equalsIgnoreCase("Partition"))
+    assert(header(5).name.equalsIgnoreCase("Data Size"))
+    assert(header(6).name.equalsIgnoreCase("Index Size"))
+    val col = df
+      .map(row => Row(row.getString(0), row.getString(1)))
+      .filter(_.getString(1).equals("Success"))
+      .toSeq
+    assert(col.equals(Seq(Row("4.1", "Success"), Row("0.2", "Success"))))
+
+    var rows = sql(
+      """
+        | show segments on source as
+        | select id, status, datasize from source_segments where status = 
'Success' order by dataSize
+        |""".stripMargin).collect()
+
+    assertResult("4.1")(rows(0).get(0))
+    assertResult("Success")(rows(0).get(1))
+    assertResult("0.2")(rows(1).get(0))
+    assertResult("Success")(rows(1).get(1))
+
+    val tables = sql("show tables").collect()
+    assert(!tables.toSeq.exists(_.get(1).equals("source_segments")))
+
+    sql(s"""drop table source""").collect
+  }
+
+  test("Show Segments on empty table") {
+    sql(s"""drop TABLE if exists source""").collect
+    sql(s"""CREATE TABLE source (CUST_ID int,CUST_NAME 
String,ACTIVE_EMUI_VERSION string,DOB timestamp, DOJ timestamp, BIGINT_COLUMN1 
bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),Double_COLUMN1 
double,DECIMAL_COLUMN2 decimal(36,10), Double_COLUMN2 double,INTEGER_COLUMN1 
int) STORED AS carbondata TBLPROPERTIES('table_blocksize'='1')""").collect
+    checkAnswer(sql("show segments on source"), Seq.empty)
+    val result = sql("show segments on source as select * from 
source_segments").collect()
+    assertResult(0)(result.length)
+  }
+
+  test("test show segments on already existing table") {
+    sql("drop TABLE if exists source").collect
+    sql(
+      """
+        |create table source (age int, name string, class string)
+        |STORED AS carbondata
+        |""".stripMargin)
+    sql("insert into source select 1, 'abc1', 'classA'")
+    sql("drop table if exists source_segments")
+    sql("create table source_segments (age int)")
+    val ex = intercept[MalformedCarbonCommandException](sql("show segments on 
source as select * from source_segments"))
+    assert(ex.getMessage.contains("source_segments already exists"))
+    sql("drop TABLE if exists source")
+    sql("drop table if exists source_segments")
+  }
+
+  test(" test show segments by wrong query") {
+    sql("drop TABLE if exists source").collect
+    sql(
+      """
+        |create table source (age int, name string, class string)
+        |STORED AS carbondata
+        |""".stripMargin)
+    sql("insert into source select 1, 'abc1', 'classA'")
+    val ex = intercept[AnalysisException](sql("show segments on source as 
select dsjk from source_segments"))
+    val tables = sql("show tables").collect()
+    assert(!tables.toSeq.exists(_.get(1).equals("source_segments")))
+    sql("drop TABLE if exists source")
+  }
+
+  //Show Segments failing if table name not in same case
+  test("DataLoadManagement001_830") {
+    sql(s"""drop TABLE if exists Case_ShowSegment_196""").collect
+    sql(s"""CREATE TABLE Case_ShowSegment_196 (CUST_ID int,CUST_NAME 
String,ACTIVE_EMUI_VERSION string,DOB timestamp, DOJ timestamp, BIGINT_COLUMN1 
bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),Double_COLUMN1 
double,DECIMAL_COLUMN2 decimal(36,10), Double_COLUMN2 double,INTEGER_COLUMN1 
int) STORED AS carbondata TBLPROPERTIES('table_blocksize'='1')""").collect
+    val df = sql(s"""show segments on 
default.CASE_ShowSegment_196""").collect()
+    val col = df.map {
+      row => Row(row.getString(0), row.getString(1), row.getString(4))
+    }.toSeq
+    assert(col.equals(Seq()))
+    sql(s"""drop table Case_ShowSegment_196""").collect
+  }
+
+  test("separate visible and invisible segments info into two files") {
+    val tableName = "test_tablestatus_history"
+    sql(s"drop table if exists ${tableName}")
+    sql(s"create table ${tableName} (name String, age int) STORED AS 
carbondata "
+        + 
"TBLPROPERTIES('AUTO_LOAD_MERGE'='true','COMPACTION_LEVEL_THRESHOLD'='2,2')")
+    val carbonTable = CarbonEnv.getCarbonTable(Some("default"), 
tableName)(sqlContext.sparkSession)
+    insertTestDataIntoTable(tableName)
+    assert(sql(s"show segments on ${tableName} as select * from 
${tableName}_segments").collect().length == 10)
+    var detail = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+    var historyDetail = 
SegmentStatusManager.readLoadHistoryMetadata(carbonTable.getMetadataPath)
+    assert(detail.length == 10)
+    assert(historyDetail.length == 0)
+    sql(s"clean files for table ${tableName}")
+    assert(sql(s"show segments on ${tableName}").collect().length == 2)
+    detail = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+    historyDetail = 
SegmentStatusManager.readLoadHistoryMetadata(carbonTable.getMetadataPath)
+    assert(detail.length == 4)
+    assert(historyDetail.length == 6)
+    dropTable(tableName)
+  }
+
+  test("show history segments") {
+    val tableName = "test_tablestatus_history"
+    sql(s"drop table if exists ${tableName}")
+    sql(s"create table ${tableName} (name String, age int) STORED AS 
carbondata "
+        + 
"TBLPROPERTIES('AUTO_LOAD_MERGE'='true','COMPACTION_LEVEL_THRESHOLD'='2,2')")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", 
tableName)
+    insertTestDataIntoTable(tableName)
+    assert(sql(s"show segments on ${tableName} as select * from 
${tableName}_segments").collect().length == 10)
+    assert(sql(s"show history segments on ${tableName} as select * from 
${tableName}_segments").collect().length == 10)
+    sql(s"clean files for table ${tableName}")
+    assert(sql(s"show segments on ${tableName} as select * from 
${tableName}_segments").collect().length == 2)
+    sql(s"show history segments on ${tableName} as select * from 
${tableName}_segments").show(false)
+    val segmentsHistoryList = sql(s"show history segments on ${tableName} as 
select * from ${tableName}_segments").collect()
+    assert(segmentsHistoryList.length == 10)
+    assertResult("0")(segmentsHistoryList(0).getString(0))
+    assertResult("Compacted")(segmentsHistoryList(0).getString(1))
+    assertResult("0.1")(segmentsHistoryList(0).getString(7))
+    assertResult("0.2")(segmentsHistoryList(1).getString(0))
+    assertResult("Success")(segmentsHistoryList(1).getString(1))
+    assertResult("5")(segmentsHistoryList(2).getString(0))
+    assertResult("Compacted")(segmentsHistoryList(2).getString(1))
+    assertResult("4.1")(segmentsHistoryList(3).getString(0))
+    assertResult("Success")(segmentsHistoryList(3).getString(1))
+    assertResult("1")(segmentsHistoryList(4).getString(0))
+    assertResult("Compacted")(segmentsHistoryList(4).getString(1))
+    assertResult("0.1")(segmentsHistoryList(4).getString(7))
+    assertResult("3")(segmentsHistoryList(7).getString(0))
+    assertResult("Compacted")(segmentsHistoryList(7).getString(1))
+    assertResult("2.1")(segmentsHistoryList(8).getString(0))
+    assertResult("Compacted")(segmentsHistoryList(8).getString(1))
+    assertResult("4")(segmentsHistoryList(9).getString(0))
+    assertResult("Compacted")(segmentsHistoryList(9).getString(1))
+    assert(sql(s"show history segments on ${tableName} as select * from 
${tableName}_segments limit 3").collect().length == 3)
+    dropTable(tableName)
+  }
+
+  private def insertTestDataIntoTable(tableName: String) = {
+    sql(s"insert into ${ tableName } select 'abc1',1")
+    sql(s"insert into ${ tableName } select 'abc2',2")
+    sql(s"insert into ${ tableName } select 'abc3',3")
+    sql(s"insert into ${ tableName } select 'abc4',4")
+    sql(s"insert into ${ tableName } select 'abc5',5")
+    sql(s"insert into ${ tableName } select 'abc6',6")
+  }
+}
\ No newline at end of file
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
index 0d4e0b4..8707ec7 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
@@ -250,14 +250,14 @@ class TestSegmentReading extends QueryTest with 
BeforeAndAfterAll {
       sql(
         s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
carbon_table_show_seg OPTIONS
             |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
-      val df = sql("SHOW SEGMENTS for table carbon_table_show_seg")
+      val df = sql("SHOW SEGMENTS for table carbon_table_show_seg as select * 
from carbon_table_show_seg_segments")
       val col = df.collect().map{
-        row => Row(row.getString(0),row.getString(1),row.getString(4))
+        row => Row(row.getString(0),row.getString(1),row.getString(7))
       }.toSeq
-      assert(col.equals(Seq(Row("2","Success","NA"),
+      assert(col.equals(Seq(Row("0","Compacted","0.1"),
         Row("1","Compacted","0.1"),
         Row("0.1","Success","NA"),
-        Row("0","Compacted","0.1"))))
+        Row("2","Success","NA"))))
     }
     finally {
       sql("SET carbon.input.segments.default.carbon_table=*")
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 4df678b..d05b9ac 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -498,8 +498,8 @@ class StandardPartitionTableLoadingTestCase extends 
QueryTest with BeforeAndAfte
     sql("insert into new_par select 1,'k'")
     val result = sql("show segments for table new_par").collectAsList()
     val dataAndIndexSize = getDataAndIndexSize(s"$storeLocation/new_par/b=k")
-    assert(result.get(0).get(6).equals(dataAndIndexSize._1))
-    assert(result.get(0).get(7).equals(dataAndIndexSize._2))
+    assert(result.get(0).get(5).equals(dataAndIndexSize._1))
+    assert(result.get(0).get(6).equals(dataAndIndexSize._2))
   }
 
   test("test partition with all sort scope") {
diff --git 
a/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
 
b/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
index 3ace9a9..b38626f 100644
--- 
a/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
+++ 
b/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
@@ -721,17 +721,17 @@ class TestStreamingTableOpName extends QueryTest with 
BeforeAndAfterAll {
 
     sql("alter table streaming.stream_table_filter compact 'minor'")
     Thread.sleep(5000)
-    val result1 = sql("show segments for table 
streaming.stream_table_filter").collect()
+    val result1 = sql("show segments for table streaming.stream_table_filter 
as select * from stream_table_filter_segments").collect()
     result1.foreach { row =>
       if (row.getString(0).equals("1")) {
         assertResult(SegmentStatus.STREAMING.getMessage)(row.getString(1))
-        assertResult(FileFormat.ROW_V1.toString)(row.getString(5).toLowerCase)
+        assertResult(FileFormat.ROW_V1.toString)(row.getString(8).toLowerCase)
       } else if (row.getString(0).equals("0.1")) {
         assertResult(SegmentStatus.SUCCESS.getMessage)(row.getString(1))
-        
assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5).toLowerCase)
+        
assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(8).toLowerCase)
       } else {
         assertResult(SegmentStatus.COMPACTED.getMessage)(row.getString(1))
-        
assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5).toLowerCase)
+        
assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(8).toLowerCase)
       }
     }
 
@@ -1242,7 +1242,7 @@ class TestStreamingTableOpName extends QueryTest with 
BeforeAndAfterAll {
 
     sql("alter table streaming.stream_table_reopen compact 'close_streaming'")
     val newSegments =
-      sql("show segments for table streaming.stream_table_reopen").collect()
+      sql("show segments for table streaming.stream_table_reopen as select * 
from stream_table_reopen_segments").collect()
     assert(newSegments.length == 8 || newSegments.length == 10 || 
newSegments.length == 12)
     assertResult(newSegments.length / 
2)(newSegments.filter(_.getString(1).equals("Success")).length)
     assertResult(newSegments.length / 
2)(newSegments.filter(_.getString(1).equals("Compacted")).length)
@@ -1250,7 +1250,7 @@ class TestStreamingTableOpName extends QueryTest with 
BeforeAndAfterAll {
     //Verify MergeTO column entry for compacted Segments
     newSegments.filter(_.getString(1).equals("Compacted")).foreach{ rw =>
       assertResult("Compacted")(rw.getString(1))
-      assert(Integer.parseInt(rw.getString(0)) < 
Integer.parseInt(rw.getString(4)))
+      assert(Integer.parseInt(rw.getString(0)) < 
Integer.parseInt(rw.getString(7)))
     }
     checkAnswer(
       sql("select count(*) from streaming.stream_table_reopen"),
diff --git 
a/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
 
b/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
index 2286a60..1f861b0 100644
--- 
a/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
+++ 
b/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
@@ -393,17 +393,17 @@ class TestStreamingTableWithRowParser extends QueryTest 
with BeforeAndAfterAll {
 
     sql("alter table streaming1.stream_table_filter compact 'minor'")
     Thread.sleep(5000)
-    val result1 = sql("show segments for table 
streaming1.stream_table_filter").collect()
+    val result1 = sql("show segments for table streaming1.stream_table_filter 
as select * from stream_table_filter_segments").collect()
     result1.foreach { row =>
       if (row.getString(0).equals("1")) {
         assertResult(SegmentStatus.STREAMING.getMessage)(row.getString(1))
-        assertResult(FileFormat.ROW_V1.toString)(row.getString(5).toLowerCase)
+        assertResult(FileFormat.ROW_V1.toString)(row.getString(8).toLowerCase)
       } else if (row.getString(0).equals("0.1")) {
         assertResult(SegmentStatus.SUCCESS.getMessage)(row.getString(1))
-        
assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5).toLowerCase)
+        
assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(8).toLowerCase)
       } else {
         assertResult(SegmentStatus.COMPACTED.getMessage)(row.getString(1))
-        
assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5).toLowerCase)
+        
assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(8).toLowerCase)
       }
     }
 
diff --git 
a/integration/spark/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
 
b/integration/spark/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
index 7634d92..e12c272 100644
--- 
a/integration/spark/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
+++ 
b/integration/spark/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala
@@ -143,63 +143,4 @@ class CarbonCommandSuite extends QueryTest with 
BeforeAndAfterAll {
     dropTable(table)
   }
 
-  test("separate visible and invisible segments info into two files") {
-    val tableName = "test_tablestatus_history"
-    sql(s"drop table if exists ${tableName}")
-    sql(s"create table ${tableName} (name String, age int) STORED AS 
carbondata "
-      + 
"TBLPROPERTIES('AUTO_LOAD_MERGE'='true','COMPACTION_LEVEL_THRESHOLD'='2,2')")
-    val carbonTable = CarbonEnv.getCarbonTable(Some("default"), 
tableName)(sqlContext.sparkSession)
-    sql(s"insert into ${tableName} select 'abc1',1")
-    sql(s"insert into ${tableName} select 'abc2',2")
-    sql(s"insert into ${tableName} select 'abc3',3")
-    sql(s"insert into ${tableName} select 'abc4',4")
-    sql(s"insert into ${tableName} select 'abc5',5")
-    sql(s"insert into ${tableName} select 'abc6',6")
-    assert(sql(s"show segments for table ${tableName}").collect().length == 10)
-    var detail = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
-    var historyDetail = 
SegmentStatusManager.readLoadHistoryMetadata(carbonTable.getMetadataPath)
-    assert(detail.length == 10)
-    assert(historyDetail.length == 0)
-    sql(s"clean files for table ${tableName}")
-    assert(sql(s"show segments for table ${tableName}").collect().length == 2)
-    detail = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
-    historyDetail = 
SegmentStatusManager.readLoadHistoryMetadata(carbonTable.getMetadataPath)
-    assert(detail.length == 4)
-    assert(historyDetail.length == 6)
-    dropTable(tableName)
-  }
-
-  test("show history segments") {
-    val tableName = "test_tablestatus_history"
-    sql(s"drop table if exists ${tableName}")
-    sql(s"create table ${tableName} (name String, age int) STORED AS 
carbondata "
-      + 
"TBLPROPERTIES('AUTO_LOAD_MERGE'='true','COMPACTION_LEVEL_THRESHOLD'='2,2')")
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", 
tableName)
-    sql(s"insert into ${tableName} select 'abc1',1")
-    sql(s"insert into ${tableName} select 'abc2',2")
-    sql(s"insert into ${tableName} select 'abc3',3")
-    sql(s"insert into ${tableName} select 'abc4',4")
-    sql(s"insert into ${tableName} select 'abc5',5")
-    sql(s"insert into ${tableName} select 'abc6',6")
-    assert(sql(s"show segments for table ${tableName}").collect().length == 10)
-    assert(sql(s"show history segments for table 
${tableName}").collect().length == 10)
-    sql(s"clean files for table ${tableName}")
-    assert(sql(s"show segments for table ${tableName}").collect().length == 2)
-    val segmentsHisotryList = sql(s"show history segments for table 
${tableName}").collect()
-    assert(segmentsHisotryList.length == 10)
-    assert(segmentsHisotryList(0).getString(0).equalsIgnoreCase("5"))
-    assert(segmentsHisotryList(0).getString(6).equalsIgnoreCase("false"))
-    assert(segmentsHisotryList(0).getString(1).equalsIgnoreCase("Compacted"))
-    assert(segmentsHisotryList(1).getString(0).equalsIgnoreCase("4.1"))
-    assert(segmentsHisotryList(1).getString(6).equalsIgnoreCase("true"))
-    assert(segmentsHisotryList(1).getString(1).equalsIgnoreCase("Success"))
-    assert(segmentsHisotryList(3).getString(0).equalsIgnoreCase("3"))
-    assert(segmentsHisotryList(3).getString(6).equalsIgnoreCase("false"))
-    assert(segmentsHisotryList(3).getString(1).equalsIgnoreCase("Compacted"))
-    assert(segmentsHisotryList(7).getString(0).equalsIgnoreCase("0.2"))
-    assert(segmentsHisotryList(7).getString(6).equalsIgnoreCase("true"))
-    assert(segmentsHisotryList(7).getString(1).equalsIgnoreCase("Success"))
-    assert(sql(s"show history segments for table ${tableName} limit 
3").collect().length == 3)
-    dropTable(tableName)
-  }
 }

Reply via email to