http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala index e841bc8..74633e2 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala @@ -24,11 +24,13 @@ import org.apache.spark.sql.{CarbonEnv, CarbonRelation} import org.apache.spark.sql.common.util.CarbonHiveContext import org.apache.spark.sql.common.util.CarbonHiveContext.sql import org.apache.spark.sql.common.util.QueryTest + import org.apache.carbondata.core.carbon.CarbonDataLoadSchema import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.spark.load.CarbonLoadModel import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.processing.model.CarbonLoadModel + /** * Test Case for org.apache.carbondata.spark.util.GlobalDictionaryUtil *
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala b/integration/spark/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala index e9c9471..1990070 100644 --- a/integration/spark/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala +++ b/integration/spark/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala @@ -31,7 +31,8 @@ class LocalSQLContext(val hdfsCarbonBasePath: String) extends CarbonContext(new SparkContext(new SparkConf() .setAppName("CarbonSpark") .setMaster("local[2]") - .set("spark.sql.shuffle.partitions", "20")), + .set("spark.sql.shuffle.partitions", "20") + .set("use_kettle_default", "true")), hdfsCarbonBasePath, hdfsCarbonBasePath) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a06133b..4bdd587 100644 --- a/pom.xml +++ b/pom.xml @@ -109,6 +109,7 @@ <hadoop.version>2.2.0</hadoop.version> <scala.version>2.10.4</scala.version> <kettle.version>4.4.0-stable</kettle.version> + <use.kettle>true</use.kettle> <hadoop.deps.scope>compile</hadoop.deps.scope> <spark.deps.scope>compile</spark.deps.scope> <scala.deps.scope>compile</scala.deps.scope> @@ -336,6 +337,12 @@ <id>include-all</id> </profile> <profile> + <id>no-kettle</id> + <properties> + <use.kettle>false</use.kettle> + </properties> + </profile> + <profile> <id>rat</id> <build> <plugins> http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java index cc5c3a2..09e000f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java @@ -20,7 +20,6 @@ package org.apache.carbondata.processing.csvload; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -39,7 +38,8 @@ import org.apache.carbondata.processing.constants.DataProcessorConstants; import org.apache.carbondata.processing.csvreaderstep.CsvInputMeta; import org.apache.carbondata.processing.dataprocessor.IDataProcessStatus; import org.apache.carbondata.processing.etl.DataLoadingException; -import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordslogger; +import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.exception.KettleException; @@ -97,7 +97,7 @@ public class DataGraphExecuter { private String[] getColumnNames(SchemaInfo schemaInfo, String tableName, String partitionId, CarbonDataLoadSchema schema) { - Set<String> columnNames = GraphExecutionUtil.getSchemaColumnNames(schema, tableName); + Set<String> columnNames = CarbonDataProcessorUtil.getSchemaColumnNames(schema, tableName); return columnNames.toArray(new String[columnNames.size()]); } @@ -210,7 +210,7 @@ public class DataGraphExecuter { if (trans.getErrors() > 0) { LOGGER.error("Graph Execution had errors"); throw new DataLoadingException("Due to internal errors, please check logs for more details."); - } else if (null != BadRecordslogger.hasBadRecord(key)) { + } else if (null != BadRecordsLogger.hasBadRecord(key)) { LOGGER.error("Graph Execution is partcially success"); throw new DataLoadingException(DataProcessorConstants.BAD_REC_FOUND, "Graph Execution is partcially success"); @@ -417,34 +417,6 @@ public class DataGraphExecuter { trans.setLogLevel(LogLevel.NOTHING); } - private void validateHeader(SchemaInfo schemaInfo, String partitionId, - CarbonDataLoadSchema schema) throws DataLoadingException { - String[] columnNames = getColumnNames(schemaInfo, model.getTableName(), partitionId, schema); - String[] csvHeader = model.getCsvHeader().toLowerCase().split(","); - - List<String> csvColumnsList = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN); - - for (String column : csvHeader) { - csvColumnsList.add(column.replaceAll("\"", "").trim()); - } - - int count = 0; - - for (String columns : columnNames) { - if (csvColumnsList.contains(columns.toLowerCase())) { - count++; - } - } - - if (count != columnNames.length) { - LOGGER.error("CSV header provided in DDL is not proper." + - " Column names in schema and CSV header are not the same."); - throw new DataLoadingException(DataProcessorConstants.CSV_VALIDATION_ERRROR_CODE, - "CSV header provided in DDL is not proper. Column names in schema and CSV header are " + - "not the same."); - } - } - /** * This method will validate the both fact as well as dimension csv files. * @@ -503,7 +475,14 @@ public class DataGraphExecuter { } } else if (model.isDirectLoad()) { if (null != model.getCsvHeader() && !model.getCsvHeader().isEmpty()) { - validateHeader(schemaInfo, partitionId, schema); + if (!CarbonDataProcessorUtil + .isHeaderValid(model.getTableName(), model.getCsvHeader(), schema, ",")) { + LOGGER.error("CSV header provided in DDL is not proper." + + " Column names in schema and CSV header are not the same."); + throw new DataLoadingException(DataProcessorConstants.CSV_VALIDATION_ERRROR_CODE, + "CSV header provided in DDL is not proper. Column names in schema and CSV header are " + + "not the same."); + } } else { for (String file : model.getFilesToProcess()) { try { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java index 6d82bcd..1dd9bdf 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java @@ -34,8 +34,6 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.carbon.CarbonDataLoadSchema; import org.apache.carbondata.core.carbon.CarbonDataLoadSchema.DimensionRelation; -import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension; -import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile; import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter; @@ -194,65 +192,6 @@ public final class GraphExecutionUtil { } /** - * This method update the column Name - * - * @param table - * @param tableName - * @param schema - */ - public static Set<String> getSchemaColumnNames(CarbonDataLoadSchema schema, String tableName) { - Set<String> columnNames = new HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - - String factTableName = schema.getCarbonTable().getFactTableName(); - if (tableName.equals(factTableName)) { - - List<CarbonDimension> dimensions = - schema.getCarbonTable().getDimensionByTableName(factTableName); - - for (CarbonDimension dimension : dimensions) { - - String foreignKey = null; - for (DimensionRelation dimRel : schema.getDimensionRelationList()) { - for (String field : dimRel.getColumns()) { - if (dimension.getColName().equals(field)) { - foreignKey = dimRel.getRelation().getFactForeignKeyColumn(); - break; - } - } - if (null != foreignKey) { - break; - } - } - if (null == foreignKey) { - columnNames.add(dimension.getColName()); - } else { - columnNames.add(foreignKey); - } - } - - List<CarbonMeasure> measures = schema.getCarbonTable().getMeasureByTableName(factTableName); - for (CarbonMeasure msr : measures) { - if (!msr.getColumnSchema().isInvisible()) { - columnNames.add(msr.getColName()); - } - } - } else { - List<CarbonDimension> dimensions = schema.getCarbonTable().getDimensionByTableName(tableName); - for (CarbonDimension dimension : dimensions) { - columnNames.add(dimension.getColName()); - } - - List<CarbonMeasure> measures = schema.getCarbonTable().getMeasureByTableName(tableName); - for (CarbonMeasure msr : measures) { - columnNames.add(msr.getColName()); - } - } - - return columnNames; - - } - - /** * @param csvFilePath * @param columnNames * @return http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java index 2faef7b..9843c2e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java @@ -26,8 +26,10 @@ import java.util.ArrayList; import java.util.List; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.devapi.DictionaryGenerationException; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.KeyGenerator; +import org.apache.carbondata.processing.newflow.complexobjects.ArrayObject; import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.CarbonCSVBasedDimSurrogateKeyGen; import org.pentaho.di.core.exception.KettleException; @@ -35,7 +37,7 @@ import org.pentaho.di.core.exception.KettleException; /** * Array DataType stateless object used in data loading */ -public class ArrayDataType implements GenericDataType { +public class ArrayDataType implements GenericDataType<ArrayObject> { /** * child columns @@ -177,7 +179,28 @@ public class ArrayDataType implements GenericDataType { } } - /* + @Override + public void writeByteArray(ArrayObject input, DataOutputStream dataOutputStream) + throws IOException, DictionaryGenerationException { + if (input == null) { + dataOutputStream.writeInt(1); + children.writeByteArray(null, dataOutputStream); + } else { + Object[] data = input.getData(); + dataOutputStream.writeInt(data.length); + for (Object eachInput : data) { + children.writeByteArray(eachInput, dataOutputStream); + } + } + } + + @Override + public void fillCardinality(List<Integer> dimCardWithComplex) { + dimCardWithComplex.add(0); + children.fillCardinality(dimCardWithComplex); + } + + /** * parse byte array and bit pack */ @Override http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java index 0fdca6e..f8c765b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import org.apache.carbondata.core.devapi.DictionaryGenerationException; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.KeyGenerator; import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.CarbonCSVBasedDimSurrogateKeyGen; @@ -35,7 +36,7 @@ import org.pentaho.di.core.exception.KettleException; * Generic DataType interface which will be used while data loading for complex types like Array & * Struct */ -public interface GenericDataType { +public interface GenericDataType<T> { /** * @return name of the column @@ -78,6 +79,14 @@ public interface GenericDataType { CarbonCSVBasedDimSurrogateKeyGen surrogateKeyGen) throws KettleException, IOException; /** + * writes to byte stream + * @param dataOutputStream + * @throws IOException + */ + void writeByteArray(T input, DataOutputStream dataOutputStream) + throws IOException, DictionaryGenerationException; + + /** * @return surrogateIndex for primitive column in complex type */ int getSurrogateIndex(); @@ -151,4 +160,11 @@ public interface GenericDataType { * @param maxSurrogateKeyArray */ void fillCardinalityAfterDataLoad(List<Integer> dimCardWithComplex, int[] maxSurrogateKeyArray); + + /** + * Fill the cardinality of the primitive datatypes + * @param dimCardWithComplex + */ + void fillCardinality(List<Integer> dimCardWithComplex); + } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java index 9199c51..610366c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java @@ -25,10 +25,22 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; +import org.apache.carbondata.core.carbon.CarbonTableIdentifier; +import org.apache.carbondata.core.carbon.metadata.encoder.Encoding; +import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.devapi.BiDictionary; +import org.apache.carbondata.core.devapi.DictionaryGenerationException; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.KeyGenerator; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.apache.carbondata.core.util.CarbonUtilException; import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.processing.newflow.dictionary.DirectDictionary; +import org.apache.carbondata.processing.newflow.dictionary.PreCreatedDictionary; import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.CarbonCSVBasedDimSurrogateKeyGen; import org.pentaho.di.core.exception.KettleException; @@ -36,7 +48,7 @@ import org.pentaho.di.core.exception.KettleException; /** * Primitive DataType stateless object used in data loading */ -public class PrimitiveDataType implements GenericDataType { +public class PrimitiveDataType implements GenericDataType<Object> { /** * surrogate index @@ -78,6 +90,10 @@ public class PrimitiveDataType implements GenericDataType { */ private int dataCounter; + private BiDictionary<Integer, Object> dictionaryGenerator; + + private CarbonDimension carbonDimension; + /** * constructor * @@ -92,6 +108,36 @@ public class PrimitiveDataType implements GenericDataType { this.dimensionOrdinal = dimensionOrdinal; } + /** + * constructor + * + * @param name + * @param parentname + * @param columnId + */ + public PrimitiveDataType(String name, String parentname, String columnId, + CarbonDimension carbonDimension, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache, + CarbonTableIdentifier carbonTableIdentifier) { + this.name = name; + this.parentname = parentname; + this.columnId = columnId; + this.carbonDimension = carbonDimension; + DictionaryColumnUniqueIdentifier identifier = + new DictionaryColumnUniqueIdentifier(carbonTableIdentifier, + carbonDimension.getColumnIdentifier(), carbonDimension.getDataType()); + try { + if (carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) { + dictionaryGenerator = new DirectDictionary(DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(carbonDimension.getDataType())); + } else { + Dictionary dictionary = cache.get(identifier); + dictionaryGenerator = new PreCreatedDictionary(dictionary); + } + } catch (CarbonUtilException e) { + throw new RuntimeException(e); + } + } + /* * primitive column will not have any child column */ @@ -178,9 +224,30 @@ public class PrimitiveDataType implements GenericDataType { dataOutputStream.writeInt(surrogateKey); } + @Override public void writeByteArray(Object input, DataOutputStream dataOutputStream) + throws IOException, DictionaryGenerationException { + String parsedValue = + input == null ? null : DataTypeUtil.parseValue(input.toString(), carbonDimension); + Integer surrogateKey; + if (null == parsedValue) { + surrogateKey = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY; + } else { + surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue); + if (surrogateKey == CarbonCommonConstants.INVALID_SURROGATE_KEY) { + surrogateKey = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY; + } + } + dataOutputStream.writeInt(surrogateKey); + } + + @Override + public void fillCardinality(List<Integer> dimCardWithComplex) { + dimCardWithComplex.add(dictionaryGenerator.size()); + } + /* - * parse bytearray and bit pack - */ + * parse bytearray and bit pack + */ @Override public void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream, KeyGenerator[] generator) throws IOException, KeyGenException { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java index bcf18c8..f034895 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java @@ -26,8 +26,10 @@ import java.util.ArrayList; import java.util.List; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.devapi.DictionaryGenerationException; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.KeyGenerator; +import org.apache.carbondata.processing.newflow.complexobjects.StructObject; import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.CarbonCSVBasedDimSurrogateKeyGen; import org.pentaho.di.core.exception.KettleException; @@ -35,7 +37,7 @@ import org.pentaho.di.core.exception.KettleException; /** * Struct DataType stateless object used in data loading */ -public class StructDataType implements GenericDataType { +public class StructDataType implements GenericDataType<StructObject> { /** * children columns @@ -186,9 +188,38 @@ public class StructDataType implements GenericDataType { } } + @Override public void writeByteArray(StructObject input, DataOutputStream dataOutputStream) + throws IOException, DictionaryGenerationException { + dataOutputStream.writeInt(children.size()); + if (input == null) { + dataOutputStream.writeInt(children.size()); + for (int i = 0; i < children.size(); i++) { + children.get(i).writeByteArray(null, dataOutputStream); + } + } else { + Object[] data = input.getData(); + for (int i = 0; i < data.length && i < children.size(); i++) { + children.get(i).writeByteArray(data[i], dataOutputStream); + } + + // For other children elements which dont have data, write empty + for (int i = data.length; i < children.size(); i++) { + children.get(i).writeByteArray(null, dataOutputStream); + } + } + } + + @Override + public void fillCardinality(List<Integer> dimCardWithComplex) { + dimCardWithComplex.add(0); + for (int i = 0; i < children.size(); i++) { + children.get(i).fillCardinality(dimCardWithComplex); + } + } + /* - * parse bytearray and bit pack - */ + * parse bytearray and bit pack + */ @Override public void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream, KeyGenerator[] generator) throws IOException, KeyGenException { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStep.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStep.java b/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStep.java index 57d74c3..a420d27 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStep.java +++ b/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStep.java @@ -363,7 +363,7 @@ public class MDKeyGenStep extends BaseStep { String carbonDataDirectoryPath = getCarbonDataFolderLocation(); finalMerger = new SingleThreadFinalSortFilesMerger(dataFolderLocation, tableName, dimensionCount - meta.getComplexDimsCount(), meta.getComplexDimsCount(), measureCount, - meta.getNoDictionaryCount(), aggType, isNoDictionaryDimension); + meta.getNoDictionaryCount(), aggType, isNoDictionaryDimension, true); CarbonFactDataHandlerModel carbonFactDataHandlerModel = getCarbonFactDataHandlerModel(); carbonFactDataHandlerModel.setPrimitiveDimLens(simpleDimsLen); carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java new file mode 100644 index 0000000..10f5197 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * + */ +package org.apache.carbondata.processing.model; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; + +import org.apache.carbondata.core.carbon.CarbonDataLoadSchema; +import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.load.LoadMetadataDetails; + +public class CarbonLoadModel implements Serializable { + /** + * + */ + private static final long serialVersionUID = 6580168429197697465L; + + private String databaseName; + + private String tableName; + + private String factFilePath; + + private String dimFolderPath; + + private String colDictFilePath; + + private String partitionId; + + private CarbonDataLoadSchema carbonDataLoadSchema; + + private String[] aggTables; + + private String aggTableName; + + private boolean aggLoadRequest; + + private String storePath; + + private boolean isRetentionRequest; + + private List<String> factFilesToProcess; + private String csvHeader; + private String csvDelimiter; + private String complexDelimiterLevel1; + private String complexDelimiterLevel2; + + private boolean isDirectLoad; + private List<LoadMetadataDetails> loadMetadataDetails; + + private String blocksID; + + /** + * Map from carbon dimension to pre defined dict file path + */ + private HashMap<CarbonDimension, String> predefDictMap; + + /** + * task id, each spark task has a unique id + */ + private String taskNo; + /** + * new load start time + */ + private String factTimeStamp; + /** + * load Id + */ + private String segmentId; + + private String allDictPath; + + /** + * escape Char + */ + private String escapeChar; + + /** + * quote Char + */ + private String quoteChar; + + /** + * comment Char + */ + private String commentChar; + + private String dateFormat; + + /** + * defines the string that should be treated as null while loadind data + */ + private String serializationNullFormat; + + /** + * defines the string to specify whether the bad record logger should be enabled or not + */ + private String badRecordsLoggerEnable; + + /** + * defines the option to specify the bad record logger action + */ + private String badRecordsAction; + + /** + * Max number of columns that needs to be parsed by univocity parser + */ + private String maxColumns; + + /** + * the key of RDD Iterator in RDD iterator Map + */ + private String rddIteratorKey; + + /** + * get escape char + * @return + */ + public String getEscapeChar() { + return escapeChar; + } + + /** + * set escape char + * @param escapeChar + */ + public void setEscapeChar(String escapeChar) { + this.escapeChar = escapeChar; + } + + /** + * get blocck id + * + * @return + */ + public String getBlocksID() { + return blocksID; + } + + /** + * set block id for carbon load model + * + * @param blocksID + */ + public void setBlocksID(String blocksID) { + this.blocksID = blocksID; + } + + public String getCsvDelimiter() { + return csvDelimiter; + } + + public void setCsvDelimiter(String csvDelimiter) { + this.csvDelimiter = csvDelimiter; + } + + public String getComplexDelimiterLevel1() { + return complexDelimiterLevel1; + } + + public void setComplexDelimiterLevel1(String complexDelimiterLevel1) { + this.complexDelimiterLevel1 = complexDelimiterLevel1; + } + + public String getComplexDelimiterLevel2() { + return complexDelimiterLevel2; + } + + public void setComplexDelimiterLevel2(String complexDelimiterLevel2) { + this.complexDelimiterLevel2 = complexDelimiterLevel2; + } + + public boolean isDirectLoad() { + return isDirectLoad; + } + + public void setDirectLoad(boolean isDirectLoad) { + this.isDirectLoad = isDirectLoad; + } + + public String getAllDictPath() { + return allDictPath; + } + + public void setAllDictPath(String allDictPath) { + this.allDictPath = allDictPath; + } + + public List<String> getFactFilesToProcess() { + return factFilesToProcess; + } + + public void setFactFilesToProcess(List<String> factFilesToProcess) { + this.factFilesToProcess = factFilesToProcess; + } + + public String getCsvHeader() { + return csvHeader; + } + + public void setCsvHeader(String csvHeader) { + this.csvHeader = csvHeader; + } + + public void initPredefDictMap() { + predefDictMap = new HashMap<>(); + } + + public String getPredefDictFilePath(CarbonDimension dimension) { + return predefDictMap.get(dimension); + } + + public void setPredefDictMap(CarbonDimension dimension, String predefDictFilePath) { + this.predefDictMap.put(dimension, predefDictFilePath); + } + + /** + * @return carbon dataload schema + */ + public CarbonDataLoadSchema getCarbonDataLoadSchema() { + return carbonDataLoadSchema; + } + + /** + * @param carbonDataLoadSchema + */ + public void setCarbonDataLoadSchema(CarbonDataLoadSchema carbonDataLoadSchema) { + this.carbonDataLoadSchema = carbonDataLoadSchema; + } + + /** + * @return the databaseName + */ + public String getDatabaseName() { + return databaseName; + } + + /** + * @param databaseName the databaseName to set + */ + public void setDatabaseName(String databaseName) { + this.databaseName = databaseName; + } + + /** + * @return the tableName + */ + public String getTableName() { + return tableName; + } + + /** + * @param tableName the tableName to set + */ + public void setTableName(String tableName) { + this.tableName = tableName; + } + + /** + * @return the factFilePath + */ + public String getFactFilePath() { + return factFilePath; + } + + /** + * @param factFilePath the factFilePath to set + */ + public void setFactFilePath(String factFilePath) { + this.factFilePath = factFilePath; + } + + /** + * + * @return external column dictionary file path + */ + public String getColDictFilePath() { + return colDictFilePath; + } + + /** + * set external column dictionary file path + * @param colDictFilePath + */ + public void setColDictFilePath(String colDictFilePath) { + this.colDictFilePath = colDictFilePath; + } + + /** + * @return the dimFolderPath + */ + public String getDimFolderPath() { + return dimFolderPath; + } + + //TODO SIMIAN + + /** + * @param dimFolderPath the dimFolderPath to set + */ + public void setDimFolderPath(String dimFolderPath) { + this.dimFolderPath = dimFolderPath; + } + + /** + * get copy with parition + * + * @param uniqueId + * @return + */ + public CarbonLoadModel getCopyWithPartition(String uniqueId) { + CarbonLoadModel copy = new CarbonLoadModel(); + copy.tableName = tableName; + copy.dimFolderPath = dimFolderPath; + copy.factFilePath = factFilePath + '/' + uniqueId; + copy.databaseName = databaseName; + copy.partitionId = uniqueId; + copy.aggTables = aggTables; + copy.aggTableName = aggTableName; + copy.aggLoadRequest = aggLoadRequest; + copy.loadMetadataDetails = loadMetadataDetails; + copy.isRetentionRequest = isRetentionRequest; + copy.complexDelimiterLevel1 = complexDelimiterLevel1; + copy.complexDelimiterLevel2 = complexDelimiterLevel2; + copy.carbonDataLoadSchema = carbonDataLoadSchema; + copy.blocksID = blocksID; + copy.taskNo = taskNo; + copy.factTimeStamp = factTimeStamp; + copy.segmentId = segmentId; + copy.serializationNullFormat = serializationNullFormat; + copy.badRecordsLoggerEnable = badRecordsLoggerEnable; + copy.badRecordsAction = badRecordsAction; + copy.escapeChar = escapeChar; + copy.quoteChar = quoteChar; + copy.commentChar = commentChar; + copy.maxColumns = maxColumns; + copy.storePath = storePath; + return copy; + } + + /** + * get CarbonLoadModel with partition + * + * @param uniqueId + * @param filesForPartition + * @param header + * @param delimiter + * @return + */ + public CarbonLoadModel getCopyWithPartition(String uniqueId, List<String> filesForPartition, + String header, String delimiter) { + CarbonLoadModel copyObj = new CarbonLoadModel(); + copyObj.tableName = tableName; + copyObj.dimFolderPath = dimFolderPath; + copyObj.factFilePath = null; + copyObj.databaseName = databaseName; + copyObj.partitionId = uniqueId; + copyObj.aggTables = aggTables; + copyObj.aggTableName = aggTableName; + copyObj.aggLoadRequest = aggLoadRequest; + copyObj.loadMetadataDetails = loadMetadataDetails; + copyObj.isRetentionRequest = isRetentionRequest; + copyObj.carbonDataLoadSchema = carbonDataLoadSchema; + copyObj.csvHeader = header; + copyObj.factFilesToProcess = filesForPartition; + copyObj.isDirectLoad = true; + copyObj.csvDelimiter = delimiter; + copyObj.complexDelimiterLevel1 = complexDelimiterLevel1; + copyObj.complexDelimiterLevel2 = complexDelimiterLevel2; + copyObj.blocksID = blocksID; + copyObj.taskNo = taskNo; + copyObj.factTimeStamp = factTimeStamp; + copyObj.segmentId = segmentId; + copyObj.serializationNullFormat = serializationNullFormat; + copyObj.badRecordsLoggerEnable = badRecordsLoggerEnable; + copyObj.badRecordsAction = badRecordsAction; + copyObj.escapeChar = escapeChar; + copyObj.quoteChar = quoteChar; + copyObj.commentChar = commentChar; + copyObj.dateFormat = dateFormat; + copyObj.maxColumns = maxColumns; + copyObj.storePath = storePath; + return copyObj; + } + + /** + * @return the partitionId + */ + public String getPartitionId() { + return partitionId; + } + + /** + * @param partitionId the partitionId to set + */ + public void setPartitionId(String partitionId) { + this.partitionId = partitionId; + } + + /** + * @return the aggTables + */ + public String[] getAggTables() { + return aggTables; + } + + /** + * @param aggTables the aggTables to set + */ + public void setAggTables(String[] aggTables) { + this.aggTables = aggTables; + } + + /** + * @return the aggLoadRequest + */ + public boolean isAggLoadRequest() { + return aggLoadRequest; + } + + /** + * @param aggLoadRequest the aggLoadRequest to set + */ + public void setAggLoadRequest(boolean aggLoadRequest) { + this.aggLoadRequest = aggLoadRequest; + } + + /** + * @param storePath The storePath to set. + */ + public void setStorePath(String storePath) { + this.storePath = storePath; + } + + /** + * @return Returns the aggTableName. + */ + public String getAggTableName() { + return aggTableName; + } + + /** + * @return Returns the factStoreLocation. + */ + public String getStorePath() { + return storePath; + } + + /** + * @param aggTableName The aggTableName to set. + */ + public void setAggTableName(String aggTableName) { + this.aggTableName = aggTableName; + } + + /** + * isRetentionRequest + * + * @return + */ + public boolean isRetentionRequest() { + return isRetentionRequest; + } + + /** + * @param isRetentionRequest + */ + public void setRetentionRequest(boolean isRetentionRequest) { + this.isRetentionRequest = isRetentionRequest; + } + + /** + * getLoadMetadataDetails. + * + * @return + */ + public List<LoadMetadataDetails> getLoadMetadataDetails() { + return loadMetadataDetails; + } + + /** + * setLoadMetadataDetails. + * + * @param loadMetadataDetails + */ + public void setLoadMetadataDetails(List<LoadMetadataDetails> loadMetadataDetails) { + this.loadMetadataDetails = loadMetadataDetails; + } + + /** + * @return + */ + public String getTaskNo() { + return taskNo; + } + + /** + * @param taskNo + */ + public void setTaskNo(String taskNo) { + this.taskNo = taskNo; + } + + /** + * @return + */ + public String getFactTimeStamp() { + return factTimeStamp; + } + + /** + * @param factTimeStamp + */ + public void setFactTimeStamp(String factTimeStamp) { + this.factTimeStamp = factTimeStamp; + } + + public String[] getDelimiters() { + return new String[] { complexDelimiterLevel1, complexDelimiterLevel2 }; + } + + /** + * @return load Id + */ + public String getSegmentId() { + return segmentId; + } + + /** + * @param segmentId + */ + public void setSegmentId(String segmentId) { + this.segmentId = segmentId; + } + + /** + * the method returns the value to be treated as null while data load + * @return + */ + public String getSerializationNullFormat() { + return serializationNullFormat; + } + + /** + * the method sets the value to be treated as null while data load + * @param serializationNullFormat + */ + public void setSerializationNullFormat(String serializationNullFormat) { + this.serializationNullFormat = serializationNullFormat; + } + + /** + * returns the string to enable bad record logger + * @return + */ + public String getBadRecordsLoggerEnable() { + return badRecordsLoggerEnable; + } + + /** + * method sets the string to specify whether to enable or dissable the badrecord logger. + * @param badRecordsLoggerEnable + */ + public void setBadRecordsLoggerEnable(String badRecordsLoggerEnable) { + this.badRecordsLoggerEnable = badRecordsLoggerEnable; + } + + public String getQuoteChar() { + return quoteChar; + } + + public void setQuoteChar(String quoteChar) { + this.quoteChar = quoteChar; + } + + public String getCommentChar() { + return commentChar; + } + + public void setCommentChar(String commentChar) { + this.commentChar = commentChar; + } + + public String getDateFormat() { return dateFormat; } + + public void setDateFormat(String dateFormat) { this.dateFormat = dateFormat; } + + /** + * @return + */ + public String getMaxColumns() { + return maxColumns; + } + + /** + * @param maxColumns + */ + public void setMaxColumns(String maxColumns) { + this.maxColumns = maxColumns; + } + + /** + * returns option to specify the bad record logger action + * @return + */ + public String getBadRecordsAction() { + return badRecordsAction; + } + + /** + * set option to specify the bad record logger action + * @param badRecordsAction + */ + public void setBadRecordsAction(String badRecordsAction) { + this.badRecordsAction = badRecordsAction; + } + + public String getRddIteratorKey() { + return rddIteratorKey; + } + + public void setRddIteratorKey(String rddIteratorKey) { + this.rddIteratorKey = rddIteratorKey; + + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java index 3e6d63e..c5dc5d1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java @@ -30,10 +30,16 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColu */ public class DataField implements Serializable { + public DataField(CarbonColumn column) { + this.column = column; + } + private CarbonColumn column; private CompressionCodec compressionCodec; + private String dateFormat; + public boolean hasDictionaryEncoding() { return column.hasEncoding(Encoding.DICTIONARY); } @@ -42,10 +48,6 @@ public class DataField implements Serializable { return column; } - public void setColumn(CarbonColumn column) { - this.column = column; - } - public CompressionCodec getCompressionCodec() { return compressionCodec; } @@ -54,4 +56,11 @@ public class DataField implements Serializable { this.compressionCodec = compressionCodec; } + public String getDateFormat() { + return dateFormat; + } + + public void setDateFormat(String dateFormat) { + this.dateFormat = dateFormat; + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java new file mode 100644 index 0000000..746e0f2 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.processing.newflow; + +import java.util.Iterator; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.carbon.CarbonTableIdentifier; +import org.apache.carbondata.processing.model.CarbonLoadModel; +import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException; +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; +import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger; + +/** + * It executes the data load. + */ +public class DataLoadExecutor { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(DataLoadExecutor.class.getName()); + + public void execute(CarbonLoadModel loadModel, String storeLocation, + Iterator<Object[]>[] inputIterators) throws Exception { + AbstractDataLoadProcessorStep loadProcessorStep = null; + try { + + loadProcessorStep = + new DataLoadProcessBuilder().build(loadModel, storeLocation, inputIterators); + // 1. initialize + loadProcessorStep.initialize(); + LOGGER.info("Data Loading is started for table " + loadModel.getTableName()); + // 2. execute the step + loadProcessorStep.execute(); + } catch (CarbonDataLoadingException e) { + throw e; + } catch (Exception e) { + LOGGER.error(e, "Data Loading failed for table " + loadModel.getTableName()); + throw new CarbonDataLoadingException( + "Data Loading failed for table " + loadModel.getTableName(), e); + } finally { + if (loadProcessorStep != null) { + // 3. Close the step + loadProcessorStep.close(); + } + } + + String key = + new CarbonTableIdentifier(loadModel.getDatabaseName(), loadModel.getTableName(), null) + .getBadRecordLoggerKey(); + if (null != BadRecordsLogger.hasBadRecord(key)) { + LOGGER.error("Data Load is partcially success for table " + loadModel.getTableName()); + throw new BadRecordFoundException("Bad records found during load"); + } else { + LOGGER.info("Data loading is successful for table "+loadModel.getTableName()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java new file mode 100644 index 0000000..92c677c --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.processing.newflow; + +import java.io.File; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; +import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.processing.model.CarbonLoadModel; +import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants; +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; +import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorStepImpl; +import org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl; +import org.apache.carbondata.processing.newflow.steps.InputProcessorStepImpl; +import org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; + +/** + * It builds the pipe line of steps for loading data to carbon. + */ +public final class DataLoadProcessBuilder { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(DataLoadProcessBuilder.class.getName()); + + public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String storeLocation, + Iterator[] inputIterators) throws Exception { + CarbonDataLoadConfiguration configuration = + createConfiguration(loadModel, storeLocation); + // 1. Reads the data input iterators and parses the data. + AbstractDataLoadProcessorStep inputProcessorStep = + new InputProcessorStepImpl(configuration, inputIterators); + // 2. Converts the data like dictionary or non dictionary or complex objects depends on + // data types and configurations. + AbstractDataLoadProcessorStep converterProcessorStep = + new DataConverterProcessorStepImpl(configuration, inputProcessorStep); + // 3. Sorts the data which are part of key (all dimensions except complex types) + AbstractDataLoadProcessorStep sortProcessorStep = + new SortProcessorStepImpl(configuration, converterProcessorStep); + // 4. Writes the sorted data in carbondata format. + AbstractDataLoadProcessorStep writerProcessorStep = + new DataWriterProcessorStepImpl(configuration, sortProcessorStep); + return writerProcessorStep; + } + + private CarbonDataLoadConfiguration createConfiguration(CarbonLoadModel loadModel, + String storeLocation) throws Exception { + if (!new File(storeLocation).mkdirs()) { + LOGGER.error("Error while creating the temp store path: " + storeLocation); + } + CarbonDataLoadConfiguration configuration = new CarbonDataLoadConfiguration(); + String databaseName = loadModel.getDatabaseName(); + String tableName = loadModel.getTableName(); + String tempLocationKey = databaseName + CarbonCommonConstants.UNDERSCORE + tableName + + CarbonCommonConstants.UNDERSCORE + loadModel.getTaskNo(); + CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation); + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, loadModel.getStorePath()); + + CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable(); + AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier(); + configuration.setTableIdentifier(identifier); + String csvHeader = loadModel.getCsvHeader(); + String csvFileName = null; + if (csvHeader != null && !csvHeader.isEmpty()) { + configuration.setHeader(CarbonDataProcessorUtil.getColumnFields(csvHeader, ",")); + } else { + CarbonFile csvFile = + CarbonDataProcessorUtil.getCsvFileToRead(loadModel.getFactFilesToProcess().get(0)); + csvFileName = csvFile.getName(); + csvHeader = CarbonDataProcessorUtil.getFileHeader(csvFile); + configuration.setHeader( + CarbonDataProcessorUtil.getColumnFields(csvHeader, loadModel.getCsvDelimiter())); + } + if (!CarbonDataProcessorUtil + .isHeaderValid(loadModel.getTableName(), csvHeader, loadModel.getCarbonDataLoadSchema(), + loadModel.getCsvDelimiter())) { + if (csvFileName == null) { + LOGGER.error("CSV header provided in DDL is not proper." + + " Column names in schema and CSV header are not the same."); + throw new CarbonDataLoadingException( + "CSV header provided in DDL is not proper. Column names in schema and CSV header are " + + "not the same."); + } else { + LOGGER.error( + "CSV File provided is not proper. Column names in schema and csv header are not same. " + + "CSVFile Name : " + csvFileName); + throw new CarbonDataLoadingException( + "CSV File provided is not proper. Column names in schema and csv header are not same. " + + "CSVFile Name : " + csvFileName); + } + } + + configuration.setPartitionId(loadModel.getPartitionId()); + configuration.setSegmentId(loadModel.getSegmentId()); + configuration.setTaskNo(loadModel.getTaskNo()); + configuration.setDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS, + new String[] { loadModel.getComplexDelimiterLevel1(), + loadModel.getComplexDelimiterLevel2() }); + configuration.setDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT, + loadModel.getSerializationNullFormat().split(",")[1]); + configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_TIME_STAMP, + loadModel.getFactTimeStamp()); + configuration.setDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE, + loadModel.getBadRecordsLoggerEnable().split(",")[1]); + configuration.setDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION, + loadModel.getBadRecordsAction().split(",")[1]); + configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_FILE_PATH, + loadModel.getFactFilePath()); + List<CarbonDimension> dimensions = + carbonTable.getDimensionByTableName(carbonTable.getFactTableName()); + List<CarbonMeasure> measures = + carbonTable.getMeasureByTableName(carbonTable.getFactTableName()); + Map<String, String> dateFormatMap = + CarbonDataProcessorUtil.getDateFormatMap(loadModel.getDateFormat()); + List<DataField> dataFields = new ArrayList<>(); + List<DataField> complexDataFields = new ArrayList<>(); + + // First add dictionary and non dictionary dimensions because these are part of mdk key. + // And then add complex data types and measures. + for (CarbonColumn column : dimensions) { + DataField dataField = new DataField(column); + dataField.setDateFormat(dateFormatMap.get(column.getColName())); + if (column.isComplex()) { + complexDataFields.add(dataField); + } else { + dataFields.add(dataField); + } + } + dataFields.addAll(complexDataFields); + for (CarbonColumn column : measures) { + // This dummy measure is added when no measure was present. We no need to load it. + if (!(column.getColName().equals("default_dummy_measure"))) { + dataFields.add(new DataField(column)); + } + } + configuration.setDataFields(dataFields.toArray(new DataField[dataFields.size()])); + return configuration; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java index 15f5b0e..958fd1a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java @@ -34,4 +34,12 @@ public final class DataLoadProcessorConstants { public static final String DIMENSION_LENGTHS = "DIMENSION_LENGTHS"; + public static final String SERIALIZATION_NULL_FORMAT = "SERIALIZATION_NULL_FORMAT"; + + public static final String BAD_RECORDS_LOGGER_ENABLE = "BAD_RECORDS_LOGGER_ENABLE"; + + public static final String BAD_RECORDS_LOGGER_ACTION = "BAD_RECORDS_LOGGER_ACTION"; + + public static final String FACT_FILE_PATH = "FACT_FILE_PATH"; + } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java new file mode 100644 index 0000000..2455392 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.processing.newflow.converter; + +/** + * It is holder for reason of bad records. + */ +public class BadRecordLogHolder { + + private String reason; + + private boolean badRecordAdded; + + public String getReason() { + return reason; + } + + public void setReason(String reason) { + this.reason = reason; + badRecordAdded = true; + } + + public boolean isBadRecordNotAdded() { + return badRecordAdded; + } + + public void clear() { + this.badRecordAdded = false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java index e304fbc..8dda65d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java @@ -30,7 +30,9 @@ public interface FieldConverter { /** * It converts the column field and updates the data in same location/index in row. * @param row + * @return the status whether it could be loaded or not, usually when record is added + * to bad records then it returns false. * @throws CarbonDataLoadingException */ - void convert(CarbonRow row) throws CarbonDataLoadingException; + void convert(CarbonRow row, BadRecordLogHolder logHolder) throws CarbonDataLoadingException; } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java index 44f1116..3b199ab 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java @@ -26,7 +26,11 @@ import org.apache.carbondata.processing.newflow.row.CarbonRow; */ public interface RowConverter { + void initialize(); + CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException; + RowConverter createCopyForNewThread(); + void finish(); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java index 70a900c..790a970 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java @@ -18,10 +18,12 @@ */ package org.apache.carbondata.processing.newflow.converter.impl; +import java.util.List; + import org.apache.carbondata.processing.newflow.converter.FieldConverter; public abstract class AbstractDictionaryFieldConverterImpl implements FieldConverter { - public abstract int getColumnCardinality(); + public abstract void fillColumnCardinality(List<Integer> cardinality); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java index 4c18aa7..4b7aa40 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java @@ -19,12 +19,42 @@ package org.apache.carbondata.processing.newflow.converter.impl; -import org.apache.carbondata.processing.newflow.converter.FieldConverter; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.util.List; + +import org.apache.carbondata.processing.datatypes.GenericDataType; +import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder; +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; import org.apache.carbondata.processing.newflow.row.CarbonRow; -public class ComplexFieldConverterImpl implements FieldConverter { +public class ComplexFieldConverterImpl extends AbstractDictionaryFieldConverterImpl { + + private GenericDataType genericDataType; + + private int index; + + public ComplexFieldConverterImpl(GenericDataType genericDataType, int index) { + this.genericDataType = genericDataType; + this.index = index; + } @Override - public void convert(CarbonRow row) { + public void convert(CarbonRow row, BadRecordLogHolder logHolder) { + Object object = row.getObject(index); + // TODO Its temporary, needs refactor here. + ByteArrayOutputStream byteArray = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArray); + try { + genericDataType.writeByteArray(object, dataOutputStream); + dataOutputStream.close(); + row.update(byteArray.toByteArray(), index); + } catch (Exception e) { + throw new CarbonDataLoadingException(object+"", e); + } + } + + @Override public void fillColumnCardinality(List<Integer> cardinality) { + genericDataType.fillCardinality(cardinality); } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java index 8ca4ff2..3182a37 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java @@ -19,16 +19,22 @@ package org.apache.carbondata.processing.newflow.converter.impl; +import java.util.List; + import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.cache.Cache; import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.carbon.CarbonTableIdentifier; +import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.devapi.BiDictionary; import org.apache.carbondata.core.devapi.DictionaryGenerationException; import org.apache.carbondata.core.util.CarbonUtilException; +import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.processing.newflow.DataField; +import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder; import org.apache.carbondata.processing.newflow.dictionary.PreCreatedDictionary; import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; import org.apache.carbondata.processing.newflow.row.CarbonRow; @@ -38,14 +44,20 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert private static final LogService LOGGER = LogServiceFactory.getLogService(DictionaryFieldConverterImpl.class.getName()); - private BiDictionary<Integer, String> dictionaryGenerator; + private BiDictionary<Integer, Object> dictionaryGenerator; private int index; + private CarbonDimension carbonDimension; + + private String nullFormat; + public DictionaryFieldConverterImpl(DataField dataField, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache, - CarbonTableIdentifier carbonTableIdentifier, int index) { + CarbonTableIdentifier carbonTableIdentifier, String nullFormat, int index) { this.index = index; + this.carbonDimension = (CarbonDimension) dataField.getColumn(); + this.nullFormat = nullFormat; DictionaryColumnUniqueIdentifier identifier = new DictionaryColumnUniqueIdentifier(carbonTableIdentifier, dataField.getColumn().getColumnIdentifier(), dataField.getColumn().getDataType()); @@ -58,17 +70,22 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert } } - @Override - public void convert(CarbonRow row) throws CarbonDataLoadingException { + @Override public void convert(CarbonRow row, BadRecordLogHolder logHolder) + throws CarbonDataLoadingException { try { - row.update(dictionaryGenerator.getOrGenerateKey(row.getString(index)), index); + String parsedValue = DataTypeUtil.parseValue(row.getString(index), carbonDimension); + if(null == parsedValue || parsedValue.equals(nullFormat)) { + row.update(CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY, index); + } else { + row.update(dictionaryGenerator.getOrGenerateKey(parsedValue), index); + } } catch (DictionaryGenerationException e) { throw new CarbonDataLoadingException(e); } } @Override - public int getColumnCardinality() { - return dictionaryGenerator.size(); + public void fillColumnCardinality(List<Integer> cardinality) { + cardinality.add(dictionaryGenerator.size()); } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java index 8ff110a..c8113ea 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java @@ -19,9 +19,13 @@ package org.apache.carbondata.processing.newflow.converter.impl; +import java.util.List; + +import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; import org.apache.carbondata.processing.newflow.DataField; +import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder; import org.apache.carbondata.processing.newflow.row.CarbonRow; public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldConverterImpl { @@ -30,21 +34,50 @@ public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldC private int index; - public DirectDictionaryFieldConverterImpl(DataField dataField, int index) { - DirectDictionaryGenerator directDictionaryGenerator = - DirectDictionaryKeyGeneratorFactory - .getDirectDictionaryGenerator(dataField.getColumn().getDataType()); - this.directDictionaryGenerator = directDictionaryGenerator; + private String nullFormat; + + private CarbonColumn column; + + public DirectDictionaryFieldConverterImpl(DataField dataField, String nullFormat, int index) { + this.nullFormat = nullFormat; + this.column = dataField.getColumn(); + if (dataField.getDateFormat() != null && !dataField.getDateFormat().isEmpty()) { + this.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(dataField.getColumn().getDataType(), + dataField.getDateFormat()); + + } else { + this.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(dataField.getColumn().getDataType()); + } this.index = index; } @Override - public void convert(CarbonRow row) { - row.update(directDictionaryGenerator.generateDirectSurrogateKey(row.getString(index)), index); + public void convert(CarbonRow row, BadRecordLogHolder logHolder) { + String value = row.getString(index); + if (value == null) { + logHolder.setReason( + "The value " + " \"" + row.getString(index) + "\"" + " with column name " + column + .getColName() + " and column data type " + column.getDataType() + " is not a valid " + + column.getDataType() + " type."); + row.update(1, index); + } else if (value.equals(nullFormat)) { + row.update(1, index); + } else { + int key = directDictionaryGenerator.generateDirectSurrogateKey(value); + if (key == 1) { + logHolder.setReason( + "The value " + " \"" + row.getString(index) + "\"" + " with column name " + column + .getColName() + " and column data type " + column.getDataType() + " is not a valid " + + column.getDataType() + " type."); + } + row.update(key, index); + } } @Override - public int getColumnCardinality() { - return Integer.MAX_VALUE; + public void fillColumnCardinality(List<Integer> cardinality) { + cardinality.add(Integer.MAX_VALUE); } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java index a10ad20..a46b9ba 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java @@ -18,11 +18,19 @@ */ package org.apache.carbondata.processing.newflow.converter.impl; +import java.util.List; + import org.apache.carbondata.core.cache.Cache; import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.carbon.CarbonTableIdentifier; import org.apache.carbondata.core.carbon.metadata.encoder.Encoding; +import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.processing.datatypes.ArrayDataType; +import org.apache.carbondata.processing.datatypes.GenericDataType; +import org.apache.carbondata.processing.datatypes.PrimitiveDataType; +import org.apache.carbondata.processing.datatypes.StructDataType; import org.apache.carbondata.processing.newflow.DataField; import org.apache.carbondata.processing.newflow.converter.FieldConverter; @@ -43,27 +51,83 @@ public class FieldEncoderFactory { /** * Creates the FieldConverter for all dimensions, for measures return null. - * @param dataField column schema - * @param cache dicionary cache. + * + * @param dataField column schema + * @param cache dicionary cache. * @param carbonTableIdentifier table identifier - * @param index index of column in the row. + * @param index index of column in the row. * @return */ public FieldConverter createFieldEncoder(DataField dataField, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache, - CarbonTableIdentifier carbonTableIdentifier, int index) { + CarbonTableIdentifier carbonTableIdentifier, int index, String nullFormat) { // Converters are only needed for dimensions and measures it return null. if (dataField.getColumn().isDimesion()) { - if (dataField.getColumn().hasEncoding(Encoding.DICTIONARY)) { - return new DictionaryFieldConverterImpl(dataField, cache, carbonTableIdentifier, index); - } else if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY)) { - return new DirectDictionaryFieldConverterImpl(dataField, index); + if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY) && + !dataField.getColumn().isComplex()) { + return new DirectDictionaryFieldConverterImpl(dataField, nullFormat, index); + } else if (dataField.getColumn().hasEncoding(Encoding.DICTIONARY) && + !dataField.getColumn().isComplex()) { + return new DictionaryFieldConverterImpl(dataField, cache, carbonTableIdentifier, nullFormat, + index); } else if (dataField.getColumn().isComplex()) { - return new ComplexFieldConverterImpl(); + return new ComplexFieldConverterImpl( + createComplexType(dataField, cache, carbonTableIdentifier), index); } else { - return new NonDictionaryFieldConverterImpl(dataField, index); + return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index); } + } else { + return new MeasureFieldConverterImpl(dataField, nullFormat, index); + } + } + + /** + * Create parser for the carbon column. + */ + private static GenericDataType createComplexType(DataField dataField, + Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache, + CarbonTableIdentifier carbonTableIdentifier) { + return createComplexType(dataField.getColumn(), dataField.getColumn().getColName(), cache, + carbonTableIdentifier); + } + + /** + * This method may be called recursively if the carbon column is complex type. + * + * @return GenericDataType + */ + private static GenericDataType createComplexType(CarbonColumn carbonColumn, String parentName, + Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache, + CarbonTableIdentifier carbonTableIdentifier) { + switch (carbonColumn.getDataType()) { + case ARRAY: + List<CarbonDimension> listOfChildDimensions = + ((CarbonDimension) carbonColumn).getListOfChildDimensions(); + // Create array parser with complex delimiter + ArrayDataType arrayDataType = + new ArrayDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId()); + for (CarbonDimension dimension : listOfChildDimensions) { + arrayDataType.addChildren(createComplexType(dimension, carbonColumn.getColName(), cache, + carbonTableIdentifier)); + } + return arrayDataType; + case STRUCT: + List<CarbonDimension> dimensions = + ((CarbonDimension) carbonColumn).getListOfChildDimensions(); + // Create struct parser with complex delimiter + StructDataType structDataType = + new StructDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId()); + for (CarbonDimension dimension : dimensions) { + structDataType.addChildren(createComplexType(dimension, carbonColumn.getColName(), cache, + carbonTableIdentifier)); + } + return structDataType; + case MAP: + throw new UnsupportedOperationException("Complex type Map is not supported yet"); + default: + return new PrimitiveDataType(carbonColumn.getColName(), parentName, + carbonColumn.getColumnId(), (CarbonDimension) carbonColumn, cache, + carbonTableIdentifier); } - return null; } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java new file mode 100644 index 0000000..c419d46 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.processing.newflow.converter.impl; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.carbon.metadata.datatype.DataType; +import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.processing.newflow.DataField; +import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder; +import org.apache.carbondata.processing.newflow.converter.FieldConverter; +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; +import org.apache.carbondata.processing.newflow.row.CarbonRow; + +/** + * Converter for measure + */ +public class MeasureFieldConverterImpl implements FieldConverter { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(MeasureFieldConverterImpl.class.getName()); + + private int index; + + private DataType dataType; + + private CarbonMeasure measure; + + private String nullformat; + + public MeasureFieldConverterImpl(DataField dataField, String nullformat, int index) { + this.dataType = dataField.getColumn().getDataType(); + this.measure = (CarbonMeasure) dataField.getColumn(); + this.nullformat = nullformat; + this.index = index; + } + + @Override + public void convert(CarbonRow row, BadRecordLogHolder logHolder) + throws CarbonDataLoadingException { + String value = row.getString(index); + Object output; + if (value == null) { + logHolder.setReason( + "The value " + " \"" + value + "\"" + " with column name " + measure.getColName() + + " and column data type " + dataType + " is not a valid " + dataType + " type."); + row.update(null, index); + } else if(value.equals(nullformat)) { + row.update(null, index); + } else { + try { + output = DataTypeUtil.getMeasureValueBasedOnDataType(value, dataType, measure); + row.update(output, index); + } catch (NumberFormatException e) { + LOGGER.warn( + "Cant not convert : " + value + " to Numeric type value. Value considered as null."); + logHolder.setReason( + "The value " + " \"" + value + "\"" + " with column name " + measure.getColName() + + " and column data type " + dataType + " is not a valid " + dataType + " type."); + output = null; + row.update(output, index); + } + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java index 9540907..c90f1ba 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java @@ -21,9 +21,11 @@ package org.apache.carbondata.processing.newflow.converter.impl; import java.nio.charset.Charset; import org.apache.carbondata.core.carbon.metadata.datatype.DataType; +import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.processing.newflow.DataField; +import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder; import org.apache.carbondata.processing.newflow.converter.FieldConverter; import org.apache.carbondata.processing.newflow.row.CarbonRow; @@ -33,17 +35,29 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter { private int index; - public NonDictionaryFieldConverterImpl(DataField dataField, int index) { + private String nullformat; + + private CarbonColumn column; + + public NonDictionaryFieldConverterImpl(DataField dataField, String nullformat, int index) { this.dataType = dataField.getColumn().getDataType(); + this.column = dataField.getColumn(); this.index = index; + this.nullformat = nullformat; } @Override - public void convert(CarbonRow row) { + public void convert(CarbonRow row, BadRecordLogHolder logHolder) { String dimensionValue = row.getString(index); + if (dimensionValue == null || dimensionValue.equals(nullformat)) { + dimensionValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL; + } if (dataType != DataType.STRING) { if (null == DataTypeUtil.normalizeIntAndLongValues(dimensionValue, dataType)) { - dimensionValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL; + logHolder.setReason( + "The value " + " \"" + dimensionValue + "\"" + " with column name " + column + .getColName() + " and column data type " + dataType + " is not a valid " + dataType + + " type."); } } row.update(dimensionValue.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)),