rangadi commented on code in PR #37972:
URL: https://github.com/apache/spark/pull/37972#discussion_r983737188


##########
connector/proto/src/main/scala/org/apache/spark/sql/proto/CatalystDataToProto.scala:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.proto

Review Comment:
   Regd naming: `to_proto` or `to_protobuf`?
   I was thinking the latter. Protobuf normally refers to the actual protobuf 
messages (serialized or not). These APIs deal with protobufs. _proto_ is more 
generic and refers to many things: proto files that define the protobuf 
schemas, etc. 
   IOW, `proto` is the schema/definition and `protobuf` is the instance of 
actual messages.  WDYT? 
   Of course naming is subjective.



##########
connector/proto/src/main/scala/org/apache/spark/sql/proto/CatalystDataToProto.scala:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.proto
+
+import com.google.protobuf.DynamicMessage
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
+import org.apache.spark.sql.proto.utils.{ProtoUtils, SchemaConverters}
+import org.apache.spark.sql.types.{BinaryType, DataType}
+
+private[proto] case class CatalystDataToProto(
+                                               child: Expression,
+                                               descFilePath: Option[String],
+                                               messageName: Option[String])
+  extends UnaryExpression {
+
+  override def dataType: DataType = BinaryType
+
+  @transient private lazy val protoType = (descFilePath, messageName) match {
+      case (Some(a), Some(b)) => ProtoUtils.buildDescriptor(a, b)
+      case _ => SchemaConverters.toProtoType(child.dataType, child.nullable)

Review Comment:
   What does it mean to serialize without a proto definition. How can we create 
a protobuf? How can consumers read it? Looks like it will be hard to ensure 
backward or forward compatibility (which is fundamental to protobufs).



##########
connector/proto/src/main/scala/org/apache/spark/sql/proto/ProtoDeserializer.scala:
##########
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.proto
+
+import com.google.protobuf.{ByteString, DynamicMessage}
+import com.google.protobuf.Descriptors._
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._
+
+import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, StructFilters}
+import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, 
UnsafeArrayData}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
+import org.apache.spark.sql.execution.datasources.DataSourceUtils
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
+import org.apache.spark.sql.proto.utils.ProtoUtils
+import org.apache.spark.sql.proto.utils.ProtoUtils.ProtoMatchedField
+import org.apache.spark.sql.proto.utils.ProtoUtils.toFieldStr
+import 
org.apache.spark.sql.proto.utils.SchemaConverters.IncompatibleSchemaException
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, 
ByteType, DataType,
+  DateType, Decimal, DoubleType, FloatType, IntegerType, LongType, NullType,
+  ShortType, StringType, StructType}
+import org.apache.spark.unsafe.types.UTF8String
+
+private[sql] class ProtoDeserializer(
+                                      rootProtoType: Descriptor,
+                                      rootCatalystType: DataType,
+                                      positionalFieldMatch: Boolean,
+                                      datetimeRebaseSpec: RebaseSpec,
+                                      filters: StructFilters) {
+
+  def this(
+            rootProtoType: Descriptor,
+            rootCatalystType: DataType,
+            datetimeRebaseMode: String) = {
+    this(
+      rootProtoType,
+      rootCatalystType,
+      positionalFieldMatch = false,
+      RebaseSpec(LegacyBehaviorPolicy.withName(datetimeRebaseMode)),
+      new NoopFilters)
+  }
+
+  private val dateRebaseFunc = DataSourceUtils.createDateRebaseFuncInRead(
+    datetimeRebaseSpec.mode, "Proto")
+
+  private val converter: Any => Option[Any] = try {
+    rootCatalystType match {
+      // A shortcut for empty schema.
+      case st: StructType if st.isEmpty =>
+        (_: Any) => Some(InternalRow.empty)
+
+      case st: StructType =>
+        val resultRow = new SpecificInternalRow(st.map(_.dataType))
+        val fieldUpdater = new RowUpdater(resultRow)
+        val applyFilters = filters.skipRow(resultRow, _)
+        val writer = getRecordWriter(rootProtoType, st, Nil, Nil, applyFilters)
+        (data: Any) => {
+          val record = data.asInstanceOf[DynamicMessage]
+          val skipRow = writer(fieldUpdater, record)
+          if (skipRow) None else Some(resultRow)
+        }
+    }
+  } catch {
+    case ise: IncompatibleSchemaException => throw new 
IncompatibleSchemaException(
+      s"Cannot convert Proto type ${rootProtoType.getName} " +
+        s"to SQL type ${rootCatalystType.sql}.", ise)
+  }
+
+  def deserialize(data: Any): Option[Any] = converter(data)
+
+  private def newArrayWriter(
+                              protoField: FieldDescriptor,
+                              protoPath: Seq[String],
+                              catalystPath: Seq[String],
+                              elementType: DataType,
+                              containsNull: Boolean): (CatalystDataUpdater, 
Int, Any) => Unit = {
+
+
+    val protoElementPath = protoPath :+ "element"
+    val elementWriter = newWriter(protoField, elementType,
+      protoElementPath, catalystPath :+ "element")
+    (updater, ordinal, value) =>
+      val collection = value.asInstanceOf[java.util.Collection[Any]]
+      val result = createArrayData(elementType, collection.size())
+      val elementUpdater = new ArrayDataUpdater(result)
+
+      var i = 0
+      val iterator = collection.iterator()
+      while (iterator.hasNext) {
+        val element = iterator.next()
+        if (element == null) {
+          if (!containsNull) {
+            throw new RuntimeException(
+              s"Array value at path ${toFieldStr(protoElementPath)} is not 
allowed to be null")
+          } else {
+            elementUpdater.setNullAt(i)
+          }
+        } else {
+          elementWriter(elementUpdater, i, element)
+        }
+        i += 1
+      }
+
+      updater.set(ordinal, result)
+  }
+
+  /**
+   * Creates a writer to write proto values to Catalyst values at the given 
ordinal with the given
+   * updater.
+   */
+  private def newWriter(
+                         protoType: FieldDescriptor,
+                         catalystType: DataType,
+                         protoPath: Seq[String],
+                         catalystPath: Seq[String]): (CatalystDataUpdater, 
Int, Any) => Unit = {
+    val errorPrefix = s"Cannot convert Proto ${toFieldStr(protoPath)} to " +
+      s"SQL ${toFieldStr(catalystPath)} because "
+    val incompatibleMsg = errorPrefix +
+      s"schema is incompatible (protoType = ${protoType} 
${protoType.toProto.getLabel} " +
+      s"${protoType.getJavaType} ${protoType.getType}, sqlType = 
${catalystType.sql})"
+
+    (protoType.getJavaType, catalystType) match {
+
+      case (null, NullType) => (updater, ordinal, _) =>
+        updater.setNullAt(ordinal)
+
+      // TODO: we can avoid boxing if future version of proto provide 
primitive accessors.
+      case (BOOLEAN, BooleanType) => (updater, ordinal, value) =>
+        updater.setBoolean(ordinal, value.asInstanceOf[Boolean])
+
+      case (BOOLEAN, ArrayType(BooleanType, containsNull)) =>
+        newArrayWriter(protoType, protoPath,
+          catalystPath, BooleanType, containsNull)
+
+      case (INT, IntegerType) => (updater, ordinal, value) =>
+        updater.setInt(ordinal, value.asInstanceOf[Int])
+
+      case (INT, ArrayType(IntegerType, containsNull)) =>
+        newArrayWriter(protoType, protoPath,
+          catalystPath, IntegerType, containsNull)
+
+      case (INT, DateType) => (updater, ordinal, value) =>
+        updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
+
+      case (LONG, LongType) => (updater, ordinal, value) =>
+        updater.setLong(ordinal, value.asInstanceOf[Long])
+
+      case (LONG, ArrayType(LongType, containsNull)) =>
+        newArrayWriter(protoType, protoPath,
+          catalystPath, LongType, containsNull)
+
+      case (FLOAT, FloatType) => (updater, ordinal, value) =>
+        updater.setFloat(ordinal, value.asInstanceOf[Float])
+
+      case (FLOAT, ArrayType(FloatType, containsNull)) =>
+        newArrayWriter(protoType, protoPath,
+          catalystPath, FloatType, containsNull)
+
+      case (DOUBLE, DoubleType) => (updater, ordinal, value) =>
+        updater.setDouble(ordinal, value.asInstanceOf[Double])
+
+      case (DOUBLE, ArrayType(DoubleType, containsNull)) =>
+        newArrayWriter(protoType, protoPath,
+          catalystPath, DoubleType, containsNull)
+
+      case (STRING, StringType) => (updater, ordinal, value) =>
+        val str = value match {
+          case s: String => UTF8String.fromString(s)
+        }
+        updater.set(ordinal, str)
+
+      case (STRING, ArrayType(StringType, containsNull)) =>
+        newArrayWriter(protoType, protoPath,
+          catalystPath, StringType, containsNull)
+
+      case (BYTE_STRING, BinaryType) => (updater, ordinal, value) =>
+        val byte_array = value match {
+          case s: ByteString => s.toByteArray
+          case _ => throw new Exception("Invalid ByteString format")
+        }
+        updater.set(ordinal, byte_array)
+
+      case (BYTE_STRING, ArrayType(BinaryType, containsNull)) =>
+        newArrayWriter(protoType, protoPath,
+          catalystPath, BinaryType, containsNull)
+
+      case (MESSAGE, st: StructType) =>
+        val writeRecord = getRecordWriter(protoType.getMessageType, st, 
protoPath,
+          catalystPath, applyFilters = _ => false)
+        (updater, ordinal, value) =>
+          val row = new SpecificInternalRow(st)
+          writeRecord(new RowUpdater(row), value.asInstanceOf[DynamicMessage])
+          updater.set(ordinal, row)
+
+      case (MESSAGE, ArrayType(st: StructType, containsNull)) =>
+        newArrayWriter(protoType, protoPath,
+          catalystPath, st, containsNull)
+
+      case (ENUM, StringType) => (updater, ordinal, value) =>
+        updater.set(ordinal, UTF8String.fromString(value.toString))
+
+      case (ENUM, ArrayType(StringType, containsNull)) =>
+        newArrayWriter(protoType, protoPath,
+          catalystPath, StringType, containsNull)
+
+      case _ => throw new IncompatibleSchemaException(incompatibleMsg)
+    }
+  }
+
+
+  private def getRecordWriter(
+                               protoType: Descriptor,
+                               catalystType: StructType,
+                               protoPath: Seq[String],
+                               catalystPath: Seq[String],
+                               applyFilters: Int => Boolean):
+  (CatalystDataUpdater, DynamicMessage) => Boolean = {
+
+    val protoSchemaHelper = new ProtoUtils.ProtoSchemaHelper(
+      protoType, catalystType, protoPath, catalystPath, positionalFieldMatch)
+
+    protoSchemaHelper.validateNoExtraCatalystFields(ignoreNullable = true)
+    // no need to validateNoExtraProtoFields since extra Proto fields are 
ignored
+
+    val (validFieldIndexes, fieldWriters) = 
protoSchemaHelper.matchedFields.map {
+      case ProtoMatchedField(catalystField, ordinal, protoField) =>
+        val baseWriter = newWriter(protoField, catalystField.dataType,
+          protoPath :+ protoField.getName, catalystPath :+ catalystField.name)
+        val fieldWriter = (fieldUpdater: CatalystDataUpdater, value: Any) => {
+          if (value == null) {
+            fieldUpdater.setNullAt(ordinal)
+          } else {
+            baseWriter(fieldUpdater, ordinal, value)
+          }
+        }
+        (protoField, fieldWriter)
+    }.toArray.unzip
+
+    (fieldUpdater, record) => {
+      var i = 0
+      var skipRow = false
+      while (i < validFieldIndexes.length && !skipRow) {
+        fieldWriters(i)(fieldUpdater, record.getField(validFieldIndexes(i)))
+        skipRow = applyFilters(i)
+        i += 1
+      }
+      skipRow
+    }
+  }
+
+  private def createArrayData(elementType: DataType, length: Int): ArrayData = 
elementType match {
+    case BooleanType => UnsafeArrayData.fromPrimitiveArray(new 
Array[Boolean](length))
+    case ByteType => UnsafeArrayData.fromPrimitiveArray(new 
Array[Byte](length))
+    case ShortType => UnsafeArrayData.fromPrimitiveArray(new 
Array[Short](length))
+    case IntegerType => UnsafeArrayData.fromPrimitiveArray(new 
Array[Int](length))
+    case LongType => UnsafeArrayData.fromPrimitiveArray(new 
Array[Long](length))
+    case FloatType => UnsafeArrayData.fromPrimitiveArray(new 
Array[Float](length))
+    case DoubleType => UnsafeArrayData.fromPrimitiveArray(new 
Array[Double](length))
+    case _ => new GenericArrayData(new Array[Any](length))
+  }
+
+  /**
+   * A base interface for updating values inside catalyst data structure like 
`InternalRow` and
+   * `ArrayData`.
+   */
+  sealed trait CatalystDataUpdater {

Review Comment:
   Could you leave a TODO comment here to share these classes with Avro support?



##########
connector/proto/src/main/scala/org/apache/spark/sql/proto/utils/DynamicSchema.scala:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.proto.utils
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.util.control.Breaks.{break, breakable}
+
+import com.google.protobuf.DescriptorProtos.{FileDescriptorProto, 
FileDescriptorSet}
+import com.google.protobuf.Descriptors.{Descriptor, FileDescriptor}
+
+class DynamicSchema {
+  var fileDescSet: FileDescriptorSet = null

Review Comment:
   Many fields in this class can be `private` I think.



##########
connector/proto/src/main/scala/org/apache/spark/sql/proto/utils/SchemaConverters.scala:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.proto.utils
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor}
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.proto.ScalaReflectionLock
+import org.apache.spark.sql.types._
+
+@DeveloperApi
+object SchemaConverters {
+  /**
+   * Internal wrapper for SQL data type and nullability.
+   *
+   * @since 3.4.0
+   */
+  case class SchemaType(dataType: DataType, nullable: Boolean)
+
+  /**
+   * Converts an Proto schema to a corresponding Spark SQL schema.
+   *
+   * @since 3.4.0
+   */
+  def toSqlType(protoSchema: Descriptor): SchemaType = {
+    toSqlTypeHelper(protoSchema)
+  }
+
+  def toSqlTypeHelper(descriptor: Descriptor): SchemaType = 
ScalaReflectionLock.synchronized {
+    
SchemaType(StructType(descriptor.getFields.asScala.flatMap(structFieldFor).toSeq),
+      nullable = true)
+  }
+
+  def structFieldFor(fd: FieldDescriptor): Option[StructField] = {
+    import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._
+    val dataType = fd.getJavaType match {
+      case INT => Some(IntegerType)
+      case LONG => Some(LongType)
+      case FLOAT => Some(FloatType)
+      case DOUBLE => Some(DoubleType)
+      case BOOLEAN => Some(BooleanType)
+      case STRING => Some(StringType)
+      case BYTE_STRING => Some(BinaryType)
+      case ENUM => Some(StringType)
+      case MESSAGE =>
+        
Option(fd.getMessageType.getFields.asScala.flatMap(structFieldFor).toSeq)

Review Comment:
   We need to detect recursive schemas and error out. Otherwise, this will 
recurse without limit. 



##########
connector/proto/src/main/scala/org/apache/spark/sql/proto/utils/SchemaConverters.scala:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.proto.utils
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor}
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.proto.ScalaReflectionLock
+import org.apache.spark.sql.types._
+
+@DeveloperApi
+object SchemaConverters {
+  /**
+   * Internal wrapper for SQL data type and nullability.
+   *
+   * @since 3.4.0
+   */
+  case class SchemaType(dataType: DataType, nullable: Boolean)
+
+  /**
+   * Converts an Proto schema to a corresponding Spark SQL schema.
+   *
+   * @since 3.4.0
+   */
+  def toSqlType(protoSchema: Descriptor): SchemaType = {
+    toSqlTypeHelper(protoSchema)
+  }
+
+  def toSqlTypeHelper(descriptor: Descriptor): SchemaType = 
ScalaReflectionLock.synchronized {
+    
SchemaType(StructType(descriptor.getFields.asScala.flatMap(structFieldFor).toSeq),
+      nullable = true)
+  }
+
+  def structFieldFor(fd: FieldDescriptor): Option[StructField] = {

Review Comment:
   When would this return `None`? 



##########
connector/proto/src/main/scala/org/apache/spark/sql/proto/ProtoDeserializer.scala:
##########
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.proto
+
+import com.google.protobuf.{ByteString, DynamicMessage}
+import com.google.protobuf.Descriptors._
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._
+
+import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, StructFilters}
+import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, 
UnsafeArrayData}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
+import org.apache.spark.sql.execution.datasources.DataSourceUtils
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
+import org.apache.spark.sql.proto.utils.ProtoUtils
+import org.apache.spark.sql.proto.utils.ProtoUtils.ProtoMatchedField
+import org.apache.spark.sql.proto.utils.ProtoUtils.toFieldStr
+import 
org.apache.spark.sql.proto.utils.SchemaConverters.IncompatibleSchemaException
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, 
ByteType, DataType,
+  DateType, Decimal, DoubleType, FloatType, IntegerType, LongType, NullType,
+  ShortType, StringType, StructType}
+import org.apache.spark.unsafe.types.UTF8String
+
+private[sql] class ProtoDeserializer(
+                                      rootProtoType: Descriptor,
+                                      rootCatalystType: DataType,
+                                      positionalFieldMatch: Boolean,
+                                      datetimeRebaseSpec: RebaseSpec,

Review Comment:
   Are these relevant here? Are these Avro specific or make sense here too?



##########
connector/proto/src/main/scala/org/apache/spark/sql/proto/functions.scala:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.proto
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.Column
+
+// scalastyle:off: object.name
+object functions {
+// scalastyle:on: object.name
+
+  /**
+   * Converts a binary column of Proto format into its corresponding catalyst 
value.

Review Comment:
   I think better to call this `Protobuf format` 



##########
connector/proto/src/main/scala/org/apache/spark/sql/proto/ProtoDataToCatalyst.scala:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.proto
+
+import java.io.ByteArrayInputStream
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.DynamicMessage
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression,
+  SpecificInternalRow, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
CodeGenerator, ExprCode}
+import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, 
PermissiveMode}
+import org.apache.spark.sql.proto.utils.{ProtoOptions, ProtoUtils, 
SchemaConverters}
+import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, 
StructType}
+
+private[proto] case class ProtoDataToCatalyst(child: Expression, descFilePath: 
String,
+                                              messageName: String,
+                                              options: Map[String, String])
+  extends UnaryExpression with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
+
+  override lazy val dataType: DataType = {
+    val dt = SchemaConverters.toSqlType(expectedSchema).dataType
+    parseMode match {
+      // With PermissiveMode, the output Catalyst row might contain columns of 
null values for
+      // corrupt records, even if some of the columns are not nullable in the 
user-provided schema.
+      // Therefore we force the schema to be all nullable here.
+      case PermissiveMode => dt.asNullable
+      case _ => dt
+    }
+  }
+
+  override def nullable: Boolean = true
+
+  private lazy val protoOptions = ProtoOptions(options)
+
+  @transient private lazy val descriptor = 
ProtoUtils.buildDescriptor(descFilePath, messageName)
+
+  @transient private lazy val expectedSchema = 
protoOptions.schema.getOrElse(descriptor)
+
+  @transient private lazy val deserializer = new 
ProtoDeserializer(expectedSchema, dataType,
+    protoOptions.datetimeRebaseModeInRead)
+
+  @transient private var result: Any = _
+
+  @transient private lazy val parseMode: ParseMode = {
+    val mode = protoOptions.parseMode
+    if (mode != PermissiveMode && mode != FailFastMode) {
+      throw new AnalysisException(unacceptableModeMessage(mode.name))
+    }
+    mode
+  }
+
+  private def unacceptableModeMessage(name: String): String = {
+    s"from_proto() doesn't support the $name mode. " +
+      s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}."
+  }
+
+  @transient private lazy val nullResultRow: Any = dataType match {
+    case st: StructType =>
+      val resultRow = new SpecificInternalRow(st.map(_.dataType))
+      for (i <- 0 until st.length) {
+        resultRow.setNullAt(i)
+      }
+      resultRow
+
+    case _ =>
+      null
+  }
+
+  private def handleException(e: Throwable): Any = {
+    parseMode match {
+      case PermissiveMode =>
+        nullResultRow
+      case FailFastMode =>
+        throw new SparkException("Malformed records are detected in record 
parsing. " +
+          s"Current parse Mode: ${FailFastMode.name}. To process malformed 
records as null " +
+          "result, try setting the option 'mode' as 'PERMISSIVE'.", e)
+      case _ =>
+        throw new AnalysisException(unacceptableModeMessage(parseMode.name))
+    }
+  }
+
+  override def nullSafeEval(input: Any): Any = {
+    val binary = input.asInstanceOf[Array[Byte]]
+    try {
+      result = DynamicMessage.parseFrom(descriptor, new 
ByteArrayInputStream(binary))
+      val unknownFields = result.asInstanceOf[DynamicMessage].getUnknownFields
+      if (!unknownFields.asMap().isEmpty) {
+        return handleException(new Throwable("UnknownFields encountered"))

Review Comment:
   What is the reason for not allowing Unknown fields? This would break forward 
compatibility right? E.g. the input might have newer versions of protobuf, but 
the Spark application might be using older version. That is not an incorrect 
use. The spark job might not needed newer fields.



##########
connector/proto/pom.xml:
##########
@@ -0,0 +1,119 @@
+<?xml version="1.0" encoding="UTF-8"?>

Review Comment:
   Some top-level project related questions:
   
   * (A) Do you plan to add Python support? 
       * The approach using descriptor files here keeps Python support simpler 
I think.
       * Overwhelming majority of the applications would be written in python.  
   * (B) Do you plan to add scheme-registry support similar to how Avro does it?
       *  This is also a common use case for  Spark users.
   * (C) Are maps supported?
        * It is also a common use case. 
   * (D) Is there a restriction on protobuf v2 vs v3? Can users provide in 
either version?



##########
connector/proto/src/main/scala/org/apache/spark/sql/proto/ProtoDataToCatalyst.scala:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.proto
+
+import java.io.ByteArrayInputStream
+
+import scala.util.control.NonFatal
+
+import com.google.protobuf.DynamicMessage
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression,
+  SpecificInternalRow, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
CodeGenerator, ExprCode}
+import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, 
PermissiveMode}
+import org.apache.spark.sql.proto.utils.{ProtoOptions, ProtoUtils, 
SchemaConverters}
+import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, 
StructType}
+
+private[proto] case class ProtoDataToCatalyst(child: Expression, descFilePath: 
String,
+                                              messageName: String,
+                                              options: Map[String, String])
+  extends UnaryExpression with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
+
+  override lazy val dataType: DataType = {
+    val dt = SchemaConverters.toSqlType(expectedSchema).dataType
+    parseMode match {
+      // With PermissiveMode, the output Catalyst row might contain columns of 
null values for
+      // corrupt records, even if some of the columns are not nullable in the 
user-provided schema.
+      // Therefore we force the schema to be all nullable here.
+      case PermissiveMode => dt.asNullable
+      case _ => dt
+    }
+  }
+
+  override def nullable: Boolean = true
+
+  private lazy val protoOptions = ProtoOptions(options)
+
+  @transient private lazy val descriptor = 
ProtoUtils.buildDescriptor(descFilePath, messageName)
+
+  @transient private lazy val expectedSchema = 
protoOptions.schema.getOrElse(descriptor)
+
+  @transient private lazy val deserializer = new 
ProtoDeserializer(expectedSchema, dataType,
+    protoOptions.datetimeRebaseModeInRead)
+
+  @transient private var result: Any = _
+
+  @transient private lazy val parseMode: ParseMode = {
+    val mode = protoOptions.parseMode
+    if (mode != PermissiveMode && mode != FailFastMode) {
+      throw new AnalysisException(unacceptableModeMessage(mode.name))
+    }
+    mode
+  }
+
+  private def unacceptableModeMessage(name: String): String = {
+    s"from_proto() doesn't support the $name mode. " +
+      s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}."
+  }
+
+  @transient private lazy val nullResultRow: Any = dataType match {
+    case st: StructType =>
+      val resultRow = new SpecificInternalRow(st.map(_.dataType))
+      for (i <- 0 until st.length) {
+        resultRow.setNullAt(i)
+      }
+      resultRow
+
+    case _ =>
+      null
+  }
+
+  private def handleException(e: Throwable): Any = {
+    parseMode match {
+      case PermissiveMode =>
+        nullResultRow
+      case FailFastMode =>
+        throw new SparkException("Malformed records are detected in record 
parsing. " +
+          s"Current parse Mode: ${FailFastMode.name}. To process malformed 
records as null " +
+          "result, try setting the option 'mode' as 'PERMISSIVE'.", e)
+      case _ =>
+        throw new AnalysisException(unacceptableModeMessage(parseMode.name))
+    }
+  }
+
+  override def nullSafeEval(input: Any): Any = {
+    val binary = input.asInstanceOf[Array[Byte]]
+    try {
+      result = DynamicMessage.parseFrom(descriptor, new 
ByteArrayInputStream(binary))

Review Comment:
   minor: can pass `binary` directly to `parseFrom()` (less code). 
   
   Is there a benefit to creating the builder once and reuse here? 
builder.mergeFrom()? Probably not. 



##########
connector/proto/src/main/scala/org/apache/spark/sql/proto/ProtoDeserializer.scala:
##########
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.proto
+
+import com.google.protobuf.{ByteString, DynamicMessage}
+import com.google.protobuf.Descriptors._
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._
+
+import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, StructFilters}
+import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, 
UnsafeArrayData}
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
+import org.apache.spark.sql.execution.datasources.DataSourceUtils
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
+import org.apache.spark.sql.proto.utils.ProtoUtils
+import org.apache.spark.sql.proto.utils.ProtoUtils.ProtoMatchedField
+import org.apache.spark.sql.proto.utils.ProtoUtils.toFieldStr
+import 
org.apache.spark.sql.proto.utils.SchemaConverters.IncompatibleSchemaException
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, 
ByteType, DataType,
+  DateType, Decimal, DoubleType, FloatType, IntegerType, LongType, NullType,
+  ShortType, StringType, StructType}
+import org.apache.spark.unsafe.types.UTF8String
+
+private[sql] class ProtoDeserializer(
+                                      rootProtoType: Descriptor,
+                                      rootCatalystType: DataType,
+                                      positionalFieldMatch: Boolean,
+                                      datetimeRebaseSpec: RebaseSpec,
+                                      filters: StructFilters) {
+
+  def this(
+            rootProtoType: Descriptor,
+            rootCatalystType: DataType,
+            datetimeRebaseMode: String) = {
+    this(
+      rootProtoType,
+      rootCatalystType,
+      positionalFieldMatch = false,
+      RebaseSpec(LegacyBehaviorPolicy.withName(datetimeRebaseMode)),
+      new NoopFilters)
+  }
+
+  private val dateRebaseFunc = DataSourceUtils.createDateRebaseFuncInRead(
+    datetimeRebaseSpec.mode, "Proto")
+
+  private val converter: Any => Option[Any] = try {
+    rootCatalystType match {
+      // A shortcut for empty schema.
+      case st: StructType if st.isEmpty =>
+        (_: Any) => Some(InternalRow.empty)
+
+      case st: StructType =>
+        val resultRow = new SpecificInternalRow(st.map(_.dataType))
+        val fieldUpdater = new RowUpdater(resultRow)
+        val applyFilters = filters.skipRow(resultRow, _)
+        val writer = getRecordWriter(rootProtoType, st, Nil, Nil, applyFilters)
+        (data: Any) => {
+          val record = data.asInstanceOf[DynamicMessage]
+          val skipRow = writer(fieldUpdater, record)
+          if (skipRow) None else Some(resultRow)
+        }
+    }
+  } catch {
+    case ise: IncompatibleSchemaException => throw new 
IncompatibleSchemaException(
+      s"Cannot convert Proto type ${rootProtoType.getName} " +
+        s"to SQL type ${rootCatalystType.sql}.", ise)
+  }
+
+  def deserialize(data: Any): Option[Any] = converter(data)
+
+  private def newArrayWriter(
+                              protoField: FieldDescriptor,
+                              protoPath: Seq[String],
+                              catalystPath: Seq[String],
+                              elementType: DataType,
+                              containsNull: Boolean): (CatalystDataUpdater, 
Int, Any) => Unit = {
+
+
+    val protoElementPath = protoPath :+ "element"
+    val elementWriter = newWriter(protoField, elementType,
+      protoElementPath, catalystPath :+ "element")
+    (updater, ordinal, value) =>
+      val collection = value.asInstanceOf[java.util.Collection[Any]]
+      val result = createArrayData(elementType, collection.size())
+      val elementUpdater = new ArrayDataUpdater(result)
+
+      var i = 0
+      val iterator = collection.iterator()
+      while (iterator.hasNext) {
+        val element = iterator.next()
+        if (element == null) {
+          if (!containsNull) {
+            throw new RuntimeException(
+              s"Array value at path ${toFieldStr(protoElementPath)} is not 
allowed to be null")
+          } else {
+            elementUpdater.setNullAt(i)
+          }
+        } else {
+          elementWriter(elementUpdater, i, element)
+        }
+        i += 1
+      }
+
+      updater.set(ordinal, result)
+  }
+
+  /**
+   * Creates a writer to write proto values to Catalyst values at the given 
ordinal with the given
+   * updater.
+   */
+  private def newWriter(
+                         protoType: FieldDescriptor,
+                         catalystType: DataType,
+                         protoPath: Seq[String],
+                         catalystPath: Seq[String]): (CatalystDataUpdater, 
Int, Any) => Unit = {
+    val errorPrefix = s"Cannot convert Proto ${toFieldStr(protoPath)} to " +
+      s"SQL ${toFieldStr(catalystPath)} because "
+    val incompatibleMsg = errorPrefix +
+      s"schema is incompatible (protoType = ${protoType} 
${protoType.toProto.getLabel} " +
+      s"${protoType.getJavaType} ${protoType.getType}, sqlType = 
${catalystType.sql})"
+
+    (protoType.getJavaType, catalystType) match {
+
+      case (null, NullType) => (updater, ordinal, _) =>
+        updater.setNullAt(ordinal)
+
+      // TODO: we can avoid boxing if future version of proto provide 
primitive accessors.
+      case (BOOLEAN, BooleanType) => (updater, ordinal, value) =>
+        updater.setBoolean(ordinal, value.asInstanceOf[Boolean])
+
+      case (BOOLEAN, ArrayType(BooleanType, containsNull)) =>

Review Comment:
   Where do we verify that `protoType` is repeated? 



##########
connector/proto/src/main/scala/org/apache/spark/sql/proto/utils/DynamicSchema.scala:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.proto.utils
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.util.control.Breaks.{break, breakable}
+
+import com.google.protobuf.DescriptorProtos.{FileDescriptorProto, 
FileDescriptorSet}
+import com.google.protobuf.Descriptors.{Descriptor, FileDescriptor}
+
+class DynamicSchema {

Review Comment:
   Add doc comment.



##########
connector/proto/src/main/scala/org/apache/spark/sql/proto/utils/MessageDefinition.scala:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.proto.utils
+
+import com.google.protobuf.DescriptorProtos.{DescriptorProto, 
FieldDescriptorProto}
+
+class MessageDefinition(val messageType: DescriptorProto = null) {

Review Comment:
   Add doc comment.



##########
connector/proto/src/main/scala/org/apache/spark/sql/proto/utils/ProtoUtils.scala:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.proto.utils
+
+import java.io.{BufferedInputStream, FileInputStream, FileNotFoundException, 
IOException}
+import java.util.Locale
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{DescriptorProtos, Descriptors, 
InvalidProtocolBufferException}
+import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor}
+import org.apache.hadoop.fs.FileStatus
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.FileSourceOptions
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.proto.utils.ProtoOptions.ignoreExtensionKey
+import 
org.apache.spark.sql.proto.utils.SchemaConverters.IncompatibleSchemaException
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+private[sql] object ProtoUtils extends Logging {
+
+  def inferSchema(
+                   spark: SparkSession,
+                   options: Map[String, String],
+                   files: Seq[FileStatus]): Option[StructType] = {
+    val conf = spark.sessionState.newHadoopConfWithOptions(options)
+    val parsedOptions = new ProtoOptions(options, conf)
+
+    if (parsedOptions.parameters.contains(ignoreExtensionKey)) {
+      logWarning(s"Option $ignoreExtensionKey is deprecated. Please use the " +
+        "general data source option pathGlobFilter for filtering file names.")
+    }
+    // User can specify an optional proto json schema.
+    val protoSchema = parsedOptions.schema
+      .getOrElse {
+        inferProtoSchemaFromFiles(files,
+          new 
FileSourceOptions(CaseInsensitiveMap(options)).ignoreCorruptFiles)
+      }
+
+    SchemaConverters.toSqlType(protoSchema).dataType match {
+      case t: StructType => Some(t)
+      case _ => throw new RuntimeException(
+        s"""Proto schema cannot be converted to a Spark SQL StructType:
+           |
+           |${protoSchema.toString()}
+           |""".stripMargin)
+    }
+  }
+
+  private def inferProtoSchemaFromFiles(
+                                         files: Seq[FileStatus],
+                                         ignoreCorruptFiles: Boolean): 
Descriptor = {
+    // Schema evolution is not supported yet. Here we only pick first random 
readable sample file to
+    // figure out the schema of the whole dataset.
+    val protoReader = files.iterator.map { f =>
+      val path = f.getPath
+      if (!path.getName.endsWith(".pb")) {
+        None
+      } else {
+        Utils.tryWithResource {
+          new FileInputStream("saved_model.pb")

Review Comment:
   What format are these files in? Could you expand on the use case?



##########
connector/proto/src/main/scala/org/apache/spark/sql/proto/utils/ProtoUtils.scala:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.proto.utils
+
+import java.io.{BufferedInputStream, FileInputStream, FileNotFoundException, 
IOException}
+import java.util.Locale
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{DescriptorProtos, Descriptors, 
InvalidProtocolBufferException}
+import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor}
+import org.apache.hadoop.fs.FileStatus
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.FileSourceOptions
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.proto.utils.ProtoOptions.ignoreExtensionKey
+import 
org.apache.spark.sql.proto.utils.SchemaConverters.IncompatibleSchemaException
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+private[sql] object ProtoUtils extends Logging {
+
+  def inferSchema(
+                   spark: SparkSession,
+                   options: Map[String, String],
+                   files: Seq[FileStatus]): Option[StructType] = {
+    val conf = spark.sessionState.newHadoopConfWithOptions(options)
+    val parsedOptions = new ProtoOptions(options, conf)
+
+    if (parsedOptions.parameters.contains(ignoreExtensionKey)) {
+      logWarning(s"Option $ignoreExtensionKey is deprecated. Please use the " +
+        "general data source option pathGlobFilter for filtering file names.")
+    }
+    // User can specify an optional proto json schema.
+    val protoSchema = parsedOptions.schema
+      .getOrElse {
+        inferProtoSchemaFromFiles(files,
+          new 
FileSourceOptions(CaseInsensitiveMap(options)).ignoreCorruptFiles)
+      }
+
+    SchemaConverters.toSqlType(protoSchema).dataType match {
+      case t: StructType => Some(t)
+      case _ => throw new RuntimeException(
+        s"""Proto schema cannot be converted to a Spark SQL StructType:
+           |
+           |${protoSchema.toString()}
+           |""".stripMargin)
+    }
+  }
+
+  private def inferProtoSchemaFromFiles(
+                                         files: Seq[FileStatus],
+                                         ignoreCorruptFiles: Boolean): 
Descriptor = {
+    // Schema evolution is not supported yet. Here we only pick first random 
readable sample file to
+    // figure out the schema of the whole dataset.
+    val protoReader = files.iterator.map { f =>
+      val path = f.getPath
+      if (!path.getName.endsWith(".pb")) {
+        None
+      } else {
+        Utils.tryWithResource {
+          new FileInputStream("saved_model.pb")
+        } { in =>
+          try {
+            
Some(DescriptorProtos.DescriptorProto.parseFrom(in).getDescriptorForType)
+          } catch {
+            case e: IOException =>
+              if (ignoreCorruptFiles) {
+                logWarning(s"Skipped the footer in the corrupted file: $path", 
e)
+                None
+              } else {
+                throw new SparkException(s"Could not read file: $path", e)
+              }
+          }
+        }
+      }
+    }.collectFirst {
+      case Some(reader) => reader
+    }
+
+    protoReader match {
+      case Some(reader) =>
+        reader.getContainingType
+      case None =>
+        throw new FileNotFoundException(
+          "No Proto files found. If files don't have .proto extension, set 
ignoreExtension to true")
+    }
+  }
+
+  def supportsDataType(dataType: DataType): Boolean = dataType match {
+    case _: AtomicType => true
+
+    case st: StructType => st.forall { f => supportsDataType(f.dataType) }
+
+    case ArrayType(elementType, _) => supportsDataType(elementType)
+
+    case MapType(keyType, valueType, _) =>

Review Comment:
   Do we support maps yet?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to