Github user liancheng commented on a diff in the pull request:
https://github.com/apache/spark/pull/956#discussion_r13582230
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -269,6 +271,22 @@ class SQLContext(@transient val sparkContext:
SparkContext)
protected abstract class QueryExecution {
def logical: LogicalPlan
+ def eagerlyProcess(plan: LogicalPlan): RDD[Row] = plan match {
+ case SetCommand(key, value) =>
--- End diff --
Hi @concretevitamin, previously I thought the only important thing of a
`SchemaRDD` made from an eagerly executed statement (a Hive native command, a
set command, or some insertion command etc.) is its side effect, not the
content of the RDD itself. But now I do agree that when user call `.collect()`
of such an RDD, we should present some meaningful result.
As for the eager evaluation of the RDD content, IMHO, for command
`SchemaRDD`s, the RDD contents are either empty or usually generated at the
same time when the command is executed (at least I can't find a counter example
in the current code base), thus this shouldn't be an issue.
So I think the constraints presented here are:
1. The side effect of a command `SchemaRDD` should take place eagerly;
1. The side effect of a command `SchemaRDD` should take place once and only
once;
1. When `.collect()` method is called, something meaningful, usually the
output message lines of the command, should be presented.
Then how about adding a lazy field inside all the physical command nodes to
wrap up the side effect and hold the command output? Take the
`SetCommandPhysical` as an example:
```scala
trait PhysicalCommand(@transient context: SQLContext) {
lazy val commandOutput: Any
}
case class SetCommandPhysical(
key: Option[String], value: Option[String], output: Seq[Attribute])(
@transient context: SQLContext)
extends PhysicalCommand(context)
with PhysicalCommand {
override lazy val commandOutput = {
// Perform the side effect, and record appropriate output
???
}
def execute(): RDD[Row] = {
val row = new GenericRow(Array[Any](commandOutput))
context.sparkContext.parallelize(row, 1)
}
}
```
In this way, all the constraints are met:
1. Eager evaluation: done by the `toRdd` call in `SchemaRDDLike` (PR #948),
1. Side effect should take place once and only once: ensured by the lazy
`commandOutput` field,
1. Present meaningful output as RDD contents: command output is held by
`commandOutput` and returned in `execute()`.
An additional benefit is that, side effect logic of all the commands can be
implemented within their own physical command nodes, instead of adding special
cases inside `SQLContext.toRdd` and/or `HiveContext.toRdd`.
Did I miss something here?
On the other hand, although I think this solution can be more concise and
clean, it may involve some non-trivial changes, so I think we'd better merge
this PR as is and make those changes in another PR when appropriate.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---