rangadi commented on code in PR #37972:
URL: https://github.com/apache/spark/pull/37972#discussion_r984057216


##########
connector/proto/src/main/scala/org/apache/spark/sql/proto/CatalystDataToProto.scala:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.proto
+
+import com.google.protobuf.DynamicMessage
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
+import org.apache.spark.sql.proto.utils.{ProtoUtils, SchemaConverters}
+import org.apache.spark.sql.types.{BinaryType, DataType}
+
+private[proto] case class CatalystDataToProto(
+                                               child: Expression,
+                                               descFilePath: Option[String],
+                                               messageName: Option[String])
+  extends UnaryExpression {
+
+  override def dataType: DataType = BinaryType
+
+  @transient private lazy val protoType = (descFilePath, messageName) match {
+      case (Some(a), Some(b)) => ProtoUtils.buildDescriptor(a, b)
+      case _ => SchemaConverters.toProtoType(child.dataType, child.nullable)

Review Comment:
   I would say lets not support it. Users will use it without realizing the 
limitations and will be confused. Protobufs are never meant be used without 
access to schema. 



##########
connector/proto/src/main/scala/org/apache/spark/sql/proto/ProtoDataToCatalyst.scala:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.proto
+
+import java.io.ByteArrayInputStream
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.DynamicMessage
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression,
+  SpecificInternalRow, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
CodeGenerator, ExprCode}
+import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, 
PermissiveMode}
+import org.apache.spark.sql.proto.utils.{ProtoOptions, ProtoUtils, 
SchemaConverters}
+import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, 
StructType}
+
+private[proto] case class ProtoDataToCatalyst(child: Expression, descFilePath: 
String,
+                                              messageName: String,
+                                              options: Map[String, String])
+  extends UnaryExpression with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
+
+  override lazy val dataType: DataType = {
+    val dt = SchemaConverters.toSqlType(expectedSchema).dataType
+    parseMode match {
+      // With PermissiveMode, the output Catalyst row might contain columns of 
null values for
+      // corrupt records, even if some of the columns are not nullable in the 
user-provided schema.
+      // Therefore we force the schema to be all nullable here.
+      case PermissiveMode => dt.asNullable
+      case _ => dt
+    }
+  }
+
+  override def nullable: Boolean = true
+
+  private lazy val protoOptions = ProtoOptions(options)
+
+  @transient private lazy val descriptor = 
ProtoUtils.buildDescriptor(descFilePath, messageName)
+
+  @transient private lazy val expectedSchema = 
protoOptions.schema.getOrElse(descriptor)
+
+  @transient private lazy val deserializer = new 
ProtoDeserializer(expectedSchema, dataType,
+    protoOptions.datetimeRebaseModeInRead)
+
+  @transient private var result: Any = _
+
+  @transient private lazy val parseMode: ParseMode = {
+    val mode = protoOptions.parseMode
+    if (mode != PermissiveMode && mode != FailFastMode) {
+      throw new AnalysisException(unacceptableModeMessage(mode.name))
+    }
+    mode
+  }
+
+  private def unacceptableModeMessage(name: String): String = {
+    s"from_proto() doesn't support the $name mode. " +
+      s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}."
+  }
+
+  @transient private lazy val nullResultRow: Any = dataType match {
+    case st: StructType =>
+      val resultRow = new SpecificInternalRow(st.map(_.dataType))
+      for (i <- 0 until st.length) {
+        resultRow.setNullAt(i)
+      }
+      resultRow
+
+    case _ =>
+      null
+  }
+
+  private def handleException(e: Throwable): Any = {
+    parseMode match {
+      case PermissiveMode =>
+        nullResultRow
+      case FailFastMode =>
+        throw new SparkException("Malformed records are detected in record 
parsing. " +
+          s"Current parse Mode: ${FailFastMode.name}. To process malformed 
records as null " +
+          "result, try setting the option 'mode' as 'PERMISSIVE'.", e)
+      case _ =>
+        throw new AnalysisException(unacceptableModeMessage(parseMode.name))
+    }
+  }
+
+  override def nullSafeEval(input: Any): Any = {
+    val binary = input.asInstanceOf[Array[Byte]]
+    try {
+      result = DynamicMessage.parseFrom(descriptor, new 
ByteArrayInputStream(binary))
+      val unknownFields = result.asInstanceOf[DynamicMessage].getUnknownFields
+      if (!unknownFields.asMap().isEmpty) {
+        return handleException(new Throwable("UnknownFields encountered"))

Review Comment:
   New fields would be so common that we need to allow this.
   
   One way to detect invalid schema vs newer version: We can check if field id 
of any of the unknown fields matches any of the known fields. That will catch 
errors like field 3 being in String in original schema and int in incorrect 
schema.



##########
connector/proto/src/main/scala/org/apache/spark/sql/proto/utils/SchemaConverters.scala:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.proto.utils
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor}
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.proto.ScalaReflectionLock
+import org.apache.spark.sql.types._
+
+@DeveloperApi
+object SchemaConverters {
+  /**
+   * Internal wrapper for SQL data type and nullability.
+   *
+   * @since 3.4.0
+   */
+  case class SchemaType(dataType: DataType, nullable: Boolean)
+
+  /**
+   * Converts an Proto schema to a corresponding Spark SQL schema.
+   *
+   * @since 3.4.0
+   */
+  def toSqlType(protoSchema: Descriptor): SchemaType = {
+    toSqlTypeHelper(protoSchema)
+  }
+
+  def toSqlTypeHelper(descriptor: Descriptor): SchemaType = 
ScalaReflectionLock.synchronized {
+    
SchemaType(StructType(descriptor.getFields.asScala.flatMap(structFieldFor).toSeq),
+      nullable = true)
+  }
+
+  def structFieldFor(fd: FieldDescriptor): Option[StructField] = {

Review Comment:
   It should be an error if it it does not match the case, right?



##########
connector/proto/src/main/scala/org/apache/spark/sql/proto/ProtoDeserializer.scala:
##########
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.proto
+
+import com.google.protobuf.{ByteString, DynamicMessage}
+import com.google.protobuf.Descriptors._
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._
+
+import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, StructFilters}
+import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, 
UnsafeArrayData}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
+import org.apache.spark.sql.execution.datasources.DataSourceUtils
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
+import org.apache.spark.sql.proto.utils.ProtoUtils
+import org.apache.spark.sql.proto.utils.ProtoUtils.ProtoMatchedField
+import org.apache.spark.sql.proto.utils.ProtoUtils.toFieldStr
+import 
org.apache.spark.sql.proto.utils.SchemaConverters.IncompatibleSchemaException
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, 
ByteType, DataType,
+  DateType, Decimal, DoubleType, FloatType, IntegerType, LongType, NullType,
+  ShortType, StringType, StructType}
+import org.apache.spark.unsafe.types.UTF8String
+
+private[sql] class ProtoDeserializer(
+                                      rootProtoType: Descriptor,
+                                      rootCatalystType: DataType,
+                                      positionalFieldMatch: Boolean,
+                                      datetimeRebaseSpec: RebaseSpec,

Review Comment:
   I am not sure actually. I will ask some Spark experts to comment. 



##########
connector/proto/pom.xml:
##########
@@ -0,0 +1,119 @@
+<?xml version="1.0" encoding="UTF-8"?>

Review Comment:
   A) Great. Given how widely Python is used over scala, it would be essential. 
We could get this merged sooner than later and have Python support added. 
   B) Confluent schema-registry is good. That is the most prevalent one. 
   C) Map support: Is that yet to to be pushed to this github? I don't see it 
