viirya commented on a change in pull request #28223: [SPARK-31450][SQL] Make 
ExpressionEncoder thread-safe
URL: https://github.com/apache/spark/pull/28223#discussion_r409175280
 
 

 ##########
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ##########
 @@ -332,30 +344,67 @@ case class ExpressionEncoder[T](
   }
 
   /**
-   * Returns an encoded version of `t` as a Spark SQL row.  Note that multiple 
calls to
-   * toRow are allowed to return the same actual [[InternalRow]] object.  
Thus, the caller should
-   * copy the result before making another call if required.
+   * Create a serializer that can convert an object of type `T` to a Spark SQL 
Row.
+   *
+   * Note that the returned [[Serializer]] is not thread safe. Multiple calls 
to
+   * `serializer.apply(..)` are allowed to return the same actual 
[[InternalRow]] object.  Thus,
+   *  the caller should copy the result before making another call if required.
    */
-  def toRow(t: T): InternalRow = try {
-    inputRow(0) = t
-    extractProjection(inputRow)
-  } catch {
-    case e: Exception =>
-      throw new RuntimeException(s"Error while encoding: $e\n" +
+  def createSerializer(): Serializer[T] = new Serializer[T] {
+    @transient
+    private var inputRow: GenericInternalRow = _
+
+    @transient
+    private var extractProjection: UnsafeProjection = _
+
+    private def initialize(): Unit = {
+      inputRow = new GenericInternalRow(1)
+      extractProjection = 
GenerateUnsafeProjection.generate(optimizedSerializer)
+    }
+    initialize()
+
+    override def apply(t: T): InternalRow = try {
+      inputRow(0) = t
+      extractProjection(inputRow)
+    } catch {
+      case e: Exception =>
+        throw new RuntimeException(s"Error while encoding: $e\n" +
           
s"${serializer.map(_.simpleString(SQLConf.get.maxToStringFields)).mkString("\n")}",
 e)
+    }
+
+    private def readObject(in: ObjectInputStream): Unit = {
+      in.defaultReadObject()
+      initialize()
+    }
   }
 
   /**
-   * Returns an object of type `T`, extracting the required values from the 
provided row.  Note that
-   * you must `resolveAndBind` an encoder to a specific schema before you can 
call this
-   * function.
+   * Create a deserializer that can convert a Spark SQL Row into an object of 
type `T`.
+   *
+   * Note that you must `resolveAndBind` an encoder to a specific schema 
before you can create a
+   * deserializer.
    */
-  def fromRow(row: InternalRow): T = try {
-    constructProjection(row).get(0, 
ObjectType(clsTag.runtimeClass)).asInstanceOf[T]
-  } catch {
-    case e: Exception =>
-      throw new RuntimeException(s"Error while decoding: $e\n" +
-        s"${deserializer.simpleString(SQLConf.get.maxToStringFields)}", e)
+  def createDeserializer(): Deserializer[T] = new Deserializer[T] {
+    @transient
+    private var constructProjection: Projection = _
+
+    private def initialize(): Unit = {
+      constructProjection = SafeProjection.create(optimizedDeserializer)
+    }
+    initialize()
+
+    override def apply(row: InternalRow): T = try {
+      constructProjection(row).get(0, 
ObjectType(clsTag.runtimeClass)).asInstanceOf[T]
+    } catch {
+      case e: Exception =>
+        throw new RuntimeException(s"Error while decoding: $e\n" +
+          s"${deserializer.simpleString(SQLConf.get.maxToStringFields)}", e)
 
 Review comment:
   As pointed already, some fields like `deserializer` are in enclosing 
encoder, and so currently looks like we will serialize entire encoder? Actually 
we did serialize entire encoder currently but yea it is better we can get rid 
of unnecessary. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to