sadhen edited a comment on pull request #31735:
URL: https://github.com/apache/spark/pull/31735#issuecomment-804539589


   @eddyxu I wrote a UDT with Timestamp, but failed to make it work. See the 
demo pr: https://github.com/eddyxu/spark/pull/4
   
   For ExampleBox, serialize to list works fine. But for 
ExamplePointWithTimeUDT, to make `pa.StructArray.from_pandas` work, we need to 
serialize it to dict. For the following snippets, the python part works fine. 
But I failed to deserialize the ExamplePointWithTime properly in the Scala part.
   
   ``` python
   class ExamplePointWithTimeUDT(UserDefinedType):
       """
       User-defined type (UDT) for ExamplePointWithTime.
       """
   
       @classmethod
       def sqlType(self):
           return StructType([
               StructField("x", DoubleType(), False),
               StructField("y", DoubleType(), True),
               StructField("ts", TimestampType(), False),
           ])
   
       @classmethod
       def module(cls):
           return 'pyspark.sql.tests'
   
       @classmethod
       def scalaUDT(cls):
           return 'org.apache.spark.sql.test.ExamplePointWithTimeUDT'
   
       def serialize(self, obj):
           return {'x': obj.x, 'y': obj.y, 'ts': obj.ts}
   
       def deserialize(self, datum):
           return ExamplePointWithTime(datum['x'], datum['y'], datum['ts'])
   
   
   class ExamplePointWithTime:
       """
       An example class to demonstrate UDT in Scala, Java, and Python.
       """
   
       __UDT__ = ExamplePointWithTimeUDT()
   
       def __init__(self, x, y, ts):
           self.x = x
           self.y = y
           self.ts = ts
   
       def __repr__(self):
           return "ExamplePointWithTime(%s,%s,%s)" % (self.x, self.y, self.ts)
   
       def __str__(self):
           return "(%s,%s,%s)" % (self.x, self.y, self.ts)
   
       def __eq__(self, other):
           return isinstance(other, self.__class__) \
               and other.x == self.x and other.y == self.y \
               and other.ts == self.ts
   ```
   
   ``` scala
   package org.apache.spark.sql.test
   
   import java.sql.Timestamp
   
   import org.apache.spark.sql.catalyst.InternalRow
   import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
   import org.apache.spark.sql.types.{DataType, DoubleType, SQLUserDefinedType, 
StructField, StructType, TimestampType, UserDefinedType}
   
   
   /**
    * An example class to demonstrate UDT in Scala, Java, and Python.
    * @param x x coordinate
    * @param y y coordinate
    * @param ts timestamp
    */
   @SQLUserDefinedType(udt = classOf[ExamplePointUDT])
   private[sql] class ExamplePointWithTime(val x: Double, val y: Double, val 
ts: Timestamp)
     extends Serializable {
   
     override def hashCode(): Int = {
       var hash = 13
       hash = hash * 31 + x.hashCode()
       hash = hash * 31 + y.hashCode()
       hash = hash * 31 + ts.hashCode()
       hash
     }
   
     override def equals(other: Any): Boolean = other match {
       case that: ExamplePointWithTime =>
         this.x == that.x && this.y == that.y && this.ts == that.ts
       case _ => false
     }
   
     override def toString(): String = s"($x, $y, ${ts.toString})"
   }
   
   /**
    * User-defined type for [[ExamplePoint]].
    */
   private[sql] class ExamplePointWithTimeUDT extends 
UserDefinedType[ExamplePointWithTime] {
   
     override def sqlType: DataType = StructType(Array(
       StructField("x", DoubleType, nullable = false),
       StructField("y", DoubleType, nullable = true),
       StructField("ts", TimestampType, nullable = false)
     ))
   
     override def pyUDT: String = 
"pyspark.testing.sqlutils.ExamplePointWithTimeUDT"
   
     override def serialize(p: ExamplePointWithTime): ArrayBasedMapData = {
       ArrayBasedMapData(
         Array("x", "y", "ts"),
         Array(p.x, p.y, p.ts)
       )
     }
   
     override def deserialize(datum: Any): ExamplePointWithTime = {
       datum match {
         case row: InternalRow =>
           new ExamplePointWithTime(
             row.getDouble(0),
             row.getDouble(1),
             row.get(2, TimestampType)      // .asInstanceOf[Timestamp]   it is 
Long, cannot be casted to Timestamp
           )
       }
     }
   
     override def userClass: Class[ExamplePointWithTime] = 
classOf[ExamplePointWithTime]
   
     private[spark] override def asNullable: ExamplePointWithTimeUDT = this
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to