Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42702730
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -0,0 +1,398 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.encoders._
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.logical._
    +import org.apache.spark.sql.execution.QueryExecution
    +import org.apache.spark.sql.types.StructType
    +
    +/**
    + * A [[Dataset]] is a strongly typed collection of objects that can be 
transformed in parallel
    + * using functional or relational operations.
    + *
    + * A [[Dataset]] differs from an [[RDD]] in the following ways:
    + *  - Internally, a [[Dataset]] is represented by a Catalyst logical plan 
and the data is stored
    + *    in the encoded form.  This representation allows for additional 
logical operations and
    + *    enables many operations (sorting, shuffling, etc.) to be performed 
without deserializing to
    + *    an object.
    + *  - The creation of a [[Dataset]] requires the presence of an explicit 
[[Encoder]] that can be
    + *    used to serialize the object into a binary format.  Encoders are 
also capable of mapping the
    + *    schema of a given object to the Spark SQL type system.  In contrast, 
RDDs rely on runtime
    + *    reflection based serialization. Operations that change the type of 
object stored in the
    + *    dataset also need an encoder for the new type.
    + *
    + * A [[Dataset]] can be thought of as a specialized DataFrame, where the 
elements map to a specific
    + * JVM object type, instead of to a generic [[Row]] container. A DataFrame 
can be transformed into
    + * specific Dataset by calling `df.as[ElementType]`.  Similarly you can 
transform a strongly-typed
    + * [[Dataset]] to a generic DataFrame by calling `ds.toDF()`.
    + *
    + * COMPATIBILITY NOTE: Long term we plan to make [[DataFrame]] extend 
`Dataset[Row]`.  However,
    + * making this change to che class hierarchy would break the function 
signatures for the existing
    + * functional operations (map, flatMap, etc).  As such, this class should 
be considered a preview
    + * of the final API.  Changes will be made to the interface after Spark 
1.6.
    + *
    + * @since 1.6.0
    + */
    +@Experimental
    +class Dataset[T] private[sql](
    +    @transient val sqlContext: SQLContext,
    +    @transient val queryExecution: QueryExecution)(
    +    implicit val encoder: Encoder[T]) extends Serializable {
    +
    +  private implicit def classTag = encoder.clsTag
    +
    +  private[sql] def this(sqlContext: SQLContext, plan: 
LogicalPlan)(implicit encoder: Encoder[T]) =
    +    this(sqlContext, new QueryExecution(sqlContext, plan))
    +
    +  /** Returns the schema of the encoded form of the objects in this 
[[Dataset]]. */
    +  def schema: StructType = encoder.schema
    +
    +  /* ************* *
    +   *  Conversions  *
    +   * ************* */
    +
    +  /**
    +   * Returns a new `Dataset` where each record has been mapped on to the 
specified type.
    +   * TODO: should bind here...
    +   * TODO: document binding rules
    +   * @since 1.6.0
    +   */
    +  def as[U : Encoder]: Dataset[U] = new Dataset(sqlContext, 
queryExecution)(implicitly[Encoder[U]])
    +
    +  /**
    +   * Applies a logical alias to this [[Dataset]] that can be used to 
disambiguate columns that have
    +   * the same name after two Datasets have been joined.
    +   */
    +  def as(alias: String): Dataset[T] = withPlan(Subquery(alias, _))
    +
    +  /**
    +   * Converts this strongly typed collection of data to generic Dataframe. 
 In contrast to the
    +   * strongly typed objects that Dataset operations work on, a Dataframe 
returns generic [[Row]]
    +   * objects that allow fields to be accessed by ordinal or name.
    +   */
    +  def toDF(): DataFrame = DataFrame(sqlContext, logicalPlan)
    +
    +
    +  /**
    +   * Returns this Dataset.
    +   * @since 1.6.0
    +   */
    +  def toDS(): Dataset[T] = this
    +
    +  /**
    +   * Converts this Dataset to an RDD.
    +   * @since 1.6.0
    +   */
    +  def rdd: RDD[T] = {
    +    val tEnc = implicitly[Encoder[T]]
    +    val input = queryExecution.analyzed.output
    +    queryExecution.toRdd.mapPartitions { iter =>
    +      val bound = tEnc.bind(input)
    +      iter.map(bound.fromRow)
    +    }
    +  }
    +
    +  /* *********************** *
    +   *  Functional Operations  *
    +   * *********************** */
    +
    +  /**
    +   * Concise syntax for chaining custom transformations.
    +   * {{{
    +   *   def featurize(ds: Dataset[T]) = ...
    +   *
    +   *   dataset
    +   *     .transform(featurize)
    +   *     .transform(...)
    +   * }}}
    +   *
    +   * @since 1.6.0
    +   */
    +  def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this)
    +
    +  /**
    +   * Returns a new [[Dataset]] that only contains elements where `func` 
returns `true`.
    +   * @since 1.6.0
    +   */
    +  def filter(func: T => Boolean): Dataset[T] = 
mapPartitions(_.filter(func))
    +
    +  /**
    +   * Returns a new [[Dataset]] that only contains elements where `func` 
returns `false`.
    +   * @since 1.6.0
    +   */
    +  def filterNot(func: T => Boolean): Dataset[T] = 
mapPartitions(_.filterNot(func))
    --- End diff --
    
    do we need this?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to