http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGIndexDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGIndexDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGIndexDataMapTestCase.scala deleted file mode 100644 index 5e944fb..0000000 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGIndexDataMapTestCase.scala +++ /dev/null @@ -1,379 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.spark.testsuite.datamap - -import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream} - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer - -import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream -import org.apache.spark.sql.test.util.QueryTest -import org.scalatest.BeforeAndAfterAll - -import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager} -import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel} -import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainIndexDataMap, AbstractCoarseGrainIndexDataMapFactory} -import org.apache.carbondata.core.datastore.FileReader -import org.apache.carbondata.core.datastore.block.SegmentProperties -import org.apache.carbondata.core.datastore.compression.SnappyCompressor -import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.datastore.page.ColumnPage -import org.apache.carbondata.core.indexstore.Blocklet -import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable -import org.apache.carbondata.core.metadata.schema.table.DataMapSchema -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} -import org.apache.carbondata.core.scan.expression.Expression -import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression -import org.apache.carbondata.core.scan.filter.intf.ExpressionType -import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf -import org.apache.carbondata.core.util.ByteUtil -import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.events.Event -import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest - -class CGIndexDataMapFactory extends AbstractCoarseGrainIndexDataMapFactory { - var identifier: AbsoluteTableIdentifier = _ - var dataMapSchema: DataMapSchema = _ - - /** - * Initialization of Datamap factory with the identifier and datamap name - */ - override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = { - this.identifier = identifier - this.dataMapSchema = dataMapSchema - } - - /** - * Return a new write for this datamap - */ - override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = { - new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapSchema) - } - - /** - * Get the datamap for segmentid - */ - override def getDataMaps(segmentId: String) = { - val file = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) - - val files = file.listFiles(new CarbonFileFilter { - override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") - }) - files.map {f => - val dataMap: AbstractCoarseGrainIndexDataMap = new CGIndexDataMap() - dataMap.init(new DataMapModel(f.getCanonicalPath)) - dataMap - }.toList.asJava - } - - - /** - * Get datamaps for distributable object. - */ - override def getDataMaps( - distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainIndexDataMap] = { - val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable] - val dataMap: AbstractCoarseGrainIndexDataMap = new CGIndexDataMap() - dataMap.init(new DataMapModel(mapDistributable.getFilePath)) - Seq(dataMap).asJava - } - - /** - * - * @param event - */ - override def fireEvent(event: Event): Unit = { - ??? - } - - /** - * Get all distributable objects of a segmentid - * - * @return - */ - override def toDistributable(segmentId: String) = { - val file = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) - - val files = file.listFiles(new CarbonFileFilter { - override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") - }) - files.map { f => - val d:DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath) - d - }.toList.asJava - } - - - /** - * Clears datamap of the segment - */ - override def clear(segmentId: String): Unit = { - - } - - /** - * Clear all datamaps from memory - */ - override def clear(): Unit = { - - } - - /** - * Return metadata of this datamap - */ - override def getMeta: DataMapMeta = { - new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava, - List(ExpressionType.EQUALS, ExpressionType.IN).asJava) - } -} - -class CGIndexDataMap extends AbstractCoarseGrainIndexDataMap { - - var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))] = _ - var FileReader: FileReader = _ - var filePath: String = _ - val compressor = new SnappyCompressor - - /** - * It is called to load the data map to memory or to initialize it. - */ - override def init(dataMapModel: DataMapModel): Unit = { - this.filePath = dataMapModel.getFilePath - val size = FileFactory.getCarbonFile(filePath).getSize - FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath)) - val footerLen = FileReader.readInt(filePath, size-4) - val bytes = FileReader.readByteArray(filePath, size-footerLen-4, footerLen) - val in = new ByteArrayInputStream(compressor.unCompressByte(bytes)) - val obj = new ObjectInputStream(in) - maxMin = obj.readObject().asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]] - } - - /** - * Prune the datamap with filter expression. It returns the list of - * blocklets where these filters can exist. - * - * @param filterExp - * @return - */ - override def prune( - filterExp: FilterResolverIntf, - segmentProperties: SegmentProperties, - partitions: java.util.List[String]): java.util.List[Blocklet] = { - val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]() - val expression = filterExp.getFilterExpression - getEqualToExpression(expression, buffer) - val value = buffer.map { f => - f.getChildren.get(1).evaluate(null).getString - } - val meta = findMeta(value(0).getBytes) - meta.map { f=> - new Blocklet(f._1, f._2+"") - }.asJava - } - - - private def findMeta(value: Array[Byte]) = { - val tuples = maxMin.filter { f => - ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) <= 0 && - ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) >= 0 - } - tuples - } - - private def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = { - if (expression.isInstanceOf[EqualToExpression]) { - buffer += expression - } else { - if (expression.getChildren != null) { - expression.getChildren.asScala.map { f => - if (f.isInstanceOf[EqualToExpression]) { - buffer += f - } - getEqualToExpression(f, buffer) - } - } - } - } - - /** - * Clear complete index table and release memory. - */ - override def clear() = { - ??? - } - - override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ??? -} - -class CGDataMapWriter(identifier: AbsoluteTableIdentifier, - segmentId: String, - dataWritePath: String, - dataMapSchema: DataMapSchema) - extends AbstractDataMapWriter(identifier, segmentId, dataWritePath) { - - var currentBlockId: String = null - val cgwritepath = dataWritePath + "/" + - dataMapSchema.getDataMapName + System.nanoTime() + ".datamap" - lazy val stream: DataOutputStream = FileFactory - .getDataOutputStream(cgwritepath, FileFactory.getFileType(cgwritepath)) - val blockletList = new ArrayBuffer[Array[Byte]]() - val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]))]() - val compressor = new SnappyCompressor - - /** - * Start of new block notification. - * - * @param blockId file name of the carbondata file - */ - override def onBlockStart(blockId: String): Unit = { - currentBlockId = blockId - } - - /** - * End of block notification - */ - override def onBlockEnd(blockId: String): Unit = { - - } - - /** - * Start of new blocklet notification. - * - * @param blockletId sequence number of blocklet in the block - */ - override def onBlockletStart(blockletId: Int): Unit = { - - } - - /** - * End of blocklet notification - * - * @param blockletId sequence number of blocklet in the block - */ - override def onBlockletEnd(blockletId: Int): Unit = { - val sorted = blockletList - .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0) - maxMin += - ((currentBlockId+"", blockletId, (sorted.last, sorted.head))) - blockletList.clear() - } - - /** - * Add the column pages row to the datamap, order of pages is same as `indexColumns` in - * DataMapMeta returned in IndexDataMapFactory. - * - * Implementation should copy the content of `pages` as needed, because `pages` memory - * may be freed after this method returns, if using unsafe column page. - */ - override def onPageAdded(blockletId: Int, - pageId: Int, - pages: Array[ColumnPage]): Unit = { - val size = pages(0).getPageSize - val list = new ArrayBuffer[Array[Byte]]() - var i = 0 - while (i < size) { - val bytes = pages(0).getBytes(i) - val newBytes = new Array[Byte](bytes.length - 2) - System.arraycopy(bytes, 2, newBytes, 0, newBytes.length) - list += newBytes - i = i + 1 - } - // Sort based on the column data in order to create index. - val sorted = list - .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l, r) <= 0) - blockletList += sorted.head - blockletList += sorted.last - } - - - /** - * This is called during closing of writer.So after this call no more data will be sent to this - * class. - */ - override def finish(): Unit = { - val out = new ByteOutputStream() - val outStream = new ObjectOutputStream(out) - outStream.writeObject(maxMin) - outStream.close() - val bytes = compressor.compressByte(out.getBytes) - stream.write(bytes) - stream.writeInt(bytes.length) - stream.close() - commitFile(cgwritepath) - } - - -} - -class CGIndexDataMapTestCase extends QueryTest with BeforeAndAfterAll { - - val file2 = resourcesPath + "/compaction/fil2.csv" - override protected def beforeAll(): Unit = { - //n should be about 5000000 of reset if size is default 1024 - val n = 150000 - CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n) - sql("DROP TABLE IF EXISTS normal_test") - sql( - """ - | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT) - | STORED BY 'org.apache.carbondata.format' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') - """.stripMargin) - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')") - } - - test("test cg datamap") { - sql("DROP TABLE IF EXISTS datamap_test_cg") - sql( - """ - | CREATE TABLE datamap_test_cg(id INT, name STRING, city STRING, age INT) - | STORED BY 'org.apache.carbondata.format' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') - """.stripMargin) - val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test_cg") - // register datamap writer - sql(s"create datamap cgdatamap on table datamap_test_cg using '${classOf[CGIndexDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')") - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test_cg OPTIONS('header'='false')") - checkAnswer(sql("select * from datamap_test_cg where name='n502670'"), - sql("select * from normal_test where name='n502670'")) - } - - test("test cg datamap with 2 datamaps ") { - sql("DROP TABLE IF EXISTS datamap_test") - sql( - """ - | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT) - | STORED BY 'org.apache.carbondata.format' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') - """.stripMargin) - val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test") - // register datamap writer - sql(s"create datamap ggdatamap1 on table datamap_test using '${classOf[CGIndexDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='name')") - sql(s"create datamap ggdatamap2 on table datamap_test using '${classOf[CGIndexDataMapFactory].getName}' DMPROPERTIES('indexcolumns'='city')") - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") - checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"), - sql("select * from normal_test where name='n502670' and city='c2670'")) - } - - override protected def afterAll(): Unit = { - CompactionSupportGlobalSortBigFileTest.deleteFile(file2) - sql("DROP TABLE IF EXISTS normal_test") - sql("DROP TABLE IF EXISTS datamap_test_cg") - } -}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala new file mode 100644 index 0000000..6c13ae9 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.datamap + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.sql.{DataFrame, SaveMode} +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.dev.DataMapWriter +import org.apache.carbondata.core.datamap.dev.cgdatamap.{CoarseGrainDataMap, CoarseGrainDataMapFactory} +import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta} +import org.apache.carbondata.core.datastore.page.ColumnPage +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema +import org.apache.carbondata.core.scan.filter.intf.ExpressionType +import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.events.Event + +class C2DataMapFactory() extends CoarseGrainDataMapFactory { + + var identifier: AbsoluteTableIdentifier = _ + + override def init(identifier: AbsoluteTableIdentifier, + dataMapSchema: DataMapSchema): Unit = { + this.identifier = identifier + } + + override def fireEvent(event: Event): Unit = ??? + + override def clear(segmentId: String): Unit = {} + + override def clear(): Unit = {} + + override def getDataMaps(distributable: DataMapDistributable): java.util.List[CoarseGrainDataMap] = ??? + + override def getDataMaps(segmentId: String): util.List[CoarseGrainDataMap] = ??? + + override def createWriter(segmentId: String, dataWritePath: String): DataMapWriter = + DataMapWriterSuite.dataMapWriterC2Mock(identifier, segmentId, dataWritePath) + + override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, List(ExpressionType.EQUALS).asJava) + + /** + * Get all distributable objects of a segmentid + * + * @return + */ + override def toDistributable(segmentId: String): util.List[DataMapDistributable] = { + ??? + } + +} + +class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll { + def buildTestData(numRows: Int): DataFrame = { + import sqlContext.implicits._ + sqlContext.sparkContext.parallelize(1 to numRows, 1) + .map(x => ("a" + x, "b", x)) + .toDF("c1", "c2", "c3") + } + + def dropTable(): Unit = { + sql("DROP TABLE IF EXISTS carbon1") + sql("DROP TABLE IF EXISTS carbon2") + } + + override def beforeAll { + dropTable() + } + + test("test write datamap 2 pages") { + sql(s"CREATE TABLE carbon1(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'") + // register datamap writer + sql(s"CREATE DATAMAP test ON TABLE carbon1 USING '${classOf[C2DataMapFactory].getName}'") + val df = buildTestData(33000) + + // save dataframe to carbon file + df.write + .format("carbondata") + .option("tableName", "carbon1") + .option("tempCSV", "false") + .option("sort_columns","c1") + .mode(SaveMode.Overwrite) + .save() + + assert(DataMapWriterSuite.callbackSeq.head.contains("block start")) + assert(DataMapWriterSuite.callbackSeq.last.contains("block end")) + assert( + DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq( + "blocklet start 0", + "add page data: blocklet 0, page 0", + "add page data: blocklet 0, page 1", + "blocklet end: 0" + )) + DataMapWriterSuite.callbackSeq = Seq() + } + + test("test write datamap 2 blocklet") { + sql(s"CREATE TABLE carbon2(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'") + sql(s"CREATE DATAMAP test ON TABLE carbon2 USING '${classOf[C2DataMapFactory].getName}'") + + CarbonProperties.getInstance() + .addProperty("carbon.blockletgroup.size.in.mb", "1") + CarbonProperties.getInstance() + .addProperty("carbon.number.of.cores.while.loading", + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL) + + val df = buildTestData(300000) + + // save dataframe to carbon file + df.write + .format("carbondata") + .option("tableName", "carbon2") + .option("tempCSV", "false") + .option("sort_columns","c1") + .option("SORT_SCOPE","GLOBAL_SORT") + .mode(SaveMode.Overwrite) + .save() + + assert(DataMapWriterSuite.callbackSeq.head.contains("block start")) + assert(DataMapWriterSuite.callbackSeq.last.contains("block end")) + // corrected test case the min "carbon.blockletgroup.size.in.mb" size could not be less than + // 64 MB + assert( + DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq( + "blocklet start 0", + "add page data: blocklet 0, page 0", + "add page data: blocklet 0, page 1", + "add page data: blocklet 0, page 2", + "add page data: blocklet 0, page 3", + "add page data: blocklet 0, page 4", + "add page data: blocklet 0, page 5", + "add page data: blocklet 0, page 6", + "add page data: blocklet 0, page 7", + "add page data: blocklet 0, page 8", + "add page data: blocklet 0, page 9", + "blocklet end: 0" + )) + DataMapWriterSuite.callbackSeq = Seq() + } + + override def afterAll { + dropTable() + } +} + +object DataMapWriterSuite { + + var callbackSeq: Seq[String] = Seq[String]() + + def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segmentId: String, + dataWritePath: String) = + new DataMapWriter(identifier, segmentId, dataWritePath) { + + override def onPageAdded( + blockletId: Int, + pageId: Int, + pages: Array[ColumnPage]): Unit = { + assert(pages.length == 1) + assert(pages(0).getDataType == DataTypes.STRING) + val bytes: Array[Byte] = pages(0).getByteArrayPage()(0) + assert(bytes.sameElements(Seq(0, 1, 'b'.toByte))) + callbackSeq :+= s"add page data: blocklet $blockletId, page $pageId" + } + + override def onBlockletEnd(blockletId: Int): Unit = { + callbackSeq :+= s"blocklet end: $blockletId" + } + + override def onBlockEnd(blockId: String): Unit = { + callbackSeq :+= s"block end $blockId" + } + + override def onBlockletStart(blockletId: Int): Unit = { + callbackSeq :+= s"blocklet start $blockletId" + } + + /** + * Start of new block notification. + * + * @param blockId file name of the carbondata file + */ + override def onBlockStart(blockId: String) = { + callbackSeq :+= s"block start $blockId" + } + + /** + * This is called during closing of writer.So after this call no more data will be sent to this + * class. + */ + override def finish() = { + + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala new file mode 100644 index 0000000..84384b7 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala @@ -0,0 +1,473 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.datamap + +import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.datamap.dev.fgdatamap.{FineGrainBlocklet, FineGrainDataMap, FineGrainDataMapFactory} +import org.apache.carbondata.core.datamap.dev.{DataMapModel, DataMapWriter} +import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta} +import org.apache.carbondata.core.datastore.FileReader +import org.apache.carbondata.core.datastore.block.SegmentProperties +import org.apache.carbondata.core.datastore.compression.SnappyCompressor +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.datastore.page.ColumnPage +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} +import org.apache.carbondata.core.scan.expression.Expression +import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression +import org.apache.carbondata.core.scan.filter.intf.ExpressionType +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf +import org.apache.carbondata.core.util.ByteUtil +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.Event +import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest + +class FGDataMapFactory extends FineGrainDataMapFactory { + var identifier: AbsoluteTableIdentifier = _ + var dataMapSchema: DataMapSchema = _ + + /** + * Initialization of Datamap factory with the identifier and datamap name + */ + override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = { + this.identifier = identifier + this.dataMapSchema = dataMapSchema + } + + /** + * Return a new write for this datamap + */ + override def createWriter(segmentId: String, dataWritePath: String): DataMapWriter = { + new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapSchema) + } + + /** + * Get the datamap for segmentid + */ + override def getDataMaps(segmentId: String): java.util.List[FineGrainDataMap] = { + val file = FileFactory + .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) + + val files = file.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") + }) + files.map { f => + val dataMap: FineGrainDataMap = new FGDataMap() + dataMap.init(new DataMapModel(f.getCanonicalPath)) + dataMap + }.toList.asJava + } + + /** + * Get datamap for distributable object. + */ + override def getDataMaps( + distributable: DataMapDistributable): java.util.List[FineGrainDataMap]= { + val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable] + val dataMap: FineGrainDataMap = new FGDataMap() + dataMap.init(new DataMapModel(mapDistributable.getFilePath)) + Seq(dataMap).asJava + } + + /** + * Get all distributable objects of a segmentid + * + * @return + */ + override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = { + val file = FileFactory + .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) + + val files = file.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") + }) + files.map { f => + val d: DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath) + d + }.toList.asJava + } + + + /** + * + * @param event + */ + override def fireEvent(event: Event):Unit = { + ??? + } + + /** + * Clears datamap of the segment + */ + override def clear(segmentId: String): Unit = { + } + + /** + * Clear all datamaps from memory + */ + override def clear(): Unit = { + } + + /** + * Return metadata of this datamap + */ + override def getMeta: DataMapMeta = { + new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava, + List(ExpressionType.EQUALS, ExpressionType.IN).asJava) + } +} + +class FGDataMap extends FineGrainDataMap { + + var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)] = _ + var FileReader: FileReader = _ + var filePath: String = _ + val compressor = new SnappyCompressor + + /** + * It is called to load the data map to memory or to initialize it. + */ + override def init(dataMapModel: DataMapModel): Unit = { + this.filePath = dataMapModel.getFilePath + val size = FileFactory.getCarbonFile(filePath).getSize + FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath)) + val footerLen = FileReader.readInt(filePath, size - 4) + val bytes = FileReader.readByteArray(filePath, size - footerLen - 4, footerLen) + val in = new ByteArrayInputStream(compressor.unCompressByte(bytes)) + val obj = new ObjectInputStream(in) + maxMin = obj.readObject() + .asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]] + } + + /** + * Prune the datamap with filter expression. It returns the list of + * blocklets where these filters can exist. + * + * @param filterExp + * @return + */ + override def prune( + filterExp: FilterResolverIntf, + segmentProperties: SegmentProperties, + partitions: java.util.List[String]): java.util.List[FineGrainBlocklet] = { + val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]() + val expression = filterExp.getFilterExpression + getEqualToExpression(expression, buffer) + val value = buffer.map { f => + f.getChildren.get(1).evaluate(null).getString + } + val meta = findMeta(value(0).getBytes) + meta.map { f => + readAndFindData(f, value(0).getBytes()) + }.filter(_.isDefined).map(_.get).asJava + } + + private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), Long, Int), + value: Array[Byte]): Option[FineGrainBlocklet] = { + val bytes = FileReader.readByteArray(filePath, meta._4, meta._5) + val outputStream = new ByteArrayInputStream(compressor.unCompressByte(bytes)) + val obj = new ObjectInputStream(outputStream) + val blockletsData = obj.readObject() + .asInstanceOf[ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]] + + import scala.collection.Searching._ + val searching = blockletsData + .search[(Array[Byte], Seq[Seq[Int]], Seq[Int])]((value, Seq(Seq(0)), Seq(0)))(new Ordering[ + (Array[Byte], Seq[Seq[Int]], Seq[Int])] { + override def compare(x: (Array[Byte], Seq[Seq[Int]], Seq[Int]), + y: (Array[Byte], Seq[Seq[Int]], Seq[Int])) = { + ByteUtil.UnsafeComparer.INSTANCE.compareTo(x._1, y._1) + } + }) + if (searching.insertionPoint >= 0) { + val f = blockletsData(searching.insertionPoint) + val pages = f._3.zipWithIndex.map { p => + val pg = new FineGrainBlocklet.Page + pg.setPageId(p._1) + pg.setRowId(f._2(p._2).toArray) + pg + } + pages + Some(new FineGrainBlocklet(meta._1, meta._2.toString, pages.toList.asJava)) + } else { + None + } + + } + + private def findMeta(value: Array[Byte]) = { + val tuples = maxMin.filter { f => + ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) >= 0 && + ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) <= 0 + } + tuples + } + + def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = { + if (expression.isInstanceOf[EqualToExpression]) { + buffer += expression + } else { + if (expression.getChildren != null) { + expression.getChildren.asScala.map { f => + if (f.isInstanceOf[EqualToExpression]) { + buffer += f + } + getEqualToExpression(f, buffer) + } + } + } + } + + /** + * Clear complete index table and release memory. + */ + override def clear():Unit = { + ??? + } + + override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ??? +} + +class FGDataMapWriter(identifier: AbsoluteTableIdentifier, + segmentId: String, dataWriterPath: String, dataMapSchema: DataMapSchema) + extends DataMapWriter(identifier, segmentId, dataWriterPath) { + + var currentBlockId: String = null + val fgwritepath = dataWriterPath + "/" + dataMapSchema.getDataMapName + System.nanoTime() + + ".datamap" + val stream: DataOutputStream = FileFactory + .getDataOutputStream(fgwritepath, FileFactory.getFileType(fgwritepath)) + val blockletList = new ArrayBuffer[(Array[Byte], Seq[Int], Seq[Int])]() + val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]() + var position: Long = 0 + val compressor = new SnappyCompressor + + /** + * Start of new block notification. + * + * @param blockId file name of the carbondata file + */ + override def onBlockStart(blockId: String): Unit = { + currentBlockId = blockId + } + + /** + * End of block notification + */ + override def onBlockEnd(blockId: String): Unit = { + + } + + /** + * Start of new blocklet notification. + * + * @param blockletId sequence number of blocklet in the block + */ + override def onBlockletStart(blockletId: Int): Unit = { + + } + + /** + * End of blocklet notification + * + * @param blockletId sequence number of blocklet in the block + */ + override def onBlockletEnd(blockletId: Int): Unit = { + val sorted = blockletList + .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0) + var oldValue: (Array[Byte], Seq[Seq[Int]], Seq[Int]) = null + var addedLast: Boolean = false + val blockletListUpdated = new ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]() + // Merge all same column values to single row. + sorted.foreach { f => + if (oldValue != null) { + if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) { + oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3 ++ f._3) + addedLast = false + } else { + blockletListUpdated += oldValue + oldValue = (f._1, Seq(f._2), f._3) + addedLast = true + } + } else { + oldValue = (f._1, Seq(f._2), f._3) + addedLast = false + } + } + if (!addedLast && oldValue != null) { + blockletListUpdated += oldValue + } + + val out = new ByteOutputStream() + val outStream = new ObjectOutputStream(out) + outStream.writeObject(blockletListUpdated) + outStream.close() + val bytes = compressor.compressByte(out.getBytes) + stream.write(bytes) + maxMin += + ((currentBlockId + "", blockletId, (blockletListUpdated.head._1, blockletListUpdated.last + ._1), position, bytes.length)) + position += bytes.length + blockletList.clear() + } + + /** + * Add the column pages row to the datamap, order of pages is same as `indexColumns` in + * DataMapMeta returned in DataMapFactory. + * + * Implementation should copy the content of `pages` as needed, because `pages` memory + * may be freed after this method returns, if using unsafe column page. + */ + override def onPageAdded(blockletId: Int, + pageId: Int, + pages: Array[ColumnPage]): Unit = { + val size = pages(0).getPageSize + val list = new ArrayBuffer[(Array[Byte], Int)]() + var i = 0 + while (i < size) { + val bytes = pages(0).getBytes(i) + val newBytes = new Array[Byte](bytes.length - 2) + System.arraycopy(bytes, 2, newBytes, 0, newBytes.length) + list += ((newBytes, i)) + i = i + 1 + } + // Sort based on the column data in order to create index. + val sorted = list + .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0) + var oldValue: (Array[Byte], Seq[Int], Seq[Int]) = null + var addedLast: Boolean = false + // Merge all same column values to single row. + sorted.foreach { f => + if (oldValue != null) { + if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) { + oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3) + addedLast = false + } else { + blockletList += oldValue + oldValue = (f._1, Seq(f._2), Seq(pageId)) + addedLast = true + } + } else { + oldValue = (f._1, Seq(f._2), Seq(pageId)) + addedLast = false + } + } + if (!addedLast && oldValue != null) { + blockletList += oldValue + } + } + + + /** + * This is called during closing of writer.So after this call no more data will be sent to this + * class. + */ + override def finish(): Unit = { + val out = new ByteOutputStream() + val outStream = new ObjectOutputStream(out) + outStream.writeObject(maxMin) + outStream.close() + val bytes = compressor.compressByte(out.getBytes) + stream.write(bytes) + stream.writeInt(bytes.length) + stream.close() + commitFile(fgwritepath) + } +} + +class FGDataMapTestCase extends QueryTest with BeforeAndAfterAll { + + val file2 = resourcesPath + "/compaction/fil2.csv" + + override protected def beforeAll(): Unit = { + //n should be about 5000000 of reset if size is default 1024 + val n = 150000 + CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n) + sql("DROP TABLE IF EXISTS normal_test") + sql( + """ + | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')") + } + + test("test fg datamap") { + sql("DROP TABLE IF EXISTS datamap_test") + sql( + """ + | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test") + // register datamap writer + sql( + s""" + | CREATE DATAMAP ggdatamap ON TABLE datamap_test + | USING '${classOf[FGDataMapFactory].getName}' + | DMPROPERTIES('indexcolumns'='name') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") + checkAnswer(sql("select * from datamap_test where name='n502670'"), + sql("select * from normal_test where name='n502670'")) + } + + test("test fg datamap with 2 datamaps ") { + sql("DROP TABLE IF EXISTS datamap_test") + sql( + """ + | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') + """.stripMargin) + val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test") + // register datamap writer + sql( + s""" + | CREATE DATAMAP ggdatamap1 ON TABLE datamap_test + | USING '${classOf[FGDataMapFactory].getName}' + | DMPROPERTIES('indexcolumns'='name') + """.stripMargin) + sql( + s""" + | CREATE DATAMAP ggdatamap2 ON TABLE datamap_test + | USING '${classOf[FGDataMapFactory].getName}' + | DMPROPERTIES('indexcolumns'='city') + """.stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") + checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"), + sql("select * from normal_test where name='n502670' and city='c2670'")) + } + + override protected def afterAll(): Unit = { + CompactionSupportGlobalSortBigFileTest.deleteFile(file2) + sql("DROP TABLE IF EXISTS normal_test") + sql("DROP TABLE IF EXISTS datamap_test") + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGIndexDataMapTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGIndexDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGIndexDataMapTestCase.scala deleted file mode 100644 index 8ddad75..0000000 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGIndexDataMapTestCase.scala +++ /dev/null @@ -1,474 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.spark.testsuite.datamap - -import java.io.{ByteArrayInputStream, DataOutputStream, ObjectInputStream, ObjectOutputStream} - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer - -import com.sun.xml.internal.messaging.saaj.util.ByteOutputStream -import org.apache.spark.sql.test.util.QueryTest -import org.scalatest.BeforeAndAfterAll - -import org.apache.carbondata.core.datamap.dev.fgdatamap.{AbstractFineGrainIndexDataMap, AbstractFineGrainIndexDataMapFactory} -import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel} -import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta} -import org.apache.carbondata.core.datastore.FileReader -import org.apache.carbondata.core.datastore.block.SegmentProperties -import org.apache.carbondata.core.datastore.compression.SnappyCompressor -import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.datastore.page.ColumnPage -import org.apache.carbondata.core.indexstore.FineGrainBlocklet -import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable -import org.apache.carbondata.core.metadata.schema.table.DataMapSchema -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata} -import org.apache.carbondata.core.scan.expression.Expression -import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression -import org.apache.carbondata.core.scan.filter.intf.ExpressionType -import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf -import org.apache.carbondata.core.util.ByteUtil -import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.events.Event -import org.apache.carbondata.spark.testsuite.datacompaction.CompactionSupportGlobalSortBigFileTest - -class FGIndexDataMapFactory extends AbstractFineGrainIndexDataMapFactory { - var identifier: AbsoluteTableIdentifier = _ - var dataMapSchema: DataMapSchema = _ - - /** - * Initialization of Datamap factory with the identifier and datamap name - */ - override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = { - this.identifier = identifier - this.dataMapSchema = dataMapSchema - } - - /** - * Return a new write for this datamap - */ - override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = { - new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapSchema) - } - - /** - * Get the datamap for segmentid - */ - override def getDataMaps(segmentId: String): java.util.List[AbstractFineGrainIndexDataMap] = { - val file = FileFactory - .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) - - val files = file.listFiles(new CarbonFileFilter { - override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") - }) - files.map { f => - val dataMap: AbstractFineGrainIndexDataMap = new FGIndexDataMap() - dataMap.init(new DataMapModel(f.getCanonicalPath)) - dataMap - }.toList.asJava - } - - /** - * Get datamap for distributable object. - */ - override def getDataMaps( - distributable: DataMapDistributable): java.util.List[AbstractFineGrainIndexDataMap]= { - val mapDistributable = distributable.asInstanceOf[BlockletDataMapDistributable] - val dataMap: AbstractFineGrainIndexDataMap = new FGIndexDataMap() - dataMap.init(new DataMapModel(mapDistributable.getFilePath)) - Seq(dataMap).asJava - } - - /** - * Get all distributable objects of a segmentid - * - * @return - */ - override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = { - val file = FileFactory - .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId)) - - val files = file.listFiles(new CarbonFileFilter { - override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap") - }) - files.map { f => - val d: DataMapDistributable = new BlockletDataMapDistributable(f.getCanonicalPath) - d - }.toList.asJava - } - - - /** - * - * @param event - */ - override def fireEvent(event: Event):Unit = { - ??? - } - - /** - * Clears datamap of the segment - */ - override def clear(segmentId: String): Unit = { - } - - /** - * Clear all datamaps from memory - */ - override def clear(): Unit = { - } - - /** - * Return metadata of this datamap - */ - override def getMeta: DataMapMeta = { - new DataMapMeta(dataMapSchema.getProperties.get("indexcolumns").split(",").toList.asJava, - List(ExpressionType.EQUALS, ExpressionType.IN).asJava) - } -} - -class FGIndexDataMap extends AbstractFineGrainIndexDataMap { - - var maxMin: ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)] = _ - var FileReader: FileReader = _ - var filePath: String = _ - val compressor = new SnappyCompressor - - /** - * It is called to load the data map to memory or to initialize it. - */ - override def init(dataMapModel: DataMapModel): Unit = { - this.filePath = dataMapModel.getFilePath - val size = FileFactory.getCarbonFile(filePath).getSize - FileReader = FileFactory.getFileHolder(FileFactory.getFileType(filePath)) - val footerLen = FileReader.readInt(filePath, size - 4) - val bytes = FileReader.readByteArray(filePath, size - footerLen - 4, footerLen) - val in = new ByteArrayInputStream(compressor.unCompressByte(bytes)) - val obj = new ObjectInputStream(in) - maxMin = obj.readObject() - .asInstanceOf[ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]] - } - - /** - * Prune the datamap with filter expression. It returns the list of - * blocklets where these filters can exist. - * - * @param filterExp - * @return - */ - override def prune( - filterExp: FilterResolverIntf, - segmentProperties: SegmentProperties, - partitions: java.util.List[String]): java.util.List[FineGrainBlocklet] = { - val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]() - val expression = filterExp.getFilterExpression - getEqualToExpression(expression, buffer) - val value = buffer.map { f => - f.getChildren.get(1).evaluate(null).getString - } - val meta = findMeta(value(0).getBytes) - meta.map { f => - readAndFindData(f, value(0).getBytes()) - }.filter(_.isDefined).map(_.get).asJava - } - - private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), Long, Int), - value: Array[Byte]): Option[FineGrainBlocklet] = { - val bytes = FileReader.readByteArray(filePath, meta._4, meta._5) - val outputStream = new ByteArrayInputStream(compressor.unCompressByte(bytes)) - val obj = new ObjectInputStream(outputStream) - val blockletsData = obj.readObject() - .asInstanceOf[ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]] - - import scala.collection.Searching._ - val searching = blockletsData - .search[(Array[Byte], Seq[Seq[Int]], Seq[Int])]((value, Seq(Seq(0)), Seq(0)))(new Ordering[ - (Array[Byte], Seq[Seq[Int]], Seq[Int])] { - override def compare(x: (Array[Byte], Seq[Seq[Int]], Seq[Int]), - y: (Array[Byte], Seq[Seq[Int]], Seq[Int])) = { - ByteUtil.UnsafeComparer.INSTANCE.compareTo(x._1, y._1) - } - }) - if (searching.insertionPoint >= 0) { - val f = blockletsData(searching.insertionPoint) - val pages = f._3.zipWithIndex.map { p => - val pg = new FineGrainBlocklet.Page - pg.setPageId(p._1) - pg.setRowId(f._2(p._2).toArray) - pg - } - pages - Some(new FineGrainBlocklet(meta._1, meta._2.toString, pages.toList.asJava)) - } else { - None - } - - } - - private def findMeta(value: Array[Byte]) = { - val tuples = maxMin.filter { f => - ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._1) >= 0 && - ByteUtil.UnsafeComparer.INSTANCE.compareTo(value, f._3._2) <= 0 - } - tuples - } - - def getEqualToExpression(expression: Expression, buffer: ArrayBuffer[Expression]): Unit = { - if (expression.isInstanceOf[EqualToExpression]) { - buffer += expression - } else { - if (expression.getChildren != null) { - expression.getChildren.asScala.map { f => - if (f.isInstanceOf[EqualToExpression]) { - buffer += f - } - getEqualToExpression(f, buffer) - } - } - } - } - - /** - * Clear complete index table and release memory. - */ - override def clear():Unit = { - ??? - } - - override def isScanRequired(filterExp: FilterResolverIntf): Boolean = ??? -} - -class FGDataMapWriter(identifier: AbsoluteTableIdentifier, - segmentId: String, dataWriterPath: String, dataMapSchema: DataMapSchema) - extends AbstractDataMapWriter(identifier, segmentId, dataWriterPath) { - - var currentBlockId: String = null - val fgwritepath = dataWriterPath + "/" + dataMapSchema.getDataMapName + System.nanoTime() + - ".datamap" - val stream: DataOutputStream = FileFactory - .getDataOutputStream(fgwritepath, FileFactory.getFileType(fgwritepath)) - val blockletList = new ArrayBuffer[(Array[Byte], Seq[Int], Seq[Int])]() - val maxMin = new ArrayBuffer[(String, Int, (Array[Byte], Array[Byte]), Long, Int)]() - var position: Long = 0 - val compressor = new SnappyCompressor - - /** - * Start of new block notification. - * - * @param blockId file name of the carbondata file - */ - override def onBlockStart(blockId: String): Unit = { - currentBlockId = blockId - } - - /** - * End of block notification - */ - override def onBlockEnd(blockId: String): Unit = { - - } - - /** - * Start of new blocklet notification. - * - * @param blockletId sequence number of blocklet in the block - */ - override def onBlockletStart(blockletId: Int): Unit = { - - } - - /** - * End of blocklet notification - * - * @param blockletId sequence number of blocklet in the block - */ - override def onBlockletEnd(blockletId: Int): Unit = { - val sorted = blockletList - .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0) - var oldValue: (Array[Byte], Seq[Seq[Int]], Seq[Int]) = null - var addedLast: Boolean = false - val blockletListUpdated = new ArrayBuffer[(Array[Byte], Seq[Seq[Int]], Seq[Int])]() - // Merge all same column values to single row. - sorted.foreach { f => - if (oldValue != null) { - if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) { - oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3 ++ f._3) - addedLast = false - } else { - blockletListUpdated += oldValue - oldValue = (f._1, Seq(f._2), f._3) - addedLast = true - } - } else { - oldValue = (f._1, Seq(f._2), f._3) - addedLast = false - } - } - if (!addedLast && oldValue != null) { - blockletListUpdated += oldValue - } - - val out = new ByteOutputStream() - val outStream = new ObjectOutputStream(out) - outStream.writeObject(blockletListUpdated) - outStream.close() - val bytes = compressor.compressByte(out.getBytes) - stream.write(bytes) - maxMin += - ((currentBlockId + "", blockletId, (blockletListUpdated.head._1, blockletListUpdated.last - ._1), position, bytes.length)) - position += bytes.length - blockletList.clear() - } - - /** - * Add the column pages row to the datamap, order of pages is same as `indexColumns` in - * DataMapMeta returned in IndexDataMapFactory. - * - * Implementation should copy the content of `pages` as needed, because `pages` memory - * may be freed after this method returns, if using unsafe column page. - */ - override def onPageAdded(blockletId: Int, - pageId: Int, - pages: Array[ColumnPage]): Unit = { - val size = pages(0).getPageSize - val list = new ArrayBuffer[(Array[Byte], Int)]() - var i = 0 - while (i < size) { - val bytes = pages(0).getBytes(i) - val newBytes = new Array[Byte](bytes.length - 2) - System.arraycopy(bytes, 2, newBytes, 0, newBytes.length) - list += ((newBytes, i)) - i = i + 1 - } - // Sort based on the column data in order to create index. - val sorted = list - .sortWith((l, r) => ByteUtil.UnsafeComparer.INSTANCE.compareTo(l._1, r._1) <= 0) - var oldValue: (Array[Byte], Seq[Int], Seq[Int]) = null - var addedLast: Boolean = false - // Merge all same column values to single row. - sorted.foreach { f => - if (oldValue != null) { - if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(f._1, oldValue._1) == 0) { - oldValue = (oldValue._1, oldValue._2 ++ Seq(f._2), oldValue._3) - addedLast = false - } else { - blockletList += oldValue - oldValue = (f._1, Seq(f._2), Seq(pageId)) - addedLast = true - } - } else { - oldValue = (f._1, Seq(f._2), Seq(pageId)) - addedLast = false - } - } - if (!addedLast && oldValue != null) { - blockletList += oldValue - } - } - - - /** - * This is called during closing of writer.So after this call no more data will be sent to this - * class. - */ - override def finish(): Unit = { - val out = new ByteOutputStream() - val outStream = new ObjectOutputStream(out) - outStream.writeObject(maxMin) - outStream.close() - val bytes = compressor.compressByte(out.getBytes) - stream.write(bytes) - stream.writeInt(bytes.length) - stream.close() - commitFile(fgwritepath) - } -} - -class FGIndexDataMapTestCase extends QueryTest with BeforeAndAfterAll { - - val file2 = resourcesPath + "/compaction/fil2.csv" - - override protected def beforeAll(): Unit = { - //n should be about 5000000 of reset if size is default 1024 - val n = 150000 - CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n) - sql("DROP TABLE IF EXISTS normal_test") - sql( - """ - | CREATE TABLE normal_test(id INT, name STRING, city STRING, age INT) - | STORED BY 'org.apache.carbondata.format' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') - """.stripMargin) - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE normal_test OPTIONS('header'='false')") - } - - test("test fg datamap") { - sql("DROP TABLE IF EXISTS datamap_test") - sql( - """ - | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT) - | STORED BY 'org.apache.carbondata.format' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') - """.stripMargin) - val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test") - // register datamap writer - sql( - s""" - | CREATE DATAMAP ggdatamap ON TABLE datamap_test - | USING '${classOf[FGIndexDataMapFactory].getName}' - | DMPROPERTIES('indexcolumns'='name') - """.stripMargin) - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") - checkAnswer(sql("select * from datamap_test where name='n502670'"), - sql("select * from normal_test where name='n502670'")) - } - - test("test fg datamap with 2 datamaps ") { - sql("DROP TABLE IF EXISTS datamap_test") - sql( - """ - | CREATE TABLE datamap_test(id INT, name STRING, city STRING, age INT) - | STORED BY 'org.apache.carbondata.format' - | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='LOCAL_SORT') - """.stripMargin) - val table = CarbonMetadata.getInstance().getCarbonTable("default_datamap_test") - // register datamap writer - sql( - s""" - | CREATE DATAMAP ggdatamap1 ON TABLE datamap_test - | USING '${classOf[FGIndexDataMapFactory].getName}' - | DMPROPERTIES('indexcolumns'='name') - """.stripMargin) - sql( - s""" - | CREATE DATAMAP ggdatamap2 ON TABLE datamap_test - | USING '${classOf[FGIndexDataMapFactory].getName}' - | DMPROPERTIES('indexcolumns'='city') - """.stripMargin) - sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test OPTIONS('header'='false')") - checkAnswer(sql("select * from datamap_test where name='n502670' and city='c2670'"), - sql("select * from normal_test where name='n502670' and city='c2670'")) - } - - override protected def afterAll(): Unit = { - CompactionSupportGlobalSortBigFileTest.deleteFile(file2) - sql("DROP TABLE IF EXISTS normal_test") - sql("DROP TABLE IF EXISTS datamap_test") - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala deleted file mode 100644 index 5fd8ae9..0000000 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.testsuite.datamap - -import java.util - -import scala.collection.JavaConverters._ - -import org.apache.spark.sql.test.util.QueryTest -import org.apache.spark.sql.{DataFrame, SaveMode} -import org.scalatest.BeforeAndAfterAll - -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter -import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainIndexDataMap, AbstractCoarseGrainIndexDataMapFactory} -import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta} -import org.apache.carbondata.core.datastore.page.ColumnPage -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.core.metadata.schema.table.DataMapSchema -import org.apache.carbondata.core.scan.filter.intf.ExpressionType -import org.apache.carbondata.core.metadata.datatype.DataTypes -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.events.Event - -class C2IndexDataMapFactory() extends AbstractCoarseGrainIndexDataMapFactory { - - var identifier: AbsoluteTableIdentifier = _ - - override def init(identifier: AbsoluteTableIdentifier, - dataMapSchema: DataMapSchema): Unit = { - this.identifier = identifier - } - - override def fireEvent(event: Event): Unit = ??? - - override def clear(segmentId: String): Unit = {} - - override def clear(): Unit = {} - - override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainIndexDataMap] = ??? - - override def getDataMaps(segmentId: String): util.List[AbstractCoarseGrainIndexDataMap] = ??? - - override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = - IndexDataMapWriterSuite.dataMapWriterC2Mock(identifier, segmentId, dataWritePath) - - override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, List(ExpressionType.EQUALS).asJava) - - /** - * Get all distributable objects of a segmentid - * - * @return - */ - override def toDistributable(segmentId: String): util.List[DataMapDistributable] = { - ??? - } - -} - -class IndexDataMapWriterSuite extends QueryTest with BeforeAndAfterAll { - def buildTestData(numRows: Int): DataFrame = { - import sqlContext.implicits._ - sqlContext.sparkContext.parallelize(1 to numRows, 1) - .map(x => ("a" + x, "b", x)) - .toDF("c1", "c2", "c3") - } - - def dropTable(): Unit = { - sql("DROP TABLE IF EXISTS carbon1") - sql("DROP TABLE IF EXISTS carbon2") - } - - override def beforeAll { - dropTable() - } - - test("test write datamap 2 pages") { - sql(s"CREATE TABLE carbon1(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'") - // register datamap writer - sql(s"CREATE DATAMAP test ON TABLE carbon1 USING '${classOf[C2IndexDataMapFactory].getName}'") - val df = buildTestData(33000) - - // save dataframe to carbon file - df.write - .format("carbondata") - .option("tableName", "carbon1") - .option("tempCSV", "false") - .option("sort_columns","c1") - .mode(SaveMode.Overwrite) - .save() - - assert(IndexDataMapWriterSuite.callbackSeq.head.contains("block start")) - assert(IndexDataMapWriterSuite.callbackSeq.last.contains("block end")) - assert( - IndexDataMapWriterSuite.callbackSeq.slice(1, IndexDataMapWriterSuite.callbackSeq.length - 1) == Seq( - "blocklet start 0", - "add page data: blocklet 0, page 0", - "add page data: blocklet 0, page 1", - "blocklet end: 0" - )) - IndexDataMapWriterSuite.callbackSeq = Seq() - } - - test("test write datamap 2 blocklet") { - sql(s"CREATE TABLE carbon2(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'") - sql(s"CREATE DATAMAP test ON TABLE carbon2 USING '${classOf[C2IndexDataMapFactory].getName}'") - - CarbonProperties.getInstance() - .addProperty("carbon.blockletgroup.size.in.mb", "1") - CarbonProperties.getInstance() - .addProperty("carbon.number.of.cores.while.loading", - CarbonCommonConstants.NUM_CORES_DEFAULT_VAL) - - val df = buildTestData(300000) - - // save dataframe to carbon file - df.write - .format("carbondata") - .option("tableName", "carbon2") - .option("tempCSV", "false") - .option("sort_columns","c1") - .option("SORT_SCOPE","GLOBAL_SORT") - .mode(SaveMode.Overwrite) - .save() - - assert(IndexDataMapWriterSuite.callbackSeq.head.contains("block start")) - assert(IndexDataMapWriterSuite.callbackSeq.last.contains("block end")) - // corrected test case the min "carbon.blockletgroup.size.in.mb" size could not be less than - // 64 MB - assert( - IndexDataMapWriterSuite.callbackSeq.slice(1, IndexDataMapWriterSuite.callbackSeq.length - 1) == Seq( - "blocklet start 0", - "add page data: blocklet 0, page 0", - "add page data: blocklet 0, page 1", - "add page data: blocklet 0, page 2", - "add page data: blocklet 0, page 3", - "add page data: blocklet 0, page 4", - "add page data: blocklet 0, page 5", - "add page data: blocklet 0, page 6", - "add page data: blocklet 0, page 7", - "add page data: blocklet 0, page 8", - "add page data: blocklet 0, page 9", - "blocklet end: 0" - )) - IndexDataMapWriterSuite.callbackSeq = Seq() - } - - override def afterAll { - dropTable() - } -} - -object IndexDataMapWriterSuite { - - var callbackSeq: Seq[String] = Seq[String]() - - def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segmentId: String, - dataWritePath: String) = - new AbstractDataMapWriter(identifier, segmentId, dataWritePath) { - - override def onPageAdded( - blockletId: Int, - pageId: Int, - pages: Array[ColumnPage]): Unit = { - assert(pages.length == 1) - assert(pages(0).getDataType == DataTypes.STRING) - val bytes: Array[Byte] = pages(0).getByteArrayPage()(0) - assert(bytes.sameElements(Seq(0, 1, 'b'.toByte))) - callbackSeq :+= s"add page data: blocklet $blockletId, page $pageId" - } - - override def onBlockletEnd(blockletId: Int): Unit = { - callbackSeq :+= s"blocklet end: $blockletId" - } - - override def onBlockEnd(blockId: String): Unit = { - callbackSeq :+= s"block end $blockId" - } - - override def onBlockletStart(blockletId: Int): Unit = { - callbackSeq :+= s"blocklet start $blockletId" - } - - /** - * Start of new block notification. - * - * @param blockId file name of the carbondata file - */ - override def onBlockStart(blockId: String) = { - callbackSeq :+= s"block start $blockId" - } - - /** - * This is called during closing of writer.So after this call no more data will be sent to this - * class. - */ - override def finish() = { - - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/ef3031d0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala new file mode 100644 index 0000000..b020aa9 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.datamap + +import java.io.{File, FilenameFilter} + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.common.exceptions.MetadataProcessException +import org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException, NoSuchDataMapException} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath + +class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { + + val testData = s"$resourcesPath/sample.csv" + + override def beforeAll { + sql("drop table if exists datamaptest") + sql("drop table if exists datamapshowtest") + sql("drop table if exists uniqdata") + sql("create table datamaptest (a string, b string, c string) stored by 'carbondata'") + } + + val newClass = "org.apache.spark.sql.CarbonSource" + + test("test datamap create: don't support using non-exist class") { + intercept[MetadataProcessException] { + sql(s"CREATE DATAMAP datamap1 ON TABLE datamaptest USING '$newClass'") + } + } + + test("test datamap create with dmproperties: don't support using non-exist class") { + intercept[MetadataProcessException] { + sql(s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')") + } + } + + test("test datamap create with existing name: don't support using non-exist class") { + intercept[MetadataProcessException] { + sql( + s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')") + } + } + + test("test datamap create with preagg") { + sql("drop datamap if exists datamap3 on table datamaptest") + sql( + "create datamap datamap3 on table datamaptest using 'preaggregate' as select count(a) from datamaptest") + val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") + assert(table != null) + val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList + assert(dataMapSchemaList.size() == 1) + assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap3")) + assert(dataMapSchemaList.get(0).getChildSchema.getTableName.equals("datamaptest_datamap3")) + } + + test("check hivemetastore after drop datamap") { + try { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + "true") + sql("drop table if exists hiveMetaStoreTable") + sql("create table hiveMetaStoreTable (a string, b string, c string) stored by 'carbondata'") + + sql( + "create datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable using 'preaggregate' as select count(a) from hiveMetaStoreTable") + checkExistence(sql("show datamap on table hiveMetaStoreTable"), true, "datamap_hiveMetaStoreTable") + + sql("drop datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable") + checkExistence(sql("show datamap on table hiveMetaStoreTable"), false, "datamap_hiveMetaStoreTable") + + } finally { + sql("drop table hiveMetaStoreTable") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) + } + } + + test("drop the table having pre-aggregate") { + try { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + "true") + sql("drop table if exists hiveMetaStoreTable_1") + sql("create table hiveMetaStoreTable_1 (a string, b string, c string) stored by 'carbondata'") + + sql( + "create datamap datamap_hiveMetaStoreTable_1 on table hiveMetaStoreTable_1 using 'preaggregate' as select count(a) from hiveMetaStoreTable_1") + + checkExistence(sql("show datamap on table hiveMetaStoreTable_1"), + true, + "datamap_hiveMetaStoreTable_1") + + sql("drop table hiveMetaStoreTable_1") + + checkExistence(sql("show tables"), false, "datamap_hiveMetaStoreTable_1") + } finally { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) + } + } + + test("test datamap create with preagg with duplicate name") { + sql( + s""" + | CREATE DATAMAP datamap10 ON TABLE datamaptest + | USING 'preaggregate' + | AS SELECT COUNT(a) FROM datamaptest + """.stripMargin) + intercept[MalformedDataMapCommandException] { + sql( + s""" + | CREATE DATAMAP datamap10 ON TABLE datamaptest + | USING 'preaggregate' + | AS SELECT COUNT(a) FROM datamaptest + """.stripMargin) + } + val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") + assert(table != null) + val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList + assert(dataMapSchemaList.size() == 2) + } + + test("test drop non-exist datamap") { + intercept[NoSuchDataMapException] { + sql("drop datamap nonexist on table datamaptest") + } + val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") + assert(table != null) + val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList + assert(dataMapSchemaList.size() == 2) + } + + test("test show datamap without preaggregate: don't support using non-exist class") { + intercept[MetadataProcessException] { + sql("drop table if exists datamapshowtest") + sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") + sql(s"CREATE DATAMAP datamap1 ON TABLE datamapshowtest USING '$newClass' ") + sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' ") + checkExistence(sql("SHOW DATAMAP ON TABLE datamapshowtest"), true, "datamap1", "datamap2", "(NA)", newClass) + } + } + + test("test show datamap with preaggregate: don't support using non-exist class") { + intercept[MetadataProcessException] { + sql("drop table if exists datamapshowtest") + sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") + sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest") + sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' ") + val frame = sql("show datamap on table datamapshowtest") + assert(frame.collect().length == 2) + checkExistence(frame, true, "datamap1", "datamap2", "(NA)", newClass, "default.datamapshowtest_datamap1") + } + } + + test("test show datamap with no datamap") { + sql("drop table if exists datamapshowtest") + sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") + assert(sql("show datamap on table datamapshowtest").collect().length == 0) + } + + test("test show datamap after dropping datamap: don't support using non-exist class") { + intercept[MetadataProcessException] { + sql("drop table if exists datamapshowtest") + sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") + sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest") + sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' ") + sql("drop datamap datamap1 on table datamapshowtest") + val frame = sql("show datamap on table datamapshowtest") + assert(frame.collect().length == 1) + checkExistence(frame, true, "datamap2", "(NA)", newClass) + } + } + + test("test if preaggregate load is successfull for hivemetastore") { + try { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true") + sql("DROP TABLE IF EXISTS maintable") + sql( + """ + | CREATE TABLE maintable(id int, name string, city string, age int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql( + s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id""" + + .stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + checkAnswer(sql(s"select * from maintable_preagg_sum"), + Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55))) + } finally { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) + } + } + + test("test preaggregate load for decimal column for hivemetastore") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true") + sql("CREATE TABLE uniqdata(CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION string,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format'") + sql("insert into uniqdata select 9000,'CUST_NAME_00000','ACTIVE_EMUI_VERSION_00000','1970-01-01 01:00:03','1970-01-01 02:00:03',123372036854,-223372036854,12345678901.1234000000,22345678901.1234000000,11234567489.7976000000,-11234567489.7976000000,1") + sql("create datamap uniqdata_agg on table uniqdata using 'preaggregate' as select min(DECIMAL_COLUMN1) from uniqdata group by DECIMAL_COLUMN1") + checkAnswer(sql("select * from uniqdata_uniqdata_agg"), Seq(Row(12345678901.1234000000, 12345678901.1234000000))) + sql("drop datamap if exists uniqdata_agg on table uniqdata") + } + + test("create pre-agg table with path") { + sql("drop table if exists main_preagg") + sql("drop table if exists main ") + val warehouse = s"$metastoredb/warehouse" + val path = warehouse + "/" + System.nanoTime + "_preAggTestPath" + sql( + s""" + | create table main( + | year int, + | month int, + | name string, + | salary int) + | stored by 'carbondata' + | tblproperties('sort_columns'='month,year,name') + """.stripMargin) + sql("insert into main select 10,11,'amy',12") + sql("insert into main select 10,11,'amy',14") + sql( + s""" + | create datamap preagg + | on table main + | using 'preaggregate' + | dmproperties ('path'='$path') + | as select name,avg(salary) + | from main + | group by name + """.stripMargin) + assertResult(true)(new File(path).exists()) + assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, "0")}") + .list(new FilenameFilter { + override def accept(dir: File, name: String): Boolean = { + name.contains(CarbonCommonConstants.FACT_FILE_EXT) + } + }).length > 0) + checkAnswer(sql("select name,avg(salary) from main group by name"), Row("amy", 13.0)) + checkAnswer(sql("select * from main_preagg"), Row("amy", 26, 2)) + sql("drop datamap preagg on table main") + assertResult(false)(new File(path).exists()) + sql("drop table main") + } + + override def afterAll { + sql("DROP TABLE IF EXISTS maintable") + sql("drop table if exists uniqdata") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) + sql("drop table if exists datamaptest") + sql("drop table if exists datamapshowtest") + } +}