HeartSaVioR commented on a change in pull request #30812:
URL: https://github.com/apache/spark/pull/30812#discussion_r545528442



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
##########
@@ -42,6 +43,24 @@ abstract class BaseStateStoreRDD[T: ClassTag, U: ClassTag](
   protected val hadoopConfBroadcast = dataRDD.context.broadcast(
     new SerializableConfiguration(sessionState.newHadoopConf()))
 
+  lazy private val executorMap: mutable.HashMap[String, Int] = {

Review comment:
       Same here.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala
##########
@@ -213,15 +214,39 @@ object StreamingSymmetricHashJoinHelper extends Logging {
       @transient private val storeCoordinator: 
Option[StateStoreCoordinatorRef])
       extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2)) {
 
+    lazy private val executorMap: mutable.HashMap[String, Int] = {

Review comment:
       If I understand correctly, we can simply do the round robin and achieve 
the same thing. 
   
   Initialize the new immutable array or list from `context.getExecutorIds()` 
(if `context.getExecutorIds()` is also immutable this is also not necessary as 
well), and have a variable for index while would move by one per assignment 
(like `++idx % count`).

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala
##########
@@ -213,15 +214,39 @@ object StreamingSymmetricHashJoinHelper extends Logging {
       @transient private val storeCoordinator: 
Option[StateStoreCoordinatorRef])
       extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2)) {
 
+    lazy private val executorMap: mutable.HashMap[String, Int] = {

Review comment:
       Btw, what we'd like to achieve is trying to distribute state across all 
executors, then this PR looks to miss accounting executors containing active 
state. We'll want to also increase the count for the case `stateStoreLocs` is 
available as well. Probably better than simple round robin.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to