HeartSaVioR commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1479190035
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -155,23 +161,112 @@ case class TransformWithStateExec(
override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver
- child.execute().mapPartitionsWithStateStore[InternalRow](
- getStateInfo,
- schemaForKeyRow,
- schemaForValueRow,
- numColsPrefixKey = 0,
- session.sqlContext.sessionState,
- Some(session.sqlContext.streams.stateStoreCoordinator),
- useColumnFamilies = true
- ) {
- case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
- val processorHandle = new StatefulProcessorHandleImpl(store,
getStateInfo.queryRunId,
- keyEncoder)
- assert(processorHandle.getHandleState ==
StatefulProcessorHandleState.CREATED)
- statefulProcessor.init(processorHandle, outputMode)
-
processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
- val result = processDataWithPartition(singleIterator, store,
processorHandle)
- result
+ if (isStreaming) {
+ child.execute().mapPartitionsWithStateStore[InternalRow](
+ getStateInfo,
+ schemaForKeyRow,
+ schemaForValueRow,
+ numColsPrefixKey = 0,
+ session.sqlContext.sessionState,
+ Some(session.sqlContext.streams.stateStoreCoordinator),
+ useColumnFamilies = true
+ ) {
+ case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+ processData(store, singleIterator)
+ }
+ } else {
+ // If the query is running in batch mode, we need to create a new
StateStore and instantiate
+ // a temp directory on the executors in mapPartitionsWithIndex.
+ child.execute().mapPartitionsWithIndex[InternalRow](
+ (i, iter) => {
+ val providerId = {
+ // lazy creation to initialize tempDirPath once
+ lazy val tempDirPath = Utils.createTempDir().getAbsolutePath
+ new StateStoreProviderId(
+ StateStoreId(tempDirPath, 0, i), getStateInfo.queryRunId)
+ }
+
+ val sqlConf = new SQLConf()
+ sqlConf.setConfString(SQLConf.STATE_STORE_PROVIDER_CLASS.key,
+ classOf[RocksDBStateStoreProvider].getName)
+ val storeConf = new StateStoreConf(sqlConf)
+
+ // Create StateStoreProvider for this partition
+ val stateStoreProvider = StateStoreProvider.createAndInit(
+ providerId,
+ schemaForKeyRow,
+ schemaForValueRow,
+ numColsPrefixKey = 0,
+ useColumnFamilies = true,
+ storeConf = storeConf,
+ hadoopConf = new Configuration())
Review Comment:
We push key-value pairs from SQL conf to Hadoop Configuration; if you simply
create a Hadoop Configuration via constructor, all hadoop configurations being
pushed through Spark conf will be missed.
```
// A Hadoop Configuration can be about 10 KB, which is pretty big, so
broadcast it
private val hadoopConfBroadcast = session.sparkContext.broadcast(
new SerializableConfiguration(session.sessionState.newHadoopConf()))
```
This is the logic we pass over Hadoop Configuration from driver to executor,
with broadcast. Note that you'll need to test this against real cluster,
preferably multi-nodes. Please verify via trying to read the Spark conf value
in hadoopConf you get in the code in the executor side.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -155,23 +161,112 @@ case class TransformWithStateExec(
override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver
- child.execute().mapPartitionsWithStateStore[InternalRow](
- getStateInfo,
- schemaForKeyRow,
- schemaForValueRow,
- numColsPrefixKey = 0,
- session.sqlContext.sessionState,
- Some(session.sqlContext.streams.stateStoreCoordinator),
- useColumnFamilies = true
- ) {
- case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
- val processorHandle = new StatefulProcessorHandleImpl(store,
getStateInfo.queryRunId,
- keyEncoder)
- assert(processorHandle.getHandleState ==
StatefulProcessorHandleState.CREATED)
- statefulProcessor.init(processorHandle, outputMode)
-
processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
- val result = processDataWithPartition(singleIterator, store,
processorHandle)
- result
+ if (isStreaming) {
+ child.execute().mapPartitionsWithStateStore[InternalRow](
+ getStateInfo,
+ schemaForKeyRow,
+ schemaForValueRow,
+ numColsPrefixKey = 0,
+ session.sqlContext.sessionState,
+ Some(session.sqlContext.streams.stateStoreCoordinator),
+ useColumnFamilies = true
+ ) {
+ case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+ processData(store, singleIterator)
+ }
+ } else {
+ // If the query is running in batch mode, we need to create a new
StateStore and instantiate
+ // a temp directory on the executors in mapPartitionsWithIndex.
+ child.execute().mapPartitionsWithIndex[InternalRow](
+ (i, iter) => {
+ val providerId = {
+ // lazy creation to initialize tempDirPath once
+ lazy val tempDirPath = Utils.createTempDir().getAbsolutePath
Review Comment:
Out of curiosity - did it really evaluate once? This part would be
serialized from driver and deserialized from executor - I see multiple tasks
being executed in the same executor, though I don't know tasks can share the
singleton, even scoped within initialization of providerId.
If you want to confirm with this, add logging on initialization of lazy val
and execute it with real cluster. Again, the reason could be different between
single JVM vs multiple JVMs.
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########
@@ -197,6 +196,18 @@ class TransformWithStateSuite extends StateStoreMetricsTest
}
}
+ test("transformWithState - batch should succeed") {
+ val inputData = Seq("a", "a", "b")
+ val result = inputData.toDS()
+ .groupByKey(x => x)
+ .transformWithState(new RunningCountStatefulProcessor(),
+ TimeoutMode.NoTimeouts(),
+ OutputMode.Append())
+
+ val df = result.toDF()
+ checkAnswer(df, Seq(("a", "1"), ("b", "1")).toDF())
Review Comment:
Why "a" counts to 1 rather than 2?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]