viirya commented on a change in pull request #28223: [SPARK-31450][SQL] Make
ExpressionEncoder thread-safe
URL: https://github.com/apache/spark/pull/28223#discussion_r409175280
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
##########
@@ -332,30 +344,67 @@ case class ExpressionEncoder[T](
}
/**
- * Returns an encoded version of `t` as a Spark SQL row. Note that multiple
calls to
- * toRow are allowed to return the same actual [[InternalRow]] object.
Thus, the caller should
- * copy the result before making another call if required.
+ * Create a serializer that can convert an object of type `T` to a Spark SQL
Row.
+ *
+ * Note that the returned [[Serializer]] is not thread safe. Multiple calls
to
+ * `serializer.apply(..)` are allowed to return the same actual
[[InternalRow]] object. Thus,
+ * the caller should copy the result before making another call if required.
*/
- def toRow(t: T): InternalRow = try {
- inputRow(0) = t
- extractProjection(inputRow)
- } catch {
- case e: Exception =>
- throw new RuntimeException(s"Error while encoding: $e\n" +
+ def createSerializer(): Serializer[T] = new Serializer[T] {
+ @transient
+ private var inputRow: GenericInternalRow = _
+
+ @transient
+ private var extractProjection: UnsafeProjection = _
+
+ private def initialize(): Unit = {
+ inputRow = new GenericInternalRow(1)
+ extractProjection =
GenerateUnsafeProjection.generate(optimizedSerializer)
+ }
+ initialize()
+
+ override def apply(t: T): InternalRow = try {
+ inputRow(0) = t
+ extractProjection(inputRow)
+ } catch {
+ case e: Exception =>
+ throw new RuntimeException(s"Error while encoding: $e\n" +
s"${serializer.map(_.simpleString(SQLConf.get.maxToStringFields)).mkString("\n")}",
e)
+ }
+
+ private def readObject(in: ObjectInputStream): Unit = {
+ in.defaultReadObject()
+ initialize()
+ }
}
/**
- * Returns an object of type `T`, extracting the required values from the
provided row. Note that
- * you must `resolveAndBind` an encoder to a specific schema before you can
call this
- * function.
+ * Create a deserializer that can convert a Spark SQL Row into an object of
type `T`.
+ *
+ * Note that you must `resolveAndBind` an encoder to a specific schema
before you can create a
+ * deserializer.
*/
- def fromRow(row: InternalRow): T = try {
- constructProjection(row).get(0,
ObjectType(clsTag.runtimeClass)).asInstanceOf[T]
- } catch {
- case e: Exception =>
- throw new RuntimeException(s"Error while decoding: $e\n" +
- s"${deserializer.simpleString(SQLConf.get.maxToStringFields)}", e)
+ def createDeserializer(): Deserializer[T] = new Deserializer[T] {
+ @transient
+ private var constructProjection: Projection = _
+
+ private def initialize(): Unit = {
+ constructProjection = SafeProjection.create(optimizedDeserializer)
+ }
+ initialize()
+
+ override def apply(row: InternalRow): T = try {
+ constructProjection(row).get(0,
ObjectType(clsTag.runtimeClass)).asInstanceOf[T]
+ } catch {
+ case e: Exception =>
+ throw new RuntimeException(s"Error while decoding: $e\n" +
+ s"${deserializer.simpleString(SQLConf.get.maxToStringFields)}", e)
Review comment:
As pointed already, some fields like `deserializer` are in enclosing
encoder, and so currently looks like we will serialize entire encoder? Actually
we did serialize entire encoder currently but yea it is better we can get rid
of unnecessary.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]