This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5741d38ee27 [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes 5741d38ee27 is described below commit 5741d38ee272418a919fe7d102514c221a6e741a Author: SandishKumarHN <sanysand...@gmail.com> AuthorDate: Fri Nov 4 14:50:12 2022 +0900 [SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes This is the follow-up PR to https://github.com/apache/spark/pull/37972 and https://github.com/apache/spark/pull/38212 ### What changes were proposed in this pull request? 1. Move spark-protobuf error classes to the spark error-classes framework(core/src/main/resources/error/error-classes.json). 2. Support protobuf imports 3. validate protobuf timestamp and duration types. ### Why are the changes needed? N/A ### Does this PR introduce _any_ user-facing change? None ### How was this patch tested? Existing tests should cover the validation of this PR. CC: rangadi mposdev21 gengliangwang Closes #38344 from SandishKumarHN/SPARK-40777-ProtoErrorCls. Authored-by: SandishKumarHN <sanysand...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/protobuf/ProtobufDataToCatalyst.scala | 22 +-- .../spark/sql/protobuf/ProtobufDeserializer.scala | 31 ++-- .../spark/sql/protobuf/ProtobufSerializer.scala | 38 +++-- .../spark/sql/protobuf/utils/ProtobufUtils.scala | 117 ++++++++------ .../sql/protobuf/utils/SchemaConverters.scala | 32 ++-- .../src/test/resources/protobuf/basicmessage.proto | 39 +++++ .../test/resources/protobuf/catalyst_types.proto | 2 - .../src/test/resources/protobuf/duration.proto | 26 +++ .../test/resources/protobuf/functions_suite.desc | Bin 5958 -> 6678 bytes .../test/resources/protobuf/functions_suite.proto | 35 ++-- .../src/test/resources/protobuf/nestedenum.proto | 28 ++++ .../src/test/resources/protobuf/timestamp.proto | 26 +++ .../ProtobufCatalystDataConversionSuite.scala | 2 +- .../sql/protobuf/ProtobufFunctionsSuite.scala | 35 ++-- .../spark/sql/protobuf/ProtobufSerdeSuite.scala | 85 +++++++--- core/src/main/resources/error/error-classes.json | 107 ++++++++++++- project/SparkBuild.scala | 2 +- .../spark/sql/errors/QueryCompilationErrors.scala | 177 ++++++++++++++++++++- .../spark/sql/errors/QueryExecutionErrors.scala | 8 + .../sql/errors/QueryCompilationErrorsSuite.scala | 4 +- 20 files changed, 625 insertions(+), 191 deletions(-) diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala index cad2442f10c..c0997b1bd06 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala @@ -21,11 +21,10 @@ import scala.util.control.NonFatal import com.google.protobuf.DynamicMessage -import org.apache.spark.SparkException -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, SpecificInternalRow, UnaryExpression} import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode} import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, PermissiveMode} +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.protobuf.utils.{ProtobufOptions, ProtobufUtils, SchemaConverters} import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, StructType} @@ -71,16 +70,11 @@ private[protobuf] case class ProtobufDataToCatalyst( @transient private lazy val parseMode: ParseMode = { val mode = protobufOptions.parseMode if (mode != PermissiveMode && mode != FailFastMode) { - throw new AnalysisException(unacceptableModeMessage(mode.name)) + throw QueryCompilationErrors.parseModeUnsupportedError(prettyName, mode) } mode } - private def unacceptableModeMessage(name: String): String = { - s"from_protobuf() doesn't support the $name mode. " + - s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}." - } - @transient private lazy val nullResultRow: Any = dataType match { case st: StructType => val resultRow = new SpecificInternalRow(st.map(_.dataType)) @@ -98,13 +92,9 @@ private[protobuf] case class ProtobufDataToCatalyst( case PermissiveMode => nullResultRow case FailFastMode => - throw new SparkException( - "Malformed records are detected in record parsing. " + - s"Current parse Mode: ${FailFastMode.name}. To process malformed records as null " + - "result, try setting the option 'mode' as 'PERMISSIVE'.", - e) + throw QueryExecutionErrors.malformedProtobufMessageDetectedInMessageParsingError(e) case _ => - throw new AnalysisException(unacceptableModeMessage(parseMode.name)) + throw QueryCompilationErrors.parseModeUnsupportedError(prettyName, parseMode) } } @@ -119,8 +109,8 @@ private[protobuf] case class ProtobufDataToCatalyst( case Some(number) => // Unknown fields contain a field with same number as a known field. Must be due to // mismatch of schema between writer and reader here. - throw new IllegalArgumentException(s"Type mismatch encountered for field:" + - s" ${messageDescriptor.getFields.get(number)}") + throw QueryCompilationErrors.protobufFieldTypeMismatchError( + messageDescriptor.getFields.get(number).toString) case None => } diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala index 0403b741ebf..46366ba268b 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala @@ -22,6 +22,7 @@ import com.google.protobuf.{ByteString, DynamicMessage, Message} import com.google.protobuf.Descriptors._ import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._ +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, StructFilters} import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData} import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData} @@ -29,7 +30,6 @@ import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.protobuf.utils.ProtobufUtils import org.apache.spark.sql.protobuf.utils.ProtobufUtils.ProtoMatchedField import org.apache.spark.sql.protobuf.utils.ProtobufUtils.toFieldStr -import org.apache.spark.sql.protobuf.utils.SchemaConverters.IncompatibleSchemaException import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -61,10 +61,10 @@ private[sql] class ProtobufDeserializer( } } } catch { - case ise: IncompatibleSchemaException => - throw new IncompatibleSchemaException( - s"Cannot convert Protobuf type ${rootDescriptor.getName} " + - s"to SQL type ${rootCatalystType.sql}.", + case ise: AnalysisException => + throw QueryCompilationErrors.cannotConvertProtobufTypeToCatalystTypeError( + rootDescriptor.getName, + rootCatalystType, ise) } @@ -152,11 +152,6 @@ private[sql] class ProtobufDeserializer( catalystType: DataType, protoPath: Seq[String], catalystPath: Seq[String]): (CatalystDataUpdater, Int, Any) => Unit = { - val errorPrefix = s"Cannot convert Protobuf ${toFieldStr(protoPath)} to " + - s"SQL ${toFieldStr(catalystPath)} because " - val incompatibleMsg = errorPrefix + - s"schema is incompatible (protoType = ${protoType} ${protoType.toProto.getLabel} " + - s"${protoType.getJavaType} ${protoType.getType}, sqlType = ${catalystType.sql})" (protoType.getJavaType, catalystType) match { @@ -175,8 +170,9 @@ private[sql] class ProtobufDeserializer( case (INT, ShortType) => (updater, ordinal, value) => updater.setShort(ordinal, value.asInstanceOf[Short]) - case (BOOLEAN | INT | FLOAT | DOUBLE | LONG | STRING | ENUM | BYTE_STRING, - ArrayType(dataType: DataType, containsNull)) if protoType.isRepeated => + case ( + BOOLEAN | INT | FLOAT | DOUBLE | LONG | STRING | ENUM | BYTE_STRING, + ArrayType(dataType: DataType, containsNull)) if protoType.isRepeated => newArrayWriter(protoType, protoPath, catalystPath, dataType, containsNull) case (LONG, LongType) => @@ -199,7 +195,8 @@ private[sql] class ProtobufDeserializer( (updater, ordinal, value) => val byte_array = value match { case s: ByteString => s.toByteArray - case _ => throw new Exception("Invalid ByteString format") + case unsupported => + throw QueryCompilationErrors.invalidByteStringFormatError(unsupported) } updater.set(ordinal, byte_array) @@ -244,7 +241,13 @@ private[sql] class ProtobufDeserializer( case (ENUM, StringType) => (updater, ordinal, value) => updater.set(ordinal, UTF8String.fromString(value.toString)) - case _ => throw new IncompatibleSchemaException(incompatibleMsg) + case _ => + throw QueryCompilationErrors.cannotConvertProtobufTypeToSqlTypeError( + toFieldStr(protoPath), + catalystPath, + s"${protoType} ${protoType.toProto.getLabel} ${protoType.getJavaType}" + + s" ${protoType.getType}", + catalystType) } } diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala index 5d9af92c5c0..0f87c640b19 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala @@ -23,13 +23,14 @@ import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor} import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._ import org.apache.spark.internal.Logging +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils} import org.apache.spark.sql.catalyst.util.IntervalStringStyles.ANSI_STYLE +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.protobuf.utils.ProtobufUtils import org.apache.spark.sql.protobuf.utils.ProtobufUtils.{toFieldStr, ProtoMatchedField} -import org.apache.spark.sql.protobuf.utils.SchemaConverters.IncompatibleSchemaException import org.apache.spark.sql.types._ /** @@ -53,10 +54,10 @@ private[sql] class ProtobufSerializer( newStructConverter(st, rootDescriptor, Nil, Nil).asInstanceOf[Any => Any] } } catch { - case ise: IncompatibleSchemaException => - throw new IncompatibleSchemaException( - s"Cannot convert SQL type ${rootCatalystType.sql} to Protobuf type " + - s"${rootDescriptor.getName}.", + case ise: AnalysisException => + throw QueryCompilationErrors.cannotConvertSqlTypeToProtobufError( + rootDescriptor.getName, + rootCatalystType, ise) } if (nullable) { (data: Any) => @@ -77,8 +78,6 @@ private[sql] class ProtobufSerializer( fieldDescriptor: FieldDescriptor, catalystPath: Seq[String], protoPath: Seq[String]): Converter = { - val errorPrefix = s"Cannot convert SQL ${toFieldStr(catalystPath)} " + - s"to Protobuf ${toFieldStr(protoPath)} because " (catalystType, fieldDescriptor.getJavaType) match { case (NullType, _) => (getter, ordinal) => null @@ -104,10 +103,11 @@ private[sql] class ProtobufSerializer( (getter, ordinal) => val data = getter.getUTF8String(ordinal).toString if (!enumSymbols.contains(data)) { - throw new IncompatibleSchemaException( - errorPrefix + - s""""$data" cannot be written since it's not defined in enum """ + - enumSymbols.mkString("\"", "\", \"", "\"")) + throw QueryCompilationErrors.cannotConvertCatalystTypeToProtobufEnumTypeError( + catalystPath, + toFieldStr(protoPath), + data, + enumSymbols.mkString("\"", "\", \"", "\"")) } fieldDescriptor.getEnumType.findValueByName(data) case (StringType, STRING) => @@ -124,7 +124,8 @@ private[sql] class ProtobufSerializer( case (TimestampType, MESSAGE) => (getter, ordinal) => val millis = DateTimeUtils.microsToMillis(getter.getLong(ordinal)) - Timestamp.newBuilder() + Timestamp + .newBuilder() .setSeconds((millis / 1000)) .setNanos(((millis % 1000) * 1000000).toInt) .build() @@ -201,7 +202,8 @@ private[sql] class ProtobufSerializer( val calendarInterval = IntervalUtils.fromIntervalString(dayTimeIntervalString) val millis = DateTimeUtils.microsToMillis(calendarInterval.microseconds) - val duration = Duration.newBuilder() + val duration = Duration + .newBuilder() .setSeconds((millis / 1000)) .setNanos(((millis % 1000) * 1000000).toInt) @@ -215,10 +217,12 @@ private[sql] class ProtobufSerializer( duration.build() case _ => - throw new IncompatibleSchemaException( - errorPrefix + - s"schema is incompatible (sqlType = ${catalystType.sql}, " + - s"protoType = ${fieldDescriptor.getJavaType})") + throw QueryCompilationErrors.cannotConvertCatalystTypeToProtobufTypeError( + catalystPath, + toFieldStr(protoPath), + catalystType, + s"${fieldDescriptor} ${fieldDescriptor.toProto.getLabel} ${fieldDescriptor.getJavaType}" + + s" ${fieldDescriptor.getType}") } } diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala index fa2ec9b7cd4..4bd59ddce6c 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala @@ -23,11 +23,12 @@ import java.util.Locale import scala.collection.JavaConverters._ import com.google.protobuf.{DescriptorProtos, Descriptors, InvalidProtocolBufferException, Message} +import com.google.protobuf.DescriptorProtos.{FileDescriptorProto, FileDescriptorSet} import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor} import org.apache.spark.internal.Logging +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.protobuf.utils.SchemaConverters.IncompatibleSchemaException import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -61,9 +62,9 @@ private[sql] object ProtobufUtils extends Logging { protoPath: Seq[String], catalystPath: Seq[String]) { if (descriptor.getName == null) { - throw new IncompatibleSchemaException( - s"Attempting to treat ${descriptor.getName} as a RECORD, " + - s"but it was: ${descriptor.getContainingType}") + throw QueryCompilationErrors.unknownProtobufMessageTypeError( + descriptor.getName, + descriptor.getContainingType().getName) } private[this] val protoFieldArray = descriptor.getFields.asScala.toArray @@ -79,30 +80,29 @@ private[sql] object ProtobufUtils extends Logging { /** * Validate that there are no Catalyst fields which don't have a matching Protobuf field, - * throwing [[IncompatibleSchemaException]] if such extra fields are found. If - * `ignoreNullable` is false, consider nullable Catalyst fields to be eligible to be an extra - * field; otherwise, ignore nullable Catalyst fields when checking for extras. + * throwing [[AnalysisException]] if such extra fields are found. If `ignoreNullable` is + * false, consider nullable Catalyst fields to be eligible to be an extra field; otherwise, + * ignore nullable Catalyst fields when checking for extras. */ def validateNoExtraCatalystFields(ignoreNullable: Boolean): Unit = catalystSchema.fields.foreach { sqlField => if (getFieldByName(sqlField.name).isEmpty && (!ignoreNullable || !sqlField.nullable)) { - throw new IncompatibleSchemaException( - s"Cannot find ${toFieldStr(catalystPath :+ sqlField.name)} in Protobuf schema") + throw QueryCompilationErrors.cannotFindCatalystTypeInProtobufSchemaError( + toFieldStr(catalystPath :+ sqlField.name)) } } /** * Validate that there are no Protobuf fields which don't have a matching Catalyst field, - * throwing [[IncompatibleSchemaException]] if such extra fields are found. Only required - * (non-nullable) fields are checked; nullable fields are ignored. + * throwing [[AnalysisException]] if such extra fields are found. Only required (non-nullable) + * fields are checked; nullable fields are ignored. */ def validateNoExtraRequiredProtoFields(): Unit = { val extraFields = protoFieldArray.toSet -- matchedFields.map(_.fieldDescriptor) extraFields.filterNot(isNullable).foreach { extraField => - throw new IncompatibleSchemaException( - s"Found ${toFieldStr(protoPath :+ extraField.getName())} in Protobuf schema " + - "but there is no match in the SQL schema") + throw QueryCompilationErrors.cannotFindProtobufFieldInCatalystError( + toFieldStr(protoPath :+ extraField.getName())) } } @@ -125,10 +125,11 @@ private[sql] object ProtobufUtils extends Logging { case Seq(protoField) => Some(protoField) case Seq() => None case matches => - throw new IncompatibleSchemaException( - s"Searching for '$name' in " + - s"Protobuf schema at ${toFieldStr(protoPath)} gave ${matches.size} matches. " + - s"Candidates: " + matches.map(_.getName()).mkString("[", ", ", "]")) + throw QueryCompilationErrors.protobufFieldMatchError( + name, + toFieldStr(protoPath), + s"${matches.size}", + matches.map(_.getName()).mkString("[", ", ", "]")) } } } @@ -157,16 +158,12 @@ private[sql] object ProtobufUtils extends Logging { val protobufClass = try { Utils.classForName(protobufClassName) } catch { - case _: ClassNotFoundException => - val hasDots = protobufClassName.contains(".") - throw new IllegalArgumentException( - s"Could not load Protobuf class with name '$protobufClassName'" + - (if (hasDots) "" else ". Ensure the class name includes package prefix.") - ) + case e: ClassNotFoundException => + throw QueryCompilationErrors.protobufClassLoadError(protobufClassName, e) } if (!classOf[Message].isAssignableFrom(protobufClass)) { - throw new IllegalArgumentException(s"$protobufClassName is not a Protobuf message type") + throw QueryCompilationErrors.protobufMessageTypeError(protobufClassName) // TODO: Need to support V2. This might work with V2 classes too. } @@ -178,46 +175,70 @@ private[sql] object ProtobufUtils extends Logging { } def buildDescriptor(descFilePath: String, messageName: String): Descriptor = { - val descriptor = parseFileDescriptor(descFilePath).getMessageTypes.asScala.find { desc => - desc.getName == messageName || desc.getFullName == messageName - } + // Find the first message descriptor that matches the name. + val descriptorOpt = parseFileDescriptorSet(descFilePath) + .flatMap { fileDesc => + fileDesc.getMessageTypes.asScala.find { desc => + desc.getName == messageName || desc.getFullName == messageName + } + }.headOption - descriptor match { + descriptorOpt match { case Some(d) => d - case None => - throw new RuntimeException(s"Unable to locate Message '$messageName' in Descriptor") + case None => throw QueryCompilationErrors.unableToLocateProtobufMessageError(messageName) } } - private def parseFileDescriptor(descFilePath: String): Descriptors.FileDescriptor = { + private def parseFileDescriptorSet(descFilePath: String): List[Descriptors.FileDescriptor] = { var fileDescriptorSet: DescriptorProtos.FileDescriptorSet = null try { val dscFile = new BufferedInputStream(new FileInputStream(descFilePath)) fileDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(dscFile) } catch { case ex: InvalidProtocolBufferException => - // TODO move all the exceptions to core/src/main/resources/error/error-classes.json - throw new RuntimeException("Error parsing descriptor byte[] into Descriptor object", ex) + throw QueryCompilationErrors.descrioptorParseError(descFilePath, ex) case ex: IOException => - throw new RuntimeException( - "Error reading Protobuf descriptor file at path: " + - descFilePath, - ex) + throw QueryCompilationErrors.cannotFindDescriptorFileError(descFilePath, ex) } - - val descriptorProto: DescriptorProtos.FileDescriptorProto = fileDescriptorSet.getFile(0) try { - val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom( - descriptorProto, - new Array[Descriptors.FileDescriptor](0)) - if (fileDescriptor.getMessageTypes().isEmpty()) { - throw new RuntimeException("No MessageTypes returned, " + fileDescriptor.getName()); - } - fileDescriptor + val fileDescriptorProtoIndex = createDescriptorProtoMap(fileDescriptorSet) + val fileDescriptorList: List[Descriptors.FileDescriptor] = + fileDescriptorSet.getFileList.asScala.map( fileDescriptorProto => + buildFileDescriptor(fileDescriptorProto, fileDescriptorProtoIndex) + ).toList + fileDescriptorList } catch { case e: Descriptors.DescriptorValidationException => - throw new RuntimeException("Error constructing FileDescriptor", e) + throw QueryCompilationErrors.failedParsingDescriptorError(descFilePath, e) + } + } + + /** + * Recursively constructs file descriptors for all dependencies for given + * FileDescriptorProto and return. + */ + private def buildFileDescriptor( + fileDescriptorProto: FileDescriptorProto, + fileDescriptorProtoMap: Map[String, FileDescriptorProto]): Descriptors.FileDescriptor = { + val fileDescriptorList = fileDescriptorProto.getDependencyList().asScala.map { dependency => + fileDescriptorProtoMap.get(dependency) match { + case Some(dependencyProto) => + buildFileDescriptor(dependencyProto, fileDescriptorProtoMap) + case None => + throw QueryCompilationErrors.protobufDescriptorDependencyError(dependency) + } } + Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, fileDescriptorList.toArray) + } + + /** + * Returns a map from descriptor proto name as found inside the descriptors to protos. + */ + private def createDescriptorProtoMap( + fileDescriptorSet: FileDescriptorSet): Map[String, FileDescriptorProto] = { + fileDescriptorSet.getFileList().asScala.map { descriptorProto => + descriptorProto.getName() -> descriptorProto + }.toMap[String, FileDescriptorProto] } /** diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala index 4fca06fb5d8..6fcba3b8918 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor} import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.protobuf.ScalaReflectionLock import org.apache.spark.sql.types._ @@ -62,14 +63,18 @@ object SchemaConverters { case STRING => Some(StringType) case BYTE_STRING => Some(BinaryType) case ENUM => Some(StringType) - case MESSAGE if fd.getMessageType.getName == "Duration" => + case MESSAGE + if (fd.getMessageType.getName == "Duration" && + fd.getMessageType.getFields.size() == 2 && + fd.getMessageType.getFields.get(0).getName.equals("seconds") && + fd.getMessageType.getFields.get(1).getName.equals("nanos")) => Some(DayTimeIntervalType.defaultConcreteType) - case MESSAGE if fd.getMessageType.getName == "Timestamp" => - Some(TimestampType) - // FIXME: Is the above accurate? Users can have protos named "Timestamp" but are not - // expected to be TimestampType in Spark. How about verifying fields? - // Same for "Duration". Only the Timestamp & Duration protos defined in - // google.protobuf package should default to corresponding Catalylist types. + case MESSAGE + if (fd.getMessageType.getName == "Timestamp" && + fd.getMessageType.getFields.size() == 2 && + fd.getMessageType.getFields.get(0).getName.equals("seconds") && + fd.getMessageType.getFields.get(1).getName.equals("nanos")) => + Some(TimestampType) case MESSAGE if fd.isRepeated && fd.getMessageType.getOptions.hasMapEntry => var keyType: DataType = NullType var valueType: DataType = NullType @@ -88,9 +93,7 @@ object SchemaConverters { nullable = false)) case MESSAGE => if (existingRecordNames.contains(fd.getFullName)) { - throw new IncompatibleSchemaException(s""" - |Found recursive reference in Protobuf schema, which can not be processed by Spark: - |${fd.toString()}""".stripMargin) + throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString()) } val newRecordNames = existingRecordNames + fd.getFullName @@ -100,10 +103,8 @@ object SchemaConverters { .toSeq) .filter(_.nonEmpty) .map(StructType.apply) - case _ => - throw new IncompatibleSchemaException( - s"Cannot convert Protobuf type" + - s" ${fd.getJavaType}") + case other => + throw QueryCompilationErrors.protobufTypeUnsupportedYetError(other.toString) } dataType.map(dt => StructField( @@ -111,7 +112,4 @@ object SchemaConverters { if (fd.isRepeated) ArrayType(dt, containsNull = false) else dt, nullable = !fd.isRequired && !fd.isRepeated)) } - - private[protobuf] class IncompatibleSchemaException(msg: String, ex: Throwable = null) - extends Exception(msg, ex) } diff --git a/connector/protobuf/src/test/resources/protobuf/basicmessage.proto b/connector/protobuf/src/test/resources/protobuf/basicmessage.proto new file mode 100644 index 00000000000..4252f349cf0 --- /dev/null +++ b/connector/protobuf/src/test/resources/protobuf/basicmessage.proto @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// cd connector/protobuf/src/test/resources/protobuf +// protoc --java_out=./ basicmessage.proto +// protoc --include_imports --descriptor_set_out=basicmessage.desc --java_out=org/apache/spark/sql/protobuf/ basicmessage.proto + +syntax = "proto3"; + +package org.apache.spark.sql.protobuf.protos; + +import "nestedenum.proto"; + +option java_outer_classname = "BasicMessageProto"; + +message BasicMessage { + int64 id = 1; + string string_value = 2; + int32 int32_value = 3; + int64 int64_value = 4; + double double_value = 5; + float float_value = 6; + bool bool_value = 7; + bytes bytes_value = 8; + NestedEnum rnested_enum = 9; +} diff --git a/connector/protobuf/src/test/resources/protobuf/catalyst_types.proto b/connector/protobuf/src/test/resources/protobuf/catalyst_types.proto index 1deb193438c..0732de10858 100644 --- a/connector/protobuf/src/test/resources/protobuf/catalyst_types.proto +++ b/connector/protobuf/src/test/resources/protobuf/catalyst_types.proto @@ -22,8 +22,6 @@ syntax = "proto3"; package org.apache.spark.sql.protobuf.protos; option java_outer_classname = "CatalystTypes"; -// TODO: import one or more protobuf files. - message BooleanMsg { bool bool_type = 1; } diff --git a/connector/protobuf/src/test/resources/protobuf/duration.proto b/connector/protobuf/src/test/resources/protobuf/duration.proto new file mode 100644 index 00000000000..2e89a8db5b7 --- /dev/null +++ b/connector/protobuf/src/test/resources/protobuf/duration.proto @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto3"; + +package org.apache.spark.sql.protobuf.protos; + +option java_outer_classname = "DurationProto"; + +message Duration { + int64 seconds = 1; + int32 nanos = 2; +} diff --git a/connector/protobuf/src/test/resources/protobuf/functions_suite.desc b/connector/protobuf/src/test/resources/protobuf/functions_suite.desc index 6e3a3967277..d54ee4337a5 100644 Binary files a/connector/protobuf/src/test/resources/protobuf/functions_suite.desc and b/connector/protobuf/src/test/resources/protobuf/functions_suite.desc differ diff --git a/connector/protobuf/src/test/resources/protobuf/functions_suite.proto b/connector/protobuf/src/test/resources/protobuf/functions_suite.proto index 60f8c262141..2fef8495c5e 100644 --- a/connector/protobuf/src/test/resources/protobuf/functions_suite.proto +++ b/connector/protobuf/src/test/resources/protobuf/functions_suite.proto @@ -15,13 +15,18 @@ * limitations under the License. */ // To compile and create test class: -// protoc --java_out=connector/protobuf/src/test/resources/protobuf/ connector/protobuf/src/test/resources/protobuf/functions_suite.proto -// protoc --descriptor_set_out=connector/protobuf/src/test/resources/protobuf/functions_suite.desc --java_out=connector/protobuf/src/test/resources/protobuf/org/apache/spark/sql/protobuf/ connector/protobuf/src/test/resources/protobuf/functions_suite.proto +// cd connector/protobuf/src/test/resources/protobuf +// protoc --java_out=./ functions_suite.proto +// protoc --include_imports --descriptor_set_out=functions_suite.desc --java_out=org/apache/spark/sql/protobuf/ functions_suite.proto syntax = "proto3"; package org.apache.spark.sql.protobuf.protos; +import "timestamp.proto"; +import "duration.proto"; +import "basicmessage.proto"; + option java_outer_classname = "SimpleMessageProtos"; message SimpleMessageJavaTypes { @@ -58,7 +63,7 @@ message SimpleMessageRepeated { string key = 1; string value = 2; enum NestedEnum { - ESTED_NOTHING = 0; + NESTED_NOTHING = 0; NESTED_FIRST = 1; NESTED_SECOND = 2; } @@ -72,17 +77,6 @@ message SimpleMessageRepeated { repeated NestedEnum rnested_enum = 10; } -message BasicMessage { - int64 id = 1; - string string_value = 2; - int32 int32_value = 3; - int64 int64_value = 4; - double double_value = 5; - float float_value = 6; - bool bool_value = 7; - bytes bytes_value = 8; -} - message RepeatedMessage { repeated BasicMessage basic_message = 1; } @@ -119,7 +113,7 @@ message SimpleMessageEnum { string key = 1; string value = 2; enum NestedEnum { - ESTED_NOTHING = 0; // TODO: Fix the name. + NESTED_NOTHING = 0; NESTED_FIRST = 1; NESTED_SECOND = 2; } @@ -168,21 +162,10 @@ message requiredMsg { int32 col_3 = 4; } -// https://github.com/protocolbuffers/protobuf/blob/main/src/google/protobuf/timestamp.proto -message Timestamp { - int64 seconds = 1; - int32 nanos = 2; -} - message timeStampMsg { string key = 1; Timestamp stmp = 2; } -// https://github.com/protocolbuffers/protobuf/blob/main/src/google/protobuf/duration.proto -message Duration { - int64 seconds = 1; - int32 nanos = 2; -} message durationMsg { string key = 1; diff --git a/connector/protobuf/src/test/resources/protobuf/nestedenum.proto b/connector/protobuf/src/test/resources/protobuf/nestedenum.proto new file mode 100644 index 00000000000..20e9005bec0 --- /dev/null +++ b/connector/protobuf/src/test/resources/protobuf/nestedenum.proto @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +package org.apache.spark.sql.protobuf.protos; + +option java_outer_classname = "NestedEnumProto"; + +enum NestedEnum { + NESTED_NOTHING = 0; + NESTED_FIRST = 1; + NESTED_SECOND = 2; +} diff --git a/connector/protobuf/src/test/resources/protobuf/timestamp.proto b/connector/protobuf/src/test/resources/protobuf/timestamp.proto new file mode 100644 index 00000000000..7616cc2ccfc --- /dev/null +++ b/connector/protobuf/src/test/resources/protobuf/timestamp.proto @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto3"; + +package org.apache.spark.sql.protobuf.protos; + +option java_outer_classname = "TimestampProto"; + +message Timestamp { + int64 seconds = 1; + int32 nanos = 2; +} diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala index 19774a2ad07..271c5b0fec8 100644 --- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala +++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala @@ -36,7 +36,7 @@ class ProtobufCatalystDataConversionSuite with SharedSparkSession with ExpressionEvalHelper { - private val testFileDesc = testFile("protobuf/catalyst_types.desc").replace("file:/", "/") + private val testFileDesc = testFile("catalyst_types.desc").replace("file:/", "/") private val javaClassNamePrefix = "org.apache.spark.sql.protobuf.protos.CatalystTypes$" private def checkResultWithEval( diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala index 72280fb0d9e..199ef235f14 100644 --- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala +++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala @@ -24,11 +24,11 @@ import scala.collection.JavaConverters._ import com.google.protobuf.{ByteString, DynamicMessage} import org.apache.spark.sql.{Column, QueryTest, Row} +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.functions.{lit, struct} import org.apache.spark.sql.protobuf.protos.SimpleMessageProtos.SimpleMessageRepeated import org.apache.spark.sql.protobuf.protos.SimpleMessageProtos.SimpleMessageRepeated.NestedEnum import org.apache.spark.sql.protobuf.utils.ProtobufUtils -import org.apache.spark.sql.protobuf.utils.SchemaConverters.IncompatibleSchemaException import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{DayTimeIntervalType, IntegerType, StringType, StructField, StructType, TimestampType} @@ -36,7 +36,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri import testImplicits._ - val testFileDesc = testFile("protobuf/functions_suite.desc").replace("file:/", "/") + val testFileDesc = testFile("functions_suite.desc").replace("file:/", "/") private val javaClassNamePrefix = "org.apache.spark.sql.protobuf.protos.SimpleMessageProtos$" /** @@ -114,7 +114,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri .addRdoubleValue(1092093.654d) .addRfloatValue(10903.0f) .addRfloatValue(10902.0f) - .addRnestedEnum(NestedEnum.ESTED_NOTHING) + .addRnestedEnum(NestedEnum.NESTED_NOTHING) .addRnestedEnum(NestedEnum.NESTED_FIRST) .build() @@ -324,7 +324,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri .setField(messageEnumDesc.findFieldByName("value"), "value") .setField( messageEnumDesc.findFieldByName("nested_enum"), - messageEnumDesc.findEnumTypeByName("NestedEnum").findValueByName("ESTED_NOTHING")) + messageEnumDesc.findEnumTypeByName("NestedEnum").findValueByName("NESTED_NOTHING")) .setField( messageEnumDesc.findFieldByName("nested_enum"), messageEnumDesc.findEnumTypeByName("NestedEnum").findValueByName("NESTED_FIRST")) @@ -410,7 +410,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri checkWithFileAndClassName("recursiveB") { case (name, descFilePathOpt) => - val e = intercept[IncompatibleSchemaException] { + val e = intercept[AnalysisException] { df.select( from_protobuf_wrapper($"messageB", name, descFilePathOpt).as("messageFromProto")) .show() @@ -446,7 +446,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri checkWithFileAndClassName("recursiveD") { case (name, descFilePathOpt) => - val e = intercept[IncompatibleSchemaException] { + val e = intercept[AnalysisException] { df.select( from_protobuf_wrapper($"messageD", name, descFilePathOpt).as("messageFromProto")) .show() @@ -458,7 +458,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri } test("Handle extra fields : oldProducer -> newConsumer") { - val testFileDesc = testFile("protobuf/catalyst_types.desc").replace("file:/", "/") + val testFileDesc = testFile("catalyst_types.desc").replace("file:/", "/") val oldProducer = ProtobufUtils.buildDescriptor(testFileDesc, "oldProducer") val newConsumer = ProtobufUtils.buildDescriptor(testFileDesc, "newConsumer") @@ -498,7 +498,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri } test("Handle extra fields : newProducer -> oldConsumer") { - val testFileDesc = testFile("protobuf/catalyst_types.desc").replace("file:/", "/") + val testFileDesc = testFile("catalyst_types.desc").replace("file:/", "/") val newProducer = ProtobufUtils.buildDescriptor(testFileDesc, "newProducer") val oldConsumer = ProtobufUtils.buildDescriptor(testFileDesc, "oldConsumer") @@ -587,19 +587,16 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri val df = Seq(basicMessage.toByteArray).toDF("value") - checkWithFileAndClassName("BasicMessage") { - case (name, descFilePathOpt) => - val resultFrom = df - .select(from_protobuf_wrapper($"value", name, descFilePathOpt) as 'sample) - .where("sample.string_value == \"slam\"") + val resultFrom = df + .select(from_protobuf_wrapper($"value", "BasicMessage", Some(testFileDesc)) as 'sample) + .where("sample.string_value == \"slam\"") - val resultToFrom = resultFrom - .select(to_protobuf_wrapper($"sample", name, descFilePathOpt) as 'value) - .select(from_protobuf_wrapper($"value", name, descFilePathOpt) as 'sample) - .where("sample.string_value == \"slam\"") + val resultToFrom = resultFrom + .select(to_protobuf_wrapper($"sample", "BasicMessage", Some(testFileDesc)) as 'value) + .select(from_protobuf_wrapper($"value", "BasicMessage", Some(testFileDesc)) as 'sample) + .where("sample.string_value == \"slam\"") - assert(resultFrom.except(resultToFrom).isEmpty) - } + assert(resultFrom.except(resultToFrom).isEmpty) } test("Handle TimestampType between to_protobuf and from_protobuf") { diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala index efc02524e68..840535654ed 100644 --- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala +++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala @@ -20,9 +20,10 @@ package org.apache.spark.sql.protobuf import com.google.protobuf.Descriptors.Descriptor import com.google.protobuf.DynamicMessage +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.NoopFilters +import org.apache.spark.sql.catalyst.expressions.Cast.toSQLType import org.apache.spark.sql.protobuf.utils.ProtobufUtils -import org.apache.spark.sql.protobuf.utils.SchemaConverters.IncompatibleSchemaException import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, StructType} @@ -35,7 +36,7 @@ class ProtobufSerdeSuite extends SharedSparkSession { import ProtoSerdeSuite._ import ProtoSerdeSuite.MatchType._ - val testFileDesc = testFile("protobuf/serde_suite.desc").replace("file:/", "/") + val testFileDesc = testFile("serde_suite.desc").replace("file:/", "/") private val javaClassNamePrefix = "org.apache.spark.sql.protobuf.protos.SerdeSuiteProtos$" test("Test basic conversion") { @@ -65,22 +66,24 @@ class ProtobufSerdeSuite extends SharedSparkSession { test("Fail to convert with field type mismatch") { val protoFile = ProtobufUtils.buildDescriptor(testFileDesc, "MissMatchTypeInRoot") - withFieldMatchType { fieldMatch => assertFailedConversionMessage( protoFile, Deserializer, fieldMatch, - "Cannot convert Protobuf field 'foo' to SQL field 'foo' because schema is incompatible " + - s"(protoType = org.apache.spark.sql.protobuf.MissMatchTypeInRoot.foo " + - s"LABEL_OPTIONAL LONG INT64, sqlType = ${CATALYST_STRUCT.head.dataType.sql})".stripMargin) + errorClass = "CANNOT_CONVERT_PROTOBUF_MESSAGE_TYPE_TO_SQL_TYPE", + params = Map( + "protobufType" -> "MissMatchTypeInRoot", + "toType" -> toSQLType(CATALYST_STRUCT))) assertFailedConversionMessage( protoFile, Serializer, fieldMatch, - s"Cannot convert SQL field 'foo' to Protobuf field 'foo' because schema is incompatible " + - s"""(sqlType = ${CATALYST_STRUCT.head.dataType.sql}, protoType = LONG)""") + errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE", + params = Map( + "protobufType" -> "MissMatchTypeInRoot", + "toType" -> toSQLType(CATALYST_STRUCT))) } } @@ -91,9 +94,22 @@ class ProtobufSerdeSuite extends SharedSparkSession { .add("foo", new StructType().add("bar", IntegerType, nullable = false)) // serialize fails whether or not 'bar' is nullable - val byNameMsg = "Cannot find field 'foo.bar' in Protobuf schema" - assertFailedConversionMessage(protoFile, Serializer, BY_NAME, byNameMsg) - assertFailedConversionMessage(protoFile, Serializer, BY_NAME, byNameMsg, nonnullCatalyst) + assertFailedConversionMessage( + protoFile, + Serializer, + BY_NAME, + errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE", + params = Map( + "protobufType" -> "FieldMissingInProto", + "toType" -> toSQLType(CATALYST_STRUCT))) + + assertFailedConversionMessage(protoFile, + Serializer, + BY_NAME, + errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE", + params = Map( + "protobufType" -> "FieldMissingInProto", + "toType" -> toSQLType(nonnullCatalyst))) } test("Fail to convert with deeply nested field type mismatch") { @@ -107,18 +123,21 @@ class ProtobufSerdeSuite extends SharedSparkSession { protoFile, Deserializer, fieldMatch, - s"Cannot convert Protobuf field 'top.foo.bar' to SQL field 'top.foo.bar' because schema " + - s"is incompatible (protoType = org.apache.spark.sql.protobuf.protos.TypeMiss.bar " + - s"LABEL_OPTIONAL LONG INT64, sqlType = INT)", - catalyst) + catalyst, + errorClass = "CANNOT_CONVERT_PROTOBUF_MESSAGE_TYPE_TO_SQL_TYPE", + params = Map( + "protobufType" -> "MissMatchTypeInDeepNested", + "toType" -> toSQLType(catalyst))) assertFailedConversionMessage( protoFile, Serializer, fieldMatch, - "Cannot convert SQL field 'top.foo.bar' to Protobuf field 'top.foo.bar' because schema " + - """is incompatible (sqlType = INT, protoType = LONG)""", - catalyst) + catalyst, + errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE", + params = Map( + "protobufType" -> "MissMatchTypeInDeepNested", + "toType" -> toSQLType(catalyst))) } } @@ -130,7 +149,10 @@ class ProtobufSerdeSuite extends SharedSparkSession { protoFile, Serializer, BY_NAME, - "Found field 'boo' in Protobuf schema but there is no match in the SQL schema") + errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE", + params = Map( + "protobufType" -> "FieldMissingInSQLRoot", + "toType" -> toSQLType(CATALYST_STRUCT))) /* deserializing should work regardless of whether the extra field is missing in SQL Schema or not */ @@ -144,7 +166,10 @@ class ProtobufSerdeSuite extends SharedSparkSession { protoNestedFile, Serializer, BY_NAME, - "Found field 'foo.baz' in Protobuf schema but there is no match in the SQL schema") + errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE", + params = Map( + "protobufType" -> "FieldMissingInSQLNested", + "toType" -> toSQLType(CATALYST_STRUCT))) /* deserializing should work regardless of whether the extra field is missing in SQL Schema or not */ @@ -161,20 +186,28 @@ class ProtobufSerdeSuite extends SharedSparkSession { protoSchema: Descriptor, serdeFactory: SerdeFactory[_], fieldMatchType: MatchType, - expectedCauseMessage: String, - catalystSchema: StructType = CATALYST_STRUCT): Unit = { - val e = intercept[IncompatibleSchemaException] { + catalystSchema: StructType = CATALYST_STRUCT, + errorClass: String, + params: Map[String, String]): Unit = { + + val e = intercept[AnalysisException] { serdeFactory.create(catalystSchema, protoSchema, fieldMatchType) } + val expectMsg = serdeFactory match { case Deserializer => - s"Cannot convert Protobuf type ${protoSchema.getName} to SQL type ${catalystSchema.sql}." + s"[CANNOT_CONVERT_PROTOBUF_MESSAGE_TYPE_TO_SQL_TYPE] Unable to convert" + + s" ${protoSchema.getName} of Protobuf to SQL type ${toSQLType(catalystSchema)}." case Serializer => - s"Cannot convert SQL type ${catalystSchema.sql} to Protobuf type ${protoSchema.getName}." + s"[UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE] Unable to convert SQL type" + + s" ${toSQLType(catalystSchema)} to Protobuf type ${protoSchema.getName}." } assert(e.getMessage === expectMsg) - assert(e.getCause.getMessage === expectedCauseMessage) + checkError( + exception = e, + errorClass = errorClass, + parameters = params) } def withFieldMatchType(f: MatchType => Unit): Unit = { diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 053c2a7af21..7fc806752be 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -17,6 +17,31 @@ ], "sqlState" : "22005" }, + "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR" : { + "message" : [ + "Error constructing FileDescriptor for <descFilePath>" + ] + }, + "CANNOT_CONVERT_PROTOBUF_FIELD_TYPE_TO_SQL_TYPE" : { + "message" : [ + "Cannot convert Protobuf <protobufColumn> to SQL <sqlColumn> because schema is incompatible (protobufType = <protobufType>, sqlType = <sqlType>)." + ] + }, + "CANNOT_CONVERT_PROTOBUF_MESSAGE_TYPE_TO_SQL_TYPE" : { + "message" : [ + "Unable to convert <protobufType> of Protobuf to SQL type <toType>." + ] + }, + "CANNOT_CONVERT_SQL_TYPE_TO_PROTOBUF_ENUM_TYPE" : { + "message" : [ + "Cannot convert SQL <sqlColumn> to Protobuf <protobufColumn> because <data> cannot be written since it's not defined in ENUM <enumString>" + ] + }, + "CANNOT_CONVERT_SQL_TYPE_TO_PROTOBUF_FIELD_TYPE" : { + "message" : [ + "Cannot convert SQL <sqlColumn> to Protobuf <protobufColumn> because schema is incompatible (protobufType = <protobufType>, sqlType = <sqlType>)." + ] + }, "CANNOT_DECODE_URL" : { "message" : [ "Cannot decode url : <url>." @@ -29,12 +54,22 @@ ], "sqlState" : "22007" }, + "CANNOT_LOAD_PROTOBUF_CLASS" : { + "message" : [ + "Could not load Protobuf class with name <protobufClassName>. Ensure the class name includes package prefix." + ] + }, "CANNOT_PARSE_DECIMAL" : { "message" : [ "Cannot parse decimal" ], "sqlState" : "42000" }, + "CANNOT_PARSE_PROTOBUF_DESCRIPTOR" : { + "message" : [ + "Error parsing file <descFilePath> descriptor byte[] into Descriptor object" + ] + }, "CANNOT_PARSE_TIMESTAMP" : { "message" : [ "<message>. If necessary set <ansiConfig> to \"false\" to bypass this error." @@ -551,6 +586,11 @@ "Invalid bucket file: <path>" ] }, + "INVALID_BYTE_STRING" : { + "message" : [ + "The expected format is ByteString, but was <unsupported> (<class>)." + ] + }, "INVALID_COLUMN_OR_FIELD_DATA_TYPE" : { "message" : [ "Column or field <name> is of type <type> while it's required to be <expectedType>." @@ -600,6 +640,11 @@ "<value> is an invalid property value, please use quotes, e.g. SET <key>=<value>" ] }, + "INVALID_PROTOBUF_MESSAGE_TYPE" : { + "message" : [ + "<protobufClassName> is not a Protobuf message type" + ] + }, "INVALID_SQL_SYNTAX" : { "message" : [ "Invalid SQL syntax: <inputString>" @@ -618,6 +663,11 @@ } } }, + "MALFORMED_PROTOBUF_MESSAGE" : { + "message" : [ + "Malformed Protobuf messages are detected in message deserialization. Parse Mode: <failFastMode>. To process malformed protobuf message as null result, try setting the option 'mode' as 'PERMISSIVE'." + ] + }, "MISSING_STATIC_PARTITION_COLUMN" : { "message" : [ "Unknown static partition column: <columnName>" @@ -669,7 +719,12 @@ "No handler for UDAF '<functionName>'. Use sparkSession.udf.register(...) instead." ] }, - "NO_UDF_INTERFACE_ERROR" : { + "NO_SQL_TYPE_IN_PROTOBUF_SCHEMA" : { + "message" : [ + "Cannot find <catalystFieldPath> in Protobuf schema" + ] + }, + "NO_UDF_INTERFACE" : { "message" : [ "UDF class <className> doesn't implement any UDF interface" ] @@ -741,6 +796,46 @@ ], "sqlState" : "42000" }, + "PROTOBUF_DEPENDENCY_NOT_FOUND" : { + "message" : [ + "Could not find dependency: <dependencyName>" + ] + }, + "PROTOBUF_DESCRIPTOR_FILE_NOT_FOUND" : { + "message" : [ + "Error reading Protobuf descriptor file at path: <filePath>" + ] + }, + "PROTOBUF_FIELD_MISSING" : { + "message" : [ + "Searching for <field> in Protobuf schema at <protobufSchema> gave <matchSize> matches. Candidates: <matches>" + ] + }, + "PROTOBUF_FIELD_MISSING_IN_SQL_SCHEMA" : { + "message" : [ + "Found <field> in Protobuf schema but there is no match in the SQL schema" + ] + }, + "PROTOBUF_FIELD_TYPE_MISMATCH" : { + "message" : [ + "Type mismatch encountered for field: <field>" + ] + }, + "PROTOBUF_MESSAGE_NOT_FOUND" : { + "message" : [ + "Unable to locate Message <messageName> in Descriptor" + ] + }, + "PROTOBUF_TYPE_NOT_SUPPORT" : { + "message" : [ + "Protobuf type not yet supported: <protobufType>." + ] + }, + "RECURSIVE_PROTOBUF_SCHEMA" : { + "message" : [ + "Found recursive reference in Protobuf schema, which can not be processed by Spark: <fieldDescriptor>" + ] + }, "RENAME_SRC_PATH_NOT_FOUND" : { "message" : [ "Failed to rename as <sourcePath> was not found" @@ -827,6 +922,16 @@ "Unable to acquire <requestedBytes> bytes of memory, got <receivedBytes>" ] }, + "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE" : { + "message" : [ + "Unable to convert SQL type <toType> to Protobuf type <protobufType>." + ] + }, + "UNKNOWN_PROTOBUF_MESSAGE_TYPE" : { + "message" : [ + "Attempting to treat <descriptorName> as a Message, but it was <containingType>" + ] + }, "UNPIVOT_REQUIRES_ATTRIBUTES" : { "message" : [ "UNPIVOT requires all given <given> expressions to be columns when no <empty> expressions are given. These are not columns: [<expressions>]." diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index f61753f7b1d..18667d1efea 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -723,7 +723,7 @@ object SparkProtobuf { dependencyOverrides += "com.google.protobuf" % "protobuf-java" % protoVersion, - (Test / PB.protoSources) += (Test / sourceDirectory).value / "resources", + (Test / PB.protoSources) += (Test / sourceDirectory).value / "resources" / "protobuf", (Test / PB.targets) := Seq( PB.gens.java -> target.value / "generated-test-sources" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 03d16856755..ea2f961509a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3008,7 +3008,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { def udfClassDoesNotImplementAnyUDFInterfaceError(className: String): Throwable = { new AnalysisException( - errorClass = "NO_UDF_INTERFACE_ERROR", + errorClass = "NO_UDF_INTERFACE", messageParameters = Map("className" -> className)) } @@ -3213,4 +3213,179 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { messageParameters = Map("expression" -> toSQLExpr(expression)) ) } + + def cannotConvertProtobufTypeToSqlTypeError( + protobufColumn: String, + sqlColumn: Seq[String], + protobufType: String, + sqlType: DataType): Throwable = { + new AnalysisException( + errorClass = "CANNOT_CONVERT_PROTOBUF_FIELD_TYPE_TO_SQL_TYPE", + messageParameters = Map( + "protobufColumn" -> protobufColumn, + "sqlColumn" -> toSQLId(sqlColumn), + "protobufType" -> protobufType, + "sqlType" -> toSQLType(sqlType))) + } + + def cannotConvertCatalystTypeToProtobufTypeError( + sqlColumn: Seq[String], + protobufColumn: String, + sqlType: DataType, + protobufType: String): Throwable = { + new AnalysisException( + errorClass = "CANNOT_CONVERT_SQL_TYPE_TO_PROTOBUF_FIELD_TYPE", + messageParameters = Map( + "sqlColumn" -> toSQLId(sqlColumn), + "protobufColumn" -> protobufColumn, + "sqlType" -> toSQLType(sqlType), + "protobufType" -> protobufType)) + } + + def cannotConvertCatalystTypeToProtobufEnumTypeError( + sqlColumn: Seq[String], + protobufColumn: String, + data: String, + enumString: String): Throwable = { + new AnalysisException( + errorClass = "CANNOT_CONVERT_SQL_TYPE_TO_PROTOBUF_ENUM_TYPE", + messageParameters = Map( + "sqlColumn" -> toSQLId(sqlColumn), + "protobufColumn" -> protobufColumn, + "data" -> data, + "enumString" -> enumString)) + } + + def cannotConvertProtobufTypeToCatalystTypeError( + protobufType: String, + sqlType: DataType, + cause: Throwable): Throwable = { + new AnalysisException( + errorClass = "CANNOT_CONVERT_PROTOBUF_MESSAGE_TYPE_TO_SQL_TYPE", + messageParameters = Map( + "protobufType" -> protobufType, + "toType" -> toSQLType(sqlType)), + cause = Option(cause.getCause)) + } + + def cannotConvertSqlTypeToProtobufError( + protobufType: String, + sqlType: DataType, + cause: Throwable): Throwable = { + new AnalysisException( + errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_MESSAGE_TYPE", + messageParameters = Map( + "protobufType" -> protobufType, + "toType" -> toSQLType(sqlType)), + cause = Option(cause.getCause)) + } + + def protobufTypeUnsupportedYetError(protobufType: String): Throwable = { + new AnalysisException( + errorClass = "PROTOBUF_TYPE_NOT_SUPPORT", + messageParameters = Map("protobufType" -> protobufType)) + } + + def unknownProtobufMessageTypeError( + descriptorName: String, + containingType: String): Throwable = { + new AnalysisException( + errorClass = "UNKNOWN_PROTOBUF_MESSAGE_TYPE", + messageParameters = Map( + "descriptorName" -> descriptorName, + "containingType" -> containingType)) + } + + def cannotFindCatalystTypeInProtobufSchemaError(catalystFieldPath: String): Throwable = { + new AnalysisException( + errorClass = "NO_SQL_TYPE_IN_PROTOBUF_SCHEMA", + messageParameters = Map("catalystFieldPath" -> catalystFieldPath)) + } + + def cannotFindProtobufFieldInCatalystError(field: String): Throwable = { + new AnalysisException( + errorClass = "PROTOBUF_FIELD_MISSING_IN_SQL_SCHEMA", + messageParameters = Map("field" -> field)) + } + + def protobufFieldMatchError(field: String, + protobufSchema: String, + matchSize: String, + matches: String): Throwable = { + new AnalysisException( + errorClass = "PROTOBUF_FIELD_MISSING", + messageParameters = Map( + "field" -> field, + "protobufSchema" -> protobufSchema, + "matchSize" -> matchSize, + "matches" -> matches)) + } + + def unableToLocateProtobufMessageError(messageName: String): Throwable = { + new AnalysisException( + errorClass = "PROTOBUF_MESSAGE_NOT_FOUND", + messageParameters = Map("messageName" -> messageName)) + } + + def descrioptorParseError(descFilePath: String, cause: Throwable): Throwable = { + new AnalysisException( + errorClass = "CANNOT_PARSE_PROTOBUF_DESCRIPTOR", + messageParameters = Map.empty("descFilePath" -> descFilePath), + cause = Option(cause.getCause)) + } + + def cannotFindDescriptorFileError(filePath: String, cause: Throwable): Throwable = { + new AnalysisException( + errorClass = "PROTOBUF_DESCRIPTOR_FILE_NOT_FOUND", + messageParameters = Map("filePath" -> filePath), + cause = Option(cause.getCause)) + } + + def failedParsingDescriptorError(descFilePath: String, cause: Throwable): Throwable = { + new AnalysisException( + errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR", + messageParameters = Map.empty("descFilePath" -> descFilePath), + cause = Option(cause.getCause)) + } + + def foundRecursionInProtobufSchema(fieldDescriptor: String): Throwable = { + new AnalysisException( + errorClass = "RECURSIVE_PROTOBUF_SCHEMA", + messageParameters = Map("fieldDescriptor" -> fieldDescriptor)) + } + + def protobufFieldTypeMismatchError(field: String): Throwable = { + new AnalysisException( + errorClass = "PROTOBUF_FIELD_TYPE_MISMATCH", + messageParameters = Map("field" -> field)) + } + + def protobufClassLoadError( + protobufClassName: String, + cause: Throwable): Throwable = { + new AnalysisException( + errorClass = "CANNOT_LOAD_PROTOBUF_CLASS", + messageParameters = Map("protobufClassName" -> protobufClassName), + cause = Option(cause.getCause)) + } + + def protobufMessageTypeError(protobufClassName: String): Throwable = { + new AnalysisException( + errorClass = "INVALID_PROTOBUF_MESSAGE_TYPE", + messageParameters = Map("protobufClassName" -> protobufClassName)) + } + + def protobufDescriptorDependencyError(dependencyName: String): Throwable = { + new AnalysisException( + errorClass = "PROTOBUF_DEPENDENCY_NOT_FOUND", + messageParameters = Map("dependencyName" -> dependencyName)) + } + + def invalidByteStringFormatError(unsupported: Any): Throwable = { + new AnalysisException( + errorClass = "INVALID_BYTE_STRING", + messageParameters = Map( + "unsupported" -> unsupported.toString, + "class" -> unsupported.getClass.toString)) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index f78ff23d269..b8febfffba1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -2808,4 +2808,12 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { errorClass = "UNSUPPORTED_EMPTY_LOCATION", messageParameters = Map.empty) } + + def malformedProtobufMessageDetectedInMessageParsingError(e: Throwable): Throwable = { + new SparkException( + errorClass = "MALFORMED_PROTOBUF_MESSAGE", + messageParameters = Map( + "failFastMode" -> FailFastMode.name), + cause = e) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala index 8086a0e97ec..bed647ef49f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala @@ -227,7 +227,7 @@ class QueryCompilationErrorsSuite parameters = Map[String, String]()) } - test("NO_UDF_INTERFACE_ERROR: java udf class does not implement any udf interface") { + test("NO_UDF_INTERFACE: java udf class does not implement any udf interface") { val className = "org.apache.spark.sql.errors.MyCastToString" val e = intercept[AnalysisException]( spark.udf.registerJava( @@ -237,7 +237,7 @@ class QueryCompilationErrorsSuite ) checkError( exception = e, - errorClass = "NO_UDF_INTERFACE_ERROR", + errorClass = "NO_UDF_INTERFACE", parameters = Map("className" -> className)) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org