uncleGen opened a new pull request #26156: Failed to get state store when task 
number is not determinate
URL: https://github.com/apache/spark/pull/26156
 
 
   ### What changes were proposed in this pull request?
   
   Add a new optimizer rule to add hint for join for those queries containing 
stream-stream-join.
   
   ### Why are the changes needed?
   
   This is a correctness bug. 
   Now, Spark use the `TaskPartitionId` to determine the StateStore path.
   
   ```
   TaskPartitionId   \ 
   StateStoreVersion  --> StoreProviderId -> StateStore
   StateStoreName    /  
   ```
   In spark stages, the task partition id is determined by the number of tasks. 
As we said the StateStore file path depends on the task partition id. So if 
stream-stream join task partition id is changed against last batch, it will get 
wrong StateStore data or fail with non-exist StateStore data. In some corner 
cases, it happened. Following is a sample pseudocode:
   
   ```
   val df3 = streamDf1.join(streamDf2)
   val df5 = streamDf3.join(batchDf4)
   val df = df3.union(df5)
   df.writeStream...start()
   ```
   A simplified DAG like this:
   
   ```
   DataSourceV2Scan   Scan Relation     DataSourceV2Scan   DataSourceV2Scan
    (streamDf3)            |               (streamDf1)        (streamDf2)
        |                  |                   |                 |
     Exchange(200)      Exchange(200)       Exchange(200)     Exchange(200)
        |                  |                   |                 | 
      Sort                Sort                 |                 |
        \                  /                   \                 /
         \                /                     \               /
           SortMergeJoin                    StreamingSymmetricHashJoin
                        \                 /
                          \             /
                            \         /
                               Union
   ```
   Stream-Steam join task Id will start from 200 to 399 as they are in the same 
stage with `SortMergeJoin`. But when there is no new incoming data in 
`streamDf3` in some batch, it will generate a empty LocalRelation, and then the 
SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, 
Stream-Steam join task Id will start from 1 to 200. Finally, it will get wrong 
StateStore path through TaskPartitionId, and failed with error reading state 
store delta file.
   
   ```
   LocalTableScan   Scan Relation     DataSourceV2Scan   DataSourceV2Scan
        |                  |                   |                 |
   BroadcastExchange       |              Exchange(200)     Exchange(200)
        |                  |                   |                 | 
        |                  |                   |                 |
         \                /                     \               /
          \             /                        \             /
         BroadcastHashJoin                 StreamingSymmetricHashJoin
                        \                 /
                          \             /
                            \         /
                               Union
   ```
   
   ### Does this PR introduce any user-facing change?
   
   No
   
   ### How was this patch tested?
    
   One new UT were added.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to