[CARBONDATA-2224][File Level Reader Support] External File level reader support
File level reader reads any carbondata file placed in any external file path. This closes #2055 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7a124ecd Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7a124ecd Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7a124ecd Branch: refs/heads/carbonfile Commit: 7a124ecd87769c0197ae67a0726c5abb4745d3a8 Parents: a386f1f Author: sounakr <soun...@gmail.com> Authored: Sat Feb 24 07:55:14 2018 +0530 Committer: Jacky Li <jacky.li...@qq.com> Committed: Fri Mar 16 17:33:43 2018 +0800 ---------------------------------------------------------------------- .../core/metadata/schema/table/CarbonTable.java | 6 + .../apache/carbondata/core/util/CarbonUtil.java | 209 +++++- .../hadoop/api/CarbonFileInputFormat.java | 682 +++++++++++++++++++ .../carbondata/hadoop/util/SchemaReader.java | 17 +- integration/spark-common-test/pom.xml | 6 + ...FileInputFormatWithExternalCarbonTable.scala | 240 +++++++ ...tCreateTableUsingSparkCarbonFileFormat.scala | 327 +++++++++ ...tSparkCarbonFileFormatWithSparkSession.scala | 176 +++++ .../carbondata/spark/rdd/CarbonScanRDD.scala | 64 +- .../VectorizedCarbonRecordReader.java | 22 +- .../management/CarbonLoadDataCommand.scala | 4 +- .../command/table/CarbonDropTableCommand.scala | 2 +- .../datasources/CarbonFileFormat.scala | 443 ------------ .../datasources/SparkCarbonFileFormat.scala | 269 ++++++++ .../datasources/SparkCarbonTableFormat.scala | 443 ++++++++++++ .../sql/execution/strategy/DDLStrategy.scala | 27 +- .../spark/sql/hive/CarbonAnalysisRules.scala | 2 +- .../spark/sql/parser/CarbonSparkSqlParser.scala | 47 +- .../spark/sql/hive/CarbonSessionState.scala | 17 +- .../spark/sql/hive/CarbonSessionState.scala | 16 +- ....apache.spark.sql.sources.DataSourceRegister | 3 +- .../sdk/file/CSVCarbonWriterSuite.java | 6 +- 22 files changed, 2522 insertions(+), 506 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index f14672f..278dc96 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -826,6 +826,12 @@ public class CarbonTable implements Serializable { return external != null && external.equalsIgnoreCase("true"); } + public boolean isFileLevelExternalTable() { + String external = tableInfo.getFactTable().getTableProperties().get("_filelevelexternal"); + return external != null && external.equalsIgnoreCase("true"); + } + + public long size() throws IOException { Map<String, Long> dataIndexSize = CarbonUtil.calculateDataIndexSize(this); Long dataSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE); http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index b961b60..ff49edf 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -41,6 +41,7 @@ import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel; import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.exception.InvalidConfigurationException; import org.apache.carbondata.core.indexstore.BlockletDetailInfo; @@ -52,18 +53,26 @@ import org.apache.carbondata.core.metadata.SegmentFileStore; import org.apache.carbondata.core.metadata.ValueEncoderMeta; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; import org.apache.carbondata.core.metadata.blocklet.SegmentInfo; +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypeAdapter; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.datatype.DecimalType; import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.SchemaEvolution; +import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry; import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; +import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier; import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.metadata.schema.table.TableSchema; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation; import org.apache.carbondata.core.mutate.UpdateVO; +import org.apache.carbondata.core.reader.CarbonHeaderReader; import org.apache.carbondata.core.reader.ThriftReader; import org.apache.carbondata.core.reader.ThriftReader.TBaseCreator; import org.apache.carbondata.core.scan.model.ProjectionDimension; @@ -77,6 +86,8 @@ import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.format.BlockletHeader; import org.apache.carbondata.format.DataChunk2; import org.apache.carbondata.format.DataChunk3; +import org.apache.carbondata.format.FileHeader; + import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -1279,7 +1290,7 @@ public final class CarbonUtil { int counter = 0; for (int i = 0; i < wrapperColumnSchemaList.size(); i++) { if (CarbonUtil.hasEncoding(wrapperColumnSchemaList.get(i).getEncodingList(), - org.apache.carbondata.core.metadata.encoder.Encoding.DICTIONARY)) { + Encoding.DICTIONARY)) { cardinality.add(dictionaryColumnCardinality[counter]); counter++; } else if (!wrapperColumnSchemaList.get(i).isDimensionColumn()) { @@ -2068,6 +2079,202 @@ public final class CarbonUtil { return tableInfo; } + public static ColumnSchema thriftColumnSchmeaToWrapperColumnSchema( + org.apache.carbondata.format.ColumnSchema externalColumnSchema) { + ColumnSchema wrapperColumnSchema = new ColumnSchema(); + wrapperColumnSchema.setColumnUniqueId(externalColumnSchema.getColumn_id()); + wrapperColumnSchema.setColumnName(externalColumnSchema.getColumn_name()); + wrapperColumnSchema.setColumnar(externalColumnSchema.isColumnar()); + DataType dataType = thriftDataTyopeToWrapperDataType(externalColumnSchema.data_type); + if (DataTypes.isDecimal(dataType)) { + DecimalType decimalType = (DecimalType) dataType; + decimalType.setPrecision(externalColumnSchema.getPrecision()); + decimalType.setScale(externalColumnSchema.getScale()); + } + wrapperColumnSchema.setDataType(dataType); + wrapperColumnSchema.setDimensionColumn(externalColumnSchema.isDimension()); + List<Encoding> encoders = new ArrayList<Encoding>(); + for (org.apache.carbondata.format.Encoding encoder : externalColumnSchema.getEncoders()) { + encoders.add(fromExternalToWrapperEncoding(encoder)); + } + wrapperColumnSchema.setEncodingList(encoders); + wrapperColumnSchema.setNumberOfChild(externalColumnSchema.getNum_child()); + wrapperColumnSchema.setPrecision(externalColumnSchema.getPrecision()); + wrapperColumnSchema.setColumnGroup(externalColumnSchema.getColumn_group_id()); + wrapperColumnSchema.setScale(externalColumnSchema.getScale()); + wrapperColumnSchema.setDefaultValue(externalColumnSchema.getDefault_value()); + wrapperColumnSchema.setSchemaOrdinal(externalColumnSchema.getSchemaOrdinal()); + Map<String, String> properties = externalColumnSchema.getColumnProperties(); + if (properties != null) { + if (properties.get(CarbonCommonConstants.SORT_COLUMNS) != null) { + wrapperColumnSchema.setSortColumn(true); + } + } + wrapperColumnSchema.setFunction(externalColumnSchema.getAggregate_function()); + List<org.apache.carbondata.format.ParentColumnTableRelation> parentColumnTableRelation = + externalColumnSchema.getParentColumnTableRelations(); + if (null != parentColumnTableRelation) { + wrapperColumnSchema.setParentColumnTableRelations( + fromThriftToWrapperParentTableColumnRelations(parentColumnTableRelation)); + } + return wrapperColumnSchema; + } + + static List<ParentColumnTableRelation> fromThriftToWrapperParentTableColumnRelations( + List<org.apache.carbondata.format.ParentColumnTableRelation> thirftParentColumnRelation) { + List<ParentColumnTableRelation> parentColumnTableRelationList = new ArrayList<>(); + for (org.apache.carbondata.format.ParentColumnTableRelation carbonTableRelation : + thirftParentColumnRelation) { + RelationIdentifier relationIdentifier = + new RelationIdentifier(carbonTableRelation.getRelationIdentifier().getDatabaseName(), + carbonTableRelation.getRelationIdentifier().getTableName(), + carbonTableRelation.getRelationIdentifier().getTableId()); + ParentColumnTableRelation parentColumnTableRelation = + new ParentColumnTableRelation(relationIdentifier, carbonTableRelation.getColumnId(), + carbonTableRelation.getColumnName()); + parentColumnTableRelationList.add(parentColumnTableRelation); + } + return parentColumnTableRelationList; + } + + static Encoding fromExternalToWrapperEncoding( + org.apache.carbondata.format.Encoding encoderThrift) { + switch (encoderThrift) { + case DICTIONARY: + return Encoding.DICTIONARY; + case DELTA: + return Encoding.DELTA; + case RLE: + return Encoding.RLE; + case INVERTED_INDEX: + return Encoding.INVERTED_INDEX; + case BIT_PACKED: + return Encoding.BIT_PACKED; + case DIRECT_DICTIONARY: + return Encoding.DIRECT_DICTIONARY; + default: + throw new IllegalArgumentException(encoderThrift.toString() + " is not supported"); + } + } + + static DataType thriftDataTyopeToWrapperDataType( + org.apache.carbondata.format.DataType dataTypeThrift) { + switch (dataTypeThrift) { + case BOOLEAN: + return DataTypes.BOOLEAN; + case STRING: + return DataTypes.STRING; + case SHORT: + return DataTypes.SHORT; + case INT: + return DataTypes.INT; + case LONG: + return DataTypes.LONG; + case DOUBLE: + return DataTypes.DOUBLE; + case DECIMAL: + return DataTypes.createDefaultDecimalType(); + case DATE: + return DataTypes.DATE; + case TIMESTAMP: + return DataTypes.TIMESTAMP; + case ARRAY: + return DataTypes.createDefaultArrayType(); + case STRUCT: + return DataTypes.createDefaultStructType(); + default: + return DataTypes.STRING; + } + } + + public static List<String> getFilePathExternalFilePath(String path) { + + // return the list of carbondata files in the given path. + CarbonFile segment = FileFactory.getCarbonFile(path, FileFactory.getFileType(path)); + CarbonFile[] dataFiles = segment.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + + if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) { + return true; + } + return false; + } + }); + List<String> filePaths = new ArrayList<>(dataFiles.length); + for (CarbonFile dfiles : dataFiles) { + filePaths.add(dfiles.getAbsolutePath()); + } + return filePaths; + } + + /** + * This method will read the schema file from a given path + * + * @param schemaFilePath + * @return + */ + public static org.apache.carbondata.format.TableInfo inferSchemaFileExternalTable( + String carbonDataFilePath, AbsoluteTableIdentifier absoluteTableIdentifier, + boolean schemaExists) throws IOException { + TBaseCreator createTBase = new ThriftReader.TBaseCreator() { + public org.apache.thrift.TBase<org.apache.carbondata.format.TableInfo, + org.apache.carbondata.format.TableInfo._Fields> create() { + return new org.apache.carbondata.format.TableInfo(); + } + }; + if (schemaExists == false) { + List<String> filePaths = + getFilePathExternalFilePath(carbonDataFilePath + "/Fact/Part0/Segment_null"); + String fistFilePath = null; + try { + fistFilePath = filePaths.get(0); + } catch (Exception e) { + LOGGER.error("CarbonData file is not present in the table location"); + } + CarbonHeaderReader carbonHeaderReader = new CarbonHeaderReader(fistFilePath); + FileHeader fileHeader = carbonHeaderReader.readHeader(); + List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>(); + List<org.apache.carbondata.format.ColumnSchema> table_columns = fileHeader.getColumn_schema(); + for (int i = 0; i < table_columns.size(); i++) { + ColumnSchema col = thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)); + col.setColumnReferenceId(col.getColumnUniqueId()); + columnSchemaList.add(col); + } + TableSchema tableSchema = new TableSchema(); + tableSchema.setTableName(absoluteTableIdentifier.getTableName()); + tableSchema.setBucketingInfo(null); + tableSchema.setSchemaEvalution(null); + tableSchema.setTableId(UUID.randomUUID().toString()); + tableSchema.setListOfColumns(columnSchemaList); + + ThriftWrapperSchemaConverterImpl thriftWrapperSchemaConverter = + new ThriftWrapperSchemaConverterImpl(); + SchemaEvolutionEntry schemaEvolutionEntry = new SchemaEvolutionEntry(); + schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis()); + SchemaEvolution schemaEvol = new SchemaEvolution(); + List<SchemaEvolutionEntry> schEntryList = new ArrayList<>(); + schEntryList.add(schemaEvolutionEntry); + schemaEvol.setSchemaEvolutionEntryList(schEntryList); + tableSchema.setSchemaEvalution(schemaEvol); + + org.apache.carbondata.format.TableSchema thriftFactTable = + thriftWrapperSchemaConverter.fromWrapperToExternalTableSchema(tableSchema); + org.apache.carbondata.format.TableInfo tableInfo = + new org.apache.carbondata.format.TableInfo(thriftFactTable, + new ArrayList<org.apache.carbondata.format.TableSchema>()); + + tableInfo.setDataMapSchemas(null); + return tableInfo; + } else { + ThriftReader thriftReader = new ThriftReader(carbonDataFilePath, createTBase); + thriftReader.open(); + org.apache.carbondata.format.TableInfo tableInfo = + (org.apache.carbondata.format.TableInfo) thriftReader.read(); + thriftReader.close(); + return tableInfo; + } + } + public static void dropDatabaseDirectory(String databasePath) throws IOException, InterruptedException { http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java new file mode 100644 index 0000000..b86b1cc --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java @@ -0,0 +1,682 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop.api; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.DataMapChooser; +import org.apache.carbondata.core.datamap.DataMapLevel; +import org.apache.carbondata.core.datamap.Segment; +import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.exception.InvalidConfigurationException; +import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; +import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.ColumnarFormatVersion; +import org.apache.carbondata.core.metadata.schema.PartitionInfo; +import org.apache.carbondata.core.metadata.schema.partition.PartitionType; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.mutate.UpdateVO; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.filter.SingleTableProvider; +import org.apache.carbondata.core.scan.filter.TableProvider; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.stats.QueryStatistic; +import org.apache.carbondata.core.stats.QueryStatisticsConstants; +import org.apache.carbondata.core.stats.QueryStatisticsRecorder; +import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeConverter; +import org.apache.carbondata.core.util.DataTypeConverterImpl; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.hadoop.CarbonInputSplit; +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit; +import org.apache.carbondata.hadoop.CarbonProjection; +import org.apache.carbondata.hadoop.CarbonRecordReader; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.hadoop.util.SchemaReader; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.security.TokenCache; + +/** + * Input format of CarbonData file. + * + * @param <T> + */ +@InterfaceAudience.User +@InterfaceStability.Evolving +public class CarbonFileInputFormat<T> extends FileInputFormat<Void, T> implements Serializable { + + public static final String READ_SUPPORT_CLASS = "carbon.read.support.class"; + // comma separated list of input segment numbers + public static final String INPUT_SEGMENT_NUMBERS = + "mapreduce.input.carboninputformat.segmentnumbers"; + private static final String VALIDATE_INPUT_SEGMENT_IDs = + "mapreduce.input.carboninputformat.validsegments"; + // comma separated list of input files + public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files"; + private static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid"; + private static final Log LOG = LogFactory.getLog(CarbonFileInputFormat.class); + private static final String FILTER_PREDICATE = + "mapreduce.input.carboninputformat.filter.predicate"; + private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection"; + private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo"; + private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport"; + private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter"; + private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr"; + public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName"; + public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName"; + private static final String PARTITIONS_TO_PRUNE = + "mapreduce.input.carboninputformat.partitions.to.prune"; + public static final String UPADTE_T = + "mapreduce.input.carboninputformat.partitions.to.prune"; + + // a cache for carbon table, it will be used in task side + private CarbonTable carbonTable; + + /** + * Set the `tableInfo` in `configuration` + */ + public static void setTableInfo(Configuration configuration, TableInfo tableInfo) + throws IOException { + if (null != tableInfo) { + configuration.set(TABLE_INFO, CarbonUtil.encodeToString(tableInfo.serialize())); + } + } + + /** + * Get TableInfo object from `configuration` + */ + private static TableInfo getTableInfo(Configuration configuration) throws IOException { + String tableInfoStr = configuration.get(TABLE_INFO); + if (tableInfoStr == null) { + return null; + } else { + TableInfo output = new TableInfo(); + output.readFields( + new DataInputStream( + new ByteArrayInputStream(CarbonUtil.decodeStringToBytes(tableInfoStr)))); + return output; + } + } + + + public static void setTablePath(Configuration configuration, String tablePath) { + configuration.set(FileInputFormat.INPUT_DIR, tablePath); + } + + public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) { + configuration.set(ALTER_PARTITION_ID, partitionIds.toString()); + } + + + public static void setDataMapJob(Configuration configuration, DataMapJob dataMapJob) + throws IOException { + if (dataMapJob != null) { + String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob); + configuration.set(DATA_MAP_DSTR, toString); + } + } + + public static DataMapJob getDataMapJob(Configuration configuration) throws IOException { + String jobString = configuration.get(DATA_MAP_DSTR); + if (jobString != null) { + return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString); + } + return null; + } + + /** + * It sets unresolved filter expression. + * + * @param configuration + * @param filterExpression + */ + public static void setFilterPredicates(Configuration configuration, Expression filterExpression) { + if (filterExpression == null) { + return; + } + try { + String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression); + configuration.set(FILTER_PREDICATE, filterString); + } catch (Exception e) { + throw new RuntimeException("Error while setting filter expression to Job", e); + } + } + + public static void setColumnProjection(Configuration configuration, CarbonProjection projection) { + if (projection == null || projection.isEmpty()) { + return; + } + String[] allColumns = projection.getAllColumns(); + StringBuilder builder = new StringBuilder(); + for (String column : allColumns) { + builder.append(column).append(","); + } + String columnString = builder.toString(); + columnString = columnString.substring(0, columnString.length() - 1); + configuration.set(COLUMN_PROJECTION, columnString); + } + + public static String getColumnProjection(Configuration configuration) { + return configuration.get(COLUMN_PROJECTION); + } + + + /** + * Set list of segments to access + */ + public static void setSegmentsToAccess(Configuration configuration, List<Segment> validSegments) { + configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.convertToString(validSegments)); + } + + /** + * Set `CARBON_INPUT_SEGMENTS` from property to configuration + */ + public static void setQuerySegment(Configuration conf, AbsoluteTableIdentifier identifier) { + String dbName = identifier.getCarbonTableIdentifier().getDatabaseName().toLowerCase(); + String tbName = identifier.getCarbonTableIdentifier().getTableName().toLowerCase(); + String segmentNumbersFromProperty = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbName + "." + tbName, "*"); + if (!segmentNumbersFromProperty.trim().equals("*")) { + CarbonFileInputFormat + .setSegmentsToAccess(conf, Segment.toSegmentList(segmentNumbersFromProperty.split(","))); + } + } + + /** + * set list of segment to access + */ + public static void setValidateSegmentsToAccess(Configuration configuration, Boolean validate) { + configuration.set(CarbonFileInputFormat.VALIDATE_INPUT_SEGMENT_IDs, validate.toString()); + } + + /** + * get list of segment to access + */ + public static boolean getValidateSegmentsToAccess(Configuration configuration) { + return configuration.get(CarbonFileInputFormat.VALIDATE_INPUT_SEGMENT_IDs, "true") + .equalsIgnoreCase("true"); + } + + /** + * set list of partitions to prune + */ + public static void setPartitionsToPrune(Configuration configuration, + List<PartitionSpec> partitions) { + if (partitions == null) { + return; + } + try { + String partitionString = + ObjectSerializationUtil.convertObjectToString(new ArrayList<>(partitions)); + configuration.set(PARTITIONS_TO_PRUNE, partitionString); + } catch (Exception e) { + throw new RuntimeException("Error while setting patition information to Job", e); + } + } + + /** + * get list of partitions to prune + */ + private static List<PartitionSpec> getPartitionsToPrune(Configuration configuration) + throws IOException { + String partitionString = configuration.get(PARTITIONS_TO_PRUNE); + if (partitionString != null) { + return (List<PartitionSpec>) ObjectSerializationUtil.convertStringToObject(partitionString); + } + return null; + } + + public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) + throws IOException { + String tablePath = configuration.get(INPUT_DIR, ""); + try { + return AbsoluteTableIdentifier + .from(tablePath, getDatabaseName(configuration), getTableName(configuration)); + } catch (InvalidConfigurationException e) { + throw new IOException(e); + } + } + + /** + * {@inheritDoc} + * Configurations FileInputFormat.INPUT_DIR + * are used to get table path to read. + * + * @param job + * @return List<InputSplit> list of CarbonInputSplit + * @throws IOException + */ + @Override + public List<InputSplit> getSplits(JobContext job) throws IOException { + AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration()); + CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration()); + if (null == carbonTable) { + throw new IOException("Missing/Corrupt schema file for table."); + } + // TableDataMap blockletMap = DataMapStoreManager.getInstance() + // .getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class.getName()); + + if (getValidateSegmentsToAccess(job.getConfiguration())) { + // get all valid segments and set them into the configuration + // check for externalTable segment (Segment_null) + // process and resolve the expression + Expression filter = getFilterPredicates(job.getConfiguration()); + TableProvider tableProvider = new SingleTableProvider(carbonTable); + // this will be null in case of corrupt schema file. + PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName()); + CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, null, null); + + FilterResolverIntf filterInterface = CarbonInputFormatUtil + .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider); + + String segmentDir = CarbonTablePath.getSegmentPath(identifier.getTablePath(), "null"); + FileFactory.FileType fileType = FileFactory.getFileType(segmentDir); + if (FileFactory.isFileExist(segmentDir, fileType)) { + // if external table Segments are found, add it to the List + List<Segment> externalTableSegments = new ArrayList<Segment>(); + Segment seg = new Segment("null", null); + externalTableSegments.add(seg); + + Map<String, String> indexFiles = + new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir); + + if (indexFiles.size() == 0) { + throw new RuntimeException("Index file not present to read the carbondata file"); + } + // do block filtering and get split + List<InputSplit> splits = + getSplits(job, filterInterface, externalTableSegments, null, partitionInfo, null); + + return splits; + } + } + return null; + } + + + + /** + * {@inheritDoc} + * Configurations FileInputFormat.INPUT_DIR, CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS + * are used to get table path to read. + * + * @return + * @throws IOException + */ + private List<InputSplit> getSplits(JobContext job, FilterResolverIntf filterResolver, + List<Segment> validSegments, BitSet matchedPartitions, PartitionInfo partitionInfo, + List<Integer> oldPartitionIdList) throws IOException { + + List<InputSplit> result = new LinkedList<InputSplit>(); + UpdateVO invalidBlockVOForSegmentId = null; + Boolean isIUDTable = false; + + AbsoluteTableIdentifier absoluteTableIdentifier = + getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier(); + SegmentUpdateStatusManager updateStatusManager = + new SegmentUpdateStatusManager(absoluteTableIdentifier); + + isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0); + + // for each segment fetch blocks matching filter in Driver BTree + List<CarbonInputSplit> dataBlocksOfSegment = + getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, matchedPartitions, + validSegments, partitionInfo, oldPartitionIdList); + for (CarbonInputSplit inputSplit : dataBlocksOfSegment) { + + // Get the UpdateVO for those tables on which IUD operations being performed. + if (isIUDTable) { + invalidBlockVOForSegmentId = + updateStatusManager.getInvalidTimestampRange(inputSplit.getSegmentId()); + } + String[] deleteDeltaFilePath = null; + if (isIUDTable) { + // In case IUD is not performed in this table avoid searching for + // invalidated blocks. + if (CarbonUtil + .isInvalidTableBlock(inputSplit.getSegmentId(), inputSplit.getPath().toString(), + invalidBlockVOForSegmentId, updateStatusManager)) { + continue; + } + // When iud is done then only get delete delta files for a block + try { + deleteDeltaFilePath = updateStatusManager + .getDeleteDeltaFilePath(inputSplit.getPath().toString(), inputSplit.getSegmentId()); + } catch (Exception e) { + throw new IOException(e); + } + } + inputSplit.setDeleteDeltaFiles(deleteDeltaFilePath); + result.add(inputSplit); + } + return result; + } + + protected Expression getFilterPredicates(Configuration configuration) { + try { + String filterExprString = configuration.get(FILTER_PREDICATE); + if (filterExprString == null) { + return null; + } + Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString); + return (Expression) filter; + } catch (IOException e) { + throw new RuntimeException("Error while reading filter expression", e); + } + } + + /** + * get data blocks of given segment + */ + private List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job, + AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver, + BitSet matchedPartitions, List<Segment> segmentIds, PartitionInfo partitionInfo, + List<Integer> oldPartitionIdList) throws IOException { + + QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder(); + QueryStatistic statistic = new QueryStatistic(); + + // get tokens for all the required FileSystem for table path + TokenCache.obtainTokensForNamenodes(job.getCredentials(), + new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration()); + boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP, + CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT)); + DataMapExprWrapper dataMapExprWrapper = + DataMapChooser.get().choose(getOrCreateCarbonTable(job.getConfiguration()), resolver); + DataMapJob dataMapJob = getDataMapJob(job.getConfiguration()); + List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration()); + List<ExtendedBlocklet> prunedBlocklets; + if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) { + DistributableDataMapFormat datamapDstr = + new DistributableDataMapFormat(absoluteTableIdentifier, dataMapExprWrapper, + segmentIds, partitionsToPrune, + BlockletDataMapFactory.class.getName()); + prunedBlocklets = dataMapJob.execute(datamapDstr, resolver); + // Apply expression on the blocklets. + prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets); + } else { + prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune); + } + + List<CarbonInputSplit> resultFilterredBlocks = new ArrayList<>(); + int partitionIndex = 0; + List<Integer> partitionIdList = new ArrayList<>(); + if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) { + partitionIdList = partitionInfo.getPartitionIds(); + } + for (ExtendedBlocklet blocklet : prunedBlocklets) { + long partitionId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo( + CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath())); + + // OldPartitionIdList is only used in alter table partition command because it change + // partition info first and then read data. + // For other normal query should use newest partitionIdList + if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) { + if (oldPartitionIdList != null) { + partitionIndex = oldPartitionIdList.indexOf((int)partitionId); + } else { + partitionIndex = partitionIdList.indexOf((int)partitionId); + } + } + if (partitionIndex != -1) { + // matchedPartitions variable will be null in two cases as follows + // 1. the table is not a partition table + // 2. the table is a partition table, and all partitions are matched by query + // for partition table, the task id of carbaondata file name is the partition id. + // if this partition is not required, here will skip it. + if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) { + CarbonInputSplit inputSplit = convertToCarbonInputSplit(blocklet); + if (inputSplit != null) { + resultFilterredBlocks.add(inputSplit); + } + } + } + } + statistic + .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis()); + recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id")); + return resultFilterredBlocks; + } + + private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) throws IOException { + CarbonInputSplit split = + CarbonInputSplit.from(blocklet.getSegmentId(), + blocklet.getBlockletId(), new FileSplit(new Path(blocklet.getPath()), 0, + blocklet.getLength(), blocklet.getLocations()), + ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()), + blocklet.getDataMapWriterPath()); + split.setDetailInfo(blocklet.getDetailInfo()); + return split; + } + + @Override + public RecordReader<Void, T> createRecordReader(InputSplit inputSplit, + TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + Configuration configuration = taskAttemptContext.getConfiguration(); + QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext); + CarbonReadSupport<T> readSupport = getReadSupportClass(configuration); + return new CarbonRecordReader<T>(queryModel, readSupport); + } + + public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException { + Configuration configuration = taskAttemptContext.getConfiguration(); + CarbonTable carbonTable = getOrCreateCarbonTable(configuration); + TableProvider tableProvider = new SingleTableProvider(carbonTable); + + // query plan includes projection column + String projectionString = getColumnProjection(configuration); + String[] projectionColumnNames = null; + if (projectionString != null) { + projectionColumnNames = projectionString.split(","); + } + QueryModel queryModel = carbonTable.createQueryWithProjection( + projectionColumnNames, getDataTypeConverter(configuration)); + + // set the filter to the query model in order to filter blocklet before scan + Expression filter = getFilterPredicates(configuration); + boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()]; + // getAllMeasures returns list of visible and invisible columns + boolean[] isFilterMeasures = + new boolean[carbonTable.getAllMeasures().size()]; + CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, isFilterDimensions, + isFilterMeasures); + queryModel.setIsFilterDimensions(isFilterDimensions); + queryModel.setIsFilterMeasures(isFilterMeasures); + FilterResolverIntf filterIntf = CarbonInputFormatUtil + .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider); + queryModel.setFilterExpressionResolverTree(filterIntf); + + // update the file level index store if there are invalid segment + if (inputSplit instanceof CarbonMultiBlockSplit) { + CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit; + List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments(); + if (invalidSegments.size() > 0) { + queryModel.setInvalidSegmentIds(invalidSegments); + } + List<UpdateVO> invalidTimestampRangeList = + split.getAllSplits().get(0).getInvalidTimestampRange(); + if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) { + queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList); + } + } + return queryModel; + } + + private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { + CarbonTable carbonTableTemp; + if (carbonTable == null) { + // carbon table should be created either from deserialized table info (schema saved in + // hive metastore) or by reading schema in HDFS (schema saved in HDFS) + TableInfo tableInfo = getTableInfo(configuration); + CarbonTable localCarbonTable; + if (tableInfo != null) { + localCarbonTable = CarbonTable.buildFromTableInfo(tableInfo); + } else { + String schemaPath = CarbonTablePath + .getSchemaFilePath(getAbsoluteTableIdentifier(configuration).getTablePath()); + if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) { + TableInfo tableInfoInfer = + SchemaReader.inferSchema(getAbsoluteTableIdentifier(configuration)); + localCarbonTable = CarbonTable.buildFromTableInfo(tableInfoInfer); + } else { + localCarbonTable = + SchemaReader.readCarbonTableFromStore(getAbsoluteTableIdentifier(configuration)); + } + } + this.carbonTable = localCarbonTable; + return localCarbonTable; + } else { + carbonTableTemp = this.carbonTable; + return carbonTableTemp; + } + } + + + public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) { + String readSupportClass = configuration.get(CARBON_READ_SUPPORT); + //By default it uses dictionary decoder read class + CarbonReadSupport<T> readSupport = null; + if (readSupportClass != null) { + try { + Class<?> myClass = Class.forName(readSupportClass); + Constructor<?> constructor = myClass.getConstructors()[0]; + Object object = constructor.newInstance(); + if (object instanceof CarbonReadSupport) { + readSupport = (CarbonReadSupport) object; + } + } catch (ClassNotFoundException ex) { + LOG.error("Class " + readSupportClass + "not found", ex); + } catch (Exception ex) { + LOG.error("Error while creating " + readSupportClass, ex); + } + } else { + readSupport = new DictionaryDecodeReadSupport<>(); + } + return readSupport; + } + + @Override + protected boolean isSplitable(JobContext context, Path filename) { + try { + // Don't split the file if it is local file system + FileSystem fileSystem = filename.getFileSystem(context.getConfiguration()); + if (fileSystem instanceof LocalFileSystem) { + return false; + } + } catch (Exception e) { + return true; + } + return true; + } + + /** + * return valid segment to access + */ + private String[] getSegmentsToAccess(JobContext job) { + String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, ""); + if (segmentString.trim().isEmpty()) { + return new String[0]; + } + return segmentString.split(","); + } + + public static DataTypeConverter getDataTypeConverter(Configuration configuration) + throws IOException { + String converter = configuration.get(CARBON_CONVERTER); + if (converter == null) { + return new DataTypeConverterImpl(); + } + return (DataTypeConverter) ObjectSerializationUtil.convertStringToObject(converter); + } + + public static void setDatabaseName(Configuration configuration, String databaseName) { + if (null != databaseName) { + configuration.set(DATABASE_NAME, databaseName); + } + } + + public static String getDatabaseName(Configuration configuration) + throws InvalidConfigurationException { + String databseName = configuration.get(DATABASE_NAME); + if (null == databseName) { + throw new InvalidConfigurationException("Database name is not set."); + } + return databseName; + } + + public static void setTableName(Configuration configuration, String tableName) { + if (null != tableName) { + configuration.set(TABLE_NAME, tableName); + } + } + + public static String getTableName(Configuration configuration) + throws InvalidConfigurationException { + String tableName = configuration.get(TABLE_NAME); + if (tableName == null) { + throw new InvalidConfigurationException("Table name is not set"); + } + return tableName; + } + + public org.apache.hadoop.mapred.RecordReader<Void, T> getRecordReader( + org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter) + throws IOException { + return null; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java index dfa8dd1..ab7c333 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java @@ -28,7 +28,6 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; -import org.apache.carbondata.core.util.path.CarbonTablePath; /** * TODO: It should be removed after store manager implementation. @@ -59,6 +58,7 @@ public class SchemaReader { throw new IOException("File does not exist: " + schemaFilePath); } } + /** * the method returns the Wrapper TableInfo * @@ -79,4 +79,19 @@ public class SchemaReader { carbonTableIdentifier.getTableName(), identifier.getTablePath()); } + + + public static TableInfo inferSchema(AbsoluteTableIdentifier identifier) + throws IOException { + // This routine is going to infer schema from the carbondata file footer + // Convert the ColumnSchema -> TableSchema -> TableInfo. + // Return the TableInfo. + org.apache.carbondata.format.TableInfo tableInfo = + CarbonUtil.inferSchemaFileExternalTable(identifier.getTablePath(), identifier, false); + SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); + TableInfo wrapperTableInfo = schemaConverter + .fromExternalToWrapperTableInfo(tableInfo, identifier.getDatabaseName(), + identifier.getTableName(), identifier.getTablePath()); + return wrapperTableInfo; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark-common-test/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/pom.xml b/integration/spark-common-test/pom.xml index b7f19fd..1c6cee9 100644 --- a/integration/spark-common-test/pom.xml +++ b/integration/spark-common-test/pom.xml @@ -105,6 +105,12 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-store-sdk</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala new file mode 100644 index 0000000..8b1f63f --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.File + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.sdk.file.{CarbonWriter, Schema} + + +class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with BeforeAndAfterAll { + + var writerPath = new File(this.getClass.getResource("/").getPath + + + "../." + + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") + .getCanonicalPath + //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? + writerPath = writerPath.replace("\\", "/"); + + + def buildTestData(persistSchema:Boolean) = { + + FileUtils.deleteDirectory(new File(writerPath)) + + val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString() + + try { + val builder = CarbonWriter.builder() + val writer = + if (persistSchema) { + builder.persistSchemaFile(true) + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } else { + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } + + var i = 0 + while (i < 100) { + writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) + i += 1 + } + writer.close() + } catch { + case ex: Exception => None + case _ => None + } + } + + def cleanTestData() = { + FileUtils.deleteDirectory(new File(writerPath)) + } + + def deleteIndexFile(path: String, extension: String) : Unit = { + val file: CarbonFile = FileFactory + .getCarbonFile(path, FileFactory.getFileType(path)) + + for (eachDir <- file.listFiles) { + if (!eachDir.isDirectory) { + if (eachDir.getName.endsWith(extension)) { + CarbonUtil.deleteFoldersAndFilesSilent(eachDir) + } + } else { + deleteIndexFile(eachDir.getPath, extension) + } + } + } + + override def beforeAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + // create carbon table and insert data + } + + override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + //TO DO, need to remove segment dependency and tableIdentifier Dependency + test("read carbondata files (sdk Writer Output) using the Carbonfile ") { + buildTestData(false) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + //new provider Carbonfile + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION + |'$writerPath' """.stripMargin) + + sql("Describe formatted sdkOutputTable").show(false) + + sql("select * from sdkOutputTable").show(false) + + sql("select * from sdkOutputTable limit 3").show(false) + + sql("select name from sdkOutputTable").show(false) + + sql("select age from sdkOutputTable").show(false) + + sql("select * from sdkOutputTable where age > 2 and age < 8").show(200, false) + + sql("select * from sdkOutputTable where name = 'robot3'").show(200, false) + + sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200, false) + + sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200, false) + + sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200, false) + + sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200, false) + + sql("select count(*) from sdkOutputTable").show(200, false) + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + + test("should not allow to alter datasource carbontable ") { + buildTestData(false) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + //data source file format + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION + |'$writerPath' """.stripMargin) + + val exception = intercept[MalformedCarbonCommandException] + { + sql("Alter table sdkOutputTable change age age BIGINT") + } + assert(exception.getMessage() + .contains("Unsupported alter operation on Carbon external fileformat table")) + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + + test("Read sdk writer output file without index file should fail") { + buildTestData(false) + deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + //data source file format + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION + |'$writerPath' """.stripMargin) + + //org.apache.spark.SparkException: Index file not present to read the carbondata file + val exception = intercept[java.lang.RuntimeException] + { + sql("select * from sdkOutputTable").show(false) + } + assert(exception.getMessage().contains("Index file not present to read the carbondata file")) + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + + + test("Read sdk writer output file without Carbondata file should fail") { + buildTestData(false) + deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + val exception = intercept[Exception] { + // data source file format + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION + |'$writerPath' """.stripMargin) + } + assert(exception.getMessage() + .contains("Operation not allowed: Invalid table path provided:")) + + + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } + + + test("Read sdk writer output file without any file should fail") { + buildTestData(false) + deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT) + deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT) + assert(new File(writerPath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + val exception = intercept[Exception] { + //data source file format + sql( + s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION + |'$writerPath' """.stripMargin) + + sql("select * from sdkOutputTable").show(false) + } + assert(exception.getMessage() + .contains("Operation not allowed: Invalid table path provided:")) + + // drop table should not delete the files + assert(new File(writerPath).exists()) + cleanTestData() + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala new file mode 100644 index 0000000..d284e50 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.File + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.sdk.file.{CarbonWriter, Schema} + +class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAndAfterAll { + + + override def beforeAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + override def afterAll(): Unit = { + sql("DROP TABLE IF EXISTS sdkOutputTable") + } + + var writerPath = new File(this.getClass.getResource("/").getPath + + + "../." + + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") + .getCanonicalPath + //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? + writerPath = writerPath.replace("\\", "/"); + + val filePath = writerPath + "/Fact/Part0/Segment_null/" + + def buildTestData(persistSchema:Boolean) = { + + FileUtils.deleteDirectory(new File(writerPath)) + + val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString() + + try { + val builder = CarbonWriter.builder() + val writer = + if (persistSchema) { + builder.persistSchemaFile(true) + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } else { + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } + + var i = 0 + while (i < 100) { + writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) + i += 1 + } + writer.close() + } catch { + case ex: Exception => None + case _ => None + } + } + + def cleanTestData() = { + FileUtils.deleteDirectory(new File(writerPath)) + } + + def deleteIndexFile(path: String, extension: String) : Unit = { + val file: CarbonFile = FileFactory + .getCarbonFile(path, FileFactory.getFileType(path)) + + for (eachDir <- file.listFiles) { + if (!eachDir.isDirectory) { + if (eachDir.getName.endsWith(extension)) { + CarbonUtil.deleteFoldersAndFilesSilent(eachDir) + } + } else { + deleteIndexFile(eachDir.getPath, extension) + } + } + } + + //TO DO, need to remove segment dependency and tableIdentifier Dependency + test("read carbondata files (sdk Writer Output) using the SparkCarbonFileFormat ") { + buildTestData(false) + assert(new File(filePath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + //data source file format + if (sqlContext.sparkContext.version.startsWith("2.1")) { + //data source file format + sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """) + } else if (sqlContext.sparkContext.version.startsWith("2.2")) { + //data source file format + sql( + s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION + |'$filePath' """.stripMargin) + } else{ + // TO DO + } + + sql("Describe formatted sdkOutputTable").show(false) + + sql("select * from sdkOutputTable").show(false) + + sql("select * from sdkOutputTable limit 3").show(false) + + sql("select name from sdkOutputTable").show(false) + + sql("select age from sdkOutputTable").show(false) + + sql("select * from sdkOutputTable where age > 2 and age < 8").show(200,false) + + sql("select * from sdkOutputTable where name = 'robot3'").show(200,false) + + sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200,false) + + sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200,false) + + sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200,false) + + sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200,false) + + sql("select count(*) from sdkOutputTable").show(200,false) + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(filePath).exists()) + cleanTestData() + } + + + test("should not allow to alter datasource carbontable ") { + buildTestData(false) + assert(new File(filePath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + + if (sqlContext.sparkContext.version.startsWith("2.1")) { + //data source file format + sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """) + } else if (sqlContext.sparkContext.version.startsWith("2.2")) { + //data source file format + sql( + s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION + |'$filePath' """.stripMargin) + } else{ + // TO DO + } + + val exception = intercept[MalformedCarbonCommandException] + { + sql("Alter table sdkOutputTable change age age BIGINT") + } + assert(exception.getMessage().contains("Unsupported alter operation on hive table")) + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(filePath).exists()) + cleanTestData() + } + + test("Read sdk writer output file without Carbondata file should fail") { + buildTestData(false) + deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT) + assert(new File(filePath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + val exception = intercept[org.apache.spark.SparkException] { + // data source file format + if (sqlContext.sparkContext.version.startsWith("2.1")) { + //data source file format + sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """) + } else if (sqlContext.sparkContext.version.startsWith("2.2")) { + //data source file format + sql( + s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION + |'$filePath' """.stripMargin) + } else{ + // TO DO + } + } + assert(exception.getMessage() + .contains("CarbonData file is not present in the location mentioned in DDL")) + + // drop table should not delete the files + assert(new File(filePath).exists()) + cleanTestData() + } + + + test("Read sdk writer output file without any file should fail") { + buildTestData(false) + deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT) + deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT) + assert(new File(filePath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + val exception = intercept[org.apache.spark.SparkException] { + //data source file format + if (sqlContext.sparkContext.version.startsWith("2.1")) { + //data source file format + sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """) + } else if (sqlContext.sparkContext.version.startsWith("2.2")) { + //data source file format + sql( + s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION + |'$filePath' """.stripMargin) + } else{ + // TO DO + } + + sql("select * from sdkOutputTable").show(false) + } + assert(exception.getMessage() + .contains("CarbonData file is not present in the location mentioned in DDL")) + + // drop table should not delete the files + assert(new File(filePath).exists()) + cleanTestData() + } + + test("Read sdk writer output file withSchema") { + buildTestData(true) + assert(new File(filePath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + //data source file format + sql("DROP TABLE IF EXISTS sdkOutputTable") + + if (sqlContext.sparkContext.version.startsWith("2.1")) { + //data source file format + sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """) + } else if (sqlContext.sparkContext.version.startsWith("2.2")) { + //data source file format + sql( + s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION + |'$filePath' """.stripMargin) + } else{ + // TO DO + } + + sql("Describe formatted sdkOutputTable").show(false) + + sql("select * from sdkOutputTable").show(false) + + sql("select * from sdkOutputTable limit 3").show(false) + + sql("select name from sdkOutputTable").show(false) + + sql("select age from sdkOutputTable").show(false) + + sql("select * from sdkOutputTable where age > 2 and age < 8").show(200, false) + + sql("select * from sdkOutputTable where name = 'robot3'").show(200, false) + + sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200, false) + + sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200, false) + + sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200, false) + + sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200, false) + + sql("select count(*) from sdkOutputTable").show(200, false) + + sql("DROP TABLE sdkOutputTable") + + // drop table should not delete the files + assert(new File(filePath).exists()) + cleanTestData() + } + + test("Read sdk writer output file without index file should fail") { + buildTestData(false) + deleteIndexFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT) + assert(new File(filePath).exists()) + sql("DROP TABLE IF EXISTS sdkOutputTable") + + if (sqlContext.sparkContext.version.startsWith("2.1")) { + //data source file format + sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """) + } else if (sqlContext.sparkContext.version.startsWith("2.2")) { + //data source file format + sql( + s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION + |'$filePath' """.stripMargin) + } else{ + // TO DO + } + //org.apache.spark.SparkException: Index file not present to read the carbondata file + val exception = intercept[org.apache.spark.SparkException] + { + sql("select * from sdkOutputTable").show(false) + } + assert(exception.getMessage().contains("Index file not present to read the carbondata file")) + + sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(filePath).exists()) + cleanTestData() + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala new file mode 100644 index 0000000..9a46676 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import java.io.File + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.SparkSession + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.sdk.file.{CarbonWriter, Schema} + + +object TestSparkCarbonFileFormatWithSparkSession { + + var writerPath = new File(this.getClass.getResource("/").getPath + + + "../." + + "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") + .getCanonicalPath + //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? + writerPath = writerPath.replace("\\", "/"); + + val filePath = writerPath + "/Fact/Part0/Segment_null/" + + def buildTestData(persistSchema:Boolean) = { + + FileUtils.deleteDirectory(new File(writerPath)) + + val schema = new StringBuilder() + .append("[ \n") + .append(" {\"name\":\"string\"},\n") + .append(" {\"age\":\"int\"},\n") + .append(" {\"height\":\"double\"}\n") + .append("]") + .toString() + + try { + val builder = CarbonWriter.builder() + val writer = + if (persistSchema) { + builder.persistSchemaFile(true) + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } else { + builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput() + } + + var i = 0 + while (i < 100) { + writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2))) + i += 1 + } + writer.close() + } catch { + case ex: Exception => None + case _ => None + } + } + + def cleanTestData() = { + FileUtils.deleteDirectory(new File(writerPath)) + } + + def deleteIndexFile(path: String, extension: String) : Unit = { + val file: CarbonFile = FileFactory + .getCarbonFile(path, FileFactory.getFileType(path)) + + for (eachDir <- file.listFiles) { + if (!eachDir.isDirectory) { + if (eachDir.getName.endsWith(extension)) { + CarbonUtil.deleteFoldersAndFilesSilent(eachDir) + } + } else { + deleteIndexFile(eachDir.getPath, extension) + } + } + } + + def main(args: Array[String]): Unit = { + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val storeLocation = s"$rootPath/examples/spark2/target/store" + val warehouse = s"$rootPath/examples/spark2/target/warehouse" + val metastoredb = s"$rootPath/examples/spark2/target/metastore_db" + + // clean data folder + if (true) { + val clean = (path: String) => FileUtils.deleteDirectory(new File(path)) + clean(storeLocation) + clean(warehouse) + clean(metastoredb) + } + + val spark = SparkSession + .builder() + .master("local") + .appName("TestSparkCarbonFileFormatWithSparkSession") + .enableHiveSupport() + .config("spark.sql.warehouse.dir", warehouse) + .config("javax.jdo.option.ConnectionURL", + s"jdbc:derby:;databaseName=$metastoredb;create=true") + .getOrCreate() + + CarbonProperties.getInstance() + .addProperty("carbon.storelocation", storeLocation) + + spark.sparkContext.setLogLevel("WARN") + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss") + .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy/MM/dd") + buildTestData(false) + assert(new File(filePath).exists()) + //data source file format + if (spark.sparkContext.version.startsWith("2.1")) { + //data source file format + spark.sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """) + } else if (spark.sparkContext.version.startsWith("2.2")) { + //data source file format + spark.sql( + s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION + |'$filePath' """.stripMargin) + } else{ + // TO DO + } + + spark.sql("Describe formatted sdkOutputTable").show(false) + + spark.sql("select * from sdkOutputTable").show(false) + + spark.sql("select * from sdkOutputTable limit 3").show(false) + + spark.sql("select name from sdkOutputTable").show(false) + + spark.sql("select age from sdkOutputTable").show(false) + + spark.sql("select * from sdkOutputTable where age > 2 and age < 8").show(200,false) + + spark.sql("select * from sdkOutputTable where name = 'robot3'").show(200,false) + + spark.sql("select * from sdkOutputTable where name like 'robo%' limit 5").show(200,false) + + spark.sql("select * from sdkOutputTable where name like '%obot%' limit 2").show(200,false) + + spark.sql("select sum(age) from sdkOutputTable where name like 'robot1%' ").show(200,false) + + spark.sql("select count(*) from sdkOutputTable where name like 'robot%' ").show(200,false) + + spark.sql("select count(*) from sdkOutputTable").show(200,false) + + spark.sql("DROP TABLE sdkOutputTable") + // drop table should not delete the files + assert(new File(filePath).exists()) + cleanTestData() + + spark.stop() + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 49a8023..6afd2c0 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -42,7 +42,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.block.Distributable import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.core.metadata.schema.table.TableInfo +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.scan.expression.Expression import org.apache.carbondata.core.scan.filter.FilterUtil import org.apache.carbondata.core.scan.model.QueryModel @@ -50,7 +50,7 @@ import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstant import org.apache.carbondata.core.statusmanager.FileFormat import org.apache.carbondata.core.util._ import org.apache.carbondata.hadoop._ -import org.apache.carbondata.hadoop.api.CarbonTableInputFormat +import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonTableInputFormat} import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader} import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.spark.InitInputMetrics @@ -90,13 +90,21 @@ class CarbonScanRDD( val jobConf = new JobConf(conf) SparkHadoopUtil.get.addCredentials(jobConf) val job = Job.getInstance(jobConf) - val format = prepareInputFormatForDriver(job.getConfiguration) - + val fileLevelExternal = tableInfo.getFactTable().getTableProperties().get("_filelevelexternal") + val format = if (fileLevelExternal != null && fileLevelExternal.equalsIgnoreCase("true")) { + prepareFileInputFormatForDriver(job.getConfiguration) + } else { + prepareInputFormatForDriver(job.getConfiguration) + } // initialise query_id for job job.getConfiguration.set("query.id", queryId) // get splits val splits = format.getSplits(job) + if ((splits == null) && format.isInstanceOf[CarbonFileInputFormat[Object]]) { + throw new SparkException( + "CarbonData file not exist in the segment_null (SDK writer Output) path") + } // separate split // 1. for batch splits, invoke distributeSplits method to create partitions @@ -113,7 +121,7 @@ class CarbonScanRDD( } val batchPartitions = distributeColumnarSplits(columnarSplits) // check and remove InExpression from filterExpression - checkAndRemoveInExpressinFromFilterExpression(format, batchPartitions) + checkAndRemoveInExpressinFromFilterExpression(batchPartitions) if (streamSplits.isEmpty) { batchPartitions.toArray } else { @@ -354,7 +362,9 @@ class CarbonScanRDD( case _ => // create record reader for CarbonData file format if (vectorReader) { - val carbonRecordReader = createVectorizedCarbonRecordReader(model, inputMetricsStats) + val carbonRecordReader = createVectorizedCarbonRecordReader(model, + inputMetricsStats, + "true") if (carbonRecordReader == null) { new CarbonRecordReader(model, format.getReadSupportClass(attemptContext.getConfiguration), inputMetricsStats) @@ -431,6 +441,16 @@ class CarbonScanRDD( createInputFormat(conf) } + def prepareFileInputFormatForDriver(conf: Configuration): CarbonFileInputFormat[Object] = { + CarbonFileInputFormat.setTableInfo(conf, tableInfo) + CarbonFileInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName) + CarbonFileInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName) + if (partitionNames != null) { + CarbonFileInputFormat.setPartitionsToPrune(conf, partitionNames.asJava) + } + createFileInputFormat(conf) + } + private def prepareInputFormatForExecutor(conf: Configuration): CarbonTableInputFormat[Object] = { CarbonTableInputFormat.setCarbonReadSupport(conf, readSupport) val tableInfo1 = getTableInfo @@ -441,6 +461,32 @@ class CarbonScanRDD( createInputFormat(conf) } + private def createFileInputFormat(conf: Configuration): CarbonFileInputFormat[Object] = { + val format = new CarbonFileInputFormat[Object] + CarbonFileInputFormat.setTablePath(conf, + identifier.appendWithLocalPrefix(identifier.getTablePath)) + CarbonFileInputFormat.setQuerySegment(conf, identifier) + CarbonFileInputFormat.setFilterPredicates(conf, filterExpression) + CarbonFileInputFormat.setColumnProjection(conf, columnProjection) + CarbonFileInputFormat.setDataMapJob(conf, new SparkDataMapJob) + if (CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP, + CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) { + CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob) + } + + // when validate segments is disabled in thread local update it to CarbonTableInputFormat + val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo + if (carbonSessionInfo != null) { + CarbonTableInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams + .getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + + identifier.getCarbonTableIdentifier.getDatabaseName + "." + + identifier.getCarbonTableIdentifier.getTableName, "true").toBoolean) + } + format + } + + private def createInputFormat(conf: Configuration): CarbonTableInputFormat[Object] = { val format = new CarbonTableInputFormat[Object] CarbonTableInputFormat.setTablePath(conf, @@ -485,7 +531,6 @@ class CarbonScanRDD( * @param identifiedPartitions */ private def checkAndRemoveInExpressinFromFilterExpression( - format: CarbonTableInputFormat[Object], identifiedPartitions: mutable.Buffer[Partition]) = { if (null != filterExpression) { if (identifiedPartitions.nonEmpty && @@ -533,12 +578,13 @@ class CarbonScanRDD( } def createVectorizedCarbonRecordReader(queryModel: QueryModel, - inputMetricsStats: InputMetricsStats): RecordReader[Void, Object] = { + inputMetricsStats: InputMetricsStats, enableBatch: String): RecordReader[Void, Object] = { val name = "org.apache.carbondata.spark.vectorreader.VectorizedCarbonRecordReader" try { val cons = Class.forName(name).getDeclaredConstructors cons.head.setAccessible(true) - cons.head.newInstance(queryModel, inputMetricsStats).asInstanceOf[RecordReader[Void, Object]] + cons.head.newInstance(queryModel, inputMetricsStats, enableBatch) + .asInstanceOf[RecordReader[Void, Object]] } catch { case e: Exception => LOGGER.error(e) http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java index 73da878..903bf44 100644 --- a/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java +++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java @@ -91,10 +91,21 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> { private InputMetricsStats inputMetricsStats; - public VectorizedCarbonRecordReader(QueryModel queryModel, InputMetricsStats inputMetricsStats) { + public VectorizedCarbonRecordReader(QueryModel queryModel, InputMetricsStats inputMetricsStats, + String enableBatch) { this.queryModel = queryModel; this.inputMetricsStats = inputMetricsStats; - enableReturningBatches(); + if (enableBatch.equals("true")) { + enableReturningBatches(); + } + } + + + /* + * Can be called before any rows are returned to enable returning columnar batches directly. + */ + public void enableReturningBatches() { + returnColumnarBatch = true; } /** @@ -273,12 +284,7 @@ class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> { if (columnarBatch == null) initBatch(); } - /* - * Can be called before any rows are returned to enable returning columnar batches directly. - */ - private void enableReturningBatches() { - returnColumnarBatch = true; - } + /** * Advances to the next batch of rows. Returns false if there are no more. http://git-wip-us.apache.org/repos/asf/carbondata/blob/7a124ecd/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index eb00ebf..5fd9639 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort} import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping, UpdateTableModel} -import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, FindDataSourceTable, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FindDataSourceTable, HadoopFsRelation, LogicalRelation, SparkCarbonTableFormat} import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.types._ @@ -1015,7 +1015,7 @@ case class CarbonLoadDataCommand( partitionSchema = partitionSchema, dataSchema = dataSchema, bucketSpec = catalogTable.bucketSpec, - fileFormat = new CarbonFileFormat, + fileFormat = new SparkCarbonTableFormat, options = options.toMap)(sparkSession = sparkSession) CarbonReflectionUtils.getLogicalRelation(hdfsRelation,