mposdev21 commented on code in PR #37972:
URL: https://github.com/apache/spark/pull/37972#discussion_r983964343


##########
connector/proto/src/main/scala/org/apache/spark/sql/proto/ProtoDataToCatalyst.scala:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.proto
+
+import java.io.ByteArrayInputStream
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.DynamicMessage
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression,
+  SpecificInternalRow, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
CodeGenerator, ExprCode}
+import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, 
PermissiveMode}
+import org.apache.spark.sql.proto.utils.{ProtoOptions, ProtoUtils, 
SchemaConverters}
+import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, 
StructType}
+
+private[proto] case class ProtoDataToCatalyst(child: Expression, descFilePath: 
String,
+                                              messageName: String,
+                                              options: Map[String, String])
+  extends UnaryExpression with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
+
+  override lazy val dataType: DataType = {
+    val dt = SchemaConverters.toSqlType(expectedSchema).dataType
+    parseMode match {
+      // With PermissiveMode, the output Catalyst row might contain columns of 
null values for
+      // corrupt records, even if some of the columns are not nullable in the 
user-provided schema.
+      // Therefore we force the schema to be all nullable here.
+      case PermissiveMode => dt.asNullable
+      case _ => dt
+    }
+  }
+
+  override def nullable: Boolean = true
+
+  private lazy val protoOptions = ProtoOptions(options)
+
+  @transient private lazy val descriptor = 
ProtoUtils.buildDescriptor(descFilePath, messageName)
+
+  @transient private lazy val expectedSchema = 
protoOptions.schema.getOrElse(descriptor)
+
+  @transient private lazy val deserializer = new 
ProtoDeserializer(expectedSchema, dataType,
+    protoOptions.datetimeRebaseModeInRead)
+
+  @transient private var result: Any = _
+
+  @transient private lazy val parseMode: ParseMode = {
+    val mode = protoOptions.parseMode
+    if (mode != PermissiveMode && mode != FailFastMode) {
+      throw new AnalysisException(unacceptableModeMessage(mode.name))
+    }
+    mode
+  }
+
+  private def unacceptableModeMessage(name: String): String = {
+    s"from_proto() doesn't support the $name mode. " +
+      s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}."
+  }
+
+  @transient private lazy val nullResultRow: Any = dataType match {
+    case st: StructType =>
+      val resultRow = new SpecificInternalRow(st.map(_.dataType))
+      for (i <- 0 until st.length) {
+        resultRow.setNullAt(i)
+      }
+      resultRow
+
+    case _ =>
+      null
+  }
+
+  private def handleException(e: Throwable): Any = {
+    parseMode match {
+      case PermissiveMode =>
+        nullResultRow
+      case FailFastMode =>
+        throw new SparkException("Malformed records are detected in record 
parsing. " +
+          s"Current parse Mode: ${FailFastMode.name}. To process malformed 
records as null " +
+          "result, try setting the option 'mode' as 'PERMISSIVE'.", e)
+      case _ =>
+        throw new AnalysisException(unacceptableModeMessage(parseMode.name))
+    }
+  }
+
+  override def nullSafeEval(input: Any): Any = {
+    val binary = input.asInstanceOf[Array[Byte]]
+    try {
+      result = DynamicMessage.parseFrom(descriptor, new 
ByteArrayInputStream(binary))
+      val unknownFields = result.asInstanceOf[DynamicMessage].getUnknownFields
+      if (!unknownFields.asMap().isEmpty) {
+        return handleException(new Throwable("UnknownFields encountered"))

Review Comment:
   We wanted to support detecting invalid schema. When a protobuf is 
deserialized with invalid descriptor using FAIL_SAFE or PERMISSIVE, we wanted 
to raise an assertion or return a null row. The message was successfully 
deserialized without any failures even in the case of an invalid schema. For 
example, if a String was deserialized as Boolean, it would successfully 
deserialize them without any error. Only way to detect them is using 
UnknownFields. Even in schema evolution, you may add new fields but cannot 
change the type of an existing field. We could not find the right API to detect 
invalid schema vs evolved schema. 



##########
connector/proto/pom.xml:
##########
@@ -0,0 +1,119 @@
+<?xml version="1.0" encoding="UTF-8"?>

Review Comment:
   A) Yes, we are working on that. We wanted to get the initial review before 
starting on that.
   B) We have done an initial prototype with Confluent schema registry. Is 
there a open source schema registry that you had in mind ?
   C) Maps is supported
   D) We have tested with proto3. Do you think proto2 support is important ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to