HeartSaVioR commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1470538842


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -171,3 +175,44 @@ case class TransformWithStateExec(
     }
   }
 }
+
+
+object TransformWithStateExec {
+
+  // Plan logical transformWithState for batch queries
+  def generateSparkPlanForBatchQueries(
+      keyDeserializer: Expression,
+      valueDeserializer: Expression,
+      groupingAttributes: Seq[Attribute],
+      dataAttributes: Seq[Attribute],
+      statefulProcessor: StatefulProcessor[Any, Any, Any],
+      timeoutMode: TimeoutMode,
+      outputMode: OutputMode,
+      outputObjAttr: Attribute,
+      child: SparkPlan): SparkPlan = {
+    val shufflePartitions = 
child.session.sessionState.conf.numShufflePartitions
+    val statefulOperatorStateInfo = StatefulOperatorStateInfo(
+      Utils.createTempDir().getAbsolutePath,

Review Comment:
   The code should work with any resource management (standalone, YARN, K8S, 
etc) with any custom configuration performed by operators/cloud vendors. Also 
you can override the value on executing JVM process (`-Djava.io.tmpdir`). It is 
too naive to consider the two nodes for driver and executor to be set up as 
exactly same.
   
   Maybe I wasn't clear - my ask was mostly about faking state instance (not 
faking state store implementation - that's mostly rewriting HDFS backed state 
store provider and we should just use that provider), like abstracting the 
layer like what we did for FMGWS (GroupStateImpl) and enabling us to provide 
in-memory value for the state. Yes that may require us to create such fake 
instance per state type, but it won't be hard as Java/Scala have native types 
for composite types we are going to support.
   
   If it's non-trivial to inject a layer, I'm OK with spinning up RocksDB state 
store provider and leveraging it. Pretty sure, it is even better if we can 
avoid registration with state store coordinator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to