Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20295#discussion_r172250093
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
 ---
    @@ -75,28 +76,66 @@ case class FlatMapGroupsInPandasExec(
         val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
         val reuseWorker = 
inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
         val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction)))
    -    val argOffsets = Array((0 until (child.output.length - 
groupingAttributes.length)).toArray)
    -    val schema = StructType(child.schema.drop(groupingAttributes.length))
         val sessionLocalTimeZone = conf.sessionLocalTimeZone
         val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone
     
    +    // Deduplicate the grouping attributes.
    +    // If a grouping attribute also appears in data attributes, then we 
don't need to send the
    +    // grouping attribute to Python worker. If a grouping attribute is not 
in data attributes,
    +    // then we need to send this grouping attribute to python worker.
    +    //
    +    // We use argOffsets to distinguish grouping attributes and data 
attributes as following:
    +    //
    +    // argOffsets[0] is the length of grouping attributes
    +    // argOffsets[1 .. argOffsets[0]+1] is the arg offsets for grouping 
attributes
    +    // argOffsets[argOffsets[0]+1 .. ] is the arg offsets for data 
attributes
    +
    +    val dupGroupingIndices = new ArrayBuffer[Int]
    +    val groupingArgOffsets = new ArrayBuffer[Int]
    +    val extraGroupingAttributes = new ArrayBuffer[Attribute]
    +
    +    val dataAttributes = child.output.drop(groupingAttributes.length)
    +    groupingAttributes.foreach { attribute =>
    +      val index = dataAttributes.indexWhere(
    +        childAttribute => attribute.semanticEquals(childAttribute))
    +      dupGroupingIndices += index
    +    }
    --- End diff --
    
    Fixed. Thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to