Github user lindblombr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21847#discussion_r206746980
  
    --- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala ---
    @@ -165,16 +182,118 @@ class AvroSerializer(rootCatalystType: DataType, 
rootAvroType: Schema, nullable:
           result
       }
     
    -  private def resolveNullableType(avroType: Schema, nullable: Boolean): 
Schema = {
    -    if (nullable) {
    +  // Resolve an Avro union against a supplied DataType, i.e. a LongType 
compared against
    +  // a ["null", "long"] should return a schema of type Schema.Type.LONG
    +  // This function also handles resolving a DataType against unions of 2 
or more types, i.e.
    +  // an IntType resolves against a ["int", "long", "null"] will correctly 
return a schema of
    +  // type Schema.Type.LONG
    +  private def resolveUnionType(avroType: Schema, catalystType: DataType,
    +                                  nullable: Boolean): Schema = {
    +    if (avroType.getType == Type.UNION) {
           // avro uses union to represent nullable type.
    -      val fields = avroType.getTypes.asScala
    -      assert(fields.length == 2)
    -      val actualType = fields.filter(_.getType != NULL)
    -      assert(actualType.length == 1)
    +      val fieldTypes = avroType.getTypes.asScala
    +
    +      // If we're nullable, we need to have at least two types.  Cases 
with more than two types
    +      // are captured in test("read read-write, read-write w/ schema, 
read") w/ test.avro input
    +      if (nullable && fieldTypes.length < 2) {
    +        throw new IncompatibleSchemaException(
    +          s"Cannot resolve nullable ${catalystType} against union type 
${avroType}")
    +      }
    +
    +      val actualType = catalystType match {
    +        case NullType => fieldTypes.filter(_.getType == Type.NULL)
    +        case BooleanType => fieldTypes.filter(_.getType == Type.BOOLEAN)
    +        case ByteType => fieldTypes.filter(_.getType == Type.INT)
    +        case BinaryType =>
    +          val at = fieldTypes.filter(x => x.getType == Type.BYTES || 
x.getType == Type.FIXED)
    +          if (at.length > 1) {
    +            throw new IncompatibleSchemaException(
    +              s"Cannot resolve schema of ${catalystType} against union 
${avroType.toString}")
    +          } else {
    +            at
    +          }
    +        case ShortType | IntegerType => fieldTypes.filter(_.getType == 
Type.INT)
    +        case LongType => fieldTypes.filter(_.getType == Type.LONG)
    +        case FloatType => fieldTypes.filter(_.getType == Type.FLOAT)
    +        case DoubleType => fieldTypes.filter(_.getType == Type.DOUBLE)
    +        case d: DecimalType => fieldTypes.filter(_.getType == Type.STRING)
    +        case StringType => fieldTypes
    +          .filter(x => x.getType == Type.STRING || x.getType == Type.ENUM)
    +        case DateType => fieldTypes.filter(x => x.getType == Type.INT || 
x.getType == Type.LONG)
    +        case TimestampType => fieldTypes.filter(_.getType == Type.LONG)
    +        case ArrayType(et, containsNull) =>
    +          // Find array that matches the element type specified
    +          fieldTypes.filter(x => x.getType == Type.ARRAY
    +            && typeMatchesSchema(et, x.getElementType))
    +        case st: StructType => // Find the matching record!
    +          val recordTypes = fieldTypes.filter(x => x.getType == 
Type.RECORD)
    +          if (recordTypes.length > 1) {
    +            throw new IncompatibleSchemaException(
    +              "Unions of multiple record types are NOT supported with 
user-specified schema")
    +          }
    +          recordTypes
    +        case MapType(kt, vt, valueContainsNull) =>
    +          // Find the map that matches the value type.  Maps in Avro are 
always key type string
    +          fieldTypes.filter(x => x.getType == Type.MAP && 
typeMatchesSchema(vt, x.getValueType))
    --- End diff --
    
    In `SchemaConverters.toAvro`, the expectation is that Maps are keyed only 
with `StringType`:
    
        case MapType(StringType, vt, valueContainsNull) =>
          builder.map().values(toAvroType(vt, valueContainsNull, recordName, 
prevNameSpace))
    
    When you attempt this trivial test case, we fail
    ```
    test("SPARK-24855: Maps with kv not string") {
        withTempPath { dir =>
          val someData = Seq(
            Row("a", Map(
              1 -> "foo",
              2 -> "bar",
              3 -> "baz"
              )
            ),
            Row("b", Map(
              1 -> "foo",
              2 -> "bar",
              3 -> "baz"
              )
            )
          )
    
          val someSchema = StructType(Seq(
            StructField("id", StringType, true),
            StructField("map", MapType(IntegerType, StringType), true)
            )
          )
    
          val df = spark.createDataFrame(
            spark.sparkContext.parallelize(someData), someSchema
          )
    
          df.write.format("avro").save("dataset")
        }
    ```
    Exception as follows
    ```
    Unexpected type MapType(IntegerType,StringType,true).
    org.apache.spark.sql.avro.IncompatibleSchemaException: Unexpected type 
MapType(IntegerType,StringType,true).
        at 
org.apache.spark.sql.avro.SchemaConverters$.toAvroType(SchemaConverters.scala:136)
        at 
org.apache.spark.sql.avro.SchemaConverters$$anonfun$toAvroType$1.apply(SchemaConverters.scala:130)
        at 
org.apache.spark.sql.avro.SchemaConverters$$anonfun$toAvroType$1.apply(SchemaConverters.scala:129)
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to