GitHub user liancheng opened a pull request:
https://github.com/apache/spark/pull/1071
[SPARK-2094][SQL] Exactly once command
## Related JIRA issues
- Main issue:
- [SPARK-2094](https://issues.apache.org/jira/browse/SPARK-2094): Ensure
exactly once semantics for DDL/Commands
- Issues resolved as dependencies:
- [SPARK-2081](https://issues.apache.org/jira/browse/SPARK-2081):
Undefine output() from the abstract class Command and implement it in concrete
subclasses
- [SPARK-2128](https://issues.apache.org/jira/browse/SPARK-2128): No plan
for DESCRIBE
- [SPARK-1852](https://issues.apache.org/jira/browse/SPARK-1852):
SparkSQL Queries with Sorts run before the user asks them to
- Other related issue:
- [SPARK-2129](https://issues.apache.org/jira/browse/SPARK-2129): NPE
thrown while lookup a view
Two test cases, `join_view` and `mergejoin_mixed`, within the
`HiveCompatibilitySuite` are removed from the whitelist to workaround this
issue.
## PR Overview
This PR defines physical plans for DDL statements and commands and wrap
their side effects in a lazy field `PhysicalCommand.sideEffectResult`, so that
they are executed eagerly and exactly once. Also, as a positive side effect,
now DDL statements and commands can be turned into proper `SchemaRDD`s and let
user query the execution results.
This PR defines schemas for the following DDL/commands:
- EXPLAIN command
- `plan`: String, the plan explanation
- SET command
- `key`: String, the key(s) of the propert(y/ies) being set or queried
- `value`: String, the value(s) of the propert(y/ies) being queried
- Other Hive naive command
- `result`: String, execution result returned by Hive
**NOTE**: We should refine schemas for different native commands by
defining physical plans for them in the future.
## Examples
### EXPLAIN command
Take the "EXPLAIN" command as an example, we first execute the command and
obtain a `SchemaRDD` at the same time, then query the `plan` field with the
schema DSL:
```
scala> loadTestTable("src")
...
scala> val q0 = hql("EXPLAIN SELECT key, COUNT(*) FROM src GROUP BY key")
...
q0: org.apache.spark.sql.SchemaRDD =
SchemaRDD[0] at RDD at SchemaRDD.scala:98
== Query Plan ==
ExplainCommandPhysical [plan#11:0]
Aggregate false, [key#4], [key#4,SUM(PartialCount#6L) AS c_1#2L]
Exchange (HashPartitioning [key#4:0], 200)
Exchange (HashPartitioning [key#4:0], 200)
Aggregate true, [key#4], [key#4,COUNT(1) AS PartialCount#6L]
HiveTableScan [key#4], (MetastoreRelation default, src, None), None
scala> q0.select('plan).collect()
...
[ExplainCommandPhysical [plan#24:0]
Aggregate false, [key#17], [key#17,SUM(PartialCount#19L) AS c_1#2L]
Exchange (HashPartitioning [key#17:0], 200)
Exchange (HashPartitioning [key#17:0], 200)
Aggregate true, [key#17], [key#17,COUNT(1) AS PartialCount#19L]
HiveTableScan [key#17], (MetastoreRelation default, src, None), None]
scala>
```
### SET command
In this example we query all the properties set in `SQLConf`, register the
result as a table, and then query the table with HiveQL:
```
scala> val q1 = hql("SET")
...
q1: org.apache.spark.sql.SchemaRDD =
SchemaRDD[7] at RDD at SchemaRDD.scala:98
== Query Plan ==
<SET command: executed by Hive, and noted by SQLContext>
scala> q1.registerAsTable("properties")
scala> hql("SELECT key, value FROM properties ORDER BY key LIMIT
10").foreach(println)
...
== Query Plan ==
TakeOrdered 10, [key#51:0 ASC]
Project [key#51:0,value#52:1]
SetCommandPhysical None, None, [key#55:0,value#56:1]), which has no
missing parents
14/06/12 12:19:27 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
from Stage 5 (SchemaRDD[21] at RDD at SchemaRDD.scala:98
== Query Plan ==
TakeOrdered 10, [key#51:0 ASC]
Project [key#51:0,value#52:1]
SetCommandPhysical None, None, [key#55:0,value#56:1])
...
[datanucleus.autoCreateSchema,true]
[datanucleus.autoStartMechanismMode,checked]
[datanucleus.cache.level2,false]
[datanucleus.cache.level2.type,none]
[datanucleus.connectionPoolingType,BONECP]
[datanucleus.fixedDatastore,false]
[datanucleus.identifierFactory,datanucleus1]
[datanucleus.plugin.pluginRegistryBundleCheck,LOG]
[datanucleus.rdbms.useLegacyNativeValueStrategy,true]
[datanucleus.storeManagerType,rdbms]
scala>
```
### "Exactly once" semantics
At last, an example of the "exactly once" semantics:
```
scala> val q2 = hql("CREATE TABLE t1(key INT, value STRING)")
...
q2: org.apache.spark.sql.SchemaRDD =
SchemaRDD[28] at RDD at SchemaRDD.scala:98
== Query Plan ==
<Native command: executed by Hive>
scala> table("t1")
...
res9: org.apache.spark.sql.SchemaRDD =
SchemaRDD[32] at RDD at SchemaRDD.scala:98
== Query Plan ==
HiveTableScan [key#58,value#59], (MetastoreRelation default, xxx, None),
None
scala> q2.collect()
...
res10: Array[org.apache.spark.sql.Row] = Array([])
scala>
```
As we can see, the "CREATE TABLE" command is executed eagerly right after
the `SchemaRDD` is created, and referencing the `SchemaRDD` again won't trigger
a duplicated execution.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/liancheng/spark exactlyOnceCommand
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/1071.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1071
----
commit 0ad343a7164a8af74c0f2db14a3df2351335a7bd
Author: Cheng Lian <[email protected]>
Date: 2014-06-11T22:14:24Z
Added physical plan for DDL and commands to ensure the "exactly once"
semantics
commit 74789c1197e7ea936b25714f2b8ce131b96ef62d
Author: Cheng Lian <[email protected]>
Date: 2014-06-12T19:33:14Z
Fixed failing test cases
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---