GitHub user liancheng opened a pull request:

    https://github.com/apache/spark/pull/1071

    [SPARK-2094][SQL] Exactly once command

    ## Related JIRA issues
    
    - Main issue:
    
      - [SPARK-2094](https://issues.apache.org/jira/browse/SPARK-2094): Ensure 
exactly once semantics for DDL/Commands
    
    - Issues resolved as dependencies:
    
      - [SPARK-2081](https://issues.apache.org/jira/browse/SPARK-2081): 
Undefine output() from the abstract class Command and implement it in concrete 
subclasses
      - [SPARK-2128](https://issues.apache.org/jira/browse/SPARK-2128): No plan 
for DESCRIBE
      - [SPARK-1852](https://issues.apache.org/jira/browse/SPARK-1852): 
SparkSQL Queries with Sorts run before the user asks them to
    
    - Other related issue:
    
      - [SPARK-2129](https://issues.apache.org/jira/browse/SPARK-2129): NPE 
thrown while lookup a view
    
        Two test cases, `join_view` and `mergejoin_mixed`, within the 
`HiveCompatibilitySuite` are removed from the whitelist to workaround this 
issue.
    
    ## PR Overview
    
    This PR defines physical plans for DDL statements and commands and wrap 
their side effects in a lazy field `PhysicalCommand.sideEffectResult`, so that 
they are executed eagerly and exactly once.  Also, as a positive side effect, 
now DDL statements and commands can be turned into proper `SchemaRDD`s and let 
user query the execution results.
    
    This PR defines schemas for the following DDL/commands:
    
    - EXPLAIN command
    
      - `plan`: String, the plan explanation
      
    - SET command
    
      - `key`: String, the key(s) of the propert(y/ies) being set or queried
      - `value`: String, the value(s) of the propert(y/ies) being queried
    
    - Other Hive naive command
    
      - `result`: String, execution result returned by Hive
      
      **NOTE**: We should refine schemas for different native commands by 
defining physical plans for them in the future.
    
    ## Examples
    
    ### EXPLAIN command
    
    Take the "EXPLAIN" command as an example, we first execute the command and 
obtain a `SchemaRDD` at the same time, then query the `plan` field with the 
schema DSL:
    
    ```
    scala> loadTestTable("src")
    ...
    
    scala> val q0 = hql("EXPLAIN SELECT key, COUNT(*) FROM src GROUP BY key")
    ...
    q0: org.apache.spark.sql.SchemaRDD =
    SchemaRDD[0] at RDD at SchemaRDD.scala:98
    == Query Plan ==
    ExplainCommandPhysical [plan#11:0]
     Aggregate false, [key#4], [key#4,SUM(PartialCount#6L) AS c_1#2L]
      Exchange (HashPartitioning [key#4:0], 200)
       Exchange (HashPartitioning [key#4:0], 200)
        Aggregate true, [key#4], [key#4,COUNT(1) AS PartialCount#6L]
         HiveTableScan [key#4], (MetastoreRelation default, src, None), None
    
    scala> q0.select('plan).collect()
    ...
    [ExplainCommandPhysical [plan#24:0]
     Aggregate false, [key#17], [key#17,SUM(PartialCount#19L) AS c_1#2L]
      Exchange (HashPartitioning [key#17:0], 200)
       Exchange (HashPartitioning [key#17:0], 200)
        Aggregate true, [key#17], [key#17,COUNT(1) AS PartialCount#19L]
         HiveTableScan [key#17], (MetastoreRelation default, src, None), None]
    
    scala>
    ```
    
    ### SET command
    
    In this example we query all the properties set in `SQLConf`, register the 
result as a table, and then query the table with HiveQL: 
    
    ```
    scala> val q1 = hql("SET")
    ...
    q1: org.apache.spark.sql.SchemaRDD =
    SchemaRDD[7] at RDD at SchemaRDD.scala:98
    == Query Plan ==
    <SET command: executed by Hive, and noted by SQLContext>
    
    scala> q1.registerAsTable("properties")
    
    scala> hql("SELECT key, value FROM properties ORDER BY key LIMIT 
10").foreach(println)
    ...
    == Query Plan ==
    TakeOrdered 10, [key#51:0 ASC]
     Project [key#51:0,value#52:1]
      SetCommandPhysical None, None, [key#55:0,value#56:1]), which has no 
missing parents
    14/06/12 12:19:27 INFO scheduler.DAGScheduler: Submitting 1 missing tasks 
from Stage 5 (SchemaRDD[21] at RDD at SchemaRDD.scala:98
    == Query Plan ==
    TakeOrdered 10, [key#51:0 ASC]
     Project [key#51:0,value#52:1]
      SetCommandPhysical None, None, [key#55:0,value#56:1])
    ...
    [datanucleus.autoCreateSchema,true]
    [datanucleus.autoStartMechanismMode,checked]
    [datanucleus.cache.level2,false]
    [datanucleus.cache.level2.type,none]
    [datanucleus.connectionPoolingType,BONECP]
    [datanucleus.fixedDatastore,false]
    [datanucleus.identifierFactory,datanucleus1]
    [datanucleus.plugin.pluginRegistryBundleCheck,LOG]
    [datanucleus.rdbms.useLegacyNativeValueStrategy,true]
    [datanucleus.storeManagerType,rdbms]
    
    scala>
    ```
    
    ### "Exactly once" semantics
    
    At last, an example of the "exactly once" semantics:
    
    ```
    scala> val q2 = hql("CREATE TABLE t1(key INT, value STRING)")
    ...
    q2: org.apache.spark.sql.SchemaRDD =
    SchemaRDD[28] at RDD at SchemaRDD.scala:98
    == Query Plan ==
    <Native command: executed by Hive>
    
    scala> table("t1")
    ...
    res9: org.apache.spark.sql.SchemaRDD =
    SchemaRDD[32] at RDD at SchemaRDD.scala:98
    == Query Plan ==
    HiveTableScan [key#58,value#59], (MetastoreRelation default, xxx, None), 
None
    
    scala> q2.collect()
    ...
    res10: Array[org.apache.spark.sql.Row] = Array([])
    
    scala>
    ```
    
    As we can see, the "CREATE TABLE" command is executed eagerly right after 
the `SchemaRDD` is created, and referencing the `SchemaRDD` again won't trigger 
a duplicated execution.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/liancheng/spark exactlyOnceCommand

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1071.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1071
    
----
commit 0ad343a7164a8af74c0f2db14a3df2351335a7bd
Author: Cheng Lian <[email protected]>
Date:   2014-06-11T22:14:24Z

    Added physical plan for DDL and commands to ensure the "exactly once" 
semantics

commit 74789c1197e7ea936b25714f2b8ce131b96ef62d
Author: Cheng Lian <[email protected]>
Date:   2014-06-12T19:33:14Z

    Fixed failing test cases

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to