Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9190#discussion_r42702909
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -0,0 +1,398 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.encoders._
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.logical._
    +import org.apache.spark.sql.execution.QueryExecution
    +import org.apache.spark.sql.types.StructType
    +
    +/**
    + * A [[Dataset]] is a strongly typed collection of objects that can be 
transformed in parallel
    + * using functional or relational operations.
    + *
    + * A [[Dataset]] differs from an [[RDD]] in the following ways:
    + *  - Internally, a [[Dataset]] is represented by a Catalyst logical plan 
and the data is stored
    + *    in the encoded form.  This representation allows for additional 
logical operations and
    + *    enables many operations (sorting, shuffling, etc.) to be performed 
without deserializing to
    + *    an object.
    + *  - The creation of a [[Dataset]] requires the presence of an explicit 
[[Encoder]] that can be
    + *    used to serialize the object into a binary format.  Encoders are 
also capable of mapping the
    + *    schema of a given object to the Spark SQL type system.  In contrast, 
RDDs rely on runtime
    + *    reflection based serialization. Operations that change the type of 
object stored in the
    + *    dataset also need an encoder for the new type.
    + *
    + * A [[Dataset]] can be thought of as a specialized DataFrame, where the 
elements map to a specific
    + * JVM object type, instead of to a generic [[Row]] container. A DataFrame 
can be transformed into
    + * specific Dataset by calling `df.as[ElementType]`.  Similarly you can 
transform a strongly-typed
    + * [[Dataset]] to a generic DataFrame by calling `ds.toDF()`.
    + *
    + * COMPATIBILITY NOTE: Long term we plan to make [[DataFrame]] extend 
`Dataset[Row]`.  However,
    + * making this change to che class hierarchy would break the function 
signatures for the existing
    + * functional operations (map, flatMap, etc).  As such, this class should 
be considered a preview
    + * of the final API.  Changes will be made to the interface after Spark 
1.6.
    + *
    + * @since 1.6.0
    + */
    +@Experimental
    +class Dataset[T] private[sql](
    +    @transient val sqlContext: SQLContext,
    +    @transient val queryExecution: QueryExecution)(
    +    implicit val encoder: Encoder[T]) extends Serializable {
    +
    +  private implicit def classTag = encoder.clsTag
    +
    +  private[sql] def this(sqlContext: SQLContext, plan: 
LogicalPlan)(implicit encoder: Encoder[T]) =
    +    this(sqlContext, new QueryExecution(sqlContext, plan))
    +
    +  /** Returns the schema of the encoded form of the objects in this 
[[Dataset]]. */
    +  def schema: StructType = encoder.schema
    +
    +  /* ************* *
    +   *  Conversions  *
    +   * ************* */
    +
    +  /**
    +   * Returns a new `Dataset` where each record has been mapped on to the 
specified type.
    +   * TODO: should bind here...
    +   * TODO: document binding rules
    +   * @since 1.6.0
    +   */
    +  def as[U : Encoder]: Dataset[U] = new Dataset(sqlContext, 
queryExecution)(implicitly[Encoder[U]])
    +
    +  /**
    +   * Applies a logical alias to this [[Dataset]] that can be used to 
disambiguate columns that have
    +   * the same name after two Datasets have been joined.
    +   */
    +  def as(alias: String): Dataset[T] = withPlan(Subquery(alias, _))
    +
    +  /**
    +   * Converts this strongly typed collection of data to generic Dataframe. 
 In contrast to the
    +   * strongly typed objects that Dataset operations work on, a Dataframe 
returns generic [[Row]]
    +   * objects that allow fields to be accessed by ordinal or name.
    +   */
    +  def toDF(): DataFrame = DataFrame(sqlContext, logicalPlan)
    +
    +
    +  /**
    +   * Returns this Dataset.
    +   * @since 1.6.0
    +   */
    +  def toDS(): Dataset[T] = this
    +
    +  /**
    +   * Converts this Dataset to an RDD.
    +   * @since 1.6.0
    +   */
    +  def rdd: RDD[T] = {
    +    val tEnc = implicitly[Encoder[T]]
    +    val input = queryExecution.analyzed.output
    +    queryExecution.toRdd.mapPartitions { iter =>
    +      val bound = tEnc.bind(input)
    +      iter.map(bound.fromRow)
    +    }
    +  }
    +
    +  /* *********************** *
    +   *  Functional Operations  *
    +   * *********************** */
    +
    +  /**
    +   * Concise syntax for chaining custom transformations.
    +   * {{{
    +   *   def featurize(ds: Dataset[T]) = ...
    +   *
    +   *   dataset
    +   *     .transform(featurize)
    +   *     .transform(...)
    +   * }}}
    +   *
    +   * @since 1.6.0
    +   */
    +  def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this)
    +
    +  /**
    +   * Returns a new [[Dataset]] that only contains elements where `func` 
returns `true`.
    +   * @since 1.6.0
    +   */
    +  def filter(func: T => Boolean): Dataset[T] = 