here.
   D) proto 2 is very common. We should discuss about where it actually matters 
for this PR. Likely with minimal changes we could be version agnostic. 



##########
connector/proto/src/main/scala/org/apache/spark/sql/proto/utils/SchemaConverters.scala:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.proto.utils
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor}
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.proto.ScalaReflectionLock
+import org.apache.spark.sql.types._
+
+@DeveloperApi
+object SchemaConverters {
+  /**
+   * Internal wrapper for SQL data type and nullability.
+   *
+   * @since 3.4.0
+   */
+  case class SchemaType(dataType: DataType, nullable: Boolean)
+
+  /**
+   * Converts an Proto schema to a corresponding Spark SQL schema.
+   *
+   * @since 3.4.0
+   */
+  def toSqlType(protoSchema: Descriptor): SchemaType = {
+    toSqlTypeHelper(protoSchema)
+  }
+
+  def toSqlTypeHelper(descriptor: Descriptor): SchemaType = 
ScalaReflectionLock.synchronized {
+    
SchemaType(StructType(descriptor.getFields.asScala.flatMap(structFieldFor).toSeq),
+      nullable = true)
+  }
+
+  def structFieldFor(fd: FieldDescriptor): Option[StructField] = {
+    import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._
+    val dataType = fd.getJavaType match {
+      case INT => Some(IntegerType)
+      case LONG => Some(LongType)
+      case FLOAT => Some(FloatType)
+      case DOUBLE => Some(DoubleType)
+      case BOOLEAN => Some(BooleanType)
+      case STRING => Some(StringType)
+      case BYTE_STRING => Some(BinaryType)
+      case ENUM => Some(StringType)
+      case MESSAGE =>
+        
Option(fd.getMessageType.getFields.asScala.flatMap(structFieldFor).toSeq)

Review Comment:
   Look at how Avro does it. We can do the same here.



##########
connector/proto/src/main/scala/org/apache/spark/sql/proto/CatalystDataToProto.scala:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.proto
+
+import com.google.protobuf.DynamicMessage
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
+import org.apache.spark.sql.proto.utils.{ProtoUtils, SchemaConverters}
+import org.apache.spark.sql.types.{BinaryType, DataType}
+
+private[proto] case class CatalystDataToProto(
+                                               child: Expression,
+                                               descFilePath: Option[String],
+                                               messageName: Option[String])
+  extends UnaryExpression {
+
+  override def dataType: DataType = BinaryType
+
+  @transient private lazy val protoType = (descFilePath, messageName) match {
+      case (Some(a), Some(b)) => ProtoUtils.buildDescriptor(a, b)
+      case _ => SchemaConverters.toProtoType(child.dataType, child.nullable)

Review Comment:
   The actual arguments for these functions would be a bit different compared 
to Avro functions. It is ok to have the differences, we just need to document 
the functions well. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to