Sounds like something to do with the serialization/deserialization, and not
related to mapGroupsWithState.

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala

The docs says that
1. this is deprecated and therefore should not be used
2. you have to use the annotation `SQLUserDefinedType
<https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/types/SQLUserDefinedType.java>`
on the class definition. You dont seem to have done it, maybe thats the
reason?

I would debug by printing the values in the serialize/deserialize methods,
and then passing it through the groupBy that is known to fail.

TD

On Fri, Feb 28, 2020 at 2:45 PM Bryan Jeffrey <bryan.jeff...@gmail.com>
wrote:

> Tathagata,
>
> The difference is more than hours off. In this instance it's different by
> 4 years. In other instances it's different by tens of years (and other
> smaller durations).
>
> We've considered moving to storage as longs, but this makes code much less
> readable and harder to maintain. The udt serialization bug also causes
> issues outside of stateful streaming, as when executing a simple group by.
>
> Regards,
>
> Bryan Jeffrey
>
> Get Outlook for Android <https://aka.ms/ghei36>
>
> ------------------------------
> *From:* Tathagata Das <tathagata.das1...@gmail.com>
> *Sent:* Friday, February 28, 2020 4:56:07 PM
> *To:* Bryan Jeffrey <bryan.jeff...@gmail.com>
> *Cc:* user <user@spark.apache.org>
> *Subject:* Re: Structured Streaming: mapGroupsWithState UDT serialization
> does not work
>
> You are deserializing by explicitly specifying UTC timezone, but when
> serializing you are not specifying it. Maybe that is reason?
>
> Also, if you can encode it using just long, then I recommend just saving
> the value as long and eliminating some of the serialization overheads.
> Spark will probably better optimize stuff if it sees it as a long rather
> than an opaque UDT.
>
> TD
>
> On Fri, Feb 28, 2020 at 6:39 AM Bryan Jeffrey <bryan.jeff...@gmail.com>
> wrote:
>
> Hello.
>
> I'm running Scala 2.11 w/ Spark 2.3.0.  I've encountered a problem with
> mapGroupsWithState, and was wondering if anyone had insight.  We use Joda
> time in a number of data structures, and so we've generated a custom
> serializer for Joda.  This works well in most dataset/dataframe structured
> streaming operations. However, when running mapGroupsWithState we observed
> that incorrect dates were being returned from a state.
>
> I created a bug here: https://issues.apache.org/jira/browse/SPARK-30986 in
> an effort to assist tracking of related information.
>
> Simple example:
> 1. Input A has a date D
> 2. Input A updates state in mapGroupsWithState. Date present in state is D
> 3. Input A is added again.  Input A has correct date D, but existing state
> now has invalid date
>
> Here is a simple repro:
>
> Joda Time UDT:
>
> private[sql] class JodaTimeUDT extends UserDefinedType[DateTime] {
>   override def sqlType: DataType  = LongType
>   override def serialize(obj: DateTime): Long = obj.getMillis
>   def deserialize(datum: Any): DateTime = datum match { case value: Long => 
> new DateTime(value, DateTimeZone.UTC) }
>   override def userClass: Class[DateTime] = classOf[DateTime]
>   private[spark] override def asNullable: JodaTimeUDT = this
> }
>
> object JodaTimeUDTRegister {
>   def register : Unit = { UDTRegistration.register(classOf[DateTime].getName, 
> classOf[JodaTimeUDT].getName)  }
> }
>
>
> Test Leveraging Joda UDT:
>
> case class FooWithDate(date: DateTime, s: String, i: Int)
>
> @RunWith(classOf[JUnitRunner])
> class TestJodaTimeUdt extends FlatSpec with Matchers with MockFactory with 
> BeforeAndAfterAll {
>   val application = this.getClass.getName
>   var session: SparkSession = _
>
>   override def beforeAll(): Unit = {
>     System.setProperty("hadoop.home.dir", getClass.getResource("/").getPath)
>     val sparkConf = new SparkConf()
>       .set("spark.driver.allowMultipleContexts", "true")
>       .set("spark.testing", "true")
>       .set("spark.memory.fraction", "1")
>       .set("spark.ui.enabled", "false")
>       .set("spark.streaming.gracefulStopTimeout", "1000")
>       .setAppName(application).setMaster("local[*]")
>
>
>     session = SparkSession.builder().config(sparkConf).getOrCreate()
>     session.sparkContext.setCheckpointDir("/")
>     JodaTimeUDTRegister.register
>   }
>
>   override def afterAll(): Unit = {
>     session.stop()
>   }
>
>   it should "work correctly for a streaming input with stateful 
> transformation" in {
>     val date = new DateTime(2020, 1, 2, 3, 4, 5, 6, DateTimeZone.UTC)
>     val sqlContext = session.sqlContext
>     import sqlContext.implicits._
>
>     val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", 
> 3), FooWithDate(date, "Foo", 3))
>     val streamInput: MemoryStream[FooWithDate] = new 
> MemoryStream[FooWithDate](42, session.sqlContext)
>     streamInput.addData(input)
>     val ds: Dataset[FooWithDate] = streamInput.toDS()
>
>     val mapGroupsWithStateFunction: (Int, Iterator[FooWithDate], 
> GroupState[FooWithDate]) => FooWithDate = TestJodaTimeUdt.updateFooState
>     val result: Dataset[FooWithDate] = ds
>       .groupByKey(x => x.i)
>       
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(mapGroupsWithStateFunction)
>     val writeTo = s"random_table_name"
>
>     
> result.writeStream.outputMode(OutputMode.Update).format("memory").queryName(writeTo).trigger(Trigger.Once()).start().awaitTermination()
>     val combinedResults: Array[FooWithDate] = session.sql(sqlText = s"select 
> * from $writeTo").as[FooWithDate].collect()
>     val expected = Array(FooWithDate(date, "Foo", 1), FooWithDate(date, 
> "FooFoo", 6))
>     combinedResults should contain theSameElementsAs(expected)
>   }
> }
>
> object TestJodaTimeUdt {
>   def updateFooState(id: Int, inputs: Iterator[FooWithDate], state: 
> GroupState[FooWithDate]): FooWithDate = {
>     if (state.hasTimedOut) {
>       state.remove()
>       state.getOption.get
>     } else {
>       val inputsSeq: Seq[FooWithDate] = inputs.toSeq
>       val startingState = state.getOption.getOrElse(inputsSeq.head)
>       val toProcess = if (state.getOption.isDefined) inputsSeq else 
> inputsSeq.tail
>       val updatedFoo = toProcess.foldLeft(startingState)(concatFoo)
>
>       state.update(updatedFoo)
>       state.setTimeoutDuration("1 minute")
>       updatedFoo
>     }
>   }
>
>   def concatFoo(a: FooWithDate, b: FooWithDate): FooWithDate = 
> FooWithDate(b.date, a.s + b.s, a.i + b.i)
> }
>
>
> The test output shows the invalid date:
>
> org.scalatest.exceptions.TestFailedException:
> Array(FooWithDate(2021-02-02T19:26:23.374Z,Foo,1),
> FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same
> elements as
> Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1),
> FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6))
>
> Is this something folks have encountered before?
>
> Thank you,
>
> Bryan Jeffrey
>
>
>
>
>
>
>

Reply via email to