mposdev21 commented on code in PR #37972: URL: https://github.com/apache/spark/pull/37972#discussion_r983964090
########## connector/proto/src/main/scala/org/apache/spark/sql/proto/CatalystDataToProto.scala: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.proto Review Comment: That makes sense. I am assuming that this would be a wholesale change wherever proto appears ? ########## connector/proto/src/main/scala/org/apache/spark/sql/proto/CatalystDataToProto.scala: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.proto + +import com.google.protobuf.DynamicMessage + +import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.proto.utils.{ProtoUtils, SchemaConverters} +import org.apache.spark.sql.types.{BinaryType, DataType} + +private[proto] case class CatalystDataToProto( + child: Expression, + descFilePath: Option[String], + messageName: Option[String]) + extends UnaryExpression { + + override def dataType: DataType = BinaryType + + @transient private lazy val protoType = (descFilePath, messageName) match { + case (Some(a), Some(b)) => ProtoUtils.buildDescriptor(a, b) + case _ => SchemaConverters.toProtoType(child.dataType, child.nullable) Review Comment: We support both with and without a descriptor. I agree that deserializing (from_protobuf) without a protobuf descriptor will not work. It was mainly added to maintain symmetry with to_avro which supports without a schema. ########## connector/proto/src/main/scala/org/apache/spark/sql/proto/ProtoDataToCatalyst.scala: ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.proto + +import java.io.ByteArrayInputStream + +import scala.util.control.NonFatal + +import com.google.protobuf.DynamicMessage + +import org.apache.spark.SparkException +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, + SpecificInternalRow, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode} +import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, PermissiveMode} +import org.apache.spark.sql.proto.utils.{ProtoOptions, ProtoUtils, SchemaConverters} +import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, StructType} + +private[proto] case class ProtoDataToCatalyst(child: Expression, descFilePath: String, + messageName: String, + options: Map[String, String]) + extends UnaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType) + + override lazy val dataType: DataType = { + val dt = SchemaConverters.toSqlType(expectedSchema).dataType + parseMode match { + // With PermissiveMode, the output Catalyst row might contain columns of null values for + // corrupt records, even if some of the columns are not nullable in the user-provided schema. + // Therefore we force the schema to be all nullable here. + case PermissiveMode => dt.asNullable + case _ => dt + } + } + + override def nullable: Boolean = true + + private lazy val protoOptions = ProtoOptions(options) + + @transient private lazy val descriptor = ProtoUtils.buildDescriptor(descFilePath, messageName) + + @transient private lazy val expectedSchema = protoOptions.schema.getOrElse(descriptor) + + @transient private lazy val deserializer = new ProtoDeserializer(expectedSchema, dataType, + protoOptions.datetimeRebaseModeInRead) + + @transient private var result: Any = _ + + @transient private lazy val parseMode: ParseMode = { + val mode = protoOptions.parseMode + if (mode != PermissiveMode && mode != FailFastMode) { + throw new AnalysisException(unacceptableModeMessage(mode.name)) + } + mode + } + + private def unacceptableModeMessage(name: String): String = { + s"from_proto() doesn't support the $name mode. " + + s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}." + } + + @transient private lazy val nullResultRow: Any = dataType match { + case st: StructType => + val resultRow = new SpecificInternalRow(st.map(_.dataType)) + for (i <- 0 until st.length) { + resultRow.setNullAt(i) + } + resultRow + + case _ => + null + } + + private def handleException(e: Throwable): Any = { + parseMode match { + case PermissiveMode => + nullResultRow + case FailFastMode => + throw new SparkException("Malformed records are detected in record parsing. " + + s"Current parse Mode: ${FailFastMode.name}. To process malformed records as null " + + "result, try setting the option 'mode' as 'PERMISSIVE'.", e) + case _ => + throw new AnalysisException(unacceptableModeMessage(parseMode.name)) + } + } + + override def nullSafeEval(input: Any): Any = { + val binary = input.asInstanceOf[Array[Byte]] + try { + result = DynamicMessage.parseFrom(descriptor, new ByteArrayInputStream(binary)) Review Comment: Yes, we will make the change to use the binary directly in parseFrom. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
