http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala new file mode 100644 index 0000000..795ef6a --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/IndexDataMapWriterSuite.scala @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.datamap + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.sql.{DataFrame, SaveMode} +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment} +import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter} +import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter +import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainIndexDataMap, AbstractCoarseGrainIndexDataMapFactory} +import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta} +import org.apache.carbondata.core.datastore.page.ColumnPage +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema +import org.apache.carbondata.core.scan.filter.intf.ExpressionType +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.events.Event + +class C2IndexDataMapFactory() extends AbstractCoarseGrainIndexDataMapFactory { + + var identifier: AbsoluteTableIdentifier = _ + + override def init(identifier: AbsoluteTableIdentifier, + dataMapSchema: DataMapSchema): Unit = { + this.identifier = identifier + } + + override def fireEvent(event: Event): Unit = ??? + + override def clear(segment: Segment): Unit = {} + + override def clear(): Unit = {} + + override def getDataMaps(distributable: DataMapDistributable): util.List[AbstractCoarseGrainIndexDataMap] = ??? + + override def getDataMaps(segment: Segment): util.List[AbstractCoarseGrainIndexDataMap] = ??? + + override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = + IndexDataMapWriterSuite.dataMapWriterC2Mock(identifier, segment, dataWritePath) + + override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, List(ExpressionType.EQUALS).asJava) + + /** + * Get all distributable objects of a segmentid + * + * @return + */ + override def toDistributable(segmentId: Segment): util.List[DataMapDistributable] = { + ??? + } + +} + +class IndexDataMapWriterSuite extends QueryTest with BeforeAndAfterAll { + def buildTestData(numRows: Int): DataFrame = { + import sqlContext.implicits._ + sqlContext.sparkContext.parallelize(1 to numRows, 1) + .map(x => ("a" + x, "b", x)) + .toDF("c1", "c2", "c3") + } + + def dropTable(): Unit = { + sql("DROP TABLE IF EXISTS carbon1") + sql("DROP TABLE IF EXISTS carbon2") + } + + override def beforeAll { + dropTable() + } + + test("test write datamap 2 pages") { + sql(s"CREATE TABLE carbon1(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'") + // register datamap writer + sql(s"CREATE DATAMAP test ON TABLE carbon1 USING '${classOf[C2IndexDataMapFactory].getName}'") + val df = buildTestData(33000) + + // save dataframe to carbon file + df.write + .format("carbondata") + .option("tableName", "carbon1") + .option("tempCSV", "false") + .option("sort_columns","c1") + .mode(SaveMode.Overwrite) + .save() + + assert(IndexDataMapWriterSuite.callbackSeq.head.contains("block start")) + assert(IndexDataMapWriterSuite.callbackSeq.last.contains("block end")) + assert( + IndexDataMapWriterSuite.callbackSeq.slice(1, IndexDataMapWriterSuite.callbackSeq.length - 1) == Seq( + "blocklet start 0", + "add page data: blocklet 0, page 0", + "add page data: blocklet 0, page 1", + "blocklet end: 0" + )) + IndexDataMapWriterSuite.callbackSeq = Seq() + } + + test("test write datamap 2 blocklet") { + sql(s"CREATE TABLE carbon2(c1 STRING, c2 STRING, c3 INT) STORED BY 'org.apache.carbondata.format'") + sql(s"CREATE DATAMAP test ON TABLE carbon2 USING '${classOf[C2IndexDataMapFactory].getName}'") + + CarbonProperties.getInstance() + .addProperty("carbon.blockletgroup.size.in.mb", "1") + CarbonProperties.getInstance() + .addProperty("carbon.number.of.cores.while.loading", + CarbonCommonConstants.NUM_CORES_DEFAULT_VAL) + + val df = buildTestData(300000) + + // save dataframe to carbon file + df.write + .format("carbondata") + .option("tableName", "carbon2") + .option("tempCSV", "false") + .option("sort_columns","c1") + .option("SORT_SCOPE","GLOBAL_SORT") + .mode(SaveMode.Overwrite) + .save() + + assert(IndexDataMapWriterSuite.callbackSeq.head.contains("block start")) + assert(IndexDataMapWriterSuite.callbackSeq.last.contains("block end")) + // corrected test case the min "carbon.blockletgroup.size.in.mb" size could not be less than + // 64 MB + assert( + IndexDataMapWriterSuite.callbackSeq.slice(1, IndexDataMapWriterSuite.callbackSeq.length - 1) == Seq( + "blocklet start 0", + "add page data: blocklet 0, page 0", + "add page data: blocklet 0, page 1", + "add page data: blocklet 0, page 2", + "add page data: blocklet 0, page 3", + "add page data: blocklet 0, page 4", + "add page data: blocklet 0, page 5", + "add page data: blocklet 0, page 6", + "add page data: blocklet 0, page 7", + "add page data: blocklet 0, page 8", + "add page data: blocklet 0, page 9", + "blocklet end: 0" + )) + IndexDataMapWriterSuite.callbackSeq = Seq() + } + + override def afterAll { + dropTable() + } +} + +object IndexDataMapWriterSuite { + + var callbackSeq: Seq[String] = Seq[String]() + + def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segment: Segment, + dataWritePath: String) = + new AbstractDataMapWriter(identifier, segment, dataWritePath) { + + override def onPageAdded( + blockletId: Int, + pageId: Int, + pages: Array[ColumnPage]): Unit = { + assert(pages.length == 1) + assert(pages(0).getDataType == DataTypes.STRING) + val bytes: Array[Byte] = pages(0).getByteArrayPage()(0) + assert(bytes.sameElements(Seq(0, 1, 'b'.toByte))) + callbackSeq :+= s"add page data: blocklet $blockletId, page $pageId" + } + + override def onBlockletEnd(blockletId: Int): Unit = { + callbackSeq :+= s"blocklet end: $blockletId" + } + + override def onBlockEnd(blockId: String): Unit = { + callbackSeq :+= s"block end $blockId" + } + + override def onBlockletStart(blockletId: Int): Unit = { + callbackSeq :+= s"blocklet start $blockletId" + } + + /** + * Start of new block notification. + * + * @param blockId file name of the carbondata file + */ + override def onBlockStart(blockId: String) = { + callbackSeq :+= s"block start $blockId" + } + + /** + * This is called during closing of writer.So after this call no more data will be sent to this + * class. + */ + override def finish() = { + + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala deleted file mode 100644 index 717af6f..0000000 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala +++ /dev/null @@ -1,288 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.testsuite.datamap - -import java.io.{File, FilenameFilter} - -import org.apache.spark.sql.Row -import org.apache.spark.sql.test.util.QueryTest -import org.scalatest.BeforeAndAfterAll - -import org.apache.carbondata.common.exceptions.MetadataProcessException -import org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException, NoSuchDataMapException} -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.metadata.CarbonMetadata -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.CarbonTablePath - -class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { - - val testData = s"$resourcesPath/sample.csv" - - override def beforeAll { - sql("drop table if exists datamaptest") - sql("drop table if exists datamapshowtest") - sql("drop table if exists uniqdata") - sql("create table datamaptest (a string, b string, c string) stored by 'carbondata'") - } - - val newClass = "org.apache.spark.sql.CarbonSource" - - test("test datamap create: don't support using non-exist class") { - intercept[MetadataProcessException] { - sql(s"CREATE DATAMAP datamap1 ON TABLE datamaptest USING '$newClass'") - } - } - - test("test datamap create with dmproperties: don't support using non-exist class") { - intercept[MetadataProcessException] { - sql(s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')") - } - } - - test("test datamap create with existing name: don't support using non-exist class") { - intercept[MetadataProcessException] { - sql( - s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')") - } - } - - test("test datamap create with preagg") { - sql("drop datamap if exists datamap3 on table datamaptest") - sql( - "create datamap datamap3 on table datamaptest using 'preaggregate' dmproperties('key'='value') as select count(a) from datamaptest") - val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") - assert(table != null) - val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList - assert(dataMapSchemaList.size() == 1) - assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap3")) - assert(dataMapSchemaList.get(0).getProperties.get("key").equals("value")) - assert(dataMapSchemaList.get(0).getChildSchema.getTableName.equals("datamaptest_datamap3")) - } - - test("check hivemetastore after drop datamap") { - try { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, - "true") - sql("drop table if exists hiveMetaStoreTable") - sql("create table hiveMetaStoreTable (a string, b string, c string) stored by 'carbondata'") - - sql( - "create datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable using 'preaggregate' dmproperties('key'='value') as select count(a) from hiveMetaStoreTable") - checkExistence(sql("show datamap on table hiveMetaStoreTable"), true, "datamap_hiveMetaStoreTable") - - sql("drop datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable") - checkExistence(sql("show datamap on table hiveMetaStoreTable"), false, "datamap_hiveMetaStoreTable") - - } finally { - sql("drop table hiveMetaStoreTable") - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, - CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) - } - } - - test("drop the table having pre-aggregate") { - try { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, - "true") - sql("drop table if exists hiveMetaStoreTable_1") - sql("create table hiveMetaStoreTable_1 (a string, b string, c string) stored by 'carbondata'") - - sql( - "create datamap datamap_hiveMetaStoreTable_1 on table hiveMetaStoreTable_1 using 'preaggregate' dmproperties('key'='value') as select count(a) from hiveMetaStoreTable_1") - - checkExistence(sql("show datamap on table hiveMetaStoreTable_1"), - true, - "datamap_hiveMetaStoreTable_1") - - sql("drop datamap datamap_hiveMetaStoreTable_1 on table hiveMetaStoreTable_1") - checkExistence(sql("show datamap on table hiveMetaStoreTable_1"), - false, - "datamap_hiveMetaStoreTable_1") - assert(sql("show datamap on table hiveMetaStoreTable_1").collect().length == 0) - sql("drop table hiveMetaStoreTable_1") - - checkExistence(sql("show tables"), false, "datamap_hiveMetaStoreTable_1") - } - finally { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, - CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) - } - } - - test("test datamap create with preagg with duplicate name") { - sql( - s""" - | CREATE DATAMAP datamap10 ON TABLE datamaptest - | USING 'preaggregate' - | DMPROPERTIES('key'='value') - | AS SELECT COUNT(a) FROM datamaptest - """.stripMargin) - intercept[MalformedDataMapCommandException] { - sql( - s""" - | CREATE DATAMAP datamap10 ON TABLE datamaptest - | USING 'preaggregate' - | DMPROPERTIES('key'='value') - | AS SELECT COUNT(a) FROM datamaptest - """.stripMargin) - } - val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") - assert(table != null) - val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList - assert(dataMapSchemaList.size() == 2) - } - - test("test drop non-exist datamap") { - intercept[NoSuchDataMapException] { - sql("drop datamap nonexist on table datamaptest") - } - val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") - assert(table != null) - val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList - assert(dataMapSchemaList.size() == 2) - } - - test("test show datamap without preaggregate: don't support using non-exist class") { - intercept[MetadataProcessException] { - sql("drop table if exists datamapshowtest") - sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") - sql(s"CREATE DATAMAP datamap1 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')") - sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')") - checkExistence(sql("SHOW DATAMAP ON TABLE datamapshowtest"), true, "datamap1", "datamap2", "(NA)", newClass) - } - } - - test("test show datamap with preaggregate: don't support using non-exist class") { - intercept[MetadataProcessException] { - sql("drop table if exists datamapshowtest") - sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") - sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest") - sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')") - val frame = sql("show datamap on table datamapshowtest") - assert(frame.collect().length == 2) - checkExistence(frame, true, "datamap1", "datamap2", "(NA)", newClass, "default.datamapshowtest_datamap1") - } - } - - test("test show datamap with no datamap") { - sql("drop table if exists datamapshowtest") - sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") - assert(sql("show datamap on table datamapshowtest").collect().length == 0) - } - - test("test show datamap after dropping datamap: don't support using non-exist class") { - intercept[MetadataProcessException] { - sql("drop table if exists datamapshowtest") - sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") - sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest") - sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')") - sql("drop datamap datamap1 on table datamapshowtest") - val frame = sql("show datamap on table datamapshowtest") - assert(frame.collect().length == 1) - checkExistence(frame, true, "datamap2", "(NA)", newClass) - } - } - - test("test if preaggregate load is successfull for hivemetastore") { - try { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true") - sql("DROP TABLE IF EXISTS maintable") - sql( - """ - | CREATE TABLE maintable(id int, name string, city string, age int) - | STORED BY 'org.apache.carbondata.format' - """.stripMargin) - sql( - s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id""" - - .stripMargin) - sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") - checkAnswer(sql(s"select * from maintable_preagg_sum"), - Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55))) - } finally { - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, - CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) - } - } - - test("test preaggregate load for decimal column for hivemetastore") { - CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true") - sql("CREATE TABLE uniqdata(CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION string,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format'") - sql("insert into uniqdata select 9000,'CUST_NAME_00000','ACTIVE_EMUI_VERSION_00000','1970-01-01 01:00:03','1970-01-01 02:00:03',123372036854,-223372036854,12345678901.1234000000,22345678901.1234000000,11234567489.7976000000,-11234567489.7976000000,1") - sql("create datamap uniqdata_agg on table uniqdata using 'preaggregate' as select min(DECIMAL_COLUMN1) from uniqdata group by DECIMAL_COLUMN1") - checkAnswer(sql("select * from uniqdata_uniqdata_agg"), Seq(Row(12345678901.1234000000, 12345678901.1234000000))) - sql("drop datamap if exists uniqdata_agg on table uniqdata") - } - - test("create pre-agg table with path") { - sql("drop table if exists main_preagg") - sql("drop table if exists main ") - val warehouse = s"$metastoredb/warehouse" - val path = warehouse + "/" + System.nanoTime + "_preAggTestPath" - sql( - s""" - | create table main( - | year int, - | month int, - | name string, - | salary int) - | stored by 'carbondata' - | tblproperties('sort_columns'='month,year,name') - """.stripMargin) - sql("insert into main select 10,11,'amy',12") - sql("insert into main select 10,11,'amy',14") - sql( - s""" - | create datamap preagg - | on table main - | using 'preaggregate' - | dmproperties ('path'='$path') - | as select name,avg(salary) - | from main - | group by name - """.stripMargin) - assertResult(true)(new File(path).exists()) - assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, "0")}") - .list(new FilenameFilter { - override def accept(dir: File, name: String): Boolean = { - name.contains(CarbonCommonConstants.FACT_FILE_EXT) - } - }).length > 0) - checkAnswer(sql("select name,avg(salary) from main group by name"), Row("amy", 13.0)) - checkAnswer(sql("select * from main_preagg"), Row("amy", 26, 2)) - sql("drop datamap preagg on table main") - assertResult(false)(new File(path).exists()) - sql("drop table main") - } - - override def afterAll { - sql("DROP TABLE IF EXISTS maintable") - sql("drop table if exists uniqdata") - CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, - CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) - sql("drop table if exists datamaptest") - sql("drop table if exists datamapshowtest") - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestIndexDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestIndexDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestIndexDataMapCommand.scala new file mode 100644 index 0000000..a05a8c2 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestIndexDataMapCommand.scala @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.datamap + +import java.io.{File, FilenameFilter} + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.common.exceptions.MetadataProcessException +import org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException, NoSuchDataMapException} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath + +class TestIndexDataMapCommand extends QueryTest with BeforeAndAfterAll { + + val testData = s"$resourcesPath/sample.csv" + + override def beforeAll { + sql("drop table if exists datamaptest") + sql("drop table if exists datamapshowtest") + sql("drop table if exists uniqdata") + sql("create table datamaptest (a string, b string, c string) stored by 'carbondata'") + } + + val newClass = "org.apache.spark.sql.CarbonSource" + + test("test datamap create: don't support using non-exist class") { + intercept[MetadataProcessException] { + sql(s"CREATE DATAMAP datamap1 ON TABLE datamaptest USING '$newClass'") + } + } + + test("test datamap create with dmproperties: don't support using non-exist class") { + intercept[MetadataProcessException] { + sql(s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')") + } + } + + test("test datamap create with existing name: don't support using non-exist class") { + intercept[MetadataProcessException] { + sql( + s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')") + } + } + + test("test datamap create with preagg") { + sql("drop datamap if exists datamap3 on table datamaptest") + sql( + "create datamap datamap3 on table datamaptest using 'preaggregate' as select count(a) from datamaptest") + val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") + assert(table != null) + val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList + assert(dataMapSchemaList.size() == 1) + assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap3")) + assert(dataMapSchemaList.get(0).getChildSchema.getTableName.equals("datamaptest_datamap3")) + } + + test("check hivemetastore after drop datamap") { + try { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + "true") + sql("drop table if exists hiveMetaStoreTable") + sql("create table hiveMetaStoreTable (a string, b string, c string) stored by 'carbondata'") + + sql( + "create datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable using 'preaggregate' as select count(a) from hiveMetaStoreTable") + checkExistence(sql("show datamap on table hiveMetaStoreTable"), true, "datamap_hiveMetaStoreTable") + + sql("drop datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable") + checkExistence(sql("show datamap on table hiveMetaStoreTable"), false, "datamap_hiveMetaStoreTable") + + } finally { + sql("drop table hiveMetaStoreTable") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) + } + } + + test("drop the table having pre-aggregate") { + try { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + "true") + sql("drop table if exists hiveMetaStoreTable_1") + sql("create table hiveMetaStoreTable_1 (a string, b string, c string) stored by 'carbondata'") + + sql( + "create datamap datamap_hiveMetaStoreTable_1 on table hiveMetaStoreTable_1 using 'preaggregate' as select count(a) from hiveMetaStoreTable_1") + + checkExistence(sql("show datamap on table hiveMetaStoreTable_1"), + true, + "datamap_hiveMetaStoreTable_1") + + sql("drop datamap datamap_hiveMetaStoreTable_1 on table hiveMetaStoreTable_1") + checkExistence(sql("show datamap on table hiveMetaStoreTable_1"), + false, + "datamap_hiveMetaStoreTable_1") + assert(sql("show datamap on table hiveMetaStoreTable_1").collect().length == 0) + sql("drop table hiveMetaStoreTable_1") + + checkExistence(sql("show tables"), false, "datamap_hiveMetaStoreTable_1") + } + finally { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) + } + } + + test("test datamap create with preagg with duplicate name") { + sql( + s""" + | CREATE DATAMAP datamap10 ON TABLE datamaptest + | USING 'preaggregate' + | AS SELECT COUNT(a) FROM datamaptest + """.stripMargin) + intercept[MalformedDataMapCommandException] { + sql( + s""" + | CREATE DATAMAP datamap10 ON TABLE datamaptest + | USING 'preaggregate' + | AS SELECT COUNT(a) FROM datamaptest + """.stripMargin) + } + val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") + assert(table != null) + val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList + assert(dataMapSchemaList.size() == 2) + } + + test("test drop non-exist datamap") { + intercept[NoSuchDataMapException] { + sql("drop datamap nonexist on table datamaptest") + } + val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest") + assert(table != null) + val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList + assert(dataMapSchemaList.size() == 2) + } + + test("test show datamap without preaggregate: don't support using non-exist class") { + intercept[MetadataProcessException] { + sql("drop table if exists datamapshowtest") + sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") + sql(s"CREATE DATAMAP datamap1 ON TABLE datamapshowtest USING '$newClass' ") + sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' ") + checkExistence(sql("SHOW DATAMAP ON TABLE datamapshowtest"), true, "datamap1", "datamap2", "(NA)", newClass) + } + } + + test("test show datamap with preaggregate: don't support using non-exist class") { + intercept[MetadataProcessException] { + sql("drop table if exists datamapshowtest") + sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") + sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest") + sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' ") + val frame = sql("show datamap on table datamapshowtest") + assert(frame.collect().length == 2) + checkExistence(frame, true, "datamap1", "datamap2", "(NA)", newClass, "default.datamapshowtest_datamap1") + } + } + + test("test show datamap with no datamap") { + sql("drop table if exists datamapshowtest") + sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") + assert(sql("show datamap on table datamapshowtest").collect().length == 0) + } + + test("test show datamap after dropping datamap: don't support using non-exist class") { + intercept[MetadataProcessException] { + sql("drop table if exists datamapshowtest") + sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'") + sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest") + sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' ") + sql("drop datamap datamap1 on table datamapshowtest") + val frame = sql("show datamap on table datamapshowtest") + assert(frame.collect().length == 1) + checkExistence(frame, true, "datamap2", "(NA)", newClass) + } + } + + test("test if preaggregate load is successfull for hivemetastore") { + try { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true") + sql("DROP TABLE IF EXISTS maintable") + sql( + """ + | CREATE TABLE maintable(id int, name string, city string, age int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql( + s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id""" + + .stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + checkAnswer(sql(s"select * from maintable_preagg_sum"), + Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55))) + } finally { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) + } + } + + test("test preaggregate load for decimal column for hivemetastore") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true") + sql("CREATE TABLE uniqdata(CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION string,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format'") + sql("insert into uniqdata select 9000,'CUST_NAME_00000','ACTIVE_EMUI_VERSION_00000','1970-01-01 01:00:03','1970-01-01 02:00:03',123372036854,-223372036854,12345678901.1234000000,22345678901.1234000000,11234567489.7976000000,-11234567489.7976000000,1") + sql("create datamap uniqdata_agg on table uniqdata using 'preaggregate' as select min(DECIMAL_COLUMN1) from uniqdata group by DECIMAL_COLUMN1") + checkAnswer(sql("select * from uniqdata_uniqdata_agg"), Seq(Row(12345678901.1234000000, 12345678901.1234000000))) + sql("drop datamap if exists uniqdata_agg on table uniqdata") + } + + test("create pre-agg table with path") { + sql("drop table if exists main_preagg") + sql("drop table if exists main ") + val warehouse = s"$metastoredb/warehouse" + val path = warehouse + "/" + System.nanoTime + "_preAggTestPath" + sql( + s""" + | create table main( + | year int, + | month int, + | name string, + | salary int) + | stored by 'carbondata' + | tblproperties('sort_columns'='month,year,name') + """.stripMargin) + sql("insert into main select 10,11,'amy',12") + sql("insert into main select 10,11,'amy',14") + sql( + s""" + | create datamap preagg + | on table main + | using 'preaggregate' + | dmproperties ('path'='$path') + | as select name,avg(salary) + | from main + | group by name + """.stripMargin) + assertResult(true)(new File(path).exists()) + assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, "0")}") + .list(new FilenameFilter { + override def accept(dir: File, name: String): Boolean = { + name.contains(CarbonCommonConstants.FACT_FILE_EXT) + } + }).length > 0) + checkAnswer(sql("select name,avg(salary) from main group by name"), Row("amy", 13.0)) + checkAnswer(sql("select * from main_preagg"), Row("amy", 26, 2)) + sql("drop datamap preagg on table main") + assertResult(false)(new File(path).exists()) + sql("drop table main") + } + + override def afterAll { + sql("DROP TABLE IF EXISTS maintable") + sql("drop table if exists uniqdata") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) + sql("drop table if exists datamaptest") + sql("drop table if exists datamapshowtest") + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java new file mode 100644 index 0000000..2b3a306 --- /dev/null +++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap; + +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; + +import static org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.PREAGGREGATE; +import static org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES; + +public class DataMapManager { + + private static DataMapManager INSTANCE; + + private DataMapManager() { } + + public static synchronized DataMapManager get() { + if (INSTANCE == null) { + INSTANCE = new DataMapManager(); + } + return INSTANCE; + } + + /** + * Return a DataMapProvider instance for specified dataMapSchema. + */ + public DataMapProvider getDataMapProvider(DataMapSchema dataMapSchema) { + DataMapProvider provider; + if (dataMapSchema.getClassName().equalsIgnoreCase(PREAGGREGATE.toString())) { + provider = new PreAggregateDataMapProvider(); + } else if (dataMapSchema.getClassName().equalsIgnoreCase(TIMESERIES.toString())) { + provider = new TimeseriesDataMapProvider(); + } else { + provider = new IndexDataMapProvider(); + } + return provider; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java new file mode 100644 index 0000000..0cf0d04 --- /dev/null +++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap; + +import org.apache.carbondata.common.annotations.InterfaceAudience; + +/** + * Property that can be specified when creating DataMap + */ +@InterfaceAudience.Internal +public class DataMapProperty { + + /** + * Used to specify the store location of the datamap + */ + public static final String PATH = "path"; +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java new file mode 100644 index 0000000..a71e0d8 --- /dev/null +++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; +import org.apache.carbondata.processing.exception.DataLoadingException; + +import org.apache.spark.sql.SparkSession; + +/** + * DataMap is a accelerator for certain type of query. Developer can add new DataMap + * implementation to improve query performance. + * + * Currently two types of DataMap are supported + * <ol> + * <li> MVDataMap: materialized view type of DataMap to accelerate olap style query, + * like SPJG query (select, predicate, join, groupby) </li> + * <li> IndexDataMap: index type of DataMap to accelerate filter query </li> + * </ol> + * + * <p> + * In following command <br> + * {@code CREATE DATAMAP dm ON TABLE main USING 'provider'}, <br> + * the <b>provider</b> string can be a short name or class name of the DataMap implementation. + * + * <br>Currently CarbonData supports following provider: + * <ol> + * <li> preaggregate: one type of MVDataMap that do pre-aggregate of single table </li> + * <li> timeseries: one type of MVDataMap that do pre-aggregate based on time dimension + * of the table </li> + * <li> class name of {@link org.apache.carbondata.core.datamap.dev.IndexDataMapFactory} + * implementation: Developer can implement new type of IndexDataMap by extending + * {@link org.apache.carbondata.core.datamap.dev.IndexDataMapFactory} </li> + * </ol> + * + * @since 1.4.0 + */ +@InterfaceAudience.Developer("DataMap") +@InterfaceStability.Unstable +public interface DataMapProvider { + + /** + * Initialize a datamap's metadata. + * This is called when user creates datamap, for example "CREATE DATAMAP dm ON TABLE mainTable" + * Implementation should initialize metadata for datamap, like creating table + */ + void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement, + SparkSession sparkSession) throws MalformedDataMapCommandException; + + /** + * Initialize a datamap's data. + * This is called when user creates datamap, for example "CREATE DATAMAP dm ON TABLE mainTable" + * Implementation should initialize data for datamap, like creating data folders + */ + void initData(CarbonTable mainTable, SparkSession sparkSession); + + /** + * Opposite operation of {@link #initMeta(CarbonTable, DataMapSchema, String, SparkSession)}. + * This is called when user drops datamap, for example "DROP DATAMAP dm ON TABLE mainTable" + * Implementation should clean all meta for the datamap + */ + void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, SparkSession sparkSession); + + /** + * Opposite operation of {@link #initData(CarbonTable, SparkSession)}. + * This is called when user drops datamap, for example "DROP DATAMAP dm ON TABLE mainTable" + * Implementation should clean all data for the datamap + */ + void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema, SparkSession sparkSession); + + /** + * Rebuild the datamap by loading all existing data from mainTable + * This is called when refreshing the datamap when + * 1. after datamap creation and if `autoRefreshDataMap` is set to true + * 2. user manually trigger refresh datamap command + */ + void rebuild(CarbonTable mainTable, SparkSession sparkSession) throws DataLoadingException; + + /** + * Build the datamap incrementally by loading specified segment data + * This is called when user manually trigger refresh datamap + */ + void incrementalBuild(CarbonTable mainTable, String[] segmentIds, SparkSession sparkSession) + throws DataLoadingException; + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java new file mode 100644 index 0000000..e11e522 --- /dev/null +++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.exceptions.MetadataProcessException; +import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException; +import org.apache.carbondata.core.datamap.DataMapRegistry; +import org.apache.carbondata.core.datamap.DataMapStoreManager; +import org.apache.carbondata.core.datamap.dev.IndexDataMapFactory; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; +import org.apache.carbondata.format.TableInfo; + +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil; + +@InterfaceAudience.Internal +public class IndexDataMapProvider implements DataMapProvider { + + private TableInfo originalTableInfo; + + @Override + public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement, + SparkSession sparkSession) throws MalformedDataMapCommandException { + IndexDataMapFactory dataMapFactory = createIndexDataMapFactory(dataMapSchema); + DataMapStoreManager.getInstance().registerDataMap( + mainTable.getAbsoluteTableIdentifier(), dataMapSchema, dataMapFactory); + originalTableInfo = PreAggregateUtil.updateMainTable(mainTable, dataMapSchema, sparkSession); + } + + @Override + public void initData(CarbonTable mainTable, SparkSession sparkSession) { + // Nothing is needed to do by default + } + + @Override + public void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, + SparkSession sparkSession) { + PreAggregateUtil.updateSchemaInfo(mainTable, originalTableInfo, sparkSession); + } + + @Override + public void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema, + SparkSession sparkSession) { + DataMapStoreManager.getInstance().clearDataMap( + mainTable.getAbsoluteTableIdentifier(), dataMapSchema.getDataMapName()); + } + + @Override + public void rebuild(CarbonTable mainTable, SparkSession sparkSession) { + // Nothing is needed to do by default + } + + @Override + public void incrementalBuild(CarbonTable mainTable, String[] segmentIds, + SparkSession sparkSession) { + throw new UnsupportedOperationException(); + } + + private IndexDataMapFactory createIndexDataMapFactory(DataMapSchema dataMapSchema) + throws MalformedDataMapCommandException { + IndexDataMapFactory dataMapFactory; + try { + // try to create DataMapProvider instance by taking providerName as class name + Class<? extends IndexDataMapFactory> providerClass = + (Class<? extends IndexDataMapFactory>) Class.forName(dataMapSchema.getClassName()); + dataMapFactory = providerClass.newInstance(); + } catch (ClassNotFoundException e) { + // try to create DataMapProvider instance by taking providerName as short name + dataMapFactory = getDataMapFactoryByShortName(dataMapSchema.getClassName()); + } catch (Throwable e) { + throw new MetadataProcessException( + "failed to create DataMapProvider '" + dataMapSchema.getClassName() + "'", e); + } + return dataMapFactory; + } + + private IndexDataMapFactory getDataMapFactoryByShortName(String providerName) + throws MalformedDataMapCommandException { + IndexDataMapFactory dataMapFactory; + String className = DataMapRegistry.getDataMapClassName(providerName); + if (className != null) { + try { + Class<? extends IndexDataMapFactory> datamapClass = + (Class<? extends IndexDataMapFactory>) Class.forName(providerName); + dataMapFactory = datamapClass.newInstance(); + } catch (ClassNotFoundException ex) { + throw new MalformedDataMapCommandException( + "DataMap '" + providerName + "' not found", ex); + } catch (Throwable ex) { + throw new MetadataProcessException( + "failed to create DataMap '" + providerName + "'", ex); + } + } else { + throw new MalformedDataMapCommandException( + "DataMap '" + providerName + "' not found"); + } + return dataMapFactory; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java new file mode 100644 index 0000000..dc53900 --- /dev/null +++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; + +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateTableHelper; +import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand; +import scala.Some; + +@InterfaceAudience.Internal +public class PreAggregateDataMapProvider implements DataMapProvider { + protected PreAggregateTableHelper helper; + protected CarbonDropTableCommand dropTableCommand; + + @Override + public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement, + SparkSession sparkSession) throws MalformedDataMapCommandException { + validateDmProperty(dataMapSchema); + helper = new PreAggregateTableHelper( + mainTable, dataMapSchema.getDataMapName(), dataMapSchema.getClassName(), + dataMapSchema.getProperties(), ctasSqlStatement, null, false); + helper.initMeta(sparkSession); + } + + private void validateDmProperty(DataMapSchema dataMapSchema) + throws MalformedDataMapCommandException { + if (!dataMapSchema.getProperties().isEmpty()) { + if (dataMapSchema.getProperties().size() > 1 || + !dataMapSchema.getProperties().containsKey(DataMapProperty.PATH)) { + throw new MalformedDataMapCommandException( + "Only 'path' dmproperty is allowed for this datamap"); + } + } + } + + @Override + public void initData(CarbonTable mainTable, SparkSession sparkSession) { + // Nothing is needed to do by default + } + + @Override + public void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, + SparkSession sparkSession) { + dropTableCommand = new CarbonDropTableCommand( + true, + new Some<>(dataMapSchema.getRelationIdentifier().getDatabaseName()), + dataMapSchema.getRelationIdentifier().getTableName(), + true); + dropTableCommand.processMetadata(sparkSession); + } + + @Override + public void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema, + SparkSession sparkSession) { + if (dropTableCommand != null) { + dropTableCommand.processData(sparkSession); + } + } + + @Override + public void rebuild(CarbonTable mainTable, SparkSession sparkSession) { + if (helper != null) { + helper.initData(sparkSession); + } + } + + @Override + public void incrementalBuild(CarbonTable mainTable, String[] segmentIds, + SparkSession sparkSession) { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java new file mode 100644 index 0000000..a66f26a --- /dev/null +++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap; + +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; + +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateTableHelper; +import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil; +import scala.Some; +import scala.Tuple2; + +@InterfaceAudience.Internal +public class TimeseriesDataMapProvider extends PreAggregateDataMapProvider { + + @Override + public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement, + SparkSession sparkSession) { + Map<String, String> dmProperties = dataMapSchema.getProperties(); + String dmProviderName = dataMapSchema.getClassName(); + TimeSeriesUtil.validateTimeSeriesGranularity(dmProperties, dmProviderName); + Tuple2<String, String> details = + TimeSeriesUtil.getTimeSeriesGranularityDetails(dmProperties, dmProviderName); + dmProperties.remove(details._1()); + helper = new PreAggregateTableHelper( + mainTable, dataMapSchema.getDataMapName(), dataMapSchema.getClassName(), + dmProperties, ctasSqlStatement, new Some(details._1()), false); + helper.initMeta(sparkSession); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index 39e73ee..9315208 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -94,7 +94,7 @@ class CarbonEnv { properties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath) } LOGGER.info(s"carbon env initial: $storePath") - // trigger event for CarbonEnv init + // trigger event for CarbonEnv create val operationContext = new OperationContext val carbonEnvInitPreEvent: CarbonEnvInitPreEvent = CarbonEnvInitPreEvent(sparkSession, carbonSessionInfo, storePath) http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala index c6d86b3..8532e9d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala @@ -19,20 +19,13 @@ package org.apache.spark.sql.execution.command.datamap import scala.collection.JavaConverters._ import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil} -import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil -import org.apache.spark.sql.hive.CarbonRelation -import org.apache.carbondata.common.exceptions.MetadataProcessException import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException} import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider -import org.apache.carbondata.core.datamap.DataMapStoreManager -import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider._ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} +import org.apache.carbondata.datamap.{DataMapManager, DataMapProvider} /** * Below command class will be used to create datamap on table @@ -47,59 +40,25 @@ case class CarbonCreateDataMapCommand( ifNotExistsSet: Boolean = false) extends AtomicRunnableCommand { - var createPreAggregateTableCommands: CreatePreAggregateTableCommand = _ - var tableIsExists: Boolean = false + private var dataMapProvider: DataMapProvider = _ + private var mainTable: CarbonTable = _ + private var dataMapSchema: DataMapSchema = _ override def processMetadata(sparkSession: SparkSession): Seq[Row] = { // since streaming segment does not support building index and pre-aggregate yet, // so streaming table does not support create datamap - val carbonTable = - CarbonEnv.getCarbonTable(tableIdentifier.database, tableIdentifier.table)(sparkSession) - if (carbonTable.isStreamingTable) { + mainTable = + CarbonEnv.getCarbonTable(tableIdentifier.database, tableIdentifier.table)(sparkSession) + if (mainTable.isStreamingTable) { throw new MalformedCarbonCommandException("Streaming table does not support creating datamap") } - validateDataMapName(carbonTable) - if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) || - dmClassName.equalsIgnoreCase(TIMESERIES.toString)) { - TimeSeriesUtil.validateTimeSeriesGranularity(dmProperties, dmClassName) - createPreAggregateTableCommands = if (dmClassName.equalsIgnoreCase(TIMESERIES.toString)) { - val details = TimeSeriesUtil.getTimeSeriesGranularityDetails(dmProperties, dmClassName) - val updatedDmProperties = dmProperties - details._1 - CreatePreAggregateTableCommand( - dataMapName, - tableIdentifier, - DataMapProvider.TIMESERIES, - updatedDmProperties, - queryString.get, - Some(details._1)) - } else { - CreatePreAggregateTableCommand( - dataMapName, - tableIdentifier, - DataMapProvider.PREAGGREGATE, - dmProperties, - queryString.get - ) - } - try { - createPreAggregateTableCommands.processMetadata(sparkSession) - } catch { - case e: Throwable => throw new MetadataProcessException(s"Failed to create datamap " + - s"'$dataMapName'", e) - } - } else { - val dataMapSchema = new DataMapSchema(dataMapName, dmClassName) - dataMapSchema.setProperties(new java.util.HashMap[String, String](dmProperties.asJava)) - val dbName = CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession) - val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation( - Some(dbName), - tableIdentifier.table)(sparkSession).asInstanceOf[CarbonRelation].carbonTable - DataMapStoreManager.getInstance().createAndRegisterDataMap( - carbonTable.getAbsoluteTableIdentifier, dataMapSchema) - // Save DataMapSchema in the schema file of main table - PreAggregateUtil.updateMainTable(carbonTable, dataMapSchema, sparkSession) - } + validateDataMapName(mainTable) + dataMapSchema = new DataMapSchema(dataMapName, dmClassName) + dataMapSchema.setProperties(new java.util.HashMap[String, String](dmProperties.asJava)) + dataMapProvider = DataMapManager.get().getDataMapProvider(dataMapSchema) + dataMapProvider.initMeta(mainTable, dataMapSchema, queryString.orNull, sparkSession) + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) LOGGER.audit(s"DataMap $dataMapName successfully added to Table ${tableIdentifier.table}") Seq.empty @@ -115,29 +74,20 @@ case class CarbonCreateDataMapCommand( } override def processData(sparkSession: SparkSession): Seq[Row] = { - if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) || - dmClassName.equalsIgnoreCase(TIMESERIES.toString)) { - if (!tableIsExists) { - createPreAggregateTableCommands.processData(sparkSession) - } else { - Seq.empty + if (dataMapProvider != null) { + dataMapProvider.initData(mainTable, sparkSession) + if (mainTable.isAutoRefreshDataMap) { + dataMapProvider.rebuild(mainTable, sparkSession) } - } else { - Seq.empty } + Seq.empty } override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = { - if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) || - dmClassName.equalsIgnoreCase(TIMESERIES.toString)) { - if (!tableIsExists && createPreAggregateTableCommands != null) { - createPreAggregateTableCommands.undoMetadata(sparkSession, exception) - } else { - Seq.empty - } - } else { - throw new MalformedDataMapCommandException("Unknown datamap provider/class " + dmClassName) + if (dataMapProvider != null) { + dataMapProvider.freeMeta(mainTable, dataMapSchema, sparkSession) } + Seq.empty } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala index e89d15a..f773a55 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala @@ -25,16 +25,15 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.execution.command.AtomicRunnableCommand import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil -import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand import org.apache.carbondata.common.exceptions.MetadataProcessException import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, NoSuchDataMapException} import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} -import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl -import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} +import org.apache.carbondata.datamap.{DataMapManager, DataMapProvider} import org.apache.carbondata.events._ /** @@ -52,8 +51,10 @@ case class CarbonDropDataMapCommand( forceDrop: Boolean = false) extends AtomicRunnableCommand { - val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - var commandToRun: CarbonDropTableCommand = _ + private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + private var dataMapProvider: DataMapProvider = _ + private var mainTable: CarbonTable = _ + private var dataMapSchema: DataMapSchema = _ override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) @@ -66,7 +67,6 @@ case class CarbonDropDataMapCommand( catalog.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName))) val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer() try { - forceDropTableFromMetaStore(sparkSession) locksToBeAcquired foreach { lock => carbonLocks += CarbonLockUtil.getLockObject(tableIdentifier, lock) } @@ -82,35 +82,33 @@ case class CarbonDropDataMapCommand( // If force drop is true then remove the datamap from hivemetastore. No need to remove from // parent as the first condition would have taken care of it. if (carbonTable.isDefined && carbonTable.get.getTableInfo.getDataMapSchemaList.size() > 0) { - val dataMapSchema = carbonTable.get.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex. + mainTable = carbonTable.get + val dataMapSchemaOp = mainTable.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex. find(_._1.getDataMapName.equalsIgnoreCase(dataMapName)) - if (dataMapSchema.isDefined) { + if (dataMapSchemaOp.isDefined) { + dataMapSchema = dataMapSchemaOp.get._1 val operationContext = new OperationContext val dropDataMapPreEvent = DropDataMapPreEvent( - Some(dataMapSchema.get._1), + Some(dataMapSchema), ifExistsSet, sparkSession) OperationListenerBus.getInstance.fireEvent(dropDataMapPreEvent, operationContext) - carbonTable.get.getTableInfo.getDataMapSchemaList.remove(dataMapSchema.get._2) + mainTable.getTableInfo.getDataMapSchemaList.remove(dataMapSchemaOp.get._2) val schemaConverter = new ThriftWrapperSchemaConverterImpl PreAggregateUtil.updateSchemaInfo( - carbonTable.get, + mainTable, schemaConverter.fromWrapperToExternalTableInfo( - carbonTable.get.getTableInfo, + mainTable.getTableInfo, dbName, tableName))(sparkSession) - commandToRun = CarbonDropTableCommand( - ifExistsSet = true, - Some(dataMapSchema.get._1.getRelationIdentifier.getDatabaseName), - dataMapSchema.get._1.getRelationIdentifier.getTableName, - dropChildTable = true - ) - commandToRun.processMetadata(sparkSession) + dataMapProvider = DataMapManager.get.getDataMapProvider(dataMapSchema) + dataMapProvider.freeMeta(mainTable, dataMapSchema, sparkSession) + // fires the event after dropping datamap from main table schema val dropDataMapPostEvent = DropDataMapPostEvent( - Some(dataMapSchema.get._1), + Some(dataMapSchema), ifExistsSet, sparkSession) OperationListenerBus.getInstance.fireEvent(dropDataMapPostEvent, operationContext) @@ -118,7 +116,7 @@ case class CarbonDropDataMapCommand( throw new NoSuchDataMapException(dataMapName, tableName) } } else if (carbonTable.isDefined && - carbonTable.get.getTableInfo.getDataMapSchemaList.size() == 0) { + carbonTable.get.getTableInfo.getDataMapSchemaList.size() == 0) { if (!ifExistsSet) { throw new NoSuchDataMapException(dataMapName, tableName) } @@ -142,39 +140,10 @@ case class CarbonDropDataMapCommand( Seq.empty } - /** - * Used to drop child datamap from hive metastore if it exists. - * forceDrop will be true only when an exception occurs in main table status updation for - * parent table or in processData from CreatePreAggregateTableCommand. - */ - private def forceDropTableFromMetaStore(sparkSession: SparkSession): Unit = { - if (forceDrop) { - val childTableName = tableName + "_" + dataMapName - LOGGER.info(s"Trying to force drop $childTableName from metastore") - val childCarbonTable: Option[CarbonTable] = try { - Some(CarbonEnv.getCarbonTable(databaseNameOp, childTableName)(sparkSession)) - } catch { - case _: Exception => - LOGGER.warn(s"Child table $childTableName not found in metastore") - None - } - if (childCarbonTable.isDefined) { - commandToRun = CarbonDropTableCommand( - ifExistsSet = true, - Some(childCarbonTable.get.getDatabaseName), - childCarbonTable.get.getTableName, - dropChildTable = true) - commandToRun.processMetadata(sparkSession) - } - } - } - override def processData(sparkSession: SparkSession): Seq[Row] = { // delete the table folder - if (commandToRun != null) { - DataMapStoreManager.getInstance().clearDataMap( - commandToRun.carbonTable.getAbsoluteTableIdentifier, dataMapName) - commandToRun.processData(sparkSession) + if (dataMapProvider != null) { + dataMapProvider.freeData(mainTable, dataMapSchema, sparkSession) } Seq.empty } http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala index c1f86ef..705455f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala @@ -93,7 +93,7 @@ case class CarbonAlterTableDropPartitionCommand( partitionInfo.dropPartition(partitionIndex) // read TableInfo - val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable)(sparkSession) + val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable) val schemaConverter = new ThriftWrapperSchemaConverterImpl() val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath) http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala index efd1216..99691d2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala @@ -88,7 +88,7 @@ case class CarbonAlterTableSplitPartitionCommand( updatePartitionInfo(partitionInfo, partitionIds) // read TableInfo - val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable)(sparkSession) + val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable) val schemaConverter = new ThriftWrapperSchemaConverterImpl() val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath) http://git-wip-us.apache.org/repos/asf/carbondata/blob/89a12af5/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala deleted file mode 100644 index c02ac4f..0000000 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.command.preaaggregate - -import scala.collection.JavaConverters._ -import scala.collection.mutable - -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand -import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand -import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand -import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil -import org.apache.spark.sql.optimizer.CarbonFilters -import org.apache.spark.sql.parser.CarbonSpark2SqlParser - -import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider -import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} - -/** - * Below command class will be used to create pre-aggregate table - * and updating the parent table about the child table information - * It will be either success or nothing happen in case of failure: - * 1. failed to create pre aggregate table. - * 2. failed to update main table - * - */ -case class CreatePreAggregateTableCommand( - dataMapName: String, - parentTableIdentifier: TableIdentifier, - dataMapProvider: DataMapProvider, - dmProperties: Map[String, String], - queryString: String, - timeSeriesFunction: Option[String] = None, - ifNotExistsSet: Boolean = false) - extends AtomicRunnableCommand { - - var parentTable: CarbonTable = _ - var loadCommand: CarbonLoadDataCommand = _ - - override def processMetadata(sparkSession: SparkSession): Seq[Row] = { - val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString) - val df = sparkSession.sql(updatedQuery) - val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes( - df.logicalPlan, queryString) - val fields = fieldRelationMap.keySet.toSeq - val tableProperties = mutable.Map[String, String]() - dmProperties.foreach(t => tableProperties.put(t._1, t._2)) - - parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan) - if (!parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table)) { - throw new MalformedDataMapCommandException( - "Parent table name is different in select and create") - } - var neworder = Seq[String]() - val parentOrder = parentTable.getSortColumns(parentTable.getTableName).asScala - parentOrder.foreach(parentcol => - fields.filter(col => (fieldRelationMap.get(col).get.aggregateFunction.isEmpty) && - (parentcol.equals(fieldRelationMap.get(col).get. - columnTableRelationList.get(0).parentColumnName))) - .map(cols => neworder :+= cols.column) - ) - tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, neworder.mkString(",")) - tableProperties.put("sort_scope", parentTable.getTableInfo.getFactTable. - getTableProperties.asScala.getOrElse("sort_scope", CarbonCommonConstants - .LOAD_SORT_SCOPE_DEFAULT)) - tableProperties - .put(CarbonCommonConstants.TABLE_BLOCKSIZE, parentTable.getBlockSizeInMB.toString) - val tableIdentifier = - TableIdentifier(parentTableIdentifier.table + "_" + dataMapName, - parentTableIdentifier.database) - // prepare table model of the collected tokens - val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel( - ifNotExistPresent = ifNotExistsSet, - new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database), - tableIdentifier.table.toLowerCase, - fields, - Seq(), - tableProperties, - None, - isAlterFlow = false, - None) - // updating the relation identifier, this will be stored in child table - // which can be used during dropping of pre-aggreate table as parent table will - // also get updated - if(timeSeriesFunction.isDefined) { - TimeSeriesUtil.validateTimeSeriesEventTime(dmProperties, parentTable) - TimeSeriesUtil.validateEventTimeColumnExitsInSelect( - fieldRelationMap, - dmProperties.get(TimeSeriesUtil.TIMESERIES_EVENTTIME).get) - TimeSeriesUtil.updateTimeColumnSelect(fieldRelationMap, - dmProperties.get(TimeSeriesUtil.TIMESERIES_EVENTTIME).get, - timeSeriesFunction.get) - } - tableModel.parentTable = Some(parentTable) - tableModel.dataMapRelation = Some(fieldRelationMap) - val tablePath = if (dmProperties.contains("path")) { - dmProperties("path") - } else { - CarbonEnv.getTablePath(tableModel.databaseNameOp, tableModel.tableName)(sparkSession) - } - CarbonCreateTableCommand(TableNewProcessor(tableModel), - tableModel.ifNotExistsSet, Some(tablePath), isVisible = false).run(sparkSession) - - val table = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession) - val tableInfo = table.getTableInfo - // child schema object which will be updated on parent table about the - val childSchema = tableInfo.getFactTable.buildChildSchema( - dataMapName, - dataMapProvider.getClassName, - tableInfo.getDatabaseName, - queryString, - "AGGREGATION") - dmProperties.foreach(f => childSchema.getProperties.put(f._1, f._2)) - - // updating the parent table about child table - PreAggregateUtil.updateMainTable( - parentTable, - childSchema, - sparkSession) - // After updating the parent carbon table with data map entry extract the latest table object - // to be used in further create process. - parentTable = CarbonEnv.getCarbonTable(parentTableIdentifier.database, - parentTableIdentifier.table)(sparkSession) - val updatedLoadQuery = if (timeSeriesFunction.isDefined) { - PreAggregateUtil.createTimeSeriesSelectQueryFromMain(childSchema.getChildSchema, - parentTable.getTableName, - parentTable.getDatabaseName) - } else { - queryString - } - val dataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction( - updatedLoadQuery)).drop("preAggLoad") - loadCommand = PreAggregateUtil.createLoadCommandForChild( - childSchema.getChildSchema.getListOfColumns, - tableIdentifier, - dataFrame, - false, - sparkSession = sparkSession) - loadCommand.processMetadata(sparkSession) - Seq.empty - } - - override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = { - // drop child table and undo the change in table info of main table - CarbonDropDataMapCommand( - dataMapName, - ifExistsSet = true, - parentTableIdentifier.database, - parentTableIdentifier.table, - forceDrop = true).run(sparkSession) - Seq.empty - } - - override def processData(sparkSession: SparkSession): Seq[Row] = { - // load child table if parent table has existing segments - // This will be used to check if the parent table has any segments or not. If not then no - // need to fire load for pre-aggregate table. Therefore reading the load details for PARENT - // table. - SegmentStatusManager.deleteLoadsAndUpdateMetadata(parentTable, false) - val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath) - if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS || - load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) { - throw new UnsupportedOperationException( - "Cannot create pre-aggregate table when insert is in progress on main table") - } else if (loadAvailable.nonEmpty) { - // Passing segmentToLoad as * because we want to load all the segments into the - // pre-aggregate table even if the user has set some segments on the parent table. - loadCommand.dataFrame = Some(PreAggregateUtil - .getDataFrame(sparkSession, loadCommand.logicalPlan.get)) - PreAggregateUtil.startDataLoadForDataMap( - TableIdentifier(parentTable.getTableName, Some(parentTable.getDatabaseName)), - segmentToLoad = "*", - validateSegments = true, - loadCommand, - isOverwrite = false, - sparkSession) - } - Seq.empty - } -} - -