Sounds like something to do with the serialization/deserialization, and not related to mapGroupsWithState.
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala The docs says that 1. this is deprecated and therefore should not be used 2. you have to use the annotation `SQLUserDefinedType <https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/types/SQLUserDefinedType.java>` on the class definition. You dont seem to have done it, maybe thats the reason? I would debug by printing the values in the serialize/deserialize methods, and then passing it through the groupBy that is known to fail. TD On Fri, Feb 28, 2020 at 2:45 PM Bryan Jeffrey <bryan.jeff...@gmail.com> wrote: > Tathagata, > > The difference is more than hours off. In this instance it's different by > 4 years. In other instances it's different by tens of years (and other > smaller durations). > > We've considered moving to storage as longs, but this makes code much less > readable and harder to maintain. The udt serialization bug also causes > issues outside of stateful streaming, as when executing a simple group by. > > Regards, > > Bryan Jeffrey > > Get Outlook for Android <https://aka.ms/ghei36> > > ------------------------------ > *From:* Tathagata Das <tathagata.das1...@gmail.com> > *Sent:* Friday, February 28, 2020 4:56:07 PM > *To:* Bryan Jeffrey <bryan.jeff...@gmail.com> > *Cc:* user <user@spark.apache.org> > *Subject:* Re: Structured Streaming: mapGroupsWithState UDT serialization > does not work > > You are deserializing by explicitly specifying UTC timezone, but when > serializing you are not specifying it. Maybe that is reason? > > Also, if you can encode it using just long, then I recommend just saving > the value as long and eliminating some of the serialization overheads. > Spark will probably better optimize stuff if it sees it as a long rather > than an opaque UDT. > > TD > > On Fri, Feb 28, 2020 at 6:39 AM Bryan Jeffrey <bryan.jeff...@gmail.com> > wrote: > > Hello. > > I'm running Scala 2.11 w/ Spark 2.3.0. I've encountered a problem with > mapGroupsWithState, and was wondering if anyone had insight. We use Joda > time in a number of data structures, and so we've generated a custom > serializer for Joda. This works well in most dataset/dataframe structured > streaming operations. However, when running mapGroupsWithState we observed > that incorrect dates were being returned from a state. > > I created a bug here: https://issues.apache.org/jira/browse/SPARK-30986 in > an effort to assist tracking of related information. > > Simple example: > 1. Input A has a date D > 2. Input A updates state in mapGroupsWithState. Date present in state is D > 3. Input A is added again. Input A has correct date D, but existing state > now has invalid date > > Here is a simple repro: > > Joda Time UDT: > > private[sql] class JodaTimeUDT extends UserDefinedType[DateTime] { > override def sqlType: DataType = LongType > override def serialize(obj: DateTime): Long = obj.getMillis > def deserialize(datum: Any): DateTime = datum match { case value: Long => > new DateTime(value, DateTimeZone.UTC) } > override def userClass: Class[DateTime] = classOf[DateTime] > private[spark] override def asNullable: JodaTimeUDT = this > } > > object JodaTimeUDTRegister { > def register : Unit = { UDTRegistration.register(classOf[DateTime].getName, > classOf[JodaTimeUDT].getName) } > } > > > Test Leveraging Joda UDT: > > case class FooWithDate(date: DateTime, s: String, i: Int) > > @RunWith(classOf[JUnitRunner]) > class TestJodaTimeUdt extends FlatSpec with Matchers with MockFactory with > BeforeAndAfterAll { > val application = this.getClass.getName > var session: SparkSession = _ > > override def beforeAll(): Unit = { > System.setProperty("hadoop.home.dir", getClass.getResource("/").getPath) > val sparkConf = new SparkConf() > .set("spark.driver.allowMultipleContexts", "true") > .set("spark.testing", "true") > .set("spark.memory.fraction", "1") > .set("spark.ui.enabled", "false") > .set("spark.streaming.gracefulStopTimeout", "1000") > .setAppName(application).setMaster("local[*]") > > > session = SparkSession.builder().config(sparkConf).getOrCreate() > session.sparkContext.setCheckpointDir("/") > JodaTimeUDTRegister.register > } > > override def afterAll(): Unit = { > session.stop() > } > > it should "work correctly for a streaming input with stateful > transformation" in { > val date = new DateTime(2020, 1, 2, 3, 4, 5, 6, DateTimeZone.UTC) > val sqlContext = session.sqlContext > import sqlContext.implicits._ > > val input = List(FooWithDate(date, "Foo", 1), FooWithDate(date, "Foo", > 3), FooWithDate(date, "Foo", 3)) > val streamInput: MemoryStream[FooWithDate] = new > MemoryStream[FooWithDate](42, session.sqlContext) > streamInput.addData(input) > val ds: Dataset[FooWithDate] = streamInput.toDS() > > val mapGroupsWithStateFunction: (Int, Iterator[FooWithDate], > GroupState[FooWithDate]) => FooWithDate = TestJodaTimeUdt.updateFooState > val result: Dataset[FooWithDate] = ds > .groupByKey(x => x.i) > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(mapGroupsWithStateFunction) > val writeTo = s"random_table_name" > > > result.writeStream.outputMode(OutputMode.Update).format("memory").queryName(writeTo).trigger(Trigger.Once()).start().awaitTermination() > val combinedResults: Array[FooWithDate] = session.sql(sqlText = s"select > * from $writeTo").as[FooWithDate].collect() > val expected = Array(FooWithDate(date, "Foo", 1), FooWithDate(date, > "FooFoo", 6)) > combinedResults should contain theSameElementsAs(expected) > } > } > > object TestJodaTimeUdt { > def updateFooState(id: Int, inputs: Iterator[FooWithDate], state: > GroupState[FooWithDate]): FooWithDate = { > if (state.hasTimedOut) { > state.remove() > state.getOption.get > } else { > val inputsSeq: Seq[FooWithDate] = inputs.toSeq > val startingState = state.getOption.getOrElse(inputsSeq.head) > val toProcess = if (state.getOption.isDefined) inputsSeq else > inputsSeq.tail > val updatedFoo = toProcess.foldLeft(startingState)(concatFoo) > > state.update(updatedFoo) > state.setTimeoutDuration("1 minute") > updatedFoo > } > } > > def concatFoo(a: FooWithDate, b: FooWithDate): FooWithDate = > FooWithDate(b.date, a.s + b.s, a.i + b.i) > } > > > The test output shows the invalid date: > > org.scalatest.exceptions.TestFailedException: > Array(FooWithDate(2021-02-02T19:26:23.374Z,Foo,1), > FooWithDate(2021-02-02T19:26:23.374Z,FooFoo,6)) did not contain the same > elements as > Array(FooWithDate(2020-01-02T03:04:05.006Z,Foo,1), > FooWithDate(2020-01-02T03:04:05.006Z,FooFoo,6)) > > Is this something folks have encountered before? > > Thank you, > > Bryan Jeffrey > > > > > > >