Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19229#discussion_r142021142
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -2083,22 +2083,57 @@ class Dataset[T] private[sql](
        * @group untypedrel
        * @since 2.0.0
        */
    -  def withColumn(colName: String, col: Column): DataFrame = {
    +  def withColumn(colName: String, col: Column): DataFrame = 
withColumns(Seq(colName), Seq(col))
    +
    +  /**
    +   * Returns a new Dataset by adding columns or replacing the existing 
columns that has
    +   * the same names.
    +   */
    +  private[spark] def withColumns(colNames: Seq[String], cols: 
Seq[Column]): DataFrame = {
    +    require(colNames.size == cols.size,
    +      s"The size of column names: ${colNames.size} isn't equal to " +
    +        s"the size of columns: ${cols.size}")
    +    SchemaUtils.checkColumnNameDuplication(
    +      colNames,
    +      "in given column names",
    +      sparkSession.sessionState.conf.caseSensitiveAnalysis)
    +
         val resolver = sparkSession.sessionState.analyzer.resolver
         val output = queryExecution.analyzed.output
    -    val shouldReplace = output.exists(f => resolver(f.name, colName))
    -    if (shouldReplace) {
    -      val columns = output.map { field =>
    -        if (resolver(field.name, colName)) {
    -          col.as(colName)
    -        } else {
    -          Column(field)
    -        }
    +
    +    val columnMap = colNames.zip(cols).toMap
    +
    +    val replacedAndExistingColumns = output.map { field =>
    +      val dupColumn = columnMap.find { case (colName, col) =>
    +        resolver(field.name, colName)
           }
    -      select(columns : _*)
    -    } else {
    -      select(Column("*"), col.as(colName))
    +      if (dupColumn.isDefined) {
    +        val colName = dupColumn.get._1
    +        val col = dupColumn.get._2
    +        col.as(colName)
    +      } else {
    +        Column(field)
    +      }
    +    }
    +
    +    val newColumns = columnMap.filter { case (colName, col) =>
    +      !output.exists(f => resolver(f.name, colName))
    +    }.map { case (colName, col) => col.as(colName) }
    +
    +    select(replacedAndExistingColumns ++ newColumns : _*)
    +  }
    +
    +  /**
    +   * Returns a new Dataset by adding columns with metadata.
    +   */
    +  private[spark] def withColumns(
    --- End diff --
    
    Ok. We can add this when we need it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to