Github user liancheng commented on a diff in the pull request:

    https://github.com/apache/spark/pull/956#discussion_r13582230
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
    @@ -269,6 +271,22 @@ class SQLContext(@transient val sparkContext: 
SparkContext)
       protected abstract class QueryExecution {
         def logical: LogicalPlan
     
    +    def eagerlyProcess(plan: LogicalPlan): RDD[Row] = plan match {
    +      case SetCommand(key, value) =>
    --- End diff --
    
    Hi @concretevitamin, previously I thought the only important thing of a 
`SchemaRDD` made from an eagerly executed statement (a Hive native command, a 
set command, or some insertion command etc.) is its side effect, not the 
content of the RDD itself. But now I do agree that when user call `.collect()` 
of such an RDD, we should present some meaningful result.
    
    As for the eager evaluation of the RDD content, IMHO, for command 
`SchemaRDD`s, the RDD contents are either empty or usually generated at the 
same time when the command is executed (at least I can't find a counter example 
in the current code base), thus this shouldn't be an issue.
    
    So I think the constraints presented here are:
    
    1. The side effect of a command `SchemaRDD` should take place eagerly;
    1. The side effect of a command `SchemaRDD` should take place once and only 
once;
    1. When `.collect()` method is called, something meaningful, usually the 
output message lines of the command, should be presented.
    
    Then how about adding a lazy field inside all the physical command nodes to 
wrap up the side effect and hold the command output? Take the 
`SetCommandPhysical` as an example:
    
    ```scala
    trait PhysicalCommand(@transient context: SQLContext) {
       lazy val commandOutput: Any
    }
    
    case class SetCommandPhysical(
        key: Option[String], value: Option[String], output: Seq[Attribute])(
        @transient context: SQLContext)
      extends PhysicalCommand(context)
      with PhysicalCommand {
    
      override lazy val commandOutput = {
        // Perform the side effect, and record appropriate output
        ???
      }
    
      def execute(): RDD[Row] = {
        val row = new GenericRow(Array[Any](commandOutput))
        context.sparkContext.parallelize(row, 1)
      }
    }
    ```
    
    In this way, all the constraints are met:
    
    1. Eager evaluation: done by the `toRdd` call in `SchemaRDDLike` (PR #948),
    1. Side effect should take place once and only once: ensured by the lazy 
`commandOutput` field,
    1. Present meaningful output as RDD contents: command output is held by 
`commandOutput` and returned in `execute()`.
    
    An additional benefit is that, side effect logic of all the commands can be 
implemented within their own physical command nodes, instead of adding special 
cases inside `SQLContext.toRdd` and/or `HiveContext.toRdd`.
    
    Did I miss something here?
    
    On the other hand, although I think this solution can be more concise and 
clean, it may involve some non-trivial changes, so I think we'd better merge 
this PR as is and make those changes in another PR when appropriate.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to