Github user icexelloss commented on a diff in the pull request:
https://github.com/apache/spark/pull/20295#discussion_r172250020
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
---
@@ -75,28 +76,66 @@ case class FlatMapGroupsInPandasExec(
val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
val reuseWorker =
inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction)))
- val argOffsets = Array((0 until (child.output.length -
groupingAttributes.length)).toArray)
- val schema = StructType(child.schema.drop(groupingAttributes.length))
val sessionLocalTimeZone = conf.sessionLocalTimeZone
val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone
+ // Deduplicate the grouping attributes.
+ // If a grouping attribute also appears in data attributes, then we
don't need to send the
+ // grouping attribute to Python worker. If a grouping attribute is not
in data attributes,
+ // then we need to send this grouping attribute to python worker.
+ //
+ // We use argOffsets to distinguish grouping attributes and data
attributes as following:
+ //
+ // argOffsets[0] is the length of grouping attributes
+ // argOffsets[1 .. argOffsets[0]+1] is the arg offsets for grouping
attributes
+ // argOffsets[argOffsets[0]+1 .. ] is the arg offsets for data
attributes
+
+ val dupGroupingIndices = new ArrayBuffer[Int]
+ val groupingArgOffsets = new ArrayBuffer[Int]
+ val extraGroupingAttributes = new ArrayBuffer[Attribute]
+
+ val dataAttributes = child.output.drop(groupingAttributes.length)
+ groupingAttributes.foreach { attribute =>
+ val index = dataAttributes.indexWhere(
+ childAttribute => attribute.semanticEquals(childAttribute))
+ dupGroupingIndices += index
+ }
+
+ val extraGroupingSize = dupGroupingIndices.count(_ == -1)
+ (groupingAttributes zip dupGroupingIndices).foreach {
--- End diff --
Fixed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]