This is an automated email from the ASF dual-hosted git repository. ajantha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 84e3297 [CARBONDATA-3736] Support show segment by query 84e3297 is described below commit 84e3297332fb8a14e5bdb94806de36f44395a54c Author: Jacky Li <jacky.li...@qq.com> AuthorDate: Wed Mar 18 13:06:04 2020 +0800 [CARBONDATA-3736] Support show segment by query Why is this PR needed? There are many fields in segment that not shown in SHOW SEGMENTS command. What changes were proposed in this PR? This PR change SHOW SEGMENTS command to: SHOW [HISTORY] SEGMENTS [FOR TABLE | ON] [db_name.]table_name [AS select_query] User can query the segments as it is a table. A sample output as following: show segments on source as select id, status, dataSize from source_segments where status = 'Success' order by dataSize +------------------------+ |output | +------------------------+ |id | status | dataSize| |4.1 | Success | 1762 | |0.2 | Success | 3524 | +------------------------+ Does this PR introduce any user interface change? Yes Is any new testcase added? Yes This closes #3657 --- .../java/org/apache/carbondata/common/Strings.java | 24 +-- docs/segment-management-on-carbondata.md | 56 +++++- .../org/apache/carbondata/api/CarbonStore.scala | 209 ++++++++++----------- .../management/CarbonShowLoadsCommand.scala | 76 -------- .../CarbonShowSegmentsAsSelectCommand.scala | 137 ++++++++++++++ .../management/CarbonShowSegmentsCommand.scala | 97 ++++++++++ .../spark/sql/parser/CarbonSpark2SqlParser.scala | 44 +++-- .../bloom/BloomCoarseGrainIndexFunctionSuite.scala | 1 - .../testsuite/addsegment/AddSegmentTestCase.scala | 12 +- .../testsuite/segment/ShowSegmentTestCase.scala | 198 +++++++++++++++++++ .../segmentreading/TestSegmentReading.scala | 8 +- .../StandardPartitionTableLoadingTestCase.scala | 4 +- .../carbondata/TestStreamingTableOpName.scala | 12 +- .../TestStreamingTableWithRowParser.scala | 8 +- .../org/apache/spark/util/CarbonCommandSuite.scala | 59 ------ 15 files changed, 651 insertions(+), 294 deletions(-) diff --git a/common/src/main/java/org/apache/carbondata/common/Strings.java b/common/src/main/java/org/apache/carbondata/common/Strings.java index 4bb9dc8..0a3eea0 100644 --- a/common/src/main/java/org/apache/carbondata/common/Strings.java +++ b/common/src/main/java/org/apache/carbondata/common/Strings.java @@ -43,26 +43,26 @@ public class Strings { /** * Append KB/MB/GB/TB to the input size and return - * @param size data size + * @param sizeInBytes data size in bytes * @return data size with unit */ - public static String formatSize(float size) { + public static String formatSize(float sizeInBytes) { long KB = 1024L; long MB = KB << 10; long GB = MB << 10; long TB = GB << 10; - if (size < 0) { + if (sizeInBytes < 0) { return "NA"; - } else if (size >= 0 && size < KB) { - return String.format("%sB", size); - } else if (size >= KB && size < MB) { - return String.format("%.2fKB", size / KB); - } else if (size >= MB && size < GB) { - return String.format("%.2fMB", size / MB); - } else if (size >= GB && size < TB) { - return String.format("%.2fGB", size / GB); + } else if (sizeInBytes >= 0 && sizeInBytes < KB) { + return String.format("%sB", sizeInBytes); + } else if (sizeInBytes >= KB && sizeInBytes < MB) { + return String.format("%.2fKB", sizeInBytes / KB); + } else if (sizeInBytes >= MB && sizeInBytes < GB) { + return String.format("%.2fMB", sizeInBytes / MB); + } else if (sizeInBytes >= GB && sizeInBytes < TB) { + return String.format("%.2fGB", sizeInBytes / GB); } else { - return String.format("%.2fTB", size / TB); + return String.format("%.2fTB", sizeInBytes / TB); } } } diff --git a/docs/segment-management-on-carbondata.md b/docs/segment-management-on-carbondata.md index fe0cbd4..d18aca1 100644 --- a/docs/segment-management-on-carbondata.md +++ b/docs/segment-management-on-carbondata.md @@ -31,19 +31,69 @@ concept which helps to maintain consistency of data and easy transaction managem This command is used to list the segments of CarbonData table. ``` - SHOW [HISTORY] SEGMENTS FOR TABLE [db_name.]table_name LIMIT number_of_segments + SHOW [HISTORY] SEGMENTS + [FOR TABLE | ON] [db_name.]table_name + [AS (select query from table_name_segments)] ``` + By default, SHOW SEGMENT command will return following fields: + +- Segment ID +- Segment Status +- Load Start Time +- Load Time Taken +- Partition +- Data Size +- Index Size + + Example: Show visible segments + ``` - SHOW SEGMENTS FOR TABLE CarbonDatabase.CarbonTable LIMIT 4 + SHOW SEGMENTS ON CarbonDatabase.CarbonTable ``` + Show all segments, include invisible segments ``` - SHOW HISTORY SEGMENTS FOR TABLE CarbonDatabase.CarbonTable LIMIT 4 + SHOW HISTORY SEGMENTS ON CarbonDatabase.CarbonTable + ``` + + + When more detail of the segment is required, user can issue SHOW SEGMENT by query. + + The query should against table name with '_segments' appended and select from following fields: + +- id: String, the id of the segment +- status: String, status of the segment +- loadStartTime: String, loading start time +- loadEndTime: String, loading end time +- timeTakenMs: Long, time spent in loading of the segment in milliseconds +- partitions: String array, partition key and values +- dataSize: Long, data size in bytes +- indexSize: Long, index size in bytes +- mergedToId: String, the target segment that this segment has been compacted +- format: String, data format of the segment +- path: String, in case of external segment this will be the path of the segment, otherwise it is null +- segmentFileName: String, name of the segment file + + Example: + + ``` + SHOW SEGMENTS ON CarbonTable AS + SELECT * FROM CarbonTable_segments + + SHOW SEGMENTS ON CarbonTable AS + SELECT id, dataSize FROM CarbonTable_segments + WHERE status='Success' + ORDER BY dataSize + + SHOW SEGMENTS ON CarbonTable AS + SELECT avg(timeTakenMs) FROM CarbonTable_segments ``` + + ### DELETE SEGMENT BY ID This command is used to delete segment by using the segment ID. Each segment has a unique segment ID associated with it. diff --git a/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala index 275d260..f516431 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala @@ -17,17 +17,14 @@ package org.apache.carbondata.api -import java.lang.Long +import java.time.{Duration, Instant} import scala.collection.JavaConverters._ import org.apache.commons.lang3.StringUtils -import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.util.CarbonException import org.apache.spark.unsafe.types.UTF8String -import org.apache.carbondata.common.Strings import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -38,126 +35,128 @@ import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil -import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.statusmanager.{FileFormat, LoadMetadataDetails, SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.streaming.segment.StreamSegment object CarbonStore { private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - def showSegments( - limit: Option[String], - tablePath: String, - showHistory: Boolean): Seq[Row] = { + def readSegments(tablePath: String, showHistory: Boolean): Array[LoadMetadataDetails] = { val metaFolder = CarbonTablePath.getMetadataPath(tablePath) - val loadMetadataDetailsArray = if (showHistory) { + val segmentsMetadataDetails = if (showHistory) { SegmentStatusManager.readLoadMetadata(metaFolder) ++ SegmentStatusManager.readLoadHistoryMetadata(metaFolder) } else { SegmentStatusManager.readLoadMetadata(metaFolder) } + if (!showHistory) { + segmentsMetadataDetails.filter(_.getVisibility.equalsIgnoreCase("true")) + } else { + segmentsMetadataDetails + } + } - if (loadMetadataDetailsArray.nonEmpty) { - var loadMetadataDetailsSortedArray = loadMetadataDetailsArray.sortWith { (l1, l2) => - java.lang.Double.parseDouble(l1.getLoadName) > java.lang.Double.parseDouble(l2.getLoadName) - } - if (!showHistory) { - loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray - .filter(_.getVisibility.equalsIgnoreCase("true")) + def getPartitions(tablePath: String, load: LoadMetadataDetails): Seq[String] = { + val segmentFile = SegmentFileStore.readSegmentFile( + CarbonTablePath.getSegmentFilePath(tablePath, load.getSegmentFile)) + if (segmentFile == null) { + return Seq.empty + } + val locationMap = segmentFile.getLocationMap + if (locationMap != null) { + locationMap.asScala.map { + case (_, detail) => + s"{${ detail.getPartitions.asScala.mkString(",") }}" + }.toSeq + } else { + Seq.empty + } + } + + def getMergeTo(load: LoadMetadataDetails): String = { + if (load.getMergedLoadName != null) { + load.getMergedLoadName + } else { + "NA" + } + } + + def getExternalSegmentPath(load: LoadMetadataDetails): String = { + if (StringUtils.isNotEmpty(load.getPath)) { + load.getPath + } else { + "NA" + } + } + + def getLoadStartTime(load: LoadMetadataDetails): String = { + val startTime = + if (load.getLoadStartTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) { + "NA" + } else { + new java.sql.Timestamp(load.getLoadStartTime).toString } - if (limit.isDefined) { - val limitLoads = limit.get - try { - val lim = Integer.parseInt(limitLoads) - loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray.slice(0, lim) - } catch { - case _: NumberFormatException => - CarbonException.analysisException("Entered limit is not a valid Number") - } + startTime + } + + def getLoadEndTime(load: LoadMetadataDetails): String = { + val endTime = + if (load.getLoadStartTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) { + "NA" + } else { + new java.sql.Timestamp(load.getLoadEndTime).toString } + endTime + } - loadMetadataDetailsSortedArray - .map { load => - val mergedTo = - if (load.getMergedLoadName != null) { - load.getMergedLoadName - } else { - "NA" - } - - val path = - if (StringUtils.isNotEmpty(load.getPath)) { - load.getPath - } else { - "NA" - } - - val startTime = - if (load.getLoadStartTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) { - "NA" - } else { - new java.sql.Timestamp(load.getLoadStartTime).toString - } - - val endTime = - if (load.getLoadEndTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) { - "NA" - } else { - new java.sql.Timestamp(load.getLoadEndTime).toString - } - - val (dataSize, indexSize) = if (load.getFileFormat.equals(FileFormat.ROW_V1)) { - // for streaming segment, we should get the actual size from the index file - // since it is continuously inserting data - val segmentDir = CarbonTablePath.getSegmentPath(tablePath, load.getLoadName) - val indexPath = CarbonTablePath.getCarbonStreamIndexFilePath(segmentDir) - val indexFile = FileFactory.getCarbonFile(indexPath) - if (indexFile.exists()) { - val indices = - StreamSegment.readIndexFile(indexPath) - (indices.asScala.map(_.getFile_size).sum, indexFile.getSize) - } else { - (-1L, -1L) - } - } else { - // If the added segment is other than carbon segment then we can only display the data - // size and not index size, we can get the data size from table status file directly - if (!load.getFileFormat.isCarbonFormat) { - (if (load.getDataSize == null) -1L else load.getDataSize.toLong, -1L) - } else { - (if (load.getDataSize == null) -1L else load.getDataSize.toLong, - if (load.getIndexSize == null) -1L else load.getIndexSize.toLong) - } - } + def getLoadTimeTaken(load: LoadMetadataDetails): String = { + if (load.getLoadEndTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) { + "NA" + } else { + Duration.between( + Instant.ofEpochMilli(load.getLoadEndTime), + Instant.ofEpochMilli(load.getLoadStartTime) + ).toString + } + } - if (showHistory) { - Row( - load.getLoadName, - load.getSegmentStatus.getMessage, - startTime, - endTime, - mergedTo, - load.getFileFormat.toString.toUpperCase, - load.getVisibility, - Strings.formatSize(dataSize.toFloat), - Strings.formatSize(indexSize.toFloat), - path) - } else { - Row( - load.getLoadName, - load.getSegmentStatus.getMessage, - startTime, - endTime, - mergedTo, - load.getFileFormat.toString.toUpperCase, - Strings.formatSize(dataSize.toFloat), - Strings.formatSize(indexSize.toFloat), - path) - } - }.toSeq + def getLoadTimeTakenAsMillis(load: LoadMetadataDetails): Long = { + if (load.getLoadEndTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) { + // loading in progress + -1L } else { - Seq.empty + load.getLoadEndTime - load.getLoadStartTime + } + } + + def getDataAndIndexSize( + tablePath: String, + load: LoadMetadataDetails): (Long, Long) = { + val (dataSize, indexSize) = if (load.getFileFormat.equals(FileFormat.ROW_V1)) { + // for streaming segment, we should get the actual size from the index file + // since it is continuously inserting data + val segmentDir = CarbonTablePath.getSegmentPath(tablePath, load.getLoadName) + val indexPath = CarbonTablePath.getCarbonStreamIndexFilePath(segmentDir) + val indexFile = FileFactory.getCarbonFile(indexPath) + if (indexFile.exists()) { + val indices = + StreamSegment.readIndexFile(indexPath) + (indices.asScala.map(_.getFile_size).sum, indexFile.getSize) + } else { + (-1L, -1L) + } + } else { + // If the added segment is other than carbon segment then we can only display the data + // size and not index size, we can get the data size from table status file directly + if (!load.getFileFormat.isCarbonFormat) { + (if (load.getDataSize == null) -1L else load.getDataSize.toLong, -1L) + } else { + (if (load.getDataSize == null) -1L else load.getDataSize.toLong, + if (load.getIndexSize == null) -1L else load.getIndexSize.toLong) + } } + (dataSize, indexSize) } /** diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala deleted file mode 100644 index 9ce9cfb..0000000 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.command.management - -import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.execution.command.{Checker, DataCommand} -import org.apache.spark.sql.types.StringType - -import org.apache.carbondata.api.CarbonStore -import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException - -case class CarbonShowLoadsCommand( - databaseNameOp: Option[String], - tableName: String, - limit: Option[String], - showHistory: Boolean = false) - extends DataCommand { - - // add new columns of show segments at last - override def output: Seq[Attribute] = { - if (showHistory) { - Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(), - AttributeReference("Status", StringType, nullable = false)(), - AttributeReference("Load Start Time", StringType, nullable = false)(), - AttributeReference("Load End Time", StringType, nullable = true)(), - AttributeReference("Merged To", StringType, nullable = false)(), - AttributeReference("File Format", StringType, nullable = false)(), - AttributeReference("Visibility", StringType, nullable = false)(), - AttributeReference("Data Size", StringType, nullable = false)(), - AttributeReference("Index Size", StringType, nullable = false)(), - AttributeReference("Path", StringType, nullable = false)()) - } else { - Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(), - AttributeReference("Status", StringType, nullable = false)(), - AttributeReference("Load Start Time", StringType, nullable = false)(), - AttributeReference("Load End Time", StringType, nullable = true)(), - AttributeReference("Merged To", StringType, nullable = false)(), - AttributeReference("File Format", StringType, nullable = false)(), - AttributeReference("Data Size", StringType, nullable = false)(), - AttributeReference("Index Size", StringType, nullable = false)(), - AttributeReference("Path", StringType, nullable = false)()) - } - } - - override def processData(sparkSession: SparkSession): Seq[Row] = { - Checker.validateTableExists(databaseNameOp, tableName, sparkSession) - val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) - setAuditTable(carbonTable) - if (!carbonTable.getTableInfo.isTransactionalTable) { - throw new MalformedCarbonCommandException("Unsupported operation on non transactional table") - } - CarbonStore.showSegments( - limit, - carbonTable.getTablePath, - showHistory - ) - } - - override protected def opName: String = "SHOW SEGMENTS" -} diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala new file mode 100644 index 0000000..26772dd --- /dev/null +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsAsSelectCommand.scala @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.execution.command.{Checker, DataCommand} + +import org.apache.carbondata.api.CarbonStore +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails + +case class SegmentRow( + id: String, status: String, loadStartTime: String, timeTakenMs: Long, partitions: Seq[String], + dataSize: Long, indexSize: Long, mergedToId: String, format: String, path: String, + loadEndTime: String, segmentFileName: String) + +case class CarbonShowSegmentsAsSelectCommand( + databaseNameOp: Option[String], + tableName: String, + query: String, + showHistory: Boolean = false) + extends DataCommand { + + private lazy val sparkSession = SparkSession.getActiveSession.get + private lazy val carbonTable = { + Checker.validateTableExists(databaseNameOp, tableName, sparkSession) + CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) + } + private lazy val df = createDataFrame + + override def output: Seq[Attribute] = { + df.queryExecution.analyzed.output.map { attr => + AttributeReference(attr.name, attr.dataType, nullable = false)() + } + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { + try { + setAuditTable(carbonTable) + if (!carbonTable.getTableInfo.isTransactionalTable) { + throw new MalformedCarbonCommandException( + "Unsupported operation on non transactional table") + } + df.collect() + } catch { + case ex: Throwable => + throw new MalformedCarbonCommandException("failed to run query: " + ex.getMessage) + } finally { + sparkSession.catalog.dropTempView(makeTempViewName(carbonTable)) + } + } + + override protected def opName: String = "SHOW SEGMENTS" + + private def createDataFrame: DataFrame = { + val tablePath = carbonTable.getTablePath + val segments = CarbonStore.readSegments(tablePath, showHistory) + val tempViewName = makeTempViewName(carbonTable) + registerSegmentRowView(sparkSession, tempViewName, carbonTable, segments) + try { + sparkSession.sql(query) + } catch { + case t: Throwable => + sparkSession.catalog.dropTempView(tempViewName) + throw t + } + } + + /** + * Generate temp view name for the query to execute + */ + private def makeTempViewName(carbonTable: CarbonTable): String = { + s"${carbonTable.getTableName}_segments" + } + + private def registerSegmentRowView( + sparkSession: SparkSession, + tempViewName: String, + carbonTable: CarbonTable, + segments: Array[LoadMetadataDetails]): Unit = { + + // populate a dataframe containing all segment information + val tablePath = carbonTable.getTablePath + val segmentRows = segments.toSeq.map { segment => + val mergedToId = CarbonStore.getMergeTo(segment) + val path = CarbonStore.getExternalSegmentPath(segment) + val startTime = CarbonStore.getLoadStartTime(segment) + val endTime = CarbonStore.getLoadEndTime(segment) + val timeTaken = CarbonStore.getLoadTimeTakenAsMillis(segment) + val (dataSize, indexSize) = CarbonStore.getDataAndIndexSize(tablePath, segment) + val partitions = CarbonStore.getPartitions(tablePath, segment) + SegmentRow( + segment.getLoadName, + segment.getSegmentStatus.toString, + startTime, + timeTaken, + partitions, + dataSize, + indexSize, + mergedToId, + segment.getFileFormat.toString, + path, + endTime, + if (segment.getSegmentFile == null) "NA" else segment.getSegmentFile) + } + + // create a temp view using the populated dataframe and execute the query on it + val df = sparkSession.createDataFrame(segmentRows) + checkIfTableExist(sparkSession, tempViewName) + df.createOrReplaceTempView(tempViewName) + } + + private def checkIfTableExist(sparkSession: SparkSession, tempViewName: String): Unit = { + if (sparkSession.catalog.tableExists(tempViewName)) { + throw new MalformedCarbonCommandException(s"$tempViewName already exists, " + + s"can not show segment by query") + } + } + +} diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala new file mode 100644 index 0000000..c3157ca --- /dev/null +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowSegmentsCommand.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.execution.command.{Checker, DataCommand} +import org.apache.spark.sql.types.StringType + +import org.apache.carbondata.api.CarbonStore.{getDataAndIndexSize, getLoadStartTime, getLoadTimeTaken, getPartitions, readSegments} +import org.apache.carbondata.common.Strings +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails + +case class CarbonShowSegmentsCommand( + databaseNameOp: Option[String], + tableName: String, + showHistory: Boolean = false) + extends DataCommand { + + // add new columns of show segments at last + override def output: Seq[Attribute] = { + Seq( + AttributeReference("ID", StringType, nullable = false)(), + AttributeReference("Status", StringType, nullable = false)(), + AttributeReference("Load Start Time", StringType, nullable = false)(), + AttributeReference("Load Time Taken", StringType, nullable = true)(), + AttributeReference("Partition", StringType, nullable = true)(), + AttributeReference("Data Size", StringType, nullable = false)(), + AttributeReference("Index Size", StringType, nullable = false)()) + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { + Checker.validateTableExists(databaseNameOp, tableName, sparkSession) + val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) + setAuditTable(carbonTable) + if (!carbonTable.getTableInfo.isTransactionalTable) { + throw new MalformedCarbonCommandException("Unsupported operation on non transactional table") + } + val tablePath = carbonTable.getTablePath + val segments = readSegments(tablePath, showHistory) + if (segments.nonEmpty) { + showBasic(segments, tablePath) + } else { + Seq.empty + } + } + + override protected def opName: String = "SHOW SEGMENTS" + + private def showBasic( + allSegments: Array[LoadMetadataDetails], + tablePath: String): Seq[Row] = { + val segments = allSegments.sortWith { (l1, l2) => + java.lang.Double.parseDouble(l1.getLoadName) > + java.lang.Double.parseDouble(l2.getLoadName) + } + + segments + .map { segment => + val startTime = getLoadStartTime(segment) + val timeTaken = getLoadTimeTaken(segment) + val (dataSize, indexSize) = getDataAndIndexSize(tablePath, segment) + val partitions = getPartitions(tablePath, segment) + val partitionString = if (partitions.size == 1) { + partitions.head + } else if (partitions.size > 1) { + partitions.head + ", ..." + } else { + "NA" + } + Row( + segment.getLoadName, + segment.getSegmentStatus.getMessage, + startTime, + timeTaken, + partitionString, + Strings.formatSize(dataSize.toFloat), + Strings.formatSize(indexSize.toFloat)) + }.toSeq + } +} diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 84bbd59..c917d4f 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -81,12 +81,13 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { startCommand | extendedSparkSyntax protected lazy val startCommand: Parser[LogicalPlan] = - loadManagement | showLoads | alterTable | restructure | updateTable | deleteRecords | + segmentManagement | alterTable | restructure | updateTable | deleteRecords | alterTableFinishStreaming | stream | cli | cacheManagement | insertStageData | indexCommands | mvCommands - protected lazy val loadManagement: Parser[LogicalPlan] = - deleteLoadsByID | deleteLoadsByLoadDate | deleteStage | cleanFiles | addLoad + protected lazy val segmentManagement: Parser[LogicalPlan] = + deleteSegmentByID | deleteSegmentByLoadDate | deleteStage | cleanFiles | addSegment | + showSegments protected lazy val restructure: Parser[LogicalPlan] = alterTableDropColumn @@ -452,7 +453,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { databaseNameOp, tableName, optionsList, partitions, filePath, isOverwrite) } - protected lazy val deleteLoadsByID: Parser[LogicalPlan] = + protected lazy val deleteSegmentByID: Parser[LogicalPlan] = DELETE ~> FROM ~ TABLE ~> (ident <~ ".").? ~ ident ~ (WHERE ~> (SEGMENT ~ "." ~ ID) ~> IN ~> "(" ~> repsep(segmentId, ",")) <~ ")" ~ opt(";") ^^ { @@ -460,7 +461,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { CarbonDeleteLoadByIdCommand(loadids, dbName, tableName.toLowerCase()) } - protected lazy val deleteLoadsByLoadDate: Parser[LogicalPlan] = + protected lazy val deleteSegmentByLoadDate: Parser[LogicalPlan] = DELETE ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~ (WHERE ~> (SEGMENT ~ "." ~ STARTTIME ~> BEFORE) ~ stringLit) <~ opt(";") ^^ { @@ -493,7 +494,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { * schema list format: column_name:data_type * for example: 'partition'='a:int,b:string' */ - protected lazy val addLoad: Parser[LogicalPlan] = + protected lazy val addSegment: Parser[LogicalPlan] = ALTER ~ TABLE ~> (ident <~ ".").? ~ ident ~ (ADD ~> SEGMENT) ~ (OPTIONS ~> "(" ~> repsep(options, ",") <~ ")") <~ opt(";") ^^ { case dbName ~ tableName ~ segment ~ optionsList => @@ -529,16 +530,27 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { } } - protected lazy val showLoads: Parser[LogicalPlan] = - (SHOW ~> opt(HISTORY) <~ SEGMENTS <~ FOR <~ TABLE) ~ (ident <~ ".").? ~ ident ~ - (LIMIT ~> numericLit).? <~ - opt(";") ^^ { - case showHistory ~ databaseName ~ tableName ~ limit => - CarbonShowLoadsCommand( - CarbonParserUtil.convertDbNameToLowerCase(databaseName), - tableName.toLowerCase(), - limit, - showHistory.isDefined) + /** + * SHOW [HISTORY] SEGMENTS + * [FOR TABLE | ON] [db_name.]table_name + * [AS (select query)] + */ + protected lazy val showSegments: Parser[LogicalPlan] = + (SHOW ~> opt(HISTORY) <~ SEGMENTS <~ ((FOR <~ TABLE) | ON)) ~ (ident <~ ".").? ~ ident ~ + (AS ~> restInput).? <~ opt(";") ^^ { + case showHistory ~ databaseName ~ tableName ~ queryOp => + if (queryOp.isEmpty) { + CarbonShowSegmentsCommand( + CarbonParserUtil.convertDbNameToLowerCase(databaseName), + tableName.toLowerCase(), + showHistory.isDefined) + } else { + CarbonShowSegmentsAsSelectCommand( + CarbonParserUtil.convertDbNameToLowerCase(databaseName), + tableName.toLowerCase(), + queryOp.get, + showHistory.isDefined) + } } protected lazy val showCache: Parser[LogicalPlan] = diff --git a/integration/spark/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainIndexFunctionSuite.scala b/integration/spark/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainIndexFunctionSuite.scala index 3ddb959..88d9498 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainIndexFunctionSuite.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainIndexFunctionSuite.scala @@ -41,7 +41,6 @@ class BloomCoarseGrainIndexFunctionSuite extends QueryTest with BeforeAndAfterA val indexName = "bloom_dm" override protected def beforeAll(): Unit = { - sqlContext.sparkContext.setLogLevel("info") deleteFile(bigFile) new File(CarbonProperties.getInstance().getSystemFolderLocation).delete() createFile(bigFile, line = 2000) diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala index 3e9f5de..0fd32c5 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala @@ -233,8 +233,8 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll { val descFormattedSize = sql("desc formatted addsegment1").collect().filter(_.get(0).toString.startsWith("Table Data Size")).head.get(1).toString val size = getDataSize(newPath) assert(descFormattedSize.split("KB")(0).toDouble > 0.0d) - assert(showSeg.get(0).get(6).toString.equalsIgnoreCase(size)) - assert(showSeg.get(0).get(7).toString.equalsIgnoreCase("NA")) + assert(showSeg.get(0).get(5).toString.equalsIgnoreCase(size)) + assert(showSeg.get(0).get(6).toString.equalsIgnoreCase("NA")) FileFactory.deleteAllFilesOfDir(new File(newPath)) } @@ -446,8 +446,8 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll { checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30))) sql(s"alter table addsegment1 add segment options('path'='$newPath', 'format'='PARQUET')").show() - checkExistence(sql(s"show segments for table addsegment1"), true, "spark/target/warehouse/addsegtest") - checkExistence(sql(s"show history segments for table addsegment1"), true, "spark/target/warehouse/addsegtest") + checkExistence(sql(s"show segments for table addsegment1 as select * from addsegment1_segments"), true, "spark/target/warehouse/addsegtest") + checkExistence(sql(s"show history segments for table addsegment1 as select * from addsegment1_segments"), true, "spark/target/warehouse/addsegtest") FileFactory.deleteAllFilesOfDir(new File(newPath)) } @@ -555,8 +555,8 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll { checkAnswer(sql("select * from carbon_table"), sql("select * from parquet_table")) // test show segment - checkExistence(sql(s"show segments for table carbon_table"), true, "spark/target/warehouse/parquet_table") - checkExistence(sql(s"show history segments for table carbon_table"), true, "spark/target/warehouse/parquet_table") + checkExistence(sql(s"show segments for table carbon_table as select * from carbon_table_segments"), true, "spark/target/warehouse/parquet_table") + checkExistence(sql(s"show history segments for table carbon_table as select * from carbon_table_segments"), true, "spark/target/warehouse/parquet_table") sql("drop table if exists parquet_table") sql("drop table if exists carbon_table") diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segment/ShowSegmentTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segment/ShowSegmentTestCase.scala new file mode 100644 index 0000000..a6d481f --- /dev/null +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segment/ShowSegmentTestCase.scala @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.segment + +import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row} +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.statusmanager.SegmentStatusManager + +/** + * Test Class for SHOW SEGMENTS command + */ +class ShowSegmentTestCase extends QueryTest with BeforeAndAfterAll { + + test("test show segment by query, success case") { + sql("drop table if exists source") + sql( + """ + |create table source (age int) + |STORED AS carbondata + |partitioned by (name string, class string) + |TBLPROPERTIES('AUTO_LOAD_MERGE'='true','COMPACTION_LEVEL_THRESHOLD'='2,2') + |""".stripMargin) + sql("insert into source select 1, 'abc1', 'classA'") + sql("insert into source select 2, 'abc2', 'classB'") + sql("insert into source select 3, 'abc3', 'classA'") + sql("insert into source select 4, 'abc4', 'classB'") + sql("insert into source select 5, 'abc5', 'classA'") + sql("insert into source select 6, 'abc6', 'classC'") + sql("show segments on source").show(false) + + val df = sql(s"""show segments on source""").collect() + // validating headers + val header = df(0).schema + assert(header(0).name.equalsIgnoreCase("ID")) + assert(header(1).name.equalsIgnoreCase("Status")) + assert(header(2).name.equalsIgnoreCase("Load Start Time")) + assert(header(3).name.equalsIgnoreCase("Load Time Taken")) + assert(header(4).name.equalsIgnoreCase("Partition")) + assert(header(5).name.equalsIgnoreCase("Data Size")) + assert(header(6).name.equalsIgnoreCase("Index Size")) + val col = df + .map(row => Row(row.getString(0), row.getString(1))) + .filter(_.getString(1).equals("Success")) + .toSeq + assert(col.equals(Seq(Row("4.1", "Success"), Row("0.2", "Success")))) + + var rows = sql( + """ + | show segments on source as + | select id, status, datasize from source_segments where status = 'Success' order by dataSize + |""".stripMargin).collect() + + assertResult("4.1")(rows(0).get(0)) + assertResult("Success")(rows(0).get(1)) + assertResult("0.2")(rows(1).get(0)) + assertResult("Success")(rows(1).get(1)) + + val tables = sql("show tables").collect() + assert(!tables.toSeq.exists(_.get(1).equals("source_segments"))) + + sql(s"""drop table source""").collect + } + + test("Show Segments on empty table") { + sql(s"""drop TABLE if exists source""").collect + sql(s"""CREATE TABLE source (CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION string,DOB timestamp, DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),Double_COLUMN1 double,DECIMAL_COLUMN2 decimal(36,10), Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED AS carbondata TBLPROPERTIES('table_blocksize'='1')""").collect + checkAnswer(sql("show segments on source"), Seq.empty) + val result = sql("show segments on source as select * from source_segments").collect() + assertResult(0)(result.length) + } + + test("test show segments on already existing table") { + sql("drop TABLE if exists source").collect + sql( + """ + |create table source (age int, name string, class string) + |STORED AS carbondata + |""".stripMargin) + sql("insert into source select 1, 'abc1', 'classA'") + sql("drop table if exists source_segments") + sql("create table source_segments (age int)") + val ex = intercept[MalformedCarbonCommandException](sql("show segments on source as select * from source_segments")) + assert(ex.getMessage.contains("source_segments already exists")) + sql("drop TABLE if exists source") + sql("drop table if exists source_segments") + } + + test(" test show segments by wrong query") { + sql("drop TABLE if exists source").collect + sql( + """ + |create table source (age int, name string, class string) + |STORED AS carbondata + |""".stripMargin) + sql("insert into source select 1, 'abc1', 'classA'") + val ex = intercept[AnalysisException](sql("show segments on source as select dsjk from source_segments")) + val tables = sql("show tables").collect() + assert(!tables.toSeq.exists(_.get(1).equals("source_segments"))) + sql("drop TABLE if exists source") + } + + //Show Segments failing if table name not in same case + test("DataLoadManagement001_830") { + sql(s"""drop TABLE if exists Case_ShowSegment_196""").collect + sql(s"""CREATE TABLE Case_ShowSegment_196 (CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION string,DOB timestamp, DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),Double_COLUMN1 double,DECIMAL_COLUMN2 decimal(36,10), Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED AS carbondata TBLPROPERTIES('table_blocksize'='1')""").collect + val df = sql(s"""show segments on default.CASE_ShowSegment_196""").collect() + val col = df.map { + row => Row(row.getString(0), row.getString(1), row.getString(4)) + }.toSeq + assert(col.equals(Seq())) + sql(s"""drop table Case_ShowSegment_196""").collect + } + + test("separate visible and invisible segments info into two files") { + val tableName = "test_tablestatus_history" + sql(s"drop table if exists ${tableName}") + sql(s"create table ${tableName} (name String, age int) STORED AS carbondata " + + "TBLPROPERTIES('AUTO_LOAD_MERGE'='true','COMPACTION_LEVEL_THRESHOLD'='2,2')") + val carbonTable = CarbonEnv.getCarbonTable(Some("default"), tableName)(sqlContext.sparkSession) + insertTestDataIntoTable(tableName) + assert(sql(s"show segments on ${tableName} as select * from ${tableName}_segments").collect().length == 10) + var detail = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + var historyDetail = SegmentStatusManager.readLoadHistoryMetadata(carbonTable.getMetadataPath) + assert(detail.length == 10) + assert(historyDetail.length == 0) + sql(s"clean files for table ${tableName}") + assert(sql(s"show segments on ${tableName}").collect().length == 2) + detail = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) + historyDetail = SegmentStatusManager.readLoadHistoryMetadata(carbonTable.getMetadataPath) + assert(detail.length == 4) + assert(historyDetail.length == 6) + dropTable(tableName) + } + + test("show history segments") { + val tableName = "test_tablestatus_history" + sql(s"drop table if exists ${tableName}") + sql(s"create table ${tableName} (name String, age int) STORED AS carbondata " + + "TBLPROPERTIES('AUTO_LOAD_MERGE'='true','COMPACTION_LEVEL_THRESHOLD'='2,2')") + val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName) + insertTestDataIntoTable(tableName) + assert(sql(s"show segments on ${tableName} as select * from ${tableName}_segments").collect().length == 10) + assert(sql(s"show history segments on ${tableName} as select * from ${tableName}_segments").collect().length == 10) + sql(s"clean files for table ${tableName}") + assert(sql(s"show segments on ${tableName} as select * from ${tableName}_segments").collect().length == 2) + sql(s"show history segments on ${tableName} as select * from ${tableName}_segments").show(false) + val segmentsHistoryList = sql(s"show history segments on ${tableName} as select * from ${tableName}_segments").collect() + assert(segmentsHistoryList.length == 10) + assertResult("0")(segmentsHistoryList(0).getString(0)) + assertResult("Compacted")(segmentsHistoryList(0).getString(1)) + assertResult("0.1")(segmentsHistoryList(0).getString(7)) + assertResult("0.2")(segmentsHistoryList(1).getString(0)) + assertResult("Success")(segmentsHistoryList(1).getString(1)) + assertResult("5")(segmentsHistoryList(2).getString(0)) + assertResult("Compacted")(segmentsHistoryList(2).getString(1)) + assertResult("4.1")(segmentsHistoryList(3).getString(0)) + assertResult("Success")(segmentsHistoryList(3).getString(1)) + assertResult("1")(segmentsHistoryList(4).getString(0)) + assertResult("Compacted")(segmentsHistoryList(4).getString(1)) + assertResult("0.1")(segmentsHistoryList(4).getString(7)) + assertResult("3")(segmentsHistoryList(7).getString(0)) + assertResult("Compacted")(segmentsHistoryList(7).getString(1)) + assertResult("2.1")(segmentsHistoryList(8).getString(0)) + assertResult("Compacted")(segmentsHistoryList(8).getString(1)) + assertResult("4")(segmentsHistoryList(9).getString(0)) + assertResult("Compacted")(segmentsHistoryList(9).getString(1)) + assert(sql(s"show history segments on ${tableName} as select * from ${tableName}_segments limit 3").collect().length == 3) + dropTable(tableName) + } + + private def insertTestDataIntoTable(tableName: String) = { + sql(s"insert into ${ tableName } select 'abc1',1") + sql(s"insert into ${ tableName } select 'abc2',2") + sql(s"insert into ${ tableName } select 'abc3',3") + sql(s"insert into ${ tableName } select 'abc4',4") + sql(s"insert into ${ tableName } select 'abc5',5") + sql(s"insert into ${ tableName } select 'abc6',6") + } +} \ No newline at end of file diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala index 0d4e0b4..8707ec7 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala @@ -250,14 +250,14 @@ class TestSegmentReading extends QueryTest with BeforeAndAfterAll { sql( s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbon_table_show_seg OPTIONS |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin) - val df = sql("SHOW SEGMENTS for table carbon_table_show_seg") + val df = sql("SHOW SEGMENTS for table carbon_table_show_seg as select * from carbon_table_show_seg_segments") val col = df.collect().map{ - row => Row(row.getString(0),row.getString(1),row.getString(4)) + row => Row(row.getString(0),row.getString(1),row.getString(7)) }.toSeq - assert(col.equals(Seq(Row("2","Success","NA"), + assert(col.equals(Seq(Row("0","Compacted","0.1"), Row("1","Compacted","0.1"), Row("0.1","Success","NA"), - Row("0","Compacted","0.1")))) + Row("2","Success","NA")))) } finally { sql("SET carbon.input.segments.default.carbon_table=*") diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala index 4df678b..d05b9ac 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala @@ -498,8 +498,8 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte sql("insert into new_par select 1,'k'") val result = sql("show segments for table new_par").collectAsList() val dataAndIndexSize = getDataAndIndexSize(s"$storeLocation/new_par/b=k") - assert(result.get(0).get(6).equals(dataAndIndexSize._1)) - assert(result.get(0).get(7).equals(dataAndIndexSize._2)) + assert(result.get(0).get(5).equals(dataAndIndexSize._1)) + assert(result.get(0).get(6).equals(dataAndIndexSize._2)) } test("test partition with all sort scope") { diff --git a/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala b/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala index 3ace9a9..b38626f 100644 --- a/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala +++ b/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala @@ -721,17 +721,17 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll { sql("alter table streaming.stream_table_filter compact 'minor'") Thread.sleep(5000) - val result1 = sql("show segments for table streaming.stream_table_filter").collect() + val result1 = sql("show segments for table streaming.stream_table_filter as select * from stream_table_filter_segments").collect() result1.foreach { row => if (row.getString(0).equals("1")) { assertResult(SegmentStatus.STREAMING.getMessage)(row.getString(1)) - assertResult(FileFormat.ROW_V1.toString)(row.getString(5).toLowerCase) + assertResult(FileFormat.ROW_V1.toString)(row.getString(8).toLowerCase) } else if (row.getString(0).equals("0.1")) { assertResult(SegmentStatus.SUCCESS.getMessage)(row.getString(1)) - assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5).toLowerCase) + assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(8).toLowerCase) } else { assertResult(SegmentStatus.COMPACTED.getMessage)(row.getString(1)) - assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5).toLowerCase) + assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(8).toLowerCase) } } @@ -1242,7 +1242,7 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll { sql("alter table streaming.stream_table_reopen compact 'close_streaming'") val newSegments = - sql("show segments for table streaming.stream_table_reopen").collect() + sql("show segments for table streaming.stream_table_reopen as select * from stream_table_reopen_segments").collect() assert(newSegments.length == 8 || newSegments.length == 10 || newSegments.length == 12) assertResult(newSegments.length / 2)(newSegments.filter(_.getString(1).equals("Success")).length) assertResult(newSegments.length / 2)(newSegments.filter(_.getString(1).equals("Compacted")).length) @@ -1250,7 +1250,7 @@ class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll { //Verify MergeTO column entry for compacted Segments newSegments.filter(_.getString(1).equals("Compacted")).foreach{ rw => assertResult("Compacted")(rw.getString(1)) - assert(Integer.parseInt(rw.getString(0)) < Integer.parseInt(rw.getString(4))) + assert(Integer.parseInt(rw.getString(0)) < Integer.parseInt(rw.getString(7))) } checkAnswer( sql("select count(*) from streaming.stream_table_reopen"), diff --git a/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala b/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala index 2286a60..1f861b0 100644 --- a/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala +++ b/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala @@ -393,17 +393,17 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll { sql("alter table streaming1.stream_table_filter compact 'minor'") Thread.sleep(5000) - val result1 = sql("show segments for table streaming1.stream_table_filter").collect() + val result1 = sql("show segments for table streaming1.stream_table_filter as select * from stream_table_filter_segments").collect() result1.foreach { row => if (row.getString(0).equals("1")) { assertResult(SegmentStatus.STREAMING.getMessage)(row.getString(1)) - assertResult(FileFormat.ROW_V1.toString)(row.getString(5).toLowerCase) + assertResult(FileFormat.ROW_V1.toString)(row.getString(8).toLowerCase) } else if (row.getString(0).equals("0.1")) { assertResult(SegmentStatus.SUCCESS.getMessage)(row.getString(1)) - assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5).toLowerCase) + assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(8).toLowerCase) } else { assertResult(SegmentStatus.COMPACTED.getMessage)(row.getString(1)) - assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5).toLowerCase) + assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(8).toLowerCase) } } diff --git a/integration/spark/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala b/integration/spark/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala index 7634d92..e12c272 100644 --- a/integration/spark/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala +++ b/integration/spark/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala @@ -143,63 +143,4 @@ class CarbonCommandSuite extends QueryTest with BeforeAndAfterAll { dropTable(table) } - test("separate visible and invisible segments info into two files") { - val tableName = "test_tablestatus_history" - sql(s"drop table if exists ${tableName}") - sql(s"create table ${tableName} (name String, age int) STORED AS carbondata " - + "TBLPROPERTIES('AUTO_LOAD_MERGE'='true','COMPACTION_LEVEL_THRESHOLD'='2,2')") - val carbonTable = CarbonEnv.getCarbonTable(Some("default"), tableName)(sqlContext.sparkSession) - sql(s"insert into ${tableName} select 'abc1',1") - sql(s"insert into ${tableName} select 'abc2',2") - sql(s"insert into ${tableName} select 'abc3',3") - sql(s"insert into ${tableName} select 'abc4',4") - sql(s"insert into ${tableName} select 'abc5',5") - sql(s"insert into ${tableName} select 'abc6',6") - assert(sql(s"show segments for table ${tableName}").collect().length == 10) - var detail = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) - var historyDetail = SegmentStatusManager.readLoadHistoryMetadata(carbonTable.getMetadataPath) - assert(detail.length == 10) - assert(historyDetail.length == 0) - sql(s"clean files for table ${tableName}") - assert(sql(s"show segments for table ${tableName}").collect().length == 2) - detail = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) - historyDetail = SegmentStatusManager.readLoadHistoryMetadata(carbonTable.getMetadataPath) - assert(detail.length == 4) - assert(historyDetail.length == 6) - dropTable(tableName) - } - - test("show history segments") { - val tableName = "test_tablestatus_history" - sql(s"drop table if exists ${tableName}") - sql(s"create table ${tableName} (name String, age int) STORED AS carbondata " - + "TBLPROPERTIES('AUTO_LOAD_MERGE'='true','COMPACTION_LEVEL_THRESHOLD'='2,2')") - val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName) - sql(s"insert into ${tableName} select 'abc1',1") - sql(s"insert into ${tableName} select 'abc2',2") - sql(s"insert into ${tableName} select 'abc3',3") - sql(s"insert into ${tableName} select 'abc4',4") - sql(s"insert into ${tableName} select 'abc5',5") - sql(s"insert into ${tableName} select 'abc6',6") - assert(sql(s"show segments for table ${tableName}").collect().length == 10) - assert(sql(s"show history segments for table ${tableName}").collect().length == 10) - sql(s"clean files for table ${tableName}") - assert(sql(s"show segments for table ${tableName}").collect().length == 2) - val segmentsHisotryList = sql(s"show history segments for table ${tableName}").collect() - assert(segmentsHisotryList.length == 10) - assert(segmentsHisotryList(0).getString(0).equalsIgnoreCase("5")) - assert(segmentsHisotryList(0).getString(6).equalsIgnoreCase("false")) - assert(segmentsHisotryList(0).getString(1).equalsIgnoreCase("Compacted")) - assert(segmentsHisotryList(1).getString(0).equalsIgnoreCase("4.1")) - assert(segmentsHisotryList(1).getString(6).equalsIgnoreCase("true")) - assert(segmentsHisotryList(1).getString(1).equalsIgnoreCase("Success")) - assert(segmentsHisotryList(3).getString(0).equalsIgnoreCase("3")) - assert(segmentsHisotryList(3).getString(6).equalsIgnoreCase("false")) - assert(segmentsHisotryList(3).getString(1).equalsIgnoreCase("Compacted")) - assert(segmentsHisotryList(7).getString(0).equalsIgnoreCase("0.2")) - assert(segmentsHisotryList(7).getString(6).equalsIgnoreCase("true")) - assert(segmentsHisotryList(7).getString(1).equalsIgnoreCase("Success")) - assert(sql(s"show history segments for table ${tableName} limit 3").collect().length == 3) - dropTable(tableName) - } }