Repository: carbondata Updated Branches: refs/heads/master 78e4d0da3 -> cf1e4d4ca
Blockletsize and Blocksize issue fix in sdk writer and other unmanaged table fixes *Decimal dataype issue fix in sdk writer *Drop unmanaged table issue in cluster *Added comment for SDK writer API methods *Query two writer's output at same path issue fix *Block alter table rename for unmanged table This closes #2141 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cf1e4d4c Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cf1e4d4c Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cf1e4d4c Branch: refs/heads/master Commit: cf1e4d4ca7cef8bc49b4eb7b811af5c1bd787cba Parents: 78e4d0d Author: ajantha-bhat <ajanthab...@gmail.com> Authored: Fri Apr 6 14:10:38 2018 +0530 Committer: Venkata Ramana G <ramana.gollam...@huawei.com> Committed: Tue Apr 17 12:21:33 2018 +0530 ---------------------------------------------------------------------- .../schema/table/TableSchemaBuilder.java | 26 ++++++- .../executor/impl/AbstractQueryExecutor.java | 7 +- .../scan/executor/util/RestructureUtil.java | 29 ++++++-- .../scan/executor/util/RestructureUtilTest.java | 4 +- .../createTable/TestUnmanagedCarbonTable.scala | 77 +++++++++++++++----- .../schema/CarbonAlterTableRenameCommand.scala | 8 ++ .../spark/sql/hive/CarbonFileMetastore.scala | 21 +----- .../sdk/file/CarbonWriterBuilder.java | 51 +++++++++++-- 8 files changed, 172 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf1e4d4c/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java index 2dd5a9e..7c2e54d 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java @@ -26,8 +26,10 @@ import java.util.Objects; import java.util.UUID; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.DecimalType; import org.apache.carbondata.core.metadata.datatype.StructField; import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.SchemaEvolution; @@ -47,6 +49,8 @@ public class TableSchemaBuilder { private int blockSize; + private int blockletSize; + private String tableName; public TableSchemaBuilder blockSize(int blockSize) { @@ -57,6 +61,14 @@ public class TableSchemaBuilder { return this; } + public TableSchemaBuilder blockletSize(int blockletSize) { + if (blockletSize <= 0) { + throw new IllegalArgumentException("blockletSize should be greater than 0"); + } + this.blockletSize = blockletSize; + return this; + } + public TableSchemaBuilder tableName(String tableName) { Objects.requireNonNull(tableName); this.tableName = tableName; @@ -76,11 +88,18 @@ public class TableSchemaBuilder { allColumns.addAll(otherColumns); schema.setListOfColumns(allColumns); + Map<String, String> property = new HashMap<>(); if (blockSize > 0) { - Map<String, String> property = new HashMap<>(); property.put(CarbonCommonConstants.TABLE_BLOCKSIZE, String.valueOf(blockSize)); + } + if (blockletSize > 0) { + property.put(CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB, String.valueOf(blockletSize)); + } + // TODO: check other table properties + if (property.size() != 0) { schema.setTableProperties(property); } + return schema; } @@ -103,6 +122,11 @@ public class TableSchemaBuilder { newColumn.setColumnUniqueId(UUID.randomUUID().toString()); newColumn.setColumnReferenceId(newColumn.getColumnUniqueId()); newColumn.setEncodingList(createEncoding(field.getDataType(), isSortColumn)); + if (DataTypes.isDecimal(field.getDataType())) { + DecimalType decimalType = (DecimalType) field.getDataType(); + newColumn.setPrecision(decimalType.getPrecision()); + newColumn.setScale(decimalType.getScale()); + } if (isSortColumn) { sortColumns.add(newColumn); newColumn.setSortColumn(true); http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf1e4d4c/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java index f0f5bce..d2d458e 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java @@ -308,7 +308,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { List<ProjectionDimension> projectDimensions = RestructureUtil .createDimensionInfoAndGetCurrentBlockQueryDimension(blockExecutionInfo, queryModel.getProjectionDimensions(), tableBlockDimensions, - segmentProperties.getComplexDimensions(), queryModel.getProjectionMeasures().size()); + segmentProperties.getComplexDimensions(), queryModel.getProjectionMeasures().size(), + queryModel.getTable().getTableInfo().isUnManagedTable()); blockExecutionInfo.setBlockId( CarbonUtil.getBlockId(queryModel.getAbsoluteTableIdentifier(), filePath, segmentId, queryModel.getTable().getTableInfo().isUnManagedTable())); @@ -517,7 +518,9 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { // getting the measure info which will be used while filling up measure data List<ProjectionMeasure> updatedQueryMeasures = RestructureUtil .createMeasureInfoAndGetCurrentBlockQueryMeasures(executionInfo, - queryModel.getProjectionMeasures(), tableBlock.getSegmentProperties().getMeasures()); + queryModel.getProjectionMeasures(), + tableBlock.getSegmentProperties().getMeasures(), + queryModel.getTable().getTableInfo().isUnManagedTable()); // setting the measure aggregator for all aggregation function selected // in query executionInfo.getMeasureInfo().setMeasureDataTypes(queryProperties.measureDataTypes); http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf1e4d4c/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java index 9c1f2f8..3b477ab 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java @@ -28,6 +28,7 @@ import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionary import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; @@ -57,12 +58,13 @@ public class RestructureUtil { * @param queryDimensions * @param tableBlockDimensions * @param tableComplexDimension + * @param isUnManagedTable * @return list of query dimension which is present in the table block */ public static List<ProjectionDimension> createDimensionInfoAndGetCurrentBlockQueryDimension( BlockExecutionInfo blockExecutionInfo, List<ProjectionDimension> queryDimensions, List<CarbonDimension> tableBlockDimensions, List<CarbonDimension> tableComplexDimension, - int measureCount) { + int measureCount, boolean isUnManagedTable) { List<ProjectionDimension> presentDimension = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); boolean[] isDimensionExists = new boolean[queryDimensions.size()]; @@ -82,7 +84,7 @@ public class RestructureUtil { queryDimension.getDimension().getDataType(); } else { for (CarbonDimension tableDimension : tableBlockDimensions) { - if (tableDimension.getColumnId().equals(queryDimension.getDimension().getColumnId())) { + if (isColumnMatches(isUnManagedTable, queryDimension.getDimension(), tableDimension)) { ProjectionDimension currentBlockDimension = new ProjectionDimension(tableDimension); tableDimension.getColumnSchema() .setPrecision(queryDimension.getDimension().getColumnSchema().getPrecision()); @@ -104,7 +106,7 @@ public class RestructureUtil { continue; } for (CarbonDimension tableDimension : tableComplexDimension) { - if (tableDimension.getColumnId().equals(queryDimension.getDimension().getColumnId())) { + if (isColumnMatches(isUnManagedTable, queryDimension.getDimension(), tableDimension)) { ProjectionDimension currentBlockDimension = new ProjectionDimension(tableDimension); // TODO: for complex dimension set scale and precision by traversing // the child dimensions @@ -141,6 +143,22 @@ public class RestructureUtil { } /** + * Match the columns for managed and unmanaged tables + * @param isUnManagedTable + * @param queryColumn + * @param tableColumn + * @return + */ + private static boolean isColumnMatches(boolean isUnManagedTable, + CarbonColumn queryColumn, CarbonColumn tableColumn) { + // If it is unmanaged table just check the column names, no need to validate column id as + // multiple sdk's output placed in a single folder doesn't have same column ID but can + // have same column name + return (tableColumn.getColumnId().equals(queryColumn.getColumnId()) || + (isUnManagedTable && tableColumn.getColName().equals(queryColumn.getColName()))); + } + + /** * This method will validate and return the default value to be * filled at the time of result preparation * @@ -337,11 +355,12 @@ public class RestructureUtil { * @param blockExecutionInfo * @param queryMeasures measures present in query * @param currentBlockMeasures current block measures + * @param isUnManagedTable * @return measures present in the block */ public static List<ProjectionMeasure> createMeasureInfoAndGetCurrentBlockQueryMeasures( BlockExecutionInfo blockExecutionInfo, List<ProjectionMeasure> queryMeasures, - List<CarbonMeasure> currentBlockMeasures) { + List<CarbonMeasure> currentBlockMeasures, boolean isUnManagedTable) { MeasureInfo measureInfo = new MeasureInfo(); List<ProjectionMeasure> presentMeasure = new ArrayList<>(queryMeasures.size()); int numberOfMeasureInQuery = queryMeasures.size(); @@ -354,7 +373,7 @@ public class RestructureUtil { // then setting measure exists is true // otherwise adding a default value of a measure for (CarbonMeasure carbonMeasure : currentBlockMeasures) { - if (carbonMeasure.getColumnId().equals(queryMeasure.getMeasure().getColumnId())) { + if (isColumnMatches(isUnManagedTable, carbonMeasure, queryMeasure.getMeasure())) { ProjectionMeasure currentBlockMeasure = new ProjectionMeasure(carbonMeasure); carbonMeasure.getColumnSchema().setDataType(queryMeasure.getMeasure().getDataType()); carbonMeasure.getColumnSchema().setPrecision(queryMeasure.getMeasure().getPrecision()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf1e4d4c/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java b/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java index 163580d..cb80cd3 100644 --- a/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java +++ b/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java @@ -92,7 +92,7 @@ public class RestructureUtilTest { List<ProjectionDimension> result = null; result = RestructureUtil .createDimensionInfoAndGetCurrentBlockQueryDimension(blockExecutionInfo, queryDimensions, - tableBlockDimensions, tableComplexDimensions, queryMeasures.size()); + tableBlockDimensions, tableComplexDimensions, queryMeasures.size(), false); List<CarbonDimension> resultDimension = new ArrayList<>(result.size()); for (ProjectionDimension queryDimension : result) { resultDimension.add(queryDimension.getDimension()); @@ -127,7 +127,7 @@ public class RestructureUtilTest { List<ProjectionMeasure> queryMeasures = Arrays.asList(queryMeasure1, queryMeasure2, queryMeasure3); BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo(); RestructureUtil.createMeasureInfoAndGetCurrentBlockQueryMeasures(blockExecutionInfo, queryMeasures, - currentBlockMeasures); + currentBlockMeasures, false); MeasureInfo measureInfo = blockExecutionInfo.getMeasureInfo(); boolean[] measuresExist = { true, true, false }; assertThat(measureInfo.getMeasureExists(), is(equalTo(measuresExist))); http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf1e4d4c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala index bfb9471..a6ee807 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala @@ -43,11 +43,24 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll { //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? writerPath = writerPath.replace("\\", "/"); - // prepare SDK writer output - def buildTestData(persistSchema: Boolean, outputMultipleFiles: Boolean): Any = { + def buildTestDataSingleFile(): Any = { + FileUtils.deleteDirectory(new File(writerPath)) + buildTestData(3,false) + } + + def buildTestDataMultipleFiles(): Any = { + FileUtils.deleteDirectory(new File(writerPath)) + buildTestData(1000000,false) + } + def buildTestDataTwice(): Any = { FileUtils.deleteDirectory(new File(writerPath)) + buildTestData(3,false) + buildTestData(3,false) + } + // prepare sdk writer output + def buildTestData(rows:Int, persistSchema:Boolean): Any = { val schema = new StringBuilder() .append("[ \n") .append(" {\"name\":\"string\"},\n") @@ -68,15 +81,11 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll { } else { builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).unManagedTable(true) .uniqueIdentifier( - System.currentTimeMillis).withBlockSize(1).withBlockletSize(1) + System.currentTimeMillis).withBlockSize(2) .buildWriterForCSVInput() } var i = 0 - var row = 3 - if (outputMultipleFiles) { - row = 1000000 - } - while (i < row) { + while (i < rows) { writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) i += 1 } @@ -116,7 +125,7 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll { test("test create External Table with Schema with partition, should ignore schema and partition") { - buildTestData(false, false) + buildTestDataSingleFile() assert(new File(writerPath).exists()) sql("DROP TABLE IF EXISTS sdkOutputTable") @@ -137,7 +146,7 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll { } test("read unmanaged table, files written from sdk Writer Output)") { - buildTestData(false, false) + buildTestDataSingleFile() assert(new File(writerPath).exists()) sql("DROP TABLE IF EXISTS sdkOutputTable1") @@ -178,7 +187,7 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll { } test("Test Blocked operations for unmanaged table ") { - buildTestData(false, false) + buildTestDataSingleFile() assert(new File(writerPath).exists()) sql("DROP TABLE IF EXISTS sdkOutputTable") @@ -244,14 +253,14 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll { assert(exception.getMessage() .contains("Unsupported operation on unmanaged table")) - //9. Update Segment + //9. Update column exception = intercept[MalformedCarbonCommandException] { sql("UPDATE sdkOutputTable SET (age) = (age + 9) ").show(false) } assert(exception.getMessage() .contains("Unsupported operation on unmanaged table")) - //10. Delete Segment + //10. Delete column exception = intercept[MalformedCarbonCommandException] { sql("DELETE FROM sdkOutputTable where name='robot1'").show(false) } @@ -266,16 +275,23 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll { .contains("Unsupported operation on unmanaged table")) //12. Streaming table creation - // External table don't accept table properties + // No need as External table don't accept table properties + + //13. Alter table rename command + exception = intercept[MalformedCarbonCommandException] { + sql("ALTER TABLE sdkOutputTable RENAME to newTable") + } + assert(exception.getMessage() + .contains("Unsupported operation on unmanaged table")) sql("DROP TABLE sdkOutputTable") - // drop table should not delete the files + //drop table should not delete the files assert(new File(writerPath).exists()) cleanTestData() } test("test create External Table With Schema, should ignore the schema provided") { - buildTestData(false, false) + buildTestDataSingleFile() assert(new File(writerPath).exists()) sql("DROP TABLE IF EXISTS sdkOutputTable") @@ -296,7 +312,7 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll { } test("Read sdk writer output file without Carbondata file should fail") { - buildTestData(false, false) + buildTestDataSingleFile() deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT) assert(new File(writerPath).exists()) sql("DROP TABLE IF EXISTS sdkOutputTable") @@ -317,7 +333,7 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll { test("Read sdk writer output file without any file should fail") { - buildTestData(false, false) + buildTestDataSingleFile() deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT) deleteFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT) assert(new File(writerPath).exists()) @@ -340,7 +356,7 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll { } test("Read sdk writer output multiple files ") { - buildTestData(false, true) + buildTestDataMultipleFiles() assert(new File(writerPath).exists()) val folder = new File(writerPath) val dataFiles = folder.listFiles(new FileFilter() { @@ -365,5 +381,28 @@ class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll { cleanTestData() } + test("Read two sdk writer outputs with same column name placed in same folder") { + buildTestDataTwice() + assert(new File(writerPath).exists()) + + sql("DROP TABLE IF EXISTS sdkOutputTable") + + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION + |'$writerPath' """.stripMargin) + + + checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0), + Row("robot1", 1, 0.5), + Row("robot2", 2, 1.0), + Row("robot0", 0, 0.0), + Row("robot1", 1, 0.5), + Row("robot2", 2, 1.0))) + + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf1e4d4c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala index 05c0059..e349e93 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala @@ -73,6 +73,14 @@ private[sql] case class CarbonAlterTableRenameCommand( s"Table $oldDatabaseName.$oldTableName does not exist") throwMetadataException(oldDatabaseName, oldTableName, "Table does not exist") } + + var oldCarbonTable: CarbonTable = null + oldCarbonTable = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession) + .asInstanceOf[CarbonRelation].carbonTable + if (oldCarbonTable.getTableInfo.isUnManagedTable) { + throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table") + } + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK, LockUsage.DELETE_SEGMENT_LOCK, http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf1e4d4c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index 0075c13..36b6d96 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -474,7 +474,7 @@ class CarbonFileMetastore extends CarbonMetaStore { sparkSession.sessionState.catalog.refreshTable(tableIdentifier) DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier) } else { - if (isUnmanagedCarbonTable(absoluteTableIdentifier, sparkSession)) { + if (isUnmanagedCarbonTable(absoluteTableIdentifier)) { removeTableFromMetadata(dbName, tableName) CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession) // discard cached table info in cachedDataSourceTables @@ -486,22 +486,9 @@ class CarbonFileMetastore extends CarbonMetaStore { } - def isUnmanagedCarbonTable(identifier: AbsoluteTableIdentifier, - sparkSession: SparkSession): Boolean = { - if (sparkSession.sessionState.catalog.listTables(identifier.getDatabaseName) - .exists(_.table.equalsIgnoreCase(identifier.getTableName))) { - - val table = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog] - .getCarbonEnv().carbonMetastore - .getTableFromMetadataCache(identifier.getDatabaseName, identifier.getTableName) - - table match { - case null => false - case _ => table.get.getTableInfo.isUnManagedTable - } - } else { - false - } + def isUnmanagedCarbonTable(identifier: AbsoluteTableIdentifier): Boolean = { + val table = getTableFromMetadataCache(identifier.getDatabaseName, identifier.getTableName); + table.map(_.getTableInfo.isUnManagedTable).getOrElse(false) } private def getTimestampFileAndType() = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf1e4d4c/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index f70e165..4e09553 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -58,44 +58,76 @@ public class CarbonWriterBuilder { private boolean isUnManagedTable; private long UUID; + /** + * prepares the builder with the schema provided + * @param schema is instance of Schema + * @return updated CarbonWriterBuilder + */ public CarbonWriterBuilder withSchema(Schema schema) { Objects.requireNonNull(schema, "schema should not be null"); this.schema = schema; return this; } + /** + * Sets the output path of the writer builder + * @param path is the absolute path where output files are written + * @return updated CarbonWriterBuilder + */ public CarbonWriterBuilder outputPath(String path) { Objects.requireNonNull(path, "path should not be null"); this.path = path; return this; } + /** + * sets the list of columns that needs to be in sorted order + * @param sortColumns is a string array of columns that needs to be sorted + * @return updated CarbonWriterBuilder + */ public CarbonWriterBuilder sortBy(String[] sortColumns) { this.sortColumns = sortColumns; return this; } - public CarbonWriterBuilder partitionBy(String[] partitionColumns) { - throw new UnsupportedOperationException(); - } - + /** + * If set, create a schema file in metadata folder. + * @param persist is a boolean value, If set, create a schema file in metadata folder + * @return updated CarbonWriterBuilder + */ public CarbonWriterBuilder persistSchemaFile(boolean persist) { this.persistSchemaFile = persist; return this; } + /** + * If set true, writes the carbondata and carbonindex files in a flat folder structure + * @param isUnManagedTable is a boolelan value if set writes + * the carbondata and carbonindex files in a flat folder structure + * @return updated CarbonWriterBuilder + */ public CarbonWriterBuilder unManagedTable(boolean isUnManagedTable) { Objects.requireNonNull(isUnManagedTable, "UnManaged Table should not be null"); this.isUnManagedTable = isUnManagedTable; return this; } + /** + * to set the timestamp in the carbondata and carbonindex index files + * @param UUID is a timestamp to be used in the carbondata and carbonindex index files + * @return updated CarbonWriterBuilder + */ public CarbonWriterBuilder uniqueIdentifier(long UUID) { Objects.requireNonNull(UUID, "Unique Identifier should not be null"); this.UUID = UUID; return this; } + /** + * To set the carbondata file size in MB between 1MB-2048MB + * @param blockSize is size in MB between 1MB to 2048 MB + * @return updated CarbonWriterBuilder + */ public CarbonWriterBuilder withBlockSize(int blockSize) { if (blockSize <= 0 || blockSize > 2048) { throw new IllegalArgumentException("blockSize should be between 1 MB to 2048 MB"); @@ -104,6 +136,11 @@ public class CarbonWriterBuilder { return this; } + /** + * To set the blocklet size of carbondata file + * @param blockletSize is blocklet size in MB + * @return updated CarbonWriterBuilder + */ public CarbonWriterBuilder withBlockletSize(int blockletSize) { if (blockletSize <= 0) { throw new IllegalArgumentException("blockletSize should be greater than zero"); @@ -151,10 +188,14 @@ public class CarbonWriterBuilder { */ private CarbonTable buildCarbonTable() { TableSchemaBuilder tableSchemaBuilder = TableSchema.builder(); - if (blockletSize > 0) { + if (blockSize > 0) { tableSchemaBuilder = tableSchemaBuilder.blockSize(blockSize); } + if (blockletSize > 0) { + tableSchemaBuilder = tableSchemaBuilder.blockletSize(blockletSize); + } + List<String> sortColumnsList; if (sortColumns != null) { sortColumnsList = Arrays.asList(sortColumns);