viirya opened a new pull request #16478:
URL: https://github.com/apache/spark/pull/16478


   ## What changes were proposed in this pull request?
   
   This patch is going to revise the current API for User Defined Types (UDTs).
   
   Currently the API of `UserDefinedType` asks developers to implement the 
method that fills user data into SparkSQL's internal format such as 
`GenericInternalRow`, `UnsafeArrayData`, etc.
   
   One target of this patch is to simplify the way to write UDTs. Developers 
can just use Scala types, instead of internal types such as ArrayData, when 
writing UDTs. `UserDefinedType` will use SparkSQL's encoder to convert user 
data to internal format.
   
   For example, the following is the serialization method of `MatrixUDT`.
   
   Before this patch:
   
       override def serialize(obj: Matrix): InternalRow = {
         val row = new GenericInternalRow(7)
         obj match {
           case sm: SparseMatrix =>
             row.setByte(0, 0)
             row.setInt(1, sm.numRows)
             row.setInt(2, sm.numCols)
             row.update(3, UnsafeArrayData.fromPrimitiveArray(sm.colPtrs))
             row.update(4, UnsafeArrayData.fromPrimitiveArray(sm.rowIndices))
             row.update(5, UnsafeArrayData.fromPrimitiveArray(sm.values))
             row.setBoolean(6, sm.isTransposed)
   
           case dm: DenseMatrix =>
             row.setByte(0, 1)
             row.setInt(1, dm.numRows)
             row.setInt(2, dm.numCols)
             row.setNullAt(3)
             row.setNullAt(4)
             row.update(5, UnsafeArrayData.fromPrimitiveArray(dm.values))
             row.setBoolean(6, dm.isTransposed)
         }
         row
       }
   
   
   After this patch:
   
       override def writeRow(obj: Matrix): Row = {
         obj match {
           case sm: SparseMatrix =>
             Row(0.toByte, sm.numRows, sm.numCols, sm.colPtrs, sm.rowIndices, 
sm.values, sm.isTransposed)
   
           case dm: DenseMatrix =>
             Row(1.toByte, dm.numRows, dm.numCols, null, null, dm.values, 
dm.isTransposed)
         }
       }
   
   Developers now manipulate external row `Row` and Scala types. Encoder will 
take care of converting the data to SparkSQL's internal format.
   
   
   ### Main API change
   
   In the past, two main methods are needed to be implemented in developers' 
UDTs which extend `UserDefinedType`.
   
       /** Convert the user type to a SQL datum */
       def serialize(obj: UserType): Any
   
       /** Convert a SQL datum to the user type */
       def deserialize(datum: Any): UserType
   
   Developers put/get the data of user class into/from an internal type of 
Spark SQL.
   
   Now developers after this patch, are changed to implement two methods which 
put/get user data into/from an external row of SparkSQL.
   
       /** Convert the object of user type to an external row. Must be 
implemented in subclasses. */
       def writeRow(obj: UserType): Row
   
       /** Convert the external row to an object of user type. Must be 
implemented in subclasses. */
       def readRow(row: Row): UserType
   
   ### Benchmark
   
   Ran a benchmark against previous `UserDefinedType`.
   
   Code:
   
       private val random = new Random(100)
       private lazy val pointsRDD = (0 to 1000).map { i =>
         val features = (0 to 100).map { _ =>
           random.nextDouble()
         }
         MyLabeledPoint(i % 10, new UDT.MyDenseVector(features.toArray))
       }.toDF()
   
       test("serialize MyLabeledPoint") {
         val N = 10L << 5
         runBenchmark("serialize MyLabeledPoint", N) {
           pointsRDD.groupBy('label).agg(max('features)).collect()
         }
       }
   
   
   Before this patch:
   
       Java HotSpot(TM) 64-Bit Server VM 1.8.0_102-b14 on Linux 4.4.39-moby
       Intel(R) Core(TM) i7-5557U CPU @ 3.10GHz
       serialize MyLabeledPoint:                Best/Avg Time(ms)    Rate(M/s)  
 Per Row(ns)   Relative
       
------------------------------------------------------------------------------------------------
       serialize MyLabeledPoint wholestage off        271 /  328          0.0   
   845767.7       1.0X
       serialize MyLabeledPoint wholestage on         131 /  261          0.0   
   408173.6       2.1X
   
   After this patch:
   
       Java HotSpot(TM) 64-Bit Server VM 1.8.0_102-b14 on Linux 4.4.39-moby
       Intel(R) Core(TM) i7-5557U CPU @ 3.10GHz
       serialize MyLabeledPoint:                Best/Avg Time(ms)    Rate(M/s)  
 Per Row(ns)   Relative
       
------------------------------------------------------------------------------------------------
       serialize MyLabeledPoint wholestage off        221 /  331          0.0   
   692100.4       1.0X
       serialize MyLabeledPoint wholestage on         125 /  145          0.0   
   391120.1       1.8X
   
   Basically, after this patch the serialization/deserialization of 
`UserDefinedType` can compete or be slightly better than previous one.
   
   ## How was this patch tested?
   
   Existing Jenkins tests.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to