This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch fix-schema-validator
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 6d22faa9fd916217b764ddebd8cacac54ea4a6db
Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz>
AuthorDate: Fri Sep 11 14:37:24 2020 -0700

    Adjust schema validation logic in AvroIngestionSchemaValidator
---
 .../hadoop/data/IngestionSchemaValidatorTest.java  |  51 ++++++++++++++-------
 .../data/test_sample_data_multi_value.avro         | Bin 0 -> 12222227 bytes
 .../avro/AvroIngestionSchemaValidator.java         |  51 +++++++++++++++------
 3 files changed, 71 insertions(+), 31 deletions(-)

diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java
index fec3583..8cd0912 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java
@@ -29,16 +29,16 @@ import org.testng.annotations.Test;
 
 
 public class IngestionSchemaValidatorTest {
+
   @Test
-  public void testAvroIngestionSchemaValidator()
+  public void testAvroIngestionSchemaValidatorForSingleValueColumns()
       throws Exception {
-    String inputFilePath = new File(
-        
Preconditions.checkNotNull(IngestionSchemaValidatorTest.class.getClassLoader().getResource("data/test_sample_data.avro"))
-            .getFile()).toString();
+    String inputFilePath = new File(Preconditions
+        
.checkNotNull(IngestionSchemaValidatorTest.class.getClassLoader().getResource("data/test_sample_data.avro"))
+        .getFile()).toString();
     String recordReaderClassName = 
"org.apache.pinot.plugin.inputformat.avro.AvroRecordReader";
 
-    Schema pinotSchema = new Schema.SchemaBuilder()
-        .addSingleValueDimension("column1", FieldSpec.DataType.LONG)
+    Schema pinotSchema = new 
Schema.SchemaBuilder().addSingleValueDimension("column1", 
FieldSpec.DataType.LONG)
         .addSingleValueDimension("column2", FieldSpec.DataType.INT)
         .addSingleValueDimension("column3", FieldSpec.DataType.STRING)
         .addSingleValueDimension("column7", FieldSpec.DataType.STRING)
@@ -53,8 +53,7 @@ public class IngestionSchemaValidatorTest {
     
Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
 
     // Adding one extra column
-    pinotSchema = new Schema.SchemaBuilder()
-        .addSingleValueDimension("column1", FieldSpec.DataType.LONG)
+    pinotSchema = new 
Schema.SchemaBuilder().addSingleValueDimension("column1", 
FieldSpec.DataType.LONG)
         .addSingleValueDimension("column2", FieldSpec.DataType.INT)
         .addSingleValueDimension("column3", FieldSpec.DataType.STRING)
         .addSingleValueDimension("extra_column", FieldSpec.DataType.STRING)
@@ -69,11 +68,9 @@ public class IngestionSchemaValidatorTest {
     
Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
     
Assert.assertTrue(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
     
Assert.assertNotNull(ingestionSchemaValidator.getMissingPinotColumnResult().getMismatchReason());
-    
System.out.println(ingestionSchemaValidator.getMissingPinotColumnResult().getMismatchReason());
 
     // Change the data type of column1 from LONG to STRING
-    pinotSchema = new Schema.SchemaBuilder()
-        .addSingleValueDimension("column1", FieldSpec.DataType.STRING)
+    pinotSchema = new 
Schema.SchemaBuilder().addSingleValueDimension("column1", 
FieldSpec.DataType.STRING)
         .addSingleValueDimension("column2", FieldSpec.DataType.INT)
         .addSingleValueDimension("column3", FieldSpec.DataType.STRING)
         .addSingleValueDimension("column7", FieldSpec.DataType.STRING)
@@ -83,14 +80,12 @@ public class IngestionSchemaValidatorTest {
     Assert.assertNotNull(ingestionSchemaValidator);
     
Assert.assertTrue(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected());
     
Assert.assertNotNull(ingestionSchemaValidator.getDataTypeMismatchResult().getMismatchReason());
-    
System.out.println(ingestionSchemaValidator.getDataTypeMismatchResult().getMismatchReason());
     
Assert.assertFalse(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
     
Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
     
Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
 
     // Change column2 from single-value column to multi-value column
-    pinotSchema = new Schema.SchemaBuilder()
-        .addSingleValueDimension("column1", FieldSpec.DataType.LONG)
+    pinotSchema = new 
Schema.SchemaBuilder().addSingleValueDimension("column1", 
FieldSpec.DataType.LONG)
         .addMultiValueDimension("column2", FieldSpec.DataType.INT)
         .addSingleValueDimension("column3", FieldSpec.DataType.STRING)
         .addSingleValueDimension("column7", FieldSpec.DataType.STRING)
@@ -101,11 +96,35 @@ public class IngestionSchemaValidatorTest {
     
Assert.assertFalse(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected());
     
Assert.assertTrue(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
     
Assert.assertNotNull(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().getMismatchReason());
-    
System.out.println(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().getMismatchReason());
     
Assert.assertTrue(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
     
Assert.assertNotNull(ingestionSchemaValidator.getMultiValueStructureMismatchResult().getMismatchReason());
-    
System.out.println(ingestionSchemaValidator.getMultiValueStructureMismatchResult().getMismatchReason());
     
Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
+  }
+
+  @Test
+  public void testAvroIngestionValidatorForMultiValueColumns()
+      throws Exception {
+    String inputFilePath = new File(Preconditions.checkNotNull(
+        
IngestionSchemaValidatorTest.class.getClassLoader().getResource("data/test_sample_data_multi_value.avro"))
+        .getFile()).toString();
+    String recordReaderClassName = 
"org.apache.pinot.plugin.inputformat.avro.AvroRecordReader";
+
+    // column 2 is of int type in the AVRO.
+    // column3 and column16 are both of array of map structure.
+    // metric_not_found doesn't exist in input AVRO
+    Schema pinotSchema = new 
Schema.SchemaBuilder().addSingleValueDimension("column1", 
FieldSpec.DataType.STRING)
+        .addSingleValueDimension("column2", FieldSpec.DataType.LONG)
+        .addSingleValueDimension("column3", FieldSpec.DataType.STRING)
+        .addMultiValueDimension("column16", FieldSpec.DataType.STRING)
+        .addMetric("metric_not_found", FieldSpec.DataType.LONG)
+        .addMetric("metric_nus_impressions", FieldSpec.DataType.LONG).build();
 
+    IngestionSchemaValidator ingestionSchemaValidator =
+        SchemaValidatorFactory.getSchemaValidator(pinotSchema, 
recordReaderClassName, inputFilePath);
+    Assert.assertNotNull(ingestionSchemaValidator);
+    
Assert.assertTrue(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected());
+    
Assert.assertTrue(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
+    
Assert.assertTrue(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
+    
Assert.assertTrue(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
   }
 }
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data_multi_value.avro
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data_multi_value.avro
new file mode 100644
index 0000000..4e4a4d8
Binary files /dev/null and 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data_multi_value.avro
 differ
diff --git 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java
 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java
index d0ee84f..42fa3a5 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java
@@ -97,6 +97,7 @@ public class AvroIngestionSchemaValidator implements 
IngestionSchemaValidator {
                 fieldSpec.getDataType().name(), getInputSchemaType()));
         continue;
       }
+      String avroColumnName = avroColumnField.schema().getName();
       org.apache.avro.Schema avroColumnSchema = avroColumnField.schema();
       org.apache.avro.Schema.Type avroColumnType = avroColumnSchema.getType();
       if (avroColumnType == org.apache.avro.Schema.Type.UNION) {
@@ -111,36 +112,56 @@ public class AvroIngestionSchemaValidator implements 
IngestionSchemaValidator {
           }
         }
         if (nonNullSchema != null) {
+          avroColumnSchema = nonNullSchema;
           avroColumnType = nonNullSchema.getType();
         }
       }
 
-      if 
(!fieldSpec.getDataType().name().equalsIgnoreCase(avroColumnType.toString())) {
-        _dataTypeMismatch.addMismatchReason(String
-            .format("The Pinot column: (%s: %s) doesn't match with the column 
(%s: %s) in input %s schema.", columnName,
-                fieldSpec.getDataType().name(), avroColumnSchema.getName(), 
avroColumnType.toString(),
-                getInputSchemaType()));
-      }
-
       if (fieldSpec.isSingleValueField()) {
+        // check data type mismatch
+        if 
(!fieldSpec.getDataType().name().equalsIgnoreCase(avroColumnType.toString())) {
+          getDataTypeMismatchResult().addMismatchReason(String
+              .format("The Pinot column: (%s: %s) doesn't match with the 
column (%s: %s) in input %s schema.", columnName,
+                  fieldSpec.getDataType().name(), avroColumnName, 
avroColumnType.toString(),
+                  getInputSchemaType()));
+        }
+        // check single-value multi-value mismatch
         if (avroColumnType.ordinal() < 
org.apache.avro.Schema.Type.STRING.ordinal()) {
           // the column is a complex structure
-          _singleValueMultiValueFieldMismatch.addMismatchReason(String.format(
-              "The Pinot column: %s is 'single-value' column but the column: 
%s from input %s is 'multi-value' column.",
-              columnName, avroColumnSchema.getName(), getInputSchemaType()));
+          
getSingleValueMultiValueFieldMismatchResult().addMismatchReason(String
+              .format(
+                  "The Pinot column: %s is 'single-value' column but the 
column: %s from input %s is 'multi-value' column.",
+                  columnName, avroColumnName, getInputSchemaType()));
         }
       } else {
+        // check data type mismatch
+        FieldSpec.DataType dataTypeForMVColumn = 
AvroUtils.extractFieldDataType(avroColumnField);
+        if (fieldSpec.getDataType() != dataTypeForMVColumn) {
+          getDataTypeMismatchResult().addMismatchReason(String
+              .format("The Pinot column: (%s: %s) doesn't match with the 
column (%s: %s) in input %s schema.",
+                  columnName, fieldSpec.getDataType().name(), avroColumnName, 
dataTypeForMVColumn.name(),
+                  getInputSchemaType()));
+        }
+        // check single-value multi-value mismatch
         if (avroColumnType.ordinal() >= 
org.apache.avro.Schema.Type.STRING.ordinal()) {
           // the column is a complex structure
-          _singleValueMultiValueFieldMismatch.addMismatchReason(String.format(
-              "The Pinot column: %s is 'multi-value' column but the column: %s 
from input %s schema is 'single-value' column.",
-              columnName, avroColumnSchema.getName(), getInputSchemaType()));
+          
getSingleValueMultiValueFieldMismatchResult().addMismatchReason(String
+              .format(
+                  "The Pinot column: %s is 'multi-value' column but the 
column: %s from input %s schema is 'single-value' column.",
+                  columnName, avroColumnName, getInputSchemaType()));
         }
+        // check multi-value column structure mismatch
         if (avroColumnType != org.apache.avro.Schema.Type.ARRAY) {
           // multi-value column should use array structure for now.
-          _multiValueStructureMismatch.addMismatchReason(String.format(
+          
getMultiValueStructureMismatchResult().addMismatchReason(String.format(
               "The Pinot column: %s is 'multi-value' column but the column: %s 
from input %s schema is of '%s' type, which should have been of 'array' type.",
-              columnName, avroColumnSchema.getName(), getInputSchemaType(), 
avroColumnType.getName()));
+              columnName, avroColumnName, getInputSchemaType(), 
avroColumnType.getName()));
+        } else if (avroColumnSchema.getElementType().getType().ordinal() < 
org.apache.avro.Schema.Type.STRING
+            .ordinal()) {
+          // even though the column schema is of array type, the element type 
of that array could be of complex type like array, map, etc.
+          
getMultiValueStructureMismatchResult().addMismatchReason(String.format(
+              "The Pinot column: %s is 'multi-value' column and it's of 
'array' type in input %s schema, but the element type is of '%s' type, which 
should have been of 'primitive' type.",
+              columnName, getInputSchemaType(), 
avroColumnSchema.getElementType().getType()));
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to