rangadi commented on code in PR #37972: URL: https://github.com/apache/spark/pull/37972#discussion_r983737188
########## connector/proto/src/main/scala/org/apache/spark/sql/proto/CatalystDataToProto.scala: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.proto Review Comment: Regd naming: `to_proto` or `to_protobuf`? I was thinking the latter. Protobuf normally refers to the actual protobuf messages (serialized or not). These APIs deal with protobufs. _proto_ is more generic and refers to many things: proto files that define the protobuf schemas, etc. IOW, `proto` is the schema/definition and `protobuf` is the instance of actual messages. WDYT? Of course naming is subjective. ########## connector/proto/src/main/scala/org/apache/spark/sql/proto/CatalystDataToProto.scala: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.proto + +import com.google.protobuf.DynamicMessage + +import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.proto.utils.{ProtoUtils, SchemaConverters} +import org.apache.spark.sql.types.{BinaryType, DataType} + +private[proto] case class CatalystDataToProto( + child: Expression, + descFilePath: Option[String], + messageName: Option[String]) + extends UnaryExpression { + + override def dataType: DataType = BinaryType + + @transient private lazy val protoType = (descFilePath, messageName) match { + case (Some(a), Some(b)) => ProtoUtils.buildDescriptor(a, b) + case _ => SchemaConverters.toProtoType(child.dataType, child.nullable) Review Comment: What does it mean to serialize without a proto definition. How can we create a protobuf? How can consumers read it? Looks like it will be hard to ensure backward or forward compatibility (which is fundamental to protobufs). ########## connector/proto/src/main/scala/org/apache/spark/sql/proto/ProtoDeserializer.scala: ########## @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.proto + +import com.google.protobuf.{ByteString, DynamicMessage} +import com.google.protobuf.Descriptors._ +import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._ + +import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, StructFilters} +import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec +import org.apache.spark.sql.execution.datasources.DataSourceUtils +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy +import org.apache.spark.sql.proto.utils.ProtoUtils +import org.apache.spark.sql.proto.utils.ProtoUtils.ProtoMatchedField +import org.apache.spark.sql.proto.utils.ProtoUtils.toFieldStr +import org.apache.spark.sql.proto.utils.SchemaConverters.IncompatibleSchemaException +import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, + DateType, Decimal, DoubleType, FloatType, IntegerType, LongType, NullType, + ShortType, StringType, StructType} +import org.apache.spark.unsafe.types.UTF8String + +private[sql] class ProtoDeserializer( + rootProtoType: Descriptor, + rootCatalystType: DataType, + positionalFieldMatch: Boolean, + datetimeRebaseSpec: RebaseSpec, + filters: StructFilters) { + + def this( + rootProtoType: Descriptor, + rootCatalystType: DataType, + datetimeRebaseMode: String) = { + this( + rootProtoType, + rootCatalystType, + positionalFieldMatch = false, + RebaseSpec(LegacyBehaviorPolicy.withName(datetimeRebaseMode)), + new NoopFilters) + } + + private val dateRebaseFunc = DataSourceUtils.createDateRebaseFuncInRead( + datetimeRebaseSpec.mode, "Proto") + + private val converter: Any => Option[Any] = try { + rootCatalystType match { + // A shortcut for empty schema. + case st: StructType if st.isEmpty => + (_: Any) => Some(InternalRow.empty) + + case st: StructType => + val resultRow = new SpecificInternalRow(st.map(_.dataType)) + val fieldUpdater = new RowUpdater(resultRow) + val applyFilters = filters.skipRow(resultRow, _) + val writer = getRecordWriter(rootProtoType, st, Nil, Nil, applyFilters) + (data: Any) => { + val record = data.asInstanceOf[DynamicMessage] + val skipRow = writer(fieldUpdater, record) + if (skipRow) None else Some(resultRow) + } + } + } catch { + case ise: IncompatibleSchemaException => throw new IncompatibleSchemaException( + s"Cannot convert Proto type ${rootProtoType.getName} " + + s"to SQL type ${rootCatalystType.sql}.", ise) + } + + def deserialize(data: Any): Option[Any] = converter(data) + + private def newArrayWriter( + protoField: FieldDescriptor, + protoPath: Seq[String], + catalystPath: Seq[String], + elementType: DataType, + containsNull: Boolean): (CatalystDataUpdater, Int, Any) => Unit = { + + + val protoElementPath = protoPath :+ "element" + val elementWriter = newWriter(protoField, elementType, + protoElementPath, catalystPath :+ "element") + (updater, ordinal, value) => + val collection = value.asInstanceOf[java.util.Collection[Any]] + val result = createArrayData(elementType, collection.size()) + val elementUpdater = new ArrayDataUpdater(result) + + var i = 0 + val iterator = collection.iterator() + while (iterator.hasNext) { + val element = iterator.next() + if (element == null) { + if (!containsNull) { + throw new RuntimeException( + s"Array value at path ${toFieldStr(protoElementPath)} is not allowed to be null") + } else { + elementUpdater.setNullAt(i) + } + } else { + elementWriter(elementUpdater, i, element) + } + i += 1 + } + + updater.set(ordinal, result) + } + + /** + * Creates a writer to write proto values to Catalyst values at the given ordinal with the given + * updater. + */ + private def newWriter( + protoType: FieldDescriptor, + catalystType: DataType, + protoPath: Seq[String], + catalystPath: Seq[String]): (CatalystDataUpdater, Int, Any) => Unit = { + val errorPrefix = s"Cannot convert Proto ${toFieldStr(protoPath)} to " + + s"SQL ${toFieldStr(catalystPath)} because " + val incompatibleMsg = errorPrefix + + s"schema is incompatible (protoType = ${protoType} ${protoType.toProto.getLabel} " + + s"${protoType.getJavaType} ${protoType.getType}, sqlType = ${catalystType.sql})" + + (protoType.getJavaType, catalystType) match { + + case (null, NullType) => (updater, ordinal, _) => + updater.setNullAt(ordinal) + + // TODO: we can avoid boxing if future version of proto provide primitive accessors. + case (BOOLEAN, BooleanType) => (updater, ordinal, value) => + updater.setBoolean(ordinal, value.asInstanceOf[Boolean]) + + case (BOOLEAN, ArrayType(BooleanType, containsNull)) => + newArrayWriter(protoType, protoPath, + catalystPath, BooleanType, containsNull) + + case (INT, IntegerType) => (updater, ordinal, value) => + updater.setInt(ordinal, value.asInstanceOf[Int]) + + case (INT, ArrayType(IntegerType, containsNull)) => + newArrayWriter(protoType, protoPath, + catalystPath, IntegerType, containsNull) + + case (INT, DateType) => (updater, ordinal, value) => + updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int])) + + case (LONG, LongType) => (updater, ordinal, value) => + updater.setLong(ordinal, value.asInstanceOf[Long]) + + case (LONG, ArrayType(LongType, containsNull)) => + newArrayWriter(protoType, protoPath, + catalystPath, LongType, containsNull) + + case (FLOAT, FloatType) => (updater, ordinal, value) => + updater.setFloat(ordinal, value.asInstanceOf[Float]) + + case (FLOAT, ArrayType(FloatType, containsNull)) => + newArrayWriter(protoType, protoPath, + catalystPath, FloatType, containsNull) + + case (DOUBLE, DoubleType) => (updater, ordinal, value) => + updater.setDouble(ordinal, value.asInstanceOf[Double]) + + case (DOUBLE, ArrayType(DoubleType, containsNull)) => + newArrayWriter(protoType, protoPath, + catalystPath, DoubleType, containsNull) + + case (STRING, StringType) => (updater, ordinal, value) => + val str = value match { + case s: String => UTF8String.fromString(s) + } + updater.set(ordinal, str) + + case (STRING, ArrayType(StringType, containsNull)) => + newArrayWriter(protoType, protoPath, + catalystPath, StringType, containsNull) + + case (BYTE_STRING, BinaryType) => (updater, ordinal, value) => + val byte_array = value match { + case s: ByteString => s.toByteArray + case _ => throw new Exception("Invalid ByteString format") + } + updater.set(ordinal, byte_array) + + case (BYTE_STRING, ArrayType(BinaryType, containsNull)) => + newArrayWriter(protoType, protoPath, + catalystPath, BinaryType, containsNull) + + case (MESSAGE, st: StructType) => + val writeRecord = getRecordWriter(protoType.getMessageType, st, protoPath, + catalystPath, applyFilters = _ => false) + (updater, ordinal, value) => + val row = new SpecificInternalRow(st) + writeRecord(new RowUpdater(row), value.asInstanceOf[DynamicMessage]) + updater.set(ordinal, row) + + case (MESSAGE, ArrayType(st: StructType, containsNull)) => + newArrayWriter(protoType, protoPath, + catalystPath, st, containsNull) + + case (ENUM, StringType) => (updater, ordinal, value) => + updater.set(ordinal, UTF8String.fromString(value.toString)) + + case (ENUM, ArrayType(StringType, containsNull)) => + newArrayWriter(protoType, protoPath, + catalystPath, StringType, containsNull) + + case _ => throw new IncompatibleSchemaException(incompatibleMsg) + } + } + + + private def getRecordWriter( + protoType: Descriptor, + catalystType: StructType, + protoPath: Seq[String], + catalystPath: Seq[String], + applyFilters: Int => Boolean): + (CatalystDataUpdater, DynamicMessage) => Boolean = { + + val protoSchemaHelper = new ProtoUtils.ProtoSchemaHelper( + protoType, catalystType, protoPath, catalystPath, positionalFieldMatch) + + protoSchemaHelper.validateNoExtraCatalystFields(ignoreNullable = true) + // no need to validateNoExtraProtoFields since extra Proto fields are ignored + + val (validFieldIndexes, fieldWriters) = protoSchemaHelper.matchedFields.map { + case ProtoMatchedField(catalystField, ordinal, protoField) => + val baseWriter = newWriter(protoField, catalystField.dataType, + protoPath :+ protoField.getName, catalystPath :+ catalystField.name) + val fieldWriter = (fieldUpdater: CatalystDataUpdater, value: Any) => { + if (value == null) { + fieldUpdater.setNullAt(ordinal) + } else { + baseWriter(fieldUpdater, ordinal, value) + } + } + (protoField, fieldWriter) + }.toArray.unzip + + (fieldUpdater, record) => { + var i = 0 + var skipRow = false + while (i < validFieldIndexes.length && !skipRow) { + fieldWriters(i)(fieldUpdater, record.getField(validFieldIndexes(i))) + skipRow = applyFilters(i) + i += 1 + } + skipRow + } + } + + private def createArrayData(elementType: DataType, length: Int): ArrayData = elementType match { + case BooleanType => UnsafeArrayData.fromPrimitiveArray(new Array[Boolean](length)) + case ByteType => UnsafeArrayData.fromPrimitiveArray(new Array[Byte](length)) + case ShortType => UnsafeArrayData.fromPrimitiveArray(new Array[Short](length)) + case IntegerType => UnsafeArrayData.fromPrimitiveArray(new Array[Int](length)) + case LongType => UnsafeArrayData.fromPrimitiveArray(new Array[Long](length)) + case FloatType => UnsafeArrayData.fromPrimitiveArray(new Array[Float](length)) + case DoubleType => UnsafeArrayData.fromPrimitiveArray(new Array[Double](length)) + case _ => new GenericArrayData(new Array[Any](length)) + } + + /** + * A base interface for updating values inside catalyst data structure like `InternalRow` and + * `ArrayData`. + */ + sealed trait CatalystDataUpdater { Review Comment: Could you leave a TODO comment here to share these classes with Avro support? ########## connector/proto/src/main/scala/org/apache/spark/sql/proto/utils/DynamicSchema.scala: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.proto.utils + +import java.util + +import scala.collection.JavaConverters._ +import scala.util.control.Breaks.{break, breakable} + +import com.google.protobuf.DescriptorProtos.{FileDescriptorProto, FileDescriptorSet} +import com.google.protobuf.Descriptors.{Descriptor, FileDescriptor} + +class DynamicSchema { + var fileDescSet: FileDescriptorSet = null Review Comment: Many fields in this class can be `private` I think. ########## connector/proto/src/main/scala/org/apache/spark/sql/proto/utils/SchemaConverters.scala: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.proto.utils + +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor} + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.proto.ScalaReflectionLock +import org.apache.spark.sql.types._ + +@DeveloperApi +object SchemaConverters { + /** + * Internal wrapper for SQL data type and nullability. + * + * @since 3.4.0 + */ + case class SchemaType(dataType: DataType, nullable: Boolean) + + /** + * Converts an Proto schema to a corresponding Spark SQL schema. + * + * @since 3.4.0 + */ + def toSqlType(protoSchema: Descriptor): SchemaType = { + toSqlTypeHelper(protoSchema) + } + + def toSqlTypeHelper(descriptor: Descriptor): SchemaType = ScalaReflectionLock.synchronized { + SchemaType(StructType(descriptor.getFields.asScala.flatMap(structFieldFor).toSeq), + nullable = true) + } + + def structFieldFor(fd: FieldDescriptor): Option[StructField] = { + import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._ + val dataType = fd.getJavaType match { + case INT => Some(IntegerType) + case LONG => Some(LongType) + case FLOAT => Some(FloatType) + case DOUBLE => Some(DoubleType) + case BOOLEAN => Some(BooleanType) + case STRING => Some(StringType) + case BYTE_STRING => Some(BinaryType) + case ENUM => Some(StringType) + case MESSAGE => + Option(fd.getMessageType.getFields.asScala.flatMap(structFieldFor).toSeq) Review Comment: We need to detect recursive schemas and error out. Otherwise, this will recurse without limit. ########## connector/proto/src/main/scala/org/apache/spark/sql/proto/utils/SchemaConverters.scala: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.proto.utils + +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor} + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.proto.ScalaReflectionLock +import org.apache.spark.sql.types._ + +@DeveloperApi +object SchemaConverters { + /** + * Internal wrapper for SQL data type and nullability. + * + * @since 3.4.0 + */ + case class SchemaType(dataType: DataType, nullable: Boolean) + + /** + * Converts an Proto schema to a corresponding Spark SQL schema. + * + * @since 3.4.0 + */ + def toSqlType(protoSchema: Descriptor): SchemaType = { + toSqlTypeHelper(protoSchema) + } + + def toSqlTypeHelper(descriptor: Descriptor): SchemaType = ScalaReflectionLock.synchronized { + SchemaType(StructType(descriptor.getFields.asScala.flatMap(structFieldFor).toSeq), + nullable = true) + } + + def structFieldFor(fd: FieldDescriptor): Option[StructField] = { Review Comment: When would this return `None`? ########## connector/proto/src/main/scala/org/apache/spark/sql/proto/ProtoDeserializer.scala: ########## @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.proto + +import com.google.protobuf.{ByteString, DynamicMessage} +import com.google.protobuf.Descriptors._ +import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._ + +import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, StructFilters} +import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec +import org.apache.spark.sql.execution.datasources.DataSourceUtils +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy +import org.apache.spark.sql.proto.utils.ProtoUtils +import org.apache.spark.sql.proto.utils.ProtoUtils.ProtoMatchedField +import org.apache.spark.sql.proto.utils.ProtoUtils.toFieldStr +import org.apache.spark.sql.proto.utils.SchemaConverters.IncompatibleSchemaException +import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, + DateType, Decimal, DoubleType, FloatType, IntegerType, LongType, NullType, + ShortType, StringType, StructType} +import org.apache.spark.unsafe.types.UTF8String + +private[sql] class ProtoDeserializer( + rootProtoType: Descriptor, + rootCatalystType: DataType, + positionalFieldMatch: Boolean, + datetimeRebaseSpec: RebaseSpec, Review Comment: Are these relevant here? Are these Avro specific or make sense here too? ########## connector/proto/src/main/scala/org/apache/spark/sql/proto/functions.scala: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.proto + +import scala.collection.JavaConverters._ + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.Column + +// scalastyle:off: object.name +object functions { +// scalastyle:on: object.name + + /** + * Converts a binary column of Proto format into its corresponding catalyst value. Review Comment: I think better to call this `Protobuf format` ########## connector/proto/src/main/scala/org/apache/spark/sql/proto/ProtoDataToCatalyst.scala: ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.proto + +import java.io.ByteArrayInputStream + +import scala.util.control.NonFatal + +import com.google.protobuf.DynamicMessage + +import org.apache.spark.SparkException +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, + SpecificInternalRow, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode} +import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, PermissiveMode} +import org.apache.spark.sql.proto.utils.{ProtoOptions, ProtoUtils, SchemaConverters} +import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, StructType} + +private[proto] case class ProtoDataToCatalyst(child: Expression, descFilePath: String, + messageName: String, + options: Map[String, String]) + extends UnaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType) + + override lazy val dataType: DataType = { + val dt = SchemaConverters.toSqlType(expectedSchema).dataType + parseMode match { + // With PermissiveMode, the output Catalyst row might contain columns of null values for + // corrupt records, even if some of the columns are not nullable in the user-provided schema. + // Therefore we force the schema to be all nullable here. + case PermissiveMode => dt.asNullable + case _ => dt + } + } + + override def nullable: Boolean = true + + private lazy val protoOptions = ProtoOptions(options) + + @transient private lazy val descriptor = ProtoUtils.buildDescriptor(descFilePath, messageName) + + @transient private lazy val expectedSchema = protoOptions.schema.getOrElse(descriptor) + + @transient private lazy val deserializer = new ProtoDeserializer(expectedSchema, dataType, + protoOptions.datetimeRebaseModeInRead) + + @transient private var result: Any = _ + + @transient private lazy val parseMode: ParseMode = { + val mode = protoOptions.parseMode + if (mode != PermissiveMode && mode != FailFastMode) { + throw new AnalysisException(unacceptableModeMessage(mode.name)) + } + mode + } + + private def unacceptableModeMessage(name: String): String = { + s"from_proto() doesn't support the $name mode. " + + s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}." + } + + @transient private lazy val nullResultRow: Any = dataType match { + case st: StructType => + val resultRow = new SpecificInternalRow(st.map(_.dataType)) + for (i <- 0 until st.length) { + resultRow.setNullAt(i) + } + resultRow + + case _ => + null + } + + private def handleException(e: Throwable): Any = { + parseMode match { + case PermissiveMode => + nullResultRow + case FailFastMode => + throw new SparkException("Malformed records are detected in record parsing. " + + s"Current parse Mode: ${FailFastMode.name}. To process malformed records as null " + + "result, try setting the option 'mode' as 'PERMISSIVE'.", e) + case _ => + throw new AnalysisException(unacceptableModeMessage(parseMode.name)) + } + } + + override def nullSafeEval(input: Any): Any = { + val binary = input.asInstanceOf[Array[Byte]] + try { + result = DynamicMessage.parseFrom(descriptor, new ByteArrayInputStream(binary)) + val unknownFields = result.asInstanceOf[DynamicMessage].getUnknownFields + if (!unknownFields.asMap().isEmpty) { + return handleException(new Throwable("UnknownFields encountered")) Review Comment: What is the reason for not allowing Unknown fields? This would break forward compatibility right? E.g. the input might have newer versions of protobuf, but the Spark application might be using older version. That is not an incorrect use. The spark job might not needed newer fields. ########## connector/proto/pom.xml: ########## @@ -0,0 +1,119 @@ +<?xml version="1.0" encoding="UTF-8"?> Review Comment: Some top-level project related questions: * (A) Do you plan to add Python support? * The approach using descriptor files here keeps Python support simpler I think. * Overwhelming majority of the applications would be written in python. * (B) Do you plan to add scheme-registry support similar to how Avro does it? * This is also a common use case for Spark users. * (C) Are maps supported? * It is also a common use case. * (D) Is there a restriction on protobuf v2 vs v3? Can users provide in either version? ########## connector/proto/src/main/scala/org/apache/spark/sql/proto/ProtoDataToCatalyst.scala: ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.proto + +import java.io.ByteArrayInputStream + +import scala.util.control.NonFatal + +import com.google.protobuf.DynamicMessage + +import org.apache.spark.SparkException +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, + SpecificInternalRow, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode} +import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, PermissiveMode} +import org.apache.spark.sql.proto.utils.{ProtoOptions, ProtoUtils, SchemaConverters} +import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, StructType} + +private[proto] case class ProtoDataToCatalyst(child: Expression, descFilePath: String, + messageName: String, + options: Map[String, String]) + extends UnaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType) + + override lazy val dataType: DataType = { + val dt = SchemaConverters.toSqlType(expectedSchema).dataType + parseMode match { + // With PermissiveMode, the output Catalyst row might contain columns of null values for + // corrupt records, even if some of the columns are not nullable in the user-provided schema. + // Therefore we force the schema to be all nullable here. + case PermissiveMode => dt.asNullable + case _ => dt + } + } + + override def nullable: Boolean = true + + private lazy val protoOptions = ProtoOptions(options) + + @transient private lazy val descriptor = ProtoUtils.buildDescriptor(descFilePath, messageName) + + @transient private lazy val expectedSchema = protoOptions.schema.getOrElse(descriptor) + + @transient private lazy val deserializer = new ProtoDeserializer(expectedSchema, dataType, + protoOptions.datetimeRebaseModeInRead) + + @transient private var result: Any = _ + + @transient private lazy val parseMode: ParseMode = { + val mode = protoOptions.parseMode + if (mode != PermissiveMode && mode != FailFastMode) { + throw new AnalysisException(unacceptableModeMessage(mode.name)) + } + mode + } + + private def unacceptableModeMessage(name: String): String = { + s"from_proto() doesn't support the $name mode. " + + s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}." + } + + @transient private lazy val nullResultRow: Any = dataType match { + case st: StructType => + val resultRow = new SpecificInternalRow(st.map(_.dataType)) + for (i <- 0 until st.length) { + resultRow.setNullAt(i) + } + resultRow + + case _ => + null + } + + private def handleException(e: Throwable): Any = { + parseMode match { + case PermissiveMode => + nullResultRow + case FailFastMode => + throw new SparkException("Malformed records are detected in record parsing. " + + s"Current parse Mode: ${FailFastMode.name}. To process malformed records as null " + + "result, try setting the option 'mode' as 'PERMISSIVE'.", e) + case _ => + throw new AnalysisException(unacceptableModeMessage(parseMode.name)) + } + } + + override def nullSafeEval(input: Any): Any = { + val binary = input.asInstanceOf[Array[Byte]] + try { + result = DynamicMessage.parseFrom(descriptor, new ByteArrayInputStream(binary)) Review Comment: minor: can pass `binary` directly to `parseFrom()` (less code). Is there a benefit to creating the builder once and reuse here? builder.mergeFrom()? Probably not. ########## connector/proto/src/main/scala/org/apache/spark/sql/proto/ProtoDeserializer.scala: ########## @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.proto + +import com.google.protobuf.{ByteString, DynamicMessage} +import com.google.protobuf.Descriptors._ +import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._ + +import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, StructFilters} +import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData} +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} +import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec +import org.apache.spark.sql.execution.datasources.DataSourceUtils +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy +import org.apache.spark.sql.proto.utils.ProtoUtils +import org.apache.spark.sql.proto.utils.ProtoUtils.ProtoMatchedField +import org.apache.spark.sql.proto.utils.ProtoUtils.toFieldStr +import org.apache.spark.sql.proto.utils.SchemaConverters.IncompatibleSchemaException +import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, + DateType, Decimal, DoubleType, FloatType, IntegerType, LongType, NullType, + ShortType, StringType, StructType} +import org.apache.spark.unsafe.types.UTF8String + +private[sql] class ProtoDeserializer( + rootProtoType: Descriptor, + rootCatalystType: DataType, + positionalFieldMatch: Boolean, + datetimeRebaseSpec: RebaseSpec, + filters: StructFilters) { + + def this( + rootProtoType: Descriptor, + rootCatalystType: DataType, + datetimeRebaseMode: String) = { + this( + rootProtoType, + rootCatalystType, + positionalFieldMatch = false, + RebaseSpec(LegacyBehaviorPolicy.withName(datetimeRebaseMode)), + new NoopFilters) + } + + private val dateRebaseFunc = DataSourceUtils.createDateRebaseFuncInRead( + datetimeRebaseSpec.mode, "Proto") + + private val converter: Any => Option[Any] = try { + rootCatalystType match { + // A shortcut for empty schema. + case st: StructType if st.isEmpty => + (_: Any) => Some(InternalRow.empty) + + case st: StructType => + val resultRow = new SpecificInternalRow(st.map(_.dataType)) + val fieldUpdater = new RowUpdater(resultRow) + val applyFilters = filters.skipRow(resultRow, _) + val writer = getRecordWriter(rootProtoType, st, Nil, Nil, applyFilters) + (data: Any) => { + val record = data.asInstanceOf[DynamicMessage] + val skipRow = writer(fieldUpdater, record) + if (skipRow) None else Some(resultRow) + } + } + } catch { + case ise: IncompatibleSchemaException => throw new IncompatibleSchemaException( + s"Cannot convert Proto type ${rootProtoType.getName} " + + s"to SQL type ${rootCatalystType.sql}.", ise) + } + + def deserialize(data: Any): Option[Any] = converter(data) + + private def newArrayWriter( + protoField: FieldDescriptor, + protoPath: Seq[String], + catalystPath: Seq[String], + elementType: DataType, + containsNull: Boolean): (CatalystDataUpdater, Int, Any) => Unit = { + + + val protoElementPath = protoPath :+ "element" + val elementWriter = newWriter(protoField, elementType, + protoElementPath, catalystPath :+ "element") + (updater, ordinal, value) => + val collection = value.asInstanceOf[java.util.Collection[Any]] + val result = createArrayData(elementType, collection.size()) + val elementUpdater = new ArrayDataUpdater(result) + + var i = 0 + val iterator = collection.iterator() + while (iterator.hasNext) { + val element = iterator.next() + if (element == null) { + if (!containsNull) { + throw new RuntimeException( + s"Array value at path ${toFieldStr(protoElementPath)} is not allowed to be null") + } else { + elementUpdater.setNullAt(i) + } + } else { + elementWriter(elementUpdater, i, element) + } + i += 1 + } + + updater.set(ordinal, result) + } + + /** + * Creates a writer to write proto values to Catalyst values at the given ordinal with the given + * updater. + */ + private def newWriter( + protoType: FieldDescriptor, + catalystType: DataType, + protoPath: Seq[String], + catalystPath: Seq[String]): (CatalystDataUpdater, Int, Any) => Unit = { + val errorPrefix = s"Cannot convert Proto ${toFieldStr(protoPath)} to " + + s"SQL ${toFieldStr(catalystPath)} because " + val incompatibleMsg = errorPrefix + + s"schema is incompatible (protoType = ${protoType} ${protoType.toProto.getLabel} " + + s"${protoType.getJavaType} ${protoType.getType}, sqlType = ${catalystType.sql})" + + (protoType.getJavaType, catalystType) match { + + case (null, NullType) => (updater, ordinal, _) => + updater.setNullAt(ordinal) + + // TODO: we can avoid boxing if future version of proto provide primitive accessors. + case (BOOLEAN, BooleanType) => (updater, ordinal, value) => + updater.setBoolean(ordinal, value.asInstanceOf[Boolean]) + + case (BOOLEAN, ArrayType(BooleanType, containsNull)) => Review Comment: Where do we verify that `protoType` is repeated? ########## connector/proto/src/main/scala/org/apache/spark/sql/proto/utils/DynamicSchema.scala: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.proto.utils + +import java.util + +import scala.collection.JavaConverters._ +import scala.util.control.Breaks.{break, breakable} + +import com.google.protobuf.DescriptorProtos.{FileDescriptorProto, FileDescriptorSet} +import com.google.protobuf.Descriptors.{Descriptor, FileDescriptor} + +class DynamicSchema { Review Comment: Add doc comment. ########## connector/proto/src/main/scala/org/apache/spark/sql/proto/utils/MessageDefinition.scala: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.proto.utils + +import com.google.protobuf.DescriptorProtos.{DescriptorProto, FieldDescriptorProto} + +class MessageDefinition(val messageType: DescriptorProto = null) { Review Comment: Add doc comment. ########## connector/proto/src/main/scala/org/apache/spark/sql/proto/utils/ProtoUtils.scala: ########## @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.proto.utils + +import java.io.{BufferedInputStream, FileInputStream, FileNotFoundException, IOException} +import java.util.Locale + +import scala.collection.JavaConverters._ + +import com.google.protobuf.{DescriptorProtos, Descriptors, InvalidProtocolBufferException} +import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor} +import org.apache.hadoop.fs.FileStatus + +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.FileSourceOptions +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.proto.utils.ProtoOptions.ignoreExtensionKey +import org.apache.spark.sql.proto.utils.SchemaConverters.IncompatibleSchemaException +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + +private[sql] object ProtoUtils extends Logging { + + def inferSchema( + spark: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + val conf = spark.sessionState.newHadoopConfWithOptions(options) + val parsedOptions = new ProtoOptions(options, conf) + + if (parsedOptions.parameters.contains(ignoreExtensionKey)) { + logWarning(s"Option $ignoreExtensionKey is deprecated. Please use the " + + "general data source option pathGlobFilter for filtering file names.") + } + // User can specify an optional proto json schema. + val protoSchema = parsedOptions.schema + .getOrElse { + inferProtoSchemaFromFiles(files, + new FileSourceOptions(CaseInsensitiveMap(options)).ignoreCorruptFiles) + } + + SchemaConverters.toSqlType(protoSchema).dataType match { + case t: StructType => Some(t) + case _ => throw new RuntimeException( + s"""Proto schema cannot be converted to a Spark SQL StructType: + | + |${protoSchema.toString()} + |""".stripMargin) + } + } + + private def inferProtoSchemaFromFiles( + files: Seq[FileStatus], + ignoreCorruptFiles: Boolean): Descriptor = { + // Schema evolution is not supported yet. Here we only pick first random readable sample file to + // figure out the schema of the whole dataset. + val protoReader = files.iterator.map { f => + val path = f.getPath + if (!path.getName.endsWith(".pb")) { + None + } else { + Utils.tryWithResource { + new FileInputStream("saved_model.pb") Review Comment: What format are these files in? Could you expand on the use case? ########## connector/proto/src/main/scala/org/apache/spark/sql/proto/utils/ProtoUtils.scala: ########## @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.proto.utils + +import java.io.{BufferedInputStream, FileInputStream, FileNotFoundException, IOException} +import java.util.Locale + +import scala.collection.JavaConverters._ + +import com.google.protobuf.{DescriptorProtos, Descriptors, InvalidProtocolBufferException} +import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor} +import org.apache.hadoop.fs.FileStatus + +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.FileSourceOptions +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.proto.utils.ProtoOptions.ignoreExtensionKey +import org.apache.spark.sql.proto.utils.SchemaConverters.IncompatibleSchemaException +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + +private[sql] object ProtoUtils extends Logging { + + def inferSchema( + spark: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + val conf = spark.sessionState.newHadoopConfWithOptions(options) + val parsedOptions = new ProtoOptions(options, conf) + + if (parsedOptions.parameters.contains(ignoreExtensionKey)) { + logWarning(s"Option $ignoreExtensionKey is deprecated. Please use the " + + "general data source option pathGlobFilter for filtering file names.") + } + // User can specify an optional proto json schema. + val protoSchema = parsedOptions.schema + .getOrElse { + inferProtoSchemaFromFiles(files, + new FileSourceOptions(CaseInsensitiveMap(options)).ignoreCorruptFiles) + } + + SchemaConverters.toSqlType(protoSchema).dataType match { + case t: StructType => Some(t) + case _ => throw new RuntimeException( + s"""Proto schema cannot be converted to a Spark SQL StructType: + | + |${protoSchema.toString()} + |""".stripMargin) + } + } + + private def inferProtoSchemaFromFiles( + files: Seq[FileStatus], + ignoreCorruptFiles: Boolean): Descriptor = { + // Schema evolution is not supported yet. Here we only pick first random readable sample file to + // figure out the schema of the whole dataset. + val protoReader = files.iterator.map { f => + val path = f.getPath + if (!path.getName.endsWith(".pb")) { + None + } else { + Utils.tryWithResource { + new FileInputStream("saved_model.pb") + } { in => + try { + Some(DescriptorProtos.DescriptorProto.parseFrom(in).getDescriptorForType) + } catch { + case e: IOException => + if (ignoreCorruptFiles) { + logWarning(s"Skipped the footer in the corrupted file: $path", e) + None + } else { + throw new SparkException(s"Could not read file: $path", e) + } + } + } + } + }.collectFirst { + case Some(reader) => reader + } + + protoReader match { + case Some(reader) => + reader.getContainingType + case None => + throw new FileNotFoundException( + "No Proto files found. If files don't have .proto extension, set ignoreExtension to true") + } + } + + def supportsDataType(dataType: DataType): Boolean = dataType match { + case _: AtomicType => true + + case st: StructType => st.forall { f => supportsDataType(f.dataType) } + + case ArrayType(elementType, _) => supportsDataType(elementType) + + case MapType(keyType, valueType, _) => Review Comment: Do we support maps yet? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
