Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11431#discussion_r54478929
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -574,6 +647,162 @@ class Dataset[T] private[sql](
         sample(withReplacement, fraction, Utils.random.nextLong)
       }
     
    +  /**
    +   * Filters rows using the given condition.
    +   * {{{
    +   *   // The following are equivalent:
    +   *   peopleDs.filter($"age" > 15)
    +   *   peopleDs.where($"age" > 15)
    +   * }}}
    +   * @since 2.0.0
    +   */
    +  def filter(condition: Column): Dataset[T] = 
withPlan(Filter(condition.expr, _))
    +
    +  /**
    +   * Filters rows using the given SQL expression.
    +   * {{{
    +   *   peopleDs.filter("age > 15")
    +   * }}}
    +   * @since 2.0.0
    +   */
    +  def filter(conditionExpr: String): Dataset[T] = {
    +    filter(Column(sqlContext.sqlParser.parseExpression(conditionExpr)))
    +  }
    +
    +  /**
    +   * Filters rows using the given condition. This is an alias for `filter`.
    +   * {{{
    +   *   // The following are equivalent:
    +   *   peopleDs.filter($"age" > 15)
    +   *   peopleDs.where($"age" > 15)
    +   * }}}
    +   * @since 2.0.0
    +   */
    +  def where(condition: Column): Dataset[T] = filter(condition)
    +
    +  /**
    +   * Filters rows using the given SQL expression.
    +   * {{{
    +   *   peopleDs.where("age > 15")
    +   * }}}
    +   * @since 2.0.0
    +   */
    +  def where(conditionExpr: String): Dataset[T] = filter(conditionExpr)
    +
    +  /**
    +   * Returns a new [[Dataset]] by taking the first `n` rows. The 
difference between this function
    +   * and `head` is that `head` returns an array while `limit` returns a 
new [[Dataset]].
    +   * @since 2.0.0
    +   */
    +  def limit(n: Int): Dataset[T] = withPlan(Limit(Literal(n), _))
    +
    +  /**
    +   * Returns a new [[Dataset]] with each partition sorted by the given 
expressions.
    +   *
    +   * This is the same operation as "SORT BY" in SQL (Hive QL).
    +   *
    +   * @since 2.0.0
    +   */
    +  @scala.annotation.varargs
    +  def sortWithinPartitions(sortCol: String, sortCols: String*): Dataset[T] 
= {
    +    sortWithinPartitions((sortCol +: sortCols).map(Column(_)) : _*)
    +  }
    +
    +  /**
    +   * Returns a new [[Dataset]] with each partition sorted by the given 
expressions.
    +   *
    +   * This is the same operation as "SORT BY" in SQL (Hive QL).
    +   *
    +   * @since 2.0.0
    +   */
    +  @scala.annotation.varargs
    +  def sortWithinPartitions(sortExprs: Column*): Dataset[T] = {
    +    sortInternal(global = false, sortExprs)
    +  }
    +
    +  /**
    +   * Returns a new [[Dataset]] sorted by the specified column, all in 
ascending order.
    +   * {{{
    +   *   // The following 3 are equivalent
    +   *   ds.sort("sortcol")
    +   *   ds.sort($"sortcol")
    +   *   ds.sort($"sortcol".asc)
    +   * }}}
    +   * @since 2.0.0
    +   */
    +  @scala.annotation.varargs
    +  def sort(sortCol: String, sortCols: String*): Dataset[T] = {
    +    sort((sortCol +: sortCols).map(apply) : _*)
    +  }
    +
    +  /**
    +   * Returns a new [[Dataset]] sorted by the given expressions. For 
example:
    +   * {{{
    +   *   ds.sort($"col1", $"col2".desc)
    +   * }}}
    +   * @since 2.0.0
    +   */
    +  @scala.annotation.varargs
    +  def sort(sortExprs: Column*): Dataset[T] = {
    +    sortInternal(global = true, sortExprs)
    +  }
    +
    +  /**
    +   * Returns a new [[Dataset]] sorted by the given expressions.
    +   * This is an alias of the `sort` function.
    +   * @since 2.0.0
    +   */
    +  @scala.annotation.varargs
    +  def orderBy(sortCol: String, sortCols: String*): Dataset[T] = 
sort(sortCol, sortCols : _*)
    +
    +  /**
    +   * Returns a new [[Dataset]] sorted by the given expressions.
    +   * This is an alias of the `sort` function.
    +   * @since 2.0.0
    +   */
    +  @scala.annotation.varargs
    +  def orderBy(sortExprs: Column*): Dataset[T] = sort(sortExprs : _*)
    +
    +  /**
    +   * Randomly splits this [[Dataset]] with the provided weights.
    +   *
    +   * @param weights weights for splits, will be normalized if they don't 
sum to 1.
    +   * @param seed Seed for sampling.
    +   * @since 2.0.0
    +   */
    +  def randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]] = 
{
    +    // It is possible that the underlying Dataset doesn't guarantee the 
ordering of rows in its
    +    // constituent partitions each time a split is materialized which 
could result in
    +    // overlapping splits. To prevent this, we explicitly sort each input 
partition to make the
    +    // ordering deterministic.
    +    val sorted = Sort(logicalPlan.output.map(SortOrder(_, Ascending)), 
global = false, logicalPlan)
    +    val sum = weights.sum
    +    val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
    +    normalizedCumWeights.sliding(2).map { x =>
    +      new Dataset(sqlContext, Sample(x(0), x(1), withReplacement = false, 
seed, sorted)())
    --- End diff --
    
    Do we need to pass encoder into newly created Datasets at here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to