This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 24bf4297f82 [SPARK-43380][SQL] Revert `Fix Avro data type conversion issues` 24bf4297f82 is described below commit 24bf4297f8280b1db42026197282542863420b98 Author: zeruibao <zerui....@databricks.com> AuthorDate: Mon Aug 14 19:36:40 2023 -0700 [SPARK-43380][SQL] Revert `Fix Avro data type conversion issues` ### What changes were proposed in this pull request? Revert my last PR https://github.com/apache/spark/pull/41052 that causes AVRO read performance regression since I change the code structure. ### Why are the changes needed? Remove performance regression ### How was this patch tested? Unit test Closes #42458 from zeruibao/revert-avro-change. Authored-by: zeruibao <zerui....@databricks.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> (cherry picked from commit 46580ab4cb02390ba71dace1235015749f048fff) Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../src/main/resources/error/error-classes.json | 10 - .../apache/spark/sql/avro/AvroDeserializer.scala | 456 +++++++++------------ .../org/apache/spark/sql/avro/AvroSuite.scala | 161 -------- docs/sql-error-conditions.md | 14 - docs/sql-migration-guide.md | 1 - .../spark/sql/errors/QueryCompilationErrors.scala | 30 -- .../org/apache/spark/sql/internal/SQLConf.scala | 12 - 7 files changed, 189 insertions(+), 495 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 74542f2b914..5fdb2fbe77e 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -69,16 +69,6 @@ } } }, - "AVRO_INCORRECT_TYPE" : { - "message" : [ - "Cannot convert Avro <avroPath> to SQL <sqlPath> because the original encoded data type is <avroType>, however you're trying to read the field as <sqlType>, which would lead to an incorrect answer. To allow reading this field, enable the SQL configuration: <key>." - ] - }, - "AVRO_LOWER_PRECISION" : { - "message" : [ - "Cannot convert Avro <avroPath> to SQL <sqlPath> because the original encoded data type is <avroType>, however you're trying to read the field as <sqlType>, which leads to data being read as null. Please provide a wider decimal type to get the correct result. To allow reading null to this field, enable the SQL configuration: <key>." - ] - }, "BATCH_METADATA_NOT_FOUND" : { "message" : [ "Unable to find batch <batchMetadataFile>." diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index d4d34a891e9..a78ee89a3e9 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -35,9 +35,8 @@ import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArr import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec -import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.DataSourceUtils -import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} +import org.apache.spark.sql.internal.LegacyBehaviorPolicy import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -118,268 +117,178 @@ private[sql] class AvroDeserializer( val incompatibleMsg = errorPrefix + s"schema is incompatible (avroType = $avroType, sqlType = ${catalystType.sql})" - val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA - val preventReadingIncorrectType = !SQLConf.get.getConf(confKey) + (avroType.getType, catalystType) match { + case (NULL, NullType) => (updater, ordinal, _) => + updater.setNullAt(ordinal) - val logicalDataType = SchemaConverters.toSqlType(avroType).dataType - avroType.getType match { - case NULL => - (logicalDataType, catalystType) match { - case (_, NullType) => (updater, ordinal, _) => - updater.setNullAt(ordinal) - case _ => throw new IncompatibleSchemaException(incompatibleMsg) - } // TODO: we can avoid boxing if future version of avro provide primitive accessors. - case BOOLEAN => - (logicalDataType, catalystType) match { - case (_, BooleanType) => (updater, ordinal, value) => - updater.setBoolean(ordinal, value.asInstanceOf[Boolean]) - case _ => throw new IncompatibleSchemaException(incompatibleMsg) - } + case (BOOLEAN, BooleanType) => (updater, ordinal, value) => + updater.setBoolean(ordinal, value.asInstanceOf[Boolean]) + + case (INT, IntegerType) => (updater, ordinal, value) => + updater.setInt(ordinal, value.asInstanceOf[Int]) + + case (INT, DateType) => (updater, ordinal, value) => + updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int])) + + case (LONG, LongType) => (updater, ordinal, value) => + updater.setLong(ordinal, value.asInstanceOf[Long]) + + case (LONG, TimestampType) => avroType.getLogicalType match { + // For backward compatibility, if the Avro type is Long and it is not logical type + // (the `null` case), the value is processed as timestamp type with millisecond precision. + case null | _: TimestampMillis => (updater, ordinal, value) => + val millis = value.asInstanceOf[Long] + val micros = DateTimeUtils.millisToMicros(millis) + updater.setLong(ordinal, timestampRebaseFunc(micros)) + case _: TimestampMicros => (updater, ordinal, value) => + val micros = value.asInstanceOf[Long] + updater.setLong(ordinal, timestampRebaseFunc(micros)) + case other => throw new IncompatibleSchemaException(errorPrefix + + s"Avro logical type $other cannot be converted to SQL type ${TimestampType.sql}.") + } - case INT => - (logicalDataType, catalystType) match { - case (IntegerType, IntegerType) => (updater, ordinal, value) => - updater.setInt(ordinal, value.asInstanceOf[Int]) - case (IntegerType, DateType) => (updater, ordinal, value) => - updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int])) - case (DateType, DateType) => (updater, ordinal, value) => - updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int])) - case (_: YearMonthIntervalType, _: YearMonthIntervalType) => (updater, ordinal, value) => - updater.setInt(ordinal, value.asInstanceOf[Int]) - case (_: YearMonthIntervalType, _) if preventReadingIncorrectType => - throw QueryCompilationErrors.avroIncorrectTypeError( - toFieldStr(avroPath), toFieldStr(catalystPath), - logicalDataType.catalogString, catalystType.catalogString, confKey.key) - case _ if !preventReadingIncorrectType => (updater, ordinal, value) => - updater.setInt(ordinal, value.asInstanceOf[Int]) - case _ => throw new IncompatibleSchemaException(incompatibleMsg) - } - case LONG => - (logicalDataType, catalystType) match { - case (LongType, LongType) => (updater, ordinal, value) => - updater.setLong(ordinal, value.asInstanceOf[Long]) - case (TimestampType, LongType) => (updater, ordinal, value) => - updater.setLong(ordinal, value.asInstanceOf[Long]) - case (TimestampNTZType, LongType) => (updater, ordinal, value) => - updater.setLong(ordinal, value.asInstanceOf[Long]) - case (LongType, TimestampType) - | (TimestampType, TimestampType) - |(TimestampNTZType, TimestampType) => avroType.getLogicalType match { - // For backward compatibility, if the Avro type is Long and it is not logical type - // (the `null` case), the value is processed as timestamp type with - // millisecond precision. - case null | _: TimestampMillis => (updater, ordinal, value) => - val millis = value.asInstanceOf[Long] - val micros = DateTimeUtils.millisToMicros(millis) - updater.setLong(ordinal, timestampRebaseFunc(micros)) - case _: TimestampMicros => (updater, ordinal, value) => - val micros = value.asInstanceOf[Long] - updater.setLong(ordinal, timestampRebaseFunc(micros)) - case other => throw new IncompatibleSchemaException(errorPrefix + - s"Avro logical type $other cannot be converted to SQL type ${TimestampType.sql}.") - } - case (LongType, TimestampNTZType) - | (TimestampNTZType, TimestampNTZType) - | (TimestampType, TimestampNTZType) => avroType.getLogicalType match { - // To keep consistent with TimestampType, if the Avro type is Long and it is not - // logical type (the `null` case), the value is processed as TimestampNTZ - // with millisecond precision. - case null | _: LocalTimestampMillis => (updater, ordinal, value) => - val millis = value.asInstanceOf[Long] - val micros = DateTimeUtils.millisToMicros(millis) - updater.setLong(ordinal, micros) - case _: LocalTimestampMicros => (updater, ordinal, value) => - val micros = value.asInstanceOf[Long] - updater.setLong(ordinal, micros) - case other => throw new IncompatibleSchemaException(errorPrefix + - s"Avro logical type $other cannot be converted to SQL type ${TimestampNTZType.sql}.") - } - // Before we upgrade Avro to 1.8 for logical type support, - // spark-avro converts Long to Date. - // For backward compatibility, we still keep this conversion. - case (LongType, DateType) => (updater, ordinal, value) => - updater.setInt(ordinal, (value.asInstanceOf[Long] / MILLIS_PER_DAY).toInt) - case (DateType, DateType) => (updater, ordinal, value) => - updater.setLong(ordinal, value.asInstanceOf[Long]) - case (_: DayTimeIntervalType, _: DayTimeIntervalType) => (updater, ordinal, value) => - updater.setLong(ordinal, value.asInstanceOf[Long]) - case (_: DayTimeIntervalType, _) if preventReadingIncorrectType => - throw QueryCompilationErrors.avroIncorrectTypeError( - toFieldStr(avroPath), toFieldStr(catalystPath), - logicalDataType.catalogString, catalystType.catalogString, confKey.key) - case (_: DayTimeIntervalType, DateType) => (updater, ordinal, value) => - updater.setInt(ordinal, (value.asInstanceOf[Long] / MILLIS_PER_DAY).toInt) - case (_, dt: DecimalType) => (updater, ordinal, value) => - val d = avroType.getLogicalType.asInstanceOf[CustomDecimal] - updater.setDecimal(ordinal, Decimal(value.asInstanceOf[Long], d.precision, d.scale)) - case _ if !preventReadingIncorrectType => (updater, ordinal, value) => - updater.setLong(ordinal, value.asInstanceOf[Long]) - case _ => throw new IncompatibleSchemaException(incompatibleMsg) - } - case FLOAT => - (logicalDataType, catalystType) match { - case (_, FloatType) => (updater, ordinal, value) => - updater.setFloat(ordinal, value.asInstanceOf[Float]) - case _ => throw new IncompatibleSchemaException(incompatibleMsg) - } - case DOUBLE => - (logicalDataType, catalystType) match { - case (_, DoubleType) => (updater, ordinal, value) => - updater.setDouble(ordinal, value.asInstanceOf[Double]) - case _ => throw new IncompatibleSchemaException(incompatibleMsg) - } - case STRING => - (logicalDataType, catalystType) match { - case (_, StringType) => (updater, ordinal, value) => - val str = value match { - case s: String => UTF8String.fromString(s) - case s: Utf8 => - val bytes = new Array[Byte](s.getByteLength) - System.arraycopy(s.getBytes, 0, bytes, 0, s.getByteLength) - UTF8String.fromBytes(bytes) - } - updater.set(ordinal, str) - case _ => throw new IncompatibleSchemaException(incompatibleMsg) - } - case ENUM => - (logicalDataType, catalystType) match { - case (_, StringType) => (updater, ordinal, value) => - updater.set(ordinal, UTF8String.fromString(value.toString)) - case _ => throw new IncompatibleSchemaException(incompatibleMsg) - } - case FIXED => - (logicalDataType, catalystType) match { - case (_, BinaryType) => (updater, ordinal, value) => - updater.set(ordinal, value.asInstanceOf[GenericFixed].bytes().clone()) - case (_, dt: DecimalType) => - val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal] - if (preventReadingIncorrectType && - d.getPrecision - d.getScale > dt.precision - dt.scale) { - throw QueryCompilationErrors.avroLowerPrecisionError(toFieldStr(avroPath), - toFieldStr(catalystPath), logicalDataType.catalogString, - dt.catalogString, confKey.key) - } - (updater, ordinal, value) => - val bigDecimal = - decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], avroType, d) - val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale) - updater.setDecimal(ordinal, decimal) - case _ => throw new IncompatibleSchemaException(incompatibleMsg) - } - case BYTES => - (logicalDataType, catalystType) match { - case (_, BinaryType) => (updater, ordinal, value) => - val bytes = value match { - case b: ByteBuffer => - val bytes = new Array[Byte](b.remaining) - b.get(bytes) - // Do not forget to reset the position - b.rewind() - bytes - case b: Array[Byte] => b - case other => - throw new RuntimeException(errorPrefix + s"$other is not a valid avro binary.") - } - updater.set(ordinal, bytes) - case (_, dt: DecimalType) => - val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal] - if (preventReadingIncorrectType && - d.getPrecision - d.getScale > dt.precision - dt.scale) { - throw QueryCompilationErrors.avroLowerPrecisionError(toFieldStr(avroPath), - toFieldStr(catalystPath), logicalDataType.catalogString, - dt.catalogString, confKey.key) - } - (updater, ordinal, value) => - val bigDecimal = decimalConversions - .fromBytes(value.asInstanceOf[ByteBuffer], avroType, d) - val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale) - updater.setDecimal(ordinal, decimal) - case _ => throw new IncompatibleSchemaException(incompatibleMsg) + case (LONG, TimestampNTZType) => avroType.getLogicalType match { + // To keep consistent with TimestampType, if the Avro type is Long and it is not + // logical type (the `null` case), the value is processed as TimestampNTZ + // with millisecond precision. + case null | _: LocalTimestampMillis => (updater, ordinal, value) => + val millis = value.asInstanceOf[Long] + val micros = DateTimeUtils.millisToMicros(millis) + updater.setLong(ordinal, micros) + case _: LocalTimestampMicros => (updater, ordinal, value) => + val micros = value.asInstanceOf[Long] + updater.setLong(ordinal, micros) + case other => throw new IncompatibleSchemaException(errorPrefix + + s"Avro logical type $other cannot be converted to SQL type ${TimestampNTZType.sql}.") + } + + // Before we upgrade Avro to 1.8 for logical type support, spark-avro converts Long to Date. + // For backward compatibility, we still keep this conversion. + case (LONG, DateType) => (updater, ordinal, value) => + updater.setInt(ordinal, (value.asInstanceOf[Long] / MILLIS_PER_DAY).toInt) + + case (FLOAT, FloatType) => (updater, ordinal, value) => + updater.setFloat(ordinal, value.asInstanceOf[Float]) + + case (DOUBLE, DoubleType) => (updater, ordinal, value) => + updater.setDouble(ordinal, value.asInstanceOf[Double]) + + case (STRING, StringType) => (updater, ordinal, value) => + val str = value match { + case s: String => UTF8String.fromString(s) + case s: Utf8 => + val bytes = new Array[Byte](s.getByteLength) + System.arraycopy(s.getBytes, 0, bytes, 0, s.getByteLength) + UTF8String.fromBytes(bytes) } - case RECORD => - (logicalDataType, catalystType) match { - case (_, st: StructType) => - // Avro datasource doesn't accept filters with nested attributes. See SPARK-32328. - // We can always return `false` from `applyFilters` for nested records. - val writeRecord = - getRecordWriter(avroType, st, avroPath, catalystPath, applyFilters = _ => false) - (updater, ordinal, value) => - val row = new SpecificInternalRow(st) - writeRecord(new RowUpdater(row), value.asInstanceOf[GenericRecord]) - updater.set(ordinal, row) - case _ => throw new IncompatibleSchemaException(incompatibleMsg) + updater.set(ordinal, str) + + case (ENUM, StringType) => (updater, ordinal, value) => + updater.set(ordinal, UTF8String.fromString(value.toString)) + + case (FIXED, BinaryType) => (updater, ordinal, value) => + updater.set(ordinal, value.asInstanceOf[GenericFixed].bytes().clone()) + + case (BYTES, BinaryType) => (updater, ordinal, value) => + val bytes = value match { + case b: ByteBuffer => + val bytes = new Array[Byte](b.remaining) + b.get(bytes) + // Do not forget to reset the position + b.rewind() + bytes + case b: Array[Byte] => b + case other => + throw new RuntimeException(errorPrefix + s"$other is not a valid avro binary.") } - case ARRAY => - (logicalDataType, catalystType) match { - case (_, ArrayType(elementType, containsNull)) => - val avroElementPath = avroPath :+ "element" - val elementWriter = newWriter(avroType.getElementType, elementType, - avroElementPath, catalystPath :+ "element") - (updater, ordinal, value) => - val collection = value.asInstanceOf[java.util.Collection[Any]] - val result = createArrayData(elementType, collection.size()) - val elementUpdater = new ArrayDataUpdater(result) - - var i = 0 - val iter = collection.iterator() - while (iter.hasNext) { - val element = iter.next() - if (element == null) { - if (!containsNull) { - throw new RuntimeException( - s"Array value at path ${toFieldStr(avroElementPath)}" + - s" is not allowed to be null") - } else { - elementUpdater.setNullAt(i) - } - } else { - elementWriter(elementUpdater, i, element) - } - i += 1 + updater.set(ordinal, bytes) + + case (FIXED, _: DecimalType) => (updater, ordinal, value) => + val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal] + val bigDecimal = decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], avroType, d) + val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale) + updater.setDecimal(ordinal, decimal) + + case (BYTES, _: DecimalType) => (updater, ordinal, value) => + val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal] + val bigDecimal = decimalConversions.fromBytes(value.asInstanceOf[ByteBuffer], avroType, d) + val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale) + updater.setDecimal(ordinal, decimal) + + case (RECORD, st: StructType) => + // Avro datasource doesn't accept filters with nested attributes. See SPARK-32328. + // We can always return `false` from `applyFilters` for nested records. + val writeRecord = + getRecordWriter(avroType, st, avroPath, catalystPath, applyFilters = _ => false) + (updater, ordinal, value) => + val row = new SpecificInternalRow(st) + writeRecord(new RowUpdater(row), value.asInstanceOf[GenericRecord]) + updater.set(ordinal, row) + + case (ARRAY, ArrayType(elementType, containsNull)) => + val avroElementPath = avroPath :+ "element" + val elementWriter = newWriter(avroType.getElementType, elementType, + avroElementPath, catalystPath :+ "element") + (updater, ordinal, value) => + val collection = value.asInstanceOf[java.util.Collection[Any]] + val result = createArrayData(elementType, collection.size()) + val elementUpdater = new ArrayDataUpdater(result) + + var i = 0 + val iter = collection.iterator() + while (iter.hasNext) { + val element = iter.next() + if (element == null) { + if (!containsNull) { + throw new RuntimeException( + s"Array value at path ${toFieldStr(avroElementPath)} is not allowed to be null") + } else { + elementUpdater.setNullAt(i) } - updater.set(ordinal, result) - case _ => throw new IncompatibleSchemaException(incompatibleMsg) - } - case MAP => - (logicalDataType, catalystType) match { - case (_, MapType(keyType, valueType, valueContainsNull)) - if keyType == StringType => - val keyWriter = newWriter(SchemaBuilder.builder().stringType(), StringType, - avroPath :+ "key", catalystPath :+ "key") - val valueWriter = newWriter(avroType.getValueType, valueType, - avroPath :+ "value", catalystPath :+ "value") - (updater, ordinal, value) => - val map = value.asInstanceOf[java.util.Map[AnyRef, AnyRef]] - val keyArray = createArrayData(keyType, map.size()) - val keyUpdater = new ArrayDataUpdater(keyArray) - val valueArray = createArrayData(valueType, map.size()) - val valueUpdater = new ArrayDataUpdater(valueArray) - val iter = map.entrySet().iterator() - var i = 0 - while (iter.hasNext) { - val entry = iter.next() - assert(entry.getKey != null) - keyWriter(keyUpdater, i, entry.getKey) - if (entry.getValue == null) { - if (!valueContainsNull) { - throw new RuntimeException( - s"Map value at path ${toFieldStr(avroPath :+ "value")}" + - s" is not allowed to be null") - } else { - valueUpdater.setNullAt(i) - } - } else { - valueWriter(valueUpdater, i, entry.getValue) - } - i += 1 + } else { + elementWriter(elementUpdater, i, element) + } + i += 1 + } + + updater.set(ordinal, result) + + case (MAP, MapType(keyType, valueType, valueContainsNull)) if keyType == StringType => + val keyWriter = newWriter(SchemaBuilder.builder().stringType(), StringType, + avroPath :+ "key", catalystPath :+ "key") + val valueWriter = newWriter(avroType.getValueType, valueType, + avroPath :+ "value", catalystPath :+ "value") + (updater, ordinal, value) => + val map = value.asInstanceOf[java.util.Map[AnyRef, AnyRef]] + val keyArray = createArrayData(keyType, map.size()) + val keyUpdater = new ArrayDataUpdater(keyArray) + val valueArray = createArrayData(valueType, map.size()) + val valueUpdater = new ArrayDataUpdater(valueArray) + val iter = map.entrySet().iterator() + var i = 0 + while (iter.hasNext) { + val entry = iter.next() + assert(entry.getKey != null) + keyWriter(keyUpdater, i, entry.getKey) + if (entry.getValue == null) { + if (!valueContainsNull) { + throw new RuntimeException( + s"Map value at path ${toFieldStr(avroPath :+ "value")} is not allowed to be null") + } else { + valueUpdater.setNullAt(i) } - // The Avro map will never have null or duplicated map keys, it's safe to create a - // ArrayBasedMapData directly here. - updater.set(ordinal, new ArrayBasedMapData(keyArray, valueArray)) - case _ => throw new IncompatibleSchemaException(incompatibleMsg) - } - case UNION => + } else { + valueWriter(valueUpdater, i, entry.getValue) + } + i += 1 + } + + // The Avro map will never have null or duplicated map keys, it's safe to create a + // ArrayBasedMapData directly here. + updater.set(ordinal, new ArrayBasedMapData(keyArray, valueArray)) + + case (UNION, _) => val nonNullTypes = nonNullUnionBranches(avroType) val nonNullAvroType = Schema.createUnion(nonNullTypes.asJava) if (nonNullTypes.nonEmpty) { @@ -388,18 +297,20 @@ private[sql] class AvroDeserializer( } else { nonNullTypes.map(_.getType).toSeq match { case Seq(a, b) if Set(a, b) == Set(INT, LONG) && catalystType == LongType => - (updater, ordinal, value) => value match { - case null => updater.setNullAt(ordinal) - case l: java.lang.Long => updater.setLong(ordinal, l) - case i: java.lang.Integer => updater.setLong(ordinal, i.longValue()) - } + (updater, ordinal, value) => + value match { + case null => updater.setNullAt(ordinal) + case l: java.lang.Long => updater.setLong(ordinal, l) + case i: java.lang.Integer => updater.setLong(ordinal, i.longValue()) + } case Seq(a, b) if Set(a, b) == Set(FLOAT, DOUBLE) && catalystType == DoubleType => - (updater, ordinal, value) => value match { - case null => updater.setNullAt(ordinal) - case d: java.lang.Double => updater.setDouble(ordinal, d) - case f: java.lang.Float => updater.setDouble(ordinal, f.doubleValue()) - } + (updater, ordinal, value) => + value match { + case null => updater.setNullAt(ordinal) + case d: java.lang.Double => updater.setDouble(ordinal, d) + case f: java.lang.Float => updater.setDouble(ordinal, f.doubleValue()) + } case _ => catalystType match { @@ -423,6 +334,17 @@ private[sql] class AvroDeserializer( } else { (updater, ordinal, _) => updater.setNullAt(ordinal) } + + case (INT, _: YearMonthIntervalType) => (updater, ordinal, value) => + updater.setInt(ordinal, value.asInstanceOf[Int]) + + case (LONG, _: DayTimeIntervalType) => (updater, ordinal, value) => + updater.setLong(ordinal, value.asInstanceOf[Long]) + + case (LONG, _: DecimalType) => (updater, ordinal, value) => + val d = avroType.getLogicalType.asInstanceOf[CustomDecimal] + updater.setDecimal(ordinal, Decimal(value.asInstanceOf[Long], d.precision, d.scale)) + case _ => throw new IncompatibleSchemaException(incompatibleMsg) } } diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 516c173c118..d22a2d36975 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -32,7 +32,6 @@ import org.apache.avro.file.{DataFileReader, DataFileWriter} import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord} import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed} import org.apache.commons.io.FileUtils -import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException, SparkUpgradeException} import org.apache.spark.TestUtils.assertExceptionMsg @@ -815,166 +814,6 @@ abstract class AvroSuite } } - test("SPARK-43380: Fix Avro data type conversion" + - " of decimal type to avoid producing incorrect results") { - withTempPath { path => - val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key - sql("SELECT 13.1234567890 a").write.format("avro").save(path.toString) - // With the flag disabled, we will throw an exception if there is a mismatch - withSQLConf(confKey -> "false") { - val e = intercept[SparkException] { - spark.read.schema("a DECIMAL(4, 3)").format("avro").load(path.toString).collect() - } - ExceptionUtils.getRootCause(e) match { - case ex: AnalysisException => - checkError( - exception = ex, - errorClass = "AVRO_LOWER_PRECISION", - parameters = Map("avroPath" -> "field 'a'", - "sqlPath" -> "field 'a'", - "avroType" -> "decimal\\(12,10\\)", - "sqlType" -> "\"DECIMAL\\(4,3\\)\"", - "key" -> SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key), - matchPVals = true - ) - case other => - fail(s"Received unexpected exception", other) - } - } - // The following used to work, so it should still work with the flag enabled - checkAnswer( - spark.read.schema("a DECIMAL(5, 3)").format("avro").load(path.toString), - Row(new java.math.BigDecimal("13.123")) - ) - withSQLConf(confKey -> "true") { - // With the flag enabled, we return a null silently, which isn't great - checkAnswer( - spark.read.schema("a DECIMAL(4, 3)").format("avro").load(path.toString), - Row(null) - ) - checkAnswer( - spark.read.schema("a DECIMAL(5, 3)").format("avro").load(path.toString), - Row(new java.math.BigDecimal("13.123")) - ) - } - } - } - - test("SPARK-43380: Fix Avro data type conversion" + - " of DayTimeIntervalType to avoid producing incorrect results") { - withTempPath { path => - val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key - val schema = StructType(Array(StructField("a", DayTimeIntervalType(), false))) - val data = Seq(Row(java.time.Duration.ofDays(1).plusSeconds(1))) - - val df = spark.createDataFrame(sparkContext.parallelize(data), schema) - df.write.format("avro").save(path.getCanonicalPath) - - withSQLConf(confKey -> "false") { - Seq("DATE", "TIMESTAMP", "TIMESTAMP_NTZ").foreach { sqlType => - val e = intercept[SparkException] { - spark.read.schema(s"a $sqlType").format("avro").load(path.toString).collect() - } - - ExceptionUtils.getRootCause(e) match { - case ex: AnalysisException => - checkError( - exception = ex, - errorClass = "AVRO_INCORRECT_TYPE", - parameters = Map("avroPath" -> "field 'a'", - "sqlPath" -> "field 'a'", - "avroType" -> "interval day to second", - "sqlType" -> s""""$sqlType"""", - "key" -> SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key), - matchPVals = true - ) - case other => - fail(s"Received unexpected exception", other) - } - } - } - - withSQLConf(confKey -> "true") { - // Allow conversion and do not need to check result - spark.read.schema("a Date").format("avro").load(path.toString) - spark.read.schema("a timestamp").format("avro").load(path.toString) - spark.read.schema("a timestamp_ntz").format("avro").load(path.toString) - } - } - } - - test("SPARK-43380: Fix Avro data type conversion" + - " of YearMonthIntervalType to avoid producing incorrect results") { - withTempPath { path => - val confKey = SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key - val schema = StructType(Array(StructField("a", YearMonthIntervalType(), false))) - val data = Seq(Row(java.time.Period.of(1, 1, 0))) - - val df = spark.createDataFrame(sparkContext.parallelize(data), schema) - df.write.format("avro").save(path.getCanonicalPath) - - withSQLConf(confKey -> "false") { - Seq("DATE", "TIMESTAMP", "TIMESTAMP_NTZ").foreach { sqlType => - val e = intercept[SparkException] { - spark.read.schema(s"a $sqlType").format("avro").load(path.toString).collect() - } - - ExceptionUtils.getRootCause(e) match { - case ex: AnalysisException => - checkError( - exception = ex, - errorClass = "AVRO_INCORRECT_TYPE", - parameters = Map("avroPath" -> "field 'a'", - "sqlPath" -> "field 'a'", - "avroType" -> "interval year to month", - "sqlType" -> s""""$sqlType"""", - "key" -> SQLConf.LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA.key), - matchPVals = true - ) - case other => - fail(s"Received unexpected exception", other) - } - } - } - - withSQLConf(confKey -> "true") { - // Allow conversion and do not need to check result - spark.read.schema("a Date").format("avro").load(path.toString) - spark.read.schema("a timestamp").format("avro").load(path.toString) - spark.read.schema("a timestamp_ntz").format("avro").load(path.toString) - } - } - } - - Seq( - "time-millis", - "time-micros", - "timestamp-micros", - "timestamp-millis", - "local-timestamp-millis", - "local-timestamp-micros" - ).foreach { timeLogicalType => - test(s"converting $timeLogicalType type to long in avro") { - withTempPath { path => - val df = Seq(100L) - .toDF("dt") - val avroSchema = - s""" - |{ - | "type" : "record", - | "name" : "test_schema", - | "fields" : [ - | {"name": "dt", "type": {"type": "long", "logicalType": "$timeLogicalType"}} - | ] - |}""".stripMargin - df.write.format("avro").option("avroSchema", avroSchema).save(path.getCanonicalPath) - checkAnswer( - spark.read.schema(s"dt long").format("avro").load(path.toString), - Row(100L)) - } - } - } - test("converting some specific sparkSQL types to avro") { withTempPath { tempDir => val testSchema = StructType(Seq( diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index e1430e94db5..e7df1aa9a4f 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -87,18 +87,6 @@ Invalid as-of join. For more details see [AS_OF_JOIN](sql-error-conditions-as-of-join-error-class.html) -### AVRO_INCORRECT_TYPE - -SQLSTATE: none assigned - -Cannot convert Avro `<avroPath>` to SQL `<sqlPath>` because the original encoded data type is `<avroType>`, however you're trying to read the field as `<sqlType>`, which would lead to an incorrect answer. To allow reading this field, enable the SQL configuration: `<key>`. - -### AVRO_LOWER_PRECISION - -SQLSTATE: none assigned - -Cannot convert Avro `<avroPath>` to SQL `<sqlPath>` because the original encoded data type is `<avroType>`, however you're trying to read the field as `<sqlType>`, which leads to data being read as null. Please provide a wider decimal type to get the correct result. To allow reading null to this field, enable the SQL configuration: `<key>`. - ### BATCH_METADATA_NOT_FOUND [SQLSTATE: 42K03](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) @@ -2064,5 +2052,3 @@ The operation `<operation>` requires a `<requiredType>`. But `<objectName>` is a The `<functionName>` requires `<expectedNum>` parameters but the actual number is `<actualNum>`. For more details see [WRONG_NUM_ARGS](sql-error-conditions-wrong-num-args-error-class.html) - - diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 59b125cbc82..bc5f442220b 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -26,7 +26,6 @@ license: | - Since Spark 3.5, the JDBC options related to DS V2 pushdown are `true` by default. These options include: `pushDownAggregate`, `pushDownLimit`, `pushDownOffset` and `pushDownTableSample`. To restore the legacy behavior, please set them to `false`. e.g. set `spark.sql.catalog.your_catalog_name.pushDownAggregate` to `false`. - Since Spark 3.5, Spark thrift server will interrupt task when canceling a running statement. To restore the previous behavior, set `spark.sql.thriftServer.interruptOnCancel` to `false`. -- Since Spark 3.5, the Avro will throw `AnalysisException` when reading Interval types as Date or Timestamp types, or reading Decimal types with lower precision. To restore the legacy behavior, set `spark.sql.legacy.avro.allowIncompatibleSchema` to `true` - Since Spark 3.5, Row's json and prettyJson methods are moved to `ToJsonUtil`. - Since Spark 3.5, the `plan` field is moved from `AnalysisException` to `EnhancedAnalysisException`. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 91548917b2f..1383827a82b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3661,36 +3661,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat ) } - def avroIncorrectTypeError( - avroPath: String, sqlPath: String, avroType: String, - sqlType: String, key: String): Throwable = { - new AnalysisException( - errorClass = "AVRO_INCORRECT_TYPE", - messageParameters = Map( - "avroPath" -> avroPath, - "sqlPath" -> sqlPath, - "avroType" -> avroType, - "sqlType" -> toSQLType(sqlType), - "key" -> key - ) - ) - } - - def avroLowerPrecisionError( - avroPath: String, sqlPath: String, avroType: String, - sqlType: String, key: String): Throwable = { - new AnalysisException( - errorClass = "AVRO_LOWER_PRECISION", - messageParameters = Map( - "avroPath" -> avroPath, - "sqlPath" -> sqlPath, - "avroType" -> avroType, - "sqlType" -> toSQLType(sqlType), - "key" -> key - ) - ) - } - def optionMustBeLiteralString(key: String): Throwable = { new AnalysisException( errorClass = "INVALID_SQL_SYNTAX.OPTION_IS_INVALID", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9197d99b005..ef77a22e632 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4251,18 +4251,6 @@ object SQLConf { .booleanConf .createWithDefault(false) - val LEGACY_AVRO_ALLOW_INCOMPATIBLE_SCHEMA = - buildConf("spark.sql.legacy.avro.allowIncompatibleSchema") - .internal() - .doc("When set to false, if types in Avro are encoded in the same format, but " + - "the type in the Avro schema explicitly says that the data types are different, " + - "reject reading the data type in the format to avoid returning incorrect results. " + - "When set to true, it restores the legacy behavior of allow reading the data in the" + - " format, which may return incorrect results.") - .version("3.5.0") - .booleanConf - .createWithDefault(false) - val LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME = buildConf("spark.sql.legacy.v1IdentifierNoCatalog") .internal() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org