mapPartitions(_.filter(func))
    +
    +  /**
    +   * Returns a new [[Dataset]] that only contains elements where `func` 
returns `false`.
    +   * @since 1.6.0
    +   */
    +  def filterNot(func: T => Boolean): Dataset[T] = 
mapPartitions(_.filterNot(func))
    +
    +  /**
    +   * Returns a new [[Dataset]] that contains the result of applying `func` 
to each element.
    +   * @since 1.6.0
    +   */
    +  def map[U : Encoder](func: T => U): Dataset[U] = 
mapPartitions(_.map(func))
    +
    +  /**
    +   * Returns a new [[Dataset]] that contains the result of applying `func` 
to each element.
    +   * @since 1.6.0
    +   */
    +  def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): 
Dataset[U] = {
    +    new Dataset(
    +      sqlContext,
    +      MapPartitions[T, U](
    +        func,
    +        implicitly[Encoder[T]],
    +        implicitly[Encoder[U]],
    +        implicitly[Encoder[U]].schema.toAttributes,
    +        logicalPlan))
    +  }
    +
    +  def flatMap[U : Encoder](func: T => TraversableOnce[U]): Dataset[U] =
    +    mapPartitions(_.flatMap(func))
    +
    +  /* ************** *
    +   *  Side effects  *
    +   * ************** */
    +
    +  /**
    +   * Runs `func` on each element of this Dataset.
    +   * @since 1.6.0
    +   */
    +  def foreach(func: T => Unit): Unit = rdd.foreach(func)
    +
    +  /**
    +   * Runs `func` on each partition of this Dataset.
    +   * @since 1.6.0
    +   */
    +  def foreachPartition(func: Iterator[T] => Unit): Unit = 
rdd.foreachPartition(func)
    +
    +  /* ************* *
    +   *  Aggregation  *
    +   * ************* */
    +
    +  /**
    +   * Reduces the elements of this Dataset using the specified  binary 
function.  The given function
    +   * must be commutative and associative or the result may be 
non-deterministic.
    +   * @since 1.6.0
    +   */
    +  def reduce(func: (T, T) => T): T = rdd.reduce(func)
    +
    +  /**
    +   * Aggregates the elements of each partition, and then the results for 
all the partitions, using a
    +   * given associative and commutative function and a neutral "zero value".
    +   *
    +   * This behaves somewhat differently than the fold operations 
implemented for non-distributed
    +   * collections in functional languages like Scala. This fold operation 
may be applied to
    +   * partitions individually, and then those results will be folded into 
the final result.
    +   * If op is not commutative, then the result may differ from that of a 
fold applied to a
    +   * non-distributed collection.
    +   * @since 1.6.0
    +   */
    +  def fold(zeroValue: T)(op: (T, T) => T): T = rdd.fold(zeroValue)(op)
    +
    +  /**
    +   * Returns a [[GroupedDataset]] where the data is grouped by the given 
key function.
    +   * @since 1.6.0
    +   */
    +  def groupBy[K : Encoder](func: T => K): GroupedDataset[K, T] = {
    --- End diff --
    
    should we have a group by that takes a column name or expression?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to