[CARBONDATA-2452] [CARBONDATA-2451] [CARBONDATA-2450] [CARBONDATA-2453] Fixed issues related to complex types
Issue 1: Dictionary encoding was being added to complex types in SDK case which led to data load failure Issue 2: Sort columns were not being checked against table schema to validate the same. Issue 3: Bad record handling was not there for complex types. Issue 4: Parent name was not being prepended to field name before checking for duplicates which threw duplicate column exception This closes #2278 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6b70b7e4 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6b70b7e4 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6b70b7e4 Branch: refs/heads/spark-2.3 Commit: 6b70b7e47b05a612ccb5a5ad01ee2d5a05ffa600 Parents: 8e7fceb Author: kunal642 <kunalkapoor...@gmail.com> Authored: Mon May 7 20:58:21 2018 +0530 Committer: kumarvishal09 <kumarvishal1...@gmail.com> Committed: Fri May 11 03:27:36 2018 +0530 ---------------------------------------------------------------------- .../schema/table/TableSchemaBuilder.java | 21 +- .../complexType/TestComplexTypeQuery.scala | 2 + .../TestNonTransactionalCarbonTable.scala | 410 +++++++++++++++++-- .../processing/datatypes/ArrayDataType.java | 11 +- .../processing/datatypes/GenericDataType.java | 3 +- .../processing/datatypes/PrimitiveDataType.java | 41 +- .../processing/datatypes/StructDataType.java | 11 +- .../loading/DataLoadProcessBuilder.java | 9 + .../impl/ComplexFieldConverterImpl.java | 2 +- .../DirectDictionaryFieldConverterImpl.java | 1 - .../loading/model/CarbonLoadModelBuilder.java | 15 +- .../InputProcessorStepWithNoConverterImpl.java | 32 +- .../sdk/file/CarbonWriterBuilder.java | 24 +- 13 files changed, 524 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java index b078400..03d03f8 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java @@ -122,7 +122,13 @@ public class TableSchemaBuilder { private ColumnSchema addColumn(StructField field, String parentName, AtomicInteger valIndex, boolean isSortColumn, boolean isComplexChild) { Objects.requireNonNull(field); - checkRepeatColumnName(field); + if (isComplexChild) { + // if field is complex then append parent name to the child field to check + // if any other field with same name exists + checkRepeatColumnName(field, parentName); + } else { + checkRepeatColumnName(field); + } ColumnSchema newColumn = new ColumnSchema(); if (parentName != null) { newColumn.setColumnName(parentName + "." + field.getFieldName()); @@ -156,7 +162,7 @@ public class TableSchemaBuilder { // SO, this will not have any impact. newColumn.setColumnUniqueId(field.getFieldName()); newColumn.setColumnReferenceId(newColumn.getColumnUniqueId()); - newColumn.setEncodingList(createEncoding(field.getDataType(), isSortColumn)); + newColumn.setEncodingList(createEncoding(field.getDataType(), isSortColumn, isComplexChild)); if (field.getDataType().isComplexType()) { if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) { newColumn.setNumberOfChild(1); @@ -209,6 +215,12 @@ public class TableSchemaBuilder { /** * Throw exception if {@param field} name is repeated */ + private void checkRepeatColumnName(StructField field, String parentName) { + checkRepeatColumnName( + new StructField(parentName + "." + field.getFieldName(), field.getDataType(), + field.getChildren())); + } + private void checkRepeatColumnName(StructField field) { for (ColumnSchema column : sortColumns) { if (column.getColumnName().equalsIgnoreCase(field.getFieldName())) { @@ -234,9 +246,10 @@ public class TableSchemaBuilder { } } - private List<Encoding> createEncoding(DataType dataType, boolean isSortColumn) { + private List<Encoding> createEncoding(DataType dataType, boolean isSortColumn, + boolean isComplexChild) { List<Encoding> encodings = new LinkedList<>(); - if (dataType == DataTypes.TIMESTAMP || dataType == DataTypes.DATE) { + if (dataType == DataTypes.DATE && !isComplexChild) { encodings.add(Encoding.DIRECT_DICTIONARY); encodings.add(Encoding.DICTIONARY); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala index bc44df0..6728cdf 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexTypeQuery.scala @@ -38,6 +38,8 @@ class TestComplexTypeQuery extends QueryTest with BeforeAndAfterAll { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, + "force") sql("drop table if exists complexcarbontable") sql("drop table if exists complexhivetable") sql("drop table if exists complex_filter") http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala index 3c51efe..376501b 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala @@ -19,6 +19,7 @@ package org.apache.carbondata.spark.testsuite.createTable import java.sql.Timestamp import java.io.{File, FileFilter, IOException} +import java.io.{File, FileFilter} import java.util import org.apache.commons.io.FileUtils @@ -33,6 +34,7 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.sdk.file.AvroCarbonWriter +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import scala.collection.JavaConverters._ import scala.collection.mutable @@ -40,7 +42,7 @@ import org.apache.avro import org.apache.commons.lang.CharEncoding import tech.allegro.schema.json2avro.converter.JsonAvroConverter -import org.apache.carbondata.core.metadata.datatype.DataTypes +import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField} import org.apache.carbondata.sdk.file.{CarbonWriter, CarbonWriterBuilder, Field, Schema} @@ -219,6 +221,9 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { } override def beforeAll(): Unit = { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) sql("DROP TABLE IF EXISTS sdkOutputTable") } @@ -247,8 +252,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { cleanTestData() } - test("test create External Table with Schema with partition, should ignore schema and partition") - { + test("test create External Table with Schema with partition, should ignore schema and partition") { buildTestDataSingleFile() assert(new File(writerPath).exists()) sql("DROP TABLE IF EXISTS sdkOutputTable") @@ -270,8 +274,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { } - test("test create External Table with insert into feature") - { + test("test create External Table with insert into feature") { buildTestDataSingleFile() assert(new File(writerPath).exists()) sql("DROP TABLE IF EXISTS sdkOutputTable") @@ -302,8 +305,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { cleanTestData() } - test("test create External Table with insert overwrite") - { + test("test create External Table with insert overwrite") { buildTestDataSingleFile() assert(new File(writerPath).exists()) sql("DROP TABLE IF EXISTS sdkOutputTable") @@ -341,8 +343,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { } - test("test create External Table with Load") - { + test("test create External Table with Load") { buildTestDataSingleFile() assert(new File(writerPath).exists()) sql("DROP TABLE IF EXISTS sdkOutputTable") @@ -563,6 +564,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { cleanTestData() } + test("Read sdk writer output file without any file should fail") { buildTestDataSingleFile() deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT) @@ -748,7 +750,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { } - test("Read sdk two writer output with same column name but different sort columns") { FileUtils.deleteDirectory(new File(writerPath)) buildTestDataOtherDataType(3, Array[String]("name")) @@ -814,7 +815,10 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { cleanTestData() } - private def WriteFilesWithAvroWriter(rows: Int, mySchema: String, json: String): Unit = { + private def WriteFilesWithAvroWriter(rows: Int, + mySchema: String, + json: String, + fields: Array[Field]) = { // conversion to GenericData.Record val nn = new avro.Schema.Parser().parse(mySchema) val converter = new JsonAvroConverter @@ -822,8 +826,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { .convertToGenericDataRecord(json.getBytes(CharEncoding.UTF_8), nn) try { - val writer = CarbonWriter.builder - .withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(mySchema)) + val writer = CarbonWriter.builder.withSchema(new Schema(fields)) .outputPath(writerPath).isTransactionalTable(false) .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput var i = 0 @@ -860,7 +863,16 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { """.stripMargin val json = """ {"name":"bob", "age":10, "address" : {"street":"abc", "city":"bang"}} """ - WriteFilesWithAvroWriter(rows, mySchema, json) + + val fields = new Array[Field](3) + fields(0) = new Field("name", DataTypes.STRING) + fields(1) = new Field("age", DataTypes.INT) + val fld = new util.ArrayList[StructField] + fld.add(new StructField("street", DataTypes.STRING)) + fld.add(new StructField("city", DataTypes.STRING)) + fields(2) = new Field("address", "struct", fld) + + WriteFilesWithAvroWriter(rows, mySchema, json, fields) } def buildAvroTestDataStructType(): Any = { @@ -899,7 +911,17 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { """.stripMargin val json: String = """ {"name": "bob","age": 10,"address": ["abc", "defg"]} """ - WriteFilesWithAvroWriter(rows, mySchema, json) + + + val fields = new Array[Field](3) + fields(0) = new Field("name", DataTypes.STRING) + fields(1) = new Field("age", DataTypes.INT) + // fields[1] = new Field("age", DataTypes.INT); + val fld = new util.ArrayList[StructField] + fld.add(new StructField("street", DataTypes.STRING)) + fields(2) = new Field("address", "array", fld) + + WriteFilesWithAvroWriter(rows, mySchema, json, fields) } def buildAvroTestDataSingleFileArrayType(): Any = { @@ -943,7 +965,18 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { """ {"name":"bob", "age":10, |"address" : {"street":"abc", "city":"bang"}, |"doorNum" : [1,2,3,4]}""".stripMargin - WriteFilesWithAvroWriter(rows, mySchema, json) + + val fields = new Array[Field](4) + fields(0) = new Field("name", DataTypes.STRING) + fields(1) = new Field("age", DataTypes.INT) + val fld = new util.ArrayList[StructField] + fld.add(new StructField("street", DataTypes.STRING)) + fld.add(new StructField("city", DataTypes.STRING)) + fields(2) = new Field("address", "struct", fld) + val fld1 = new util.ArrayList[StructField] + fld1.add(new StructField("eachDoorNum", DataTypes.INT)) + fields(3) = new Field("doorNum", "array", fld1) + WriteFilesWithAvroWriter(rows, mySchema, json, fields) } def buildAvroTestDataBothStructArrayType(): Any = { @@ -951,6 +984,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { buildAvroTestDataStructWithArrayType(3, null) } + // ArrayOfStruct test def buildAvroTestDataArrayOfStruct(rows: Int, options: util.Map[String, String]): Any = { FileUtils.deleteDirectory(new File(writerPath)) @@ -995,7 +1029,20 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { |{"street":"def","city":"city2"}, |{"street":"ghi","city":"city3"}, |{"street":"jkl","city":"city4"}]} """.stripMargin - WriteFilesWithAvroWriter(rows, mySchema, json) + + val fields = new Array[Field](3) + fields(0) = new Field("name", DataTypes.STRING) + fields(1) = new Field("age", DataTypes.INT) + + val fld = new util.ArrayList[StructField] + fld.add(new StructField("street", DataTypes.STRING)) + fld.add(new StructField("city", DataTypes.STRING)) + + val fld2 = new util.ArrayList[StructField] + fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld)) + fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2) + + WriteFilesWithAvroWriter(rows, mySchema, json, fields) } def buildAvroTestDataArrayOfStructType(): Any = { @@ -1003,6 +1050,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { buildAvroTestDataArrayOfStruct(3, null) } + // StructOfArray test def buildAvroTestDataStructOfArray(rows: Int, options: util.Map[String, String]): Any = { FileUtils.deleteDirectory(new File(writerPath)) @@ -1064,7 +1112,21 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { | ] | } |} """.stripMargin - WriteFilesWithAvroWriter(rows, mySchema, json) + + val fields = new Array[Field](3) + fields(0) = new Field("name", DataTypes.STRING) + fields(1) = new Field("age", DataTypes.INT) + + val fld2 = new util.ArrayList[StructField] + fld2.add(new StructField("street", DataTypes.STRING)) + fld2.add(new StructField("city", DataTypes.STRING)) + + val fld1 = new util.ArrayList[StructField] + fld1.add(new StructField("eachDoorNum", DataTypes.INT)) + fld2.add(new StructField("doorNum", DataTypes.createArrayType(DataTypes.INT), fld1)) + + fields(2) = new Field("address","struct",fld2) + WriteFilesWithAvroWriter(rows, mySchema, json, fields) } def buildAvroTestDataStructOfArrayType(): Any = { @@ -1072,6 +1134,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { buildAvroTestDataStructOfArray(3, null) } + test("Read sdk writer Avro output Record Type") { buildAvroTestDataStructType() assert(new File(writerPath).exists()) @@ -1080,6 +1143,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION |'$writerPath' """.stripMargin) + checkAnswer(sql("select * from sdkOutputTable"), Seq( Row("bob", 10, Row("abc","bang")), Row("bob", 10, Row("abc","bang")), @@ -1138,6 +1202,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { cleanTestData() } + test("Read sdk writer Avro output with Array of struct") { buildAvroTestDataArrayOfStructType() assert(new File(writerPath).exists()) @@ -1163,6 +1228,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { cleanTestData() } + // Struct of array test("Read sdk writer Avro output with struct of Array") { buildAvroTestDataStructOfArrayType() @@ -1264,7 +1330,21 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { | } | ] |} """.stripMargin - WriteFilesWithAvroWriter(rows, mySchema, json) + + val fields = new Array[Field](3) + fields(0) = new Field("name", DataTypes.STRING) + fields(1) = new Field("age", DataTypes.INT) + + val fld = new util.ArrayList[StructField] + fld.add(new StructField("street", DataTypes.STRING)) + fld.add(new StructField("city", DataTypes.STRING)) + fld.add(new StructField("FloorNum", DataTypes.createArrayType(DataTypes.INT))) + + val fld2 = new util.ArrayList[StructField] + fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld)) + fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2) + + WriteFilesWithAvroWriter(rows, mySchema, json, fields) } def buildAvroTestDataMultiLevel3Type(): Any = { @@ -1302,6 +1382,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { cleanTestData() } + // test multi level -- 3 levels [array of struct of struct of string, int] def buildAvroTestDataMultiLevel3_1(rows: Int, options: util.Map[String, String]): Any = { FileUtils.deleteDirectory(new File(writerPath)) @@ -1381,7 +1462,26 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { | } | ] |} """.stripMargin - WriteFilesWithAvroWriter(rows, mySchema, json) + + val fields = new Array[Field](3) + fields(0) = new Field("name", DataTypes.STRING) + fields(1) = new Field("age", DataTypes.INT) + + val fld = new util.ArrayList[StructField] + fld.add(new StructField("street", DataTypes.STRING)) + fld.add(new StructField("city", DataTypes.STRING)) + + val subFld = new util.ArrayList[StructField] + subFld.add(new StructField("wing", DataTypes.STRING)) + subFld.add(new StructField("number", DataTypes.INT)) + fld.add(new StructField("FloorNum", DataTypes.createStructType(subFld))) + + // array of struct of struct + val fld2 = new util.ArrayList[StructField] + fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld)) + fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2) + + WriteFilesWithAvroWriter(rows, mySchema, json, fields) } def buildAvroTestDataMultiLevel3_1Type(): Any = { @@ -1461,7 +1561,22 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { | "BuildNum": [[[1,2,3],[4,5,6]],[[10,20,30],[40,50,60]]] | } """.stripMargin - WriteFilesWithAvroWriter(rows, mySchema, json) + val fields = new Array[Field](3) + fields(0) = new Field("name", DataTypes.STRING) + fields(1) = new Field("age", DataTypes.INT) + + val subFld = new util.ArrayList[StructField] + subFld.add(new StructField("EachDoorNum", DataTypes.INT)) + + val fld = new util.ArrayList[StructField] + fld.add(new StructField("DoorNum", DataTypes.createArrayType(DataTypes.INT), subFld)) + // array of struct of struct + val doorNum = new util.ArrayList[StructField] + doorNum.add(new StructField("FloorNum", + DataTypes.createArrayType(DataTypes.createArrayType(DataTypes.INT)), fld)) + fields(2) = new Field("BuildNum", "array", doorNum) + + WriteFilesWithAvroWriter(rows, mySchema, json, fields) } def buildAvroTestDataMultiLevel3_2Type(): Any = { @@ -1500,6 +1615,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { cleanTestData() } + + // test multi level -- 4 levels [array of array of array of struct] def buildAvroTestDataMultiLevel4(rows: Int, options: util.Map[String, String]): Any = { FileUtils.deleteDirectory(new File(writerPath)) @@ -1578,7 +1695,30 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { | ] | ] |} """.stripMargin - WriteFilesWithAvroWriter(rows, mySchema, json) + + val fields = new Array[Field](3) + fields(0) = new Field("name", DataTypes.STRING) + fields(1) = new Field("age", DataTypes.INT) + + val subFld = new util.ArrayList[StructField] + subFld.add(new StructField("EachDoorNum", DataTypes.INT)) + + val address = new util.ArrayList[StructField] + address.add(new StructField("street", DataTypes.STRING)) + address.add(new StructField("city", DataTypes.STRING)) + + val fld = new util.ArrayList[StructField] + fld.add(new StructField("DoorNum", + DataTypes.createArrayType(DataTypes.createStructType(address)), + subFld)) + // array of struct of struct + val doorNum = new util.ArrayList[StructField] + doorNum.add(new StructField("FloorNum", + DataTypes.createArrayType( + DataTypes.createArrayType(DataTypes.createStructType(address))), fld)) + fields(2) = new Field("BuildNum", "array", doorNum) + + WriteFilesWithAvroWriter(rows, mySchema, json, fields) } def buildAvroTestDataMultiLevel4Type(): Any = { @@ -1604,4 +1744,228 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { cleanTestData() } -} \ No newline at end of file + test( + "test if exception is thrown when a column which is not in schema is specified in sort columns") + { + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "id", + | "type": "int" + | }, + | { + | "name": "course_details", + | "type": { + | "name": "course_details", + | "type": "record", + | "fields": [ + | { + | "name": "course_struct_course_time", + | "type": "string" + | } + | ] + | } + | } + | ] + |}""".stripMargin + + val json1 = + """{"id": 101,"course_details": { "course_struct_course_time":"2014-01-05" }}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val converter = new JsonAvroConverter + val record = converter + .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn) + + val fields = new Array[Field](2) + fields(0) = new Field("id", DataTypes.INT) + val fld_s = new java.util.ArrayList[StructField] + fld_s.add(new StructField("course_struct_course_time", DataTypes.STRING)) + fields(1) = new Field("course_details", "struct", fld_s) + + assert(intercept[RuntimeException] { + val writer = CarbonWriter.builder.withSchema(new Schema(fields)).sortBy(Array("name", "id")) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput + writer.write(record) + writer.close() + }.getMessage.toLowerCase.contains("column: name specified in sort columns")) + } + + test("test if data load is success with a struct having timestamp column ") { + val schema1 = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "id", + | "type": "int" + | }, + | { + | "name": "course_details", + | "type": { + | "name": "course_details", + | "type": "record", + | "fields": [ + | { + | "name": "course_struct_course_time", + | "type": "string" + | } + | ] + | } + | } + | ] + |}""".stripMargin + + val json1 = + """{"id": 101,"course_details": { "course_struct_course_time":"2014-01-05 00:00:00" }}""".stripMargin + val nn = new org.apache.avro.Schema.Parser().parse(schema1) + val converter = new JsonAvroConverter + val record = converter + .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn) + + val fields = new Array[Field](2) + fields(0) = new Field("id", DataTypes.INT) + val fld_s = new java.util.ArrayList[StructField] + fld_s.add(new StructField("course_struct_course_time", DataTypes.TIMESTAMP)) + fields(1) = new Field("course_details", "struct", fld_s) + + val writer = CarbonWriter.builder.withSchema(new Schema(fields)).sortBy(Array("id")) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput + writer.write(record) + writer.close() + } + + test( + "test is dataload is successful if childcolumn has same name as one of the other fields(not " + + "complex)") + { + val schema = + """{ + | "type": "record", + | "name": "Order", + | "namespace": "com.apache.schema", + | "fields": [ + | { + | "name": "id", + | "type": "long" + | }, + | { + | "name": "entries", + | "type": { + | "type": "array", + | "items": { + | "type": "record", + | "name": "Entry", + | "fields": [ + | { + | "name": "id", + | "type": "long" + | } + | ] + | } + | } + | } + | ] + |}""".stripMargin + val json1 = + """{"id": 101, "entries": [ {"id":1234}, {"id":3212} ]}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema) + val converter = new JsonAvroConverter + val record = converter + .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn) + + val fields = new Array[Field](2) + fields(0) = new Field("id", DataTypes.LONG) + val fld_s = new java.util.ArrayList[StructField] + fld_s.add(new StructField("id", DataTypes.LONG)) + fields(1) = new Field("entries", DataTypes.createArrayType(DataTypes.createStructType(fld_s))) + val writer = CarbonWriter.builder.withSchema(new Schema(fields)) + .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput + writer.write(record) + writer.close() + } + + test("test if data load with various bad_records_action") { + val schema = + """{ + | "namespace": "com.apache.schema", + | "type": "record", + | "name": "StudentActivity", + | "fields": [ + | { + | "name": "id", + | "type": "string" + | }, + | { + | "name": "course_details", + | "type": { + | "name": "course_details", + | "type": "record", + | "fields": [ + | { + | "name": "course_struct_course_string", + | "type": "string" + | } + | ] + | } + | }, + | { + | "name": "salary_string", + | "type": { + | "type": "array", + | "items": "string" + | } + | } + | ] + |}""".stripMargin + val json1 = + """{ + | "id": "cust_1", + | "course_details": { + | "course_struct_course_string": "asd" + | }, + | "salary_string": [ + | "xyz", + | "abc" + | ] + |}""".stripMargin + + val nn = new org.apache.avro.Schema.Parser().parse(schema) + val converter = new JsonAvroConverter + val record = converter + .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn) + + val fields = new Array[Field](3) + fields(0)=new Field("id", DataTypes.STRING) + val fld_s = new java.util.ArrayList[StructField] + fld_s.add(new StructField("carbon_int", DataTypes.INT)) + fields(1)=new Field("course_details", "struct",fld_s) + + val fld_a = new java.util.ArrayList[StructField] + fld_a.add(new StructField("carbon_array", DataTypes.INT)) + fields(2)=new Field("salary_string", "array",fld_a) + + val loadOptions = new util.HashMap[String, String]() + loadOptions.put("bad_records_action", "fail") + assert(intercept[Exception] { + val writer = CarbonWriter.builder.withSchema(new Schema(fields)).outputPath(writerPath) + .isTransactionalTable(false).withLoadOptions(loadOptions).buildWriterForAvroInput + writer.write(record) + writer.close() + }.getMessage.contains("Data load failed due to bad record")) + + loadOptions.put("bad_records_action", "FORCE") + val writer = CarbonWriter.builder.withSchema(new Schema(fields)).outputPath(writerPath) + .isTransactionalTable(false).withLoadOptions(loadOptions).buildWriterForAvroInput + writer.write(record) + writer.close() + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java index d7d59ce..cc2619e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java @@ -27,7 +27,7 @@ import org.apache.carbondata.core.devapi.DictionaryGenerationException; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.KeyGenerator; import org.apache.carbondata.processing.loading.complexobjects.ArrayObject; - +import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder; /** * Array DataType stateless object used in data loading @@ -151,17 +151,16 @@ public class ArrayDataType implements GenericDataType<ArrayObject> { return true; } - @Override - public void writeByteArray(ArrayObject input, DataOutputStream dataOutputStream) - throws IOException, DictionaryGenerationException { + @Override public void writeByteArray(ArrayObject input, DataOutputStream dataOutputStream, + BadRecordLogHolder logHolder) throws IOException, DictionaryGenerationException { if (input == null) { dataOutputStream.writeInt(1); - children.writeByteArray(null, dataOutputStream); + children.writeByteArray(null, dataOutputStream, logHolder); } else { Object[] data = input.getData(); dataOutputStream.writeInt(data.length); for (Object eachInput : data) { - children.writeByteArray(eachInput, dataOutputStream); + children.writeByteArray(eachInput, dataOutputStream, logHolder); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java index 21ad83d..f48a91d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.carbondata.core.devapi.DictionaryGenerationException; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.KeyGenerator; +import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder; /** * Generic DataType interface which will be used while data loading for complex types like Array & @@ -58,7 +59,7 @@ public interface GenericDataType<T> { * @param dataOutputStream * @throws IOException */ - void writeByteArray(T input, DataOutputStream dataOutputStream) + void writeByteArray(T input, DataOutputStream dataOutputStream, BadRecordLogHolder logHolder) throws IOException, DictionaryGenerationException; /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java index e34c184..fdfc3bb 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java @@ -48,10 +48,12 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder; import org.apache.carbondata.processing.loading.dictionary.DictionaryServerClientDictionary; import org.apache.carbondata.processing.loading.dictionary.DirectDictionary; import org.apache.carbondata.processing.loading.dictionary.PreCreatedDictionary; import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; /** * Primitive DataType stateless object used in data loading @@ -265,19 +267,29 @@ public class PrimitiveDataType implements GenericDataType<Object> { return isDictionary; } - @Override public void writeByteArray(Object input, DataOutputStream dataOutputStream) - throws IOException, DictionaryGenerationException { - + @Override public void writeByteArray(Object input, DataOutputStream dataOutputStream, + BadRecordLogHolder logHolder) throws IOException, DictionaryGenerationException { String parsedValue = input == null ? null : DataTypeUtil.parseValue(input.toString(), carbonDimension); + String message = logHolder.getColumnMessageMap().get(carbonDimension.getColName()); if (this.isDictionary) { Integer surrogateKey; if (null == parsedValue) { surrogateKey = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY; + if (null == message) { + message = CarbonDataProcessorUtil + .prepareFailureReason(carbonDimension.getColName(), carbonDimension.getDataType()); + logHolder.getColumnMessageMap().put(carbonDimension.getColName(), message); + logHolder.setReason(message); + } } else { surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue); if (surrogateKey == CarbonCommonConstants.INVALID_SURROGATE_KEY) { surrogateKey = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY; + message = CarbonDataProcessorUtil + .prepareFailureReason(carbonDimension.getColName(), carbonDimension.getDataType()); + logHolder.getColumnMessageMap().put(carbonDimension.getColName(), message); + logHolder.setReason(message); } } dataOutputStream.writeInt(surrogateKey); @@ -285,15 +297,15 @@ public class PrimitiveDataType implements GenericDataType<Object> { // Transform into ByteArray for No Dictionary. // TODO have to refactor and place all the cases present in NonDictionaryFieldConverterImpl if (null == parsedValue && this.carbonDimension.getDataType() != DataTypes.STRING) { - updateNullValue(dataOutputStream); + updateNullValue(dataOutputStream, logHolder); } else if (null == parsedValue || parsedValue.equals(nullformat)) { - updateNullValue(dataOutputStream); + updateNullValue(dataOutputStream, logHolder); } else { String dateFormat = null; if (this.carbonDimension.getDataType() == DataTypes.DATE) { - dateFormat = this.carbonDimension.getDateFormat(); + dateFormat = carbonDimension.getDateFormat(); } else if (this.carbonDimension.getDataType() == DataTypes.TIMESTAMP) { - dateFormat = this.carbonDimension.getTimestampFormat(); + dateFormat = carbonDimension.getTimestampFormat(); } try { @@ -318,9 +330,12 @@ public class PrimitiveDataType implements GenericDataType<Object> { updateValueToByteStream(dataOutputStream, parsedValue.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); } else { - updateNullValue(dataOutputStream); + updateNullValue(dataOutputStream, logHolder); } } + } catch (NumberFormatException e) { + // Update logHolder for bad record and put null in dataOutputStream. + updateNullValue(dataOutputStream, logHolder); } catch (CarbonDataLoadingException e) { throw e; } catch (Throwable ex) { @@ -338,7 +353,8 @@ public class PrimitiveDataType implements GenericDataType<Object> { dataOutputStream.write(value); } - private void updateNullValue(DataOutputStream dataOutputStream) throws IOException { + private void updateNullValue(DataOutputStream dataOutputStream, BadRecordLogHolder logHolder) + throws IOException { if (this.carbonDimension.getDataType() == DataTypes.STRING) { dataOutputStream.writeInt(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length); dataOutputStream.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY); @@ -346,6 +362,13 @@ public class PrimitiveDataType implements GenericDataType<Object> { dataOutputStream.writeInt(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length); dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY); } + String message = logHolder.getColumnMessageMap().get(carbonDimension.getColName()); + if (null == message) { + message = CarbonDataProcessorUtil + .prepareFailureReason(carbonDimension.getColName(), carbonDimension.getDataType()); + logHolder.getColumnMessageMap().put(carbonDimension.getColName(), message); + logHolder.setReason(message); + } } @Override public void fillCardinality(List<Integer> dimCardWithComplex) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java index 4fe6255..bb3da6c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java @@ -27,6 +27,7 @@ import org.apache.carbondata.core.devapi.DictionaryGenerationException; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.KeyGenerator; import org.apache.carbondata.processing.loading.complexobjects.StructObject; +import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder; /** * Struct DataType stateless object used in data loading @@ -150,22 +151,22 @@ public class StructDataType implements GenericDataType<StructObject> { return true; } - @Override public void writeByteArray(StructObject input, DataOutputStream dataOutputStream) - throws IOException, DictionaryGenerationException { + @Override public void writeByteArray(StructObject input, DataOutputStream dataOutputStream, + BadRecordLogHolder logHolder) throws IOException, DictionaryGenerationException { dataOutputStream.writeInt(children.size()); if (input == null) { for (int i = 0; i < children.size(); i++) { - children.get(i).writeByteArray(null, dataOutputStream); + children.get(i).writeByteArray(null, dataOutputStream, logHolder); } } else { Object[] data = input.getData(); for (int i = 0; i < data.length && i < children.size(); i++) { - children.get(i).writeByteArray(data[i], dataOutputStream); + children.get(i).writeByteArray(data[i], dataOutputStream, logHolder); } // For other children elements which dont have data, write empty for (int i = data.length; i < children.size(); i++) { - children.get(i).writeByteArray(null, dataOutputStream); + children.get(i).writeByteArray(null, dataOutputStream, logHolder); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java index 2f904ed..17d0c76 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java @@ -237,6 +237,15 @@ public final class DataLoadProcessBuilder { } if (column.isComplex()) { complexDataFields.add(dataField); + List<CarbonDimension> childDimensions = + ((CarbonDimension) dataField.getColumn()).getListOfChildDimensions(); + for (CarbonDimension childDimension : childDimensions) { + if (childDimension.getDataType() == DataTypes.DATE) { + childDimension.setDateFormat(loadModel.getDateFormat()); + } else if (childDimension.getDataType() == DataTypes.TIMESTAMP) { + childDimension.setTimestampFormat(loadModel.getTimestampformat()); + } + } } else { dataFields.add(dataField); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java index b26ef36..4e46f9f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java @@ -44,7 +44,7 @@ public class ComplexFieldConverterImpl extends AbstractDictionaryFieldConverterI ByteArrayOutputStream byteArray = new ByteArrayOutputStream(); DataOutputStream dataOutputStream = new DataOutputStream(byteArray); try { - genericDataType.writeByteArray(object, dataOutputStream); + genericDataType.writeByteArray(object, dataOutputStream, logHolder); dataOutputStream.close(); row.update(byteArray.toByteArray(), index); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java index b49cd90..64ddf27 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java @@ -54,7 +54,6 @@ public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldC this.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory .getDirectDictionaryGenerator(dataField.getColumn().getDataType(), dataField.getTimestampFormat()); - } else { this.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory .getDirectDictionaryGenerator(dataField.getColumn().getDataType()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java index 13dd75c..9a9d09e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java @@ -79,7 +79,20 @@ public class CarbonLoadModelBuilder { // we have provided 'fileheader', so it hadoopConf can be null build(options, optionsFinal, model, null); - + String timestampFormat = options.get("timestampformat"); + if (timestampFormat == null) { + timestampFormat = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT); + } + String dateFormat = options.get("dateFormat"); + if (dateFormat == null) { + dateFormat = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT); + } + model.setDateFormat(dateFormat); + model.setTimestampformat(timestampFormat); model.setUseOnePass(Boolean.parseBoolean(Maps.getOrDefault(options, "onepass", "false"))); model.setDictionaryServerHost(Maps.getOrDefault(options, "dicthost", null)); try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java index 77f5260..c99a413 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java @@ -35,11 +35,15 @@ import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.processing.datatypes.GenericDataType; import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep; +import org.apache.carbondata.processing.loading.BadRecordsLogger; +import org.apache.carbondata.processing.loading.BadRecordsLoggerProvider; import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; import org.apache.carbondata.processing.loading.DataField; import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants; +import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder; import org.apache.carbondata.processing.loading.converter.impl.FieldEncoderFactory; import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl; +import org.apache.carbondata.processing.loading.exception.BadRecordFoundException; import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException; import org.apache.carbondata.processing.loading.row.CarbonRowBatch; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; @@ -134,7 +138,7 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce outIterators[i] = new InputProcessorIterator(readerIterators[i], batchSize, configuration.isPreFetch(), rowCounter, orderOfData, noDictionaryMapping, dataTypes, - configuration.getDataFields(), dataFieldsWithComplexDataType); + configuration, dataFieldsWithComplexDataType); } return outIterators; } @@ -207,11 +211,13 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce private int[] orderOfData; + private CarbonDataLoadConfiguration configuration; + private Map<Integer, GenericDataType> dataFieldsWithComplexDataType; public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators, int batchSize, boolean preFetch, AtomicLong rowCounter, int[] orderOfData, boolean[] noDictionaryMapping, - DataType[] dataTypes, DataField[] dataFields, + DataType[] dataTypes, CarbonDataLoadConfiguration configuration, Map<Integer, GenericDataType> dataFieldsWithComplexDataType) { this.inputIterators = inputIterators; this.batchSize = batchSize; @@ -223,7 +229,8 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce this.firstTime = true; this.noDictionaryMapping = noDictionaryMapping; this.dataTypes = dataTypes; - this.dataFields = dataFields; + this.dataFields = configuration.getDataFields(); + this.configuration = configuration; this.orderOfData = orderOfData; this.dataFieldsWithComplexDataType = dataFieldsWithComplexDataType; } @@ -272,6 +279,9 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce private Object[] convertToNoDictionaryToBytes(Object[] data, DataField[] dataFields) { Object[] newData = new Object[data.length]; + BadRecordLogHolder logHolder = new BadRecordLogHolder(); + BadRecordsLogger badRecordLogger = + BadRecordsLoggerProvider.createBadRecordLogger(configuration); for (int i = 0; i < data.length; i++) { if (i < noDictionaryMapping.length && noDictionaryMapping[i]) { newData[i] = DataTypeUtil @@ -284,11 +294,21 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce try { GenericDataType complextType = dataFieldsWithComplexDataType.get(dataFields[i].getColumn().getOrdinal()); - - complextType.writeByteArray(data[orderOfData[i]], dataOutputStream); - + complextType.writeByteArray(data[orderOfData[i]], dataOutputStream, logHolder); + if (!logHolder.isLogged() && logHolder.isBadRecordNotAdded()) { + badRecordLogger.addBadRecordsToBuilder(data, logHolder.getReason()); + if (badRecordLogger.isDataLoadFail()) { + String error = "Data load failed due to bad record: " + logHolder.getReason(); + if (!badRecordLogger.isBadRecordLoggerEnable()) { + error += "Please enable bad record logger to know the detail reason."; + } + throw new BadRecordFoundException(error); + } + } dataOutputStream.close(); newData[i] = byteArray.toByteArray(); + } catch (BadRecordFoundException e) { + throw new CarbonDataLoadingException("Loading Exception: " + e.getMessage(), e); } catch (Exception e) { throw new CarbonDataLoadingException("Loading Exception", e); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6b70b7e4/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index f541dbb..00ba8a5 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -431,6 +431,20 @@ public class CarbonWriterBuilder { // to child of complex array type in the order val1, val2 so that each array type child is // differentiated to any level AtomicInteger valIndex = new AtomicInteger(0); + // Check if any of the columns specified in sort columns are missing from schema. + for (String sortColumn: sortColumnsList) { + boolean exists = false; + for (Field field : fields) { + if (field.getFieldName().equalsIgnoreCase(sortColumn)) { + exists = true; + break; + } + } + if (!exists) { + throw new RuntimeException( + "column: " + sortColumn + " specified in sort columns does not exist in schema"); + } + } for (Field field : fields) { if (null != field) { int isSortColumn = sortColumnsList.indexOf(field.getFieldName()); @@ -442,9 +456,9 @@ public class CarbonWriterBuilder { " sort columns not supported for " + "array, struct, double, float, decimal "); } } - if (field.getChildren() != null && field.getChildren().size() > 0) { if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) { + checkForUnsupportedDataTypes(field.getChildren().get(0).getDataType()); // Loop through the inner columns and for a StructData DataType complexType = DataTypes.createArrayType(field.getChildren().get(0).getDataType()); @@ -455,6 +469,7 @@ public class CarbonWriterBuilder { List<StructField> structFieldsArray = new ArrayList<StructField>(field.getChildren().size()); for (StructField childFld : field.getChildren()) { + checkForUnsupportedDataTypes(childFld.getDataType()); structFieldsArray .add(new StructField(childFld.getFieldName(), childFld.getDataType())); } @@ -475,6 +490,13 @@ public class CarbonWriterBuilder { } } + private void checkForUnsupportedDataTypes(DataType dataType) { + if (dataType == DataTypes.DOUBLE || dataType == DataTypes.DATE || DataTypes + .isDecimal(dataType)) { + throw new RuntimeException("Unsupported data type: " + dataType.getName()); + } + } + /** * Save the schema of the {@param table} to {@param persistFilePath} * @param table table object containing schema