anishshri-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1473313760


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -152,22 +159,116 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, 
getStateInfo.queryRunId)
-        assert(processorHandle.getHandleState == 
StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        
processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, 
processorHandle)
-        result
+    if (isStreaming) {
+      child.execute().mapPartitionsWithStateStore[InternalRow](
+        getStateInfo,
+        schemaForKeyRow,
+        schemaForValueRow,
+        numColsPrefixKey = 0,
+        session.sqlContext.sessionState,
+        Some(session.sqlContext.streams.stateStoreCoordinator),
+        useColumnFamilies = true
+      ) {
+        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+          processData(store, singleIterator)
+      }
+    } else {
+      // If the query is running in batch mode, we need to create a new 
StateStore and instantiate
+      // a temp directory on the executors in mapPartitionsWithIndex.
+      child.execute().mapPartitionsWithIndex[InternalRow](
+        (i, iter) => {
+          val providerId = new StateStoreProviderId(
+            StateStoreId(Utils.createTempDir().getAbsolutePath,
+              i, 0), getStateInfo.queryRunId)
+
+          // Create StateStoreProvider for this partition
+          val stateStoreProvider = StateStoreProvider.createAndInit(
+            providerId,
+            schemaForKeyRow,
+            schemaForValueRow,
+            numColsPrefixKey = 0,
+            useColumnFamilies = true,
+            storeConf = StateStoreConf(getDefaultStateStoreSQLConf),
+            hadoopConf = new Configuration())
+
+          val store = stateStoreProvider.getStore(0)
+          processData(store, iter)
+        }
+      )
     }
   }
+
+  /**
+   * Process the data in the partition using the state store and the stateful 
processor.
+   * @param store The state store to use
+   * @param singleIterator The iterator of rows to process
+   * @return An iterator of rows that are the result of processing the input 
rows
+   */
+  private def processData(store: StateStore, singleIterator: 
Iterator[InternalRow]):
+    CompletionIterator[InternalRow, Iterator[InternalRow]] = {
+    val processorHandle = new StatefulProcessorHandleImpl(
+      store, getStateInfo.queryRunId, isStreaming)
+    assert(processorHandle.getHandleState == 
StatefulProcessorHandleState.CREATED)
+    statefulProcessor.init(processorHandle, outputMode)
+    processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
+    val result = processDataWithPartition(singleIterator, store, 
processorHandle)
+    result
+  }
+
+  /**
+   * Set the default SQLConf values for the State Store.
+   * This is used for the batch operator, since these SQLConfs are not
+   * automatically populated for batch queries.
+   * @return SQLConf with default values with RocksDBStateStoreProvider
+   */
+  private def getDefaultStateStoreSQLConf: SQLConf = {
+      val sqlConf = new SQLConf()
+      StateStoreConf.sqlConfKeys.foreach {
+        case [email protected]_STORE_PROVIDER_CLASS =>
+          sqlConf.setConfString(conf.key, 
classOf[RocksDBStateStoreProvider].getName)
+        case conf => sqlConf.setConfString(conf.key, conf.defaultValueString)
+      }
+      sqlConf
+  }
+}
+
+

Review Comment:
   nit: extra newline ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to