HeartSaVioR commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1470538842
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -171,3 +175,44 @@ case class TransformWithStateExec(
}
}
}
+
+
+object TransformWithStateExec {
+
+ // Plan logical transformWithState for batch queries
+ def generateSparkPlanForBatchQueries(
+ keyDeserializer: Expression,
+ valueDeserializer: Expression,
+ groupingAttributes: Seq[Attribute],
+ dataAttributes: Seq[Attribute],
+ statefulProcessor: StatefulProcessor[Any, Any, Any],
+ timeoutMode: TimeoutMode,
+ outputMode: OutputMode,
+ outputObjAttr: Attribute,
+ child: SparkPlan): SparkPlan = {
+ val shufflePartitions =
child.session.sessionState.conf.numShufflePartitions
+ val statefulOperatorStateInfo = StatefulOperatorStateInfo(
+ Utils.createTempDir().getAbsolutePath,
Review Comment:
The code should work with any resource management (standalone, YARN, K8S,
etc) with any custom configuration performed by operators/cloud vendors. Also
you can override the value on executing JVM process (`-Djava.io.tmpdir`). It is
too naive to consider the two nodes for driver and executor to be set up as
exactly same.
Maybe I wasn't clear - my ask was mostly about faking state instance (not
faking state store implementation - that's mostly rewriting HDFS backed state
store provider and we should just use that provider), like abstracting the
layer like what we did for FMGWS (GroupStateImpl) and enabling us to provide
in-memory value for the state. Yes that may require us to create such fake
instance per state type, but it won't be hard as Java/Scala have native types
for composite types we are going to support.
If it's non-trivial to inject a layer, I'm OK with spinning up RocksDB state
store provider and leveraging it. Pretty sure, it is even better if we can
avoid registration with state store coordinator.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]