GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/18355

    Added StateStoreProviderId with queryRunId to reload StateStoreProviders 
when query is restarted

    ## What changes were proposed in this pull request?
    StateStoreProvider instances are loaded on-demand in a executor when a 
query is started. When a query is restarted, the loaded provider instance will 
get reused. Now, there is a non-trivial chance, that the task of the previous 
query run is still running, while the tasks of the restarted run has started. 
So for a stateful partition, there may be two concurrent tasks related to the 
same stateful partition, and there for using the same provider instance. This 
can lead to inconsistent results and possibly random failures, as state store 
implementations are not designed to be thread-safe.
    
    To fix this, I have introduced a StateStoreProviderId, that unique 
identifies a provider loaded in an executor. It has the query run id in it, 
thus making sure that restarted queries will force the executor to load a new 
provider instance, thus avoiding two concurrent tasks (from two different runs) 
from reusing the same provider instance.
    
    Additional minor bug fixes
    - All state stores related to query run is marked as deactivated in the 
StateStoreCoordinator so that the executors can unload them and clear resources.
    - Moved the code that determined the checkpoint directory of a state store 
from implementation-specific code (HDFSBackedStateStoreProvider) to 
non-specific code (StateStoreId), so that implementation do not accidentally 
get it wrong.
      - Also added store name to the path, to support multiple stores per sql 
operator partition.
    
    *Note:* This change does not address the scenario where two tasks of the 
same run (e.g. speculative tasks) are concurrently running in the same 
executor. The chance of this very small, because ideally speculative tasks 
should never run in the same executor.
    
    ## How was this patch tested?
    Existing unit tests + new unit test.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark SPARK-21145

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/18355.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #18355
    
----
commit 3da6b0ff671e86f6074e73d4974852bce50fd347
Author: Tathagata Das <[email protected]>
Date:   2017-06-19T22:46:25Z

    Added StateStoreProviderId with queryRunId to reload providers when query 
restarted

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to