Github user maropu commented on a diff in the pull request:
https://github.com/apache/spark/pull/22355#discussion_r216114653
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.InternalRow
+
+
+/**
+ * A [[MutableProjection]] that is calculated by calling `eval` on each of
the specified
+ * expressions.
+ *
+ * @param expressions a sequence of expressions that determine the value
of each column of the
+ * output row.
+ */
+class InterpretedMutableProjection(expressions: Seq[Expression]) extends
MutableProjection {
+ def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
+ this(expressions.map(BindReferences.bindReference(_, inputSchema)))
+
+ private[this] val buffer = new Array[Any](expressions.size)
+
+ override def initialize(partitionIndex: Int): Unit = {
+ expressions.foreach(_.foreach {
+ case n: Nondeterministic => n.initialize(partitionIndex)
+ case _ =>
+ })
+ }
+
+ private[this] val exprArray = expressions.toArray
+ private[this] var mutableRow: InternalRow = new
GenericInternalRow(exprArray.length)
+ def currentValue: InternalRow = mutableRow
+
+ override def target(row: InternalRow): MutableProjection = {
+ mutableRow = row
+ this
+ }
+
+ override def apply(input: InternalRow): InternalRow = {
+ var i = 0
+ while (i < exprArray.length) {
+ // Store the result into buffer first, to make the projection atomic
(needed by aggregation)
+ buffer(i) = exprArray(i).eval(input)
+ i += 1
+ }
+ i = 0
+ while (i < exprArray.length) {
+ mutableRow(i) = buffer(i)
+ i += 1
+ }
+ mutableRow
+ }
+}
--- End diff --
The change seems to look good to me though, I'ld like to address
performance and code quality issues in follow-ups. cc: @gatorsmile
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]