Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19416#discussion_r142302989
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
 ---
    @@ -62,26 +60,7 @@ case class FlatMapGroupsWithStateExec(
       import GroupStateImpl._
     
       private val isTimeoutEnabled = timeoutConf != NoTimeout
    -  private val timestampTimeoutAttribute =
    -    AttributeReference("timeoutTimestamp", dataType = IntegerType, 
nullable = false)()
    -  private val stateAttributes: Seq[Attribute] = {
    -    val encSchemaAttribs = stateEncoder.schema.toAttributes
    -    if (isTimeoutEnabled) encSchemaAttribs :+ timestampTimeoutAttribute 
else encSchemaAttribs
    -  }
    -  // Get the serializer for the state, taking into account whether we need 
to save timestamps
    -  private val stateSerializer = {
    -    val encoderSerializer = stateEncoder.namedExpressions
    -    if (isTimeoutEnabled) {
    -      encoderSerializer :+ Literal(GroupStateImpl.NO_TIMESTAMP)
    -    } else {
    -      encoderSerializer
    -    }
    -  }
    -  // Get the deserializer for the state. Note that this must be done in 
the driver, as
    -  // resolving and binding of deserializer expressions to the encoded type 
can be safely done
    -  // only in the driver.
    -  private val stateDeserializer = 
stateEncoder.resolveAndBind().deserializer
    -
    +  val stateManager = new FlatMapGroupsWithState_StateManager(stateEncoder, 
isTimeoutEnabled)
    --- End diff --
    
    Refactored this class to separate out the state management from the 
processing. This results in this class being far simpler.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to