[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...

2018-09-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22355


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...

2018-09-17 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22355#discussion_r218042119
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala
 ---
@@ -69,11 +85,26 @@ class CodeGeneratorWithInterpretedFallbackSuite extends 
SparkFunSuite with PlanT
   test("codegen failures in the CODEGEN_ONLY mode") {
 val errMsg = intercept[ExecutionException] {
   val input = Seq(BoundReference(0, IntegerType, nullable = true))
-  val codegenOnly = CodegenObjectFactoryMode.CODEGEN_ONLY.toString
   withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) {
 FailedCodegenProjection.createObject(input)
   }
+  val noCodegen = CodegenObjectFactoryMode.NO_CODEGEN.toString
--- End diff --

oh.. yes. I'll remove.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...

2018-09-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22355#discussion_r218041490
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala
 ---
@@ -69,11 +85,26 @@ class CodeGeneratorWithInterpretedFallbackSuite extends 
SparkFunSuite with PlanT
   test("codegen failures in the CODEGEN_ONLY mode") {
 val errMsg = intercept[ExecutionException] {
   val input = Seq(BoundReference(0, IntegerType, nullable = true))
-  val codegenOnly = CodegenObjectFactoryMode.CODEGEN_ONLY.toString
   withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) {
 FailedCodegenProjection.createObject(input)
   }
+  val noCodegen = CodegenObjectFactoryMode.NO_CODEGEN.toString
--- End diff --

unnecessary line?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...

2018-09-14 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22355#discussion_r217860646
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
+
+
+/**
+ * A [[MutableProjection]] that is calculated by calling `eval` on each of 
the specified
+ * expressions.
+ *
+ * @param expressions a sequence of expressions that determine the value 
of each column of the
+ *output row.
+ */
+class InterpretedMutableProjection(expressions: Seq[Expression]) extends 
MutableProjection {
+  def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
+this(toBoundExprs(expressions, inputSchema))
+
+  private[this] val buffer = new Array[Any](expressions.size)
+
+  override def initialize(partitionIndex: Int): Unit = {
+expressions.foreach(_.foreach {
+  case n: Nondeterministic => n.initialize(partitionIndex)
+  case _ =>
+})
+  }
+
+  private[this] val validExprs = expressions.zipWithIndex.filter {
+case (NoOp, _) => false
+case _ => true
+  }
+  private[this] var mutableRow: InternalRow = new 
GenericInternalRow(expressions.size)
+  def currentValue: InternalRow = mutableRow
+
+  override def target(row: InternalRow): MutableProjection = {
+mutableRow = row
+this
+  }
+
+  override def apply(input: InternalRow): InternalRow = {
+validExprs.foreach { case (expr, i) =>
--- End diff --

oh, I forgot that we should do that in performance-sensitive places... 
Thanks! I'll update.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...

2018-09-14 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22355#discussion_r217841164
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
+
+
+/**
+ * A [[MutableProjection]] that is calculated by calling `eval` on each of 
the specified
+ * expressions.
+ *
+ * @param expressions a sequence of expressions that determine the value 
of each column of the
+ *output row.
+ */
+class InterpretedMutableProjection(expressions: Seq[Expression]) extends 
MutableProjection {
+  def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
+this(toBoundExprs(expressions, inputSchema))
+
+  private[this] val buffer = new Array[Any](expressions.size)
+
+  override def initialize(partitionIndex: Int): Unit = {
+expressions.foreach(_.foreach {
+  case n: Nondeterministic => n.initialize(partitionIndex)
+  case _ =>
+})
+  }
+
+  private[this] val validExprs = expressions.zipWithIndex.filter {
+case (NoOp, _) => false
+case _ => true
+  }
+  private[this] var mutableRow: InternalRow = new 
GenericInternalRow(expressions.size)
+  def currentValue: InternalRow = mutableRow
+
+  override def target(row: InternalRow): MutableProjection = {
+mutableRow = row
+this
+  }
+
+  override def apply(input: InternalRow): InternalRow = {
+validExprs.foreach { case (expr, i) =>
--- End diff --

Can you please use the old code? That should be much more performant that 
this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...

2018-09-12 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22355#discussion_r217242015
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala
 ---
@@ -37,19 +37,22 @@ object CodegenObjectFactoryMode extends Enumeration {
  */
 abstract class CodeGeneratorWithInterpretedFallback[IN, OUT] extends 
Logging {
 
-  def createObject(in: IN): OUT = {
+  def createObject(in: IN): OUT =
+createObject(in, subexpressionEliminationEnabled = false)
--- End diff --

ok, I'll do that first in another pr.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...

2018-09-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22355#discussion_r217241810
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala
 ---
@@ -37,19 +37,22 @@ object CodegenObjectFactoryMode extends Enumeration {
  */
 abstract class CodeGeneratorWithInterpretedFallback[IN, OUT] extends 
Logging {
 
-  def createObject(in: IN): OUT = {
+  def createObject(in: IN): OUT =
+createObject(in, subexpressionEliminationEnabled = false)
--- End diff --

I took some more look, seems it's not an actual config, but just a method 
parameter. Maybe we should remove this config in 3.0.

that said, let's keep your PR as it is.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...

2018-09-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22355#discussion_r217233868
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala
 ---
@@ -37,19 +37,22 @@ object CodegenObjectFactoryMode extends Enumeration {
  */
 abstract class CodeGeneratorWithInterpretedFallback[IN, OUT] extends 
Logging {
 
-  def createObject(in: IN): OUT = {
+  def createObject(in: IN): OUT =
+createObject(in, subexpressionEliminationEnabled = false)
--- End diff --

can we do that first? I think that will be a small change and can also make 
this PR simpler and easier to review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...

2018-09-12 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22355#discussion_r217058960
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala
 ---
@@ -37,19 +37,22 @@ object CodegenObjectFactoryMode extends Enumeration {
  */
 abstract class CodeGeneratorWithInterpretedFallback[IN, OUT] extends 
Logging {
 
-  def createObject(in: IN): OUT = {
+  def createObject(in: IN): OUT =
+createObject(in, subexpressionEliminationEnabled = false)
--- End diff --

ah, it might be yes. I'll check in follow-up.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...

2018-09-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22355#discussion_r217054661
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala
 ---
@@ -37,19 +37,22 @@ object CodegenObjectFactoryMode extends Enumeration {
  */
 abstract class CodeGeneratorWithInterpretedFallback[IN, OUT] extends 
Logging {
 
-  def createObject(in: IN): OUT = {
+  def createObject(in: IN): OUT =
+createObject(in, subexpressionEliminationEnabled = false)
--- End diff --

Can we eliminate it? I think we can use `SQLConf.get` directly when we need 
to access the conf. This should be done in a different PR though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...

2018-09-12 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22355#discussion_r217017120
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.InternalRow
+
+
+/**
+ * A [[MutableProjection]] that is calculated by calling `eval` on each of 
the specified
+ * expressions.
+ *
+ * @param expressions a sequence of expressions that determine the value 
of each column of the
+ *output row.
+ */
+class InterpretedMutableProjection(expressions: Seq[Expression]) extends 
MutableProjection {
+  def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
+this(expressions.map(toBoundExprs(_, inputSchema)))
--- End diff --

yea, haha. I've already updated!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...

2018-09-12 Thread rednaxelafx
Github user rednaxelafx commented on a diff in the pull request:

https://github.com/apache/spark/pull/22355#discussion_r217016675
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.InternalRow
+
+
+/**
+ * A [[MutableProjection]] that is calculated by calling `eval` on each of 
the specified
+ * expressions.
+ *
+ * @param expressions a sequence of expressions that determine the value 
of each column of the
+ *output row.
+ */
+class InterpretedMutableProjection(expressions: Seq[Expression]) extends 
MutableProjection {
+  def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
+this(expressions.map(toBoundExprs(_, inputSchema)))
--- End diff --

```
[error] 
/home/jenkins/workspace/SparkPullRequestBuilder@2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala:32:
 type mismatch;
[error]  found   : org.apache.spark.sql.catalyst.expressions.Expression
[error]  required: Seq[org.apache.spark.sql.catalyst.expressions.Expression]
[error] this(expressions.map(toBoundExprs(_, inputSchema)))
[error]   ^
```

It's probably `this(toBoundExprs(expressions, inputSchema))` right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...

2018-09-10 Thread rednaxelafx
Github user rednaxelafx commented on a diff in the pull request:

https://github.com/apache/spark/pull/22355#discussion_r216525084
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.InternalRow
+
+
+/**
+ * A [[MutableProjection]] that is calculated by calling `eval` on each of 
the specified
+ * expressions.
+ *
+ * @param expressions a sequence of expressions that determine the value 
of each column of the
+ *output row.
+ */
+class InterpretedMutableProjection(expressions: Seq[Expression]) extends 
MutableProjection {
+  def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
+this(expressions.map(BindReferences.bindReference(_, inputSchema)))
--- End diff --

use `toBoundExpr`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...

2018-09-10 Thread rednaxelafx
Github user rednaxelafx commented on a diff in the pull request:

https://github.com/apache/spark/pull/22355#discussion_r216524666
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
 ---
@@ -86,24 +86,12 @@ package object expressions  {
   }
 
   /**
-   * Converts a [[InternalRow]] to another Row given a sequence of 
expression that define each
-   * column of the new row. If the schema of the input row is specified, 
then the given expression
-   * will be bound to that schema.
-   *
-   * In contrast to a normal projection, a MutableProjection reuses the 
same underlying row object
-   * each time an input row is added.  This significantly reduces the cost 
of calculating the
-   * projection, but means that it is not safe to hold on to a reference 
to a [[InternalRow]] after
-   * `next()` has been called on the [[Iterator]] that produced it. 
Instead, the user must call
-   * `InternalRow.copy()` and hold on to the returned [[InternalRow]] 
before calling `next()`.
+   * A helper function to bound given expressions to an input schema.
--- End diff --

Spelling nitpick: s/bound/bind/


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...

2018-09-10 Thread rednaxelafx
Github user rednaxelafx commented on a diff in the pull request:

https://github.com/apache/spark/pull/22355#discussion_r216526434
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.InternalRow
+
+
+/**
+ * A [[MutableProjection]] that is calculated by calling `eval` on each of 
the specified
+ * expressions.
+ *
+ * @param expressions a sequence of expressions that determine the value 
of each column of the
+ *output row.
+ */
+class InterpretedMutableProjection(expressions: Seq[Expression]) extends 
MutableProjection {
+  def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
+this(expressions.map(BindReferences.bindReference(_, inputSchema)))
+
+  private[this] val buffer = new Array[Any](expressions.size)
+
+  override def initialize(partitionIndex: Int): Unit = {
+expressions.foreach(_.foreach {
+  case n: Nondeterministic => n.initialize(partitionIndex)
+  case _ =>
+})
+  }
+
+  private[this] val exprArray = expressions.toArray
+  private[this] var mutableRow: InternalRow = new 
GenericInternalRow(exprArray.length)
+  def currentValue: InternalRow = mutableRow
+
+  override def target(row: InternalRow): MutableProjection = {
+mutableRow = row
+this
+  }
+
+  override def apply(input: InternalRow): InternalRow = {
+var i = 0
+while (i < exprArray.length) {
+  // Store the result into buffer first, to make the projection atomic 
(needed by aggregation)
+  buffer(i) = exprArray(i).eval(input)
+  i += 1
+}
+i = 0
+while (i < exprArray.length) {
+  mutableRow(i) = buffer(i)
+  i += 1
+}
+mutableRow
+  }
+}
--- End diff --

+1 on the check for `NoOp`s.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...

2018-09-07 Thread sadhen
Github user sadhen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22355#discussion_r216116648
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.InternalRow
+
+
+/**
+ * A [[MutableProjection]] that is calculated by calling `eval` on each of 
the specified
+ * expressions.
+ *
+ * @param expressions a sequence of expressions that determine the value 
of each column of the
+ *output row.
+ */
+class InterpretedMutableProjection(expressions: Seq[Expression]) extends 
MutableProjection {
+  def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
+this(expressions.map(BindReferences.bindReference(_, inputSchema)))
+
+  private[this] val buffer = new Array[Any](expressions.size)
+
+  override def initialize(partitionIndex: Int): Unit = {
+expressions.foreach(_.foreach {
+  case n: Nondeterministic => n.initialize(partitionIndex)
+  case _ =>
+})
+  }
+
+  private[this] val exprArray = expressions.toArray
+  private[this] var mutableRow: InternalRow = new 
GenericInternalRow(exprArray.length)
+  def currentValue: InternalRow = mutableRow
+
+  override def target(row: InternalRow): MutableProjection = {
+mutableRow = row
+this
+  }
+
+  override def apply(input: InternalRow): InternalRow = {
+var i = 0
+while (i < exprArray.length) {
+  // Store the result into buffer first, to make the projection atomic 
(needed by aggregation)
+  buffer(i) = exprArray(i).eval(input)
+  i += 1
+}
+i = 0
+while (i < exprArray.length) {
+  mutableRow(i) = buffer(i)
+  i += 1
+}
+mutableRow
+  }
+}
--- End diff --

Should be easy to create a test case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...

2018-09-07 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22355#discussion_r216114653
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.InternalRow
+
+
+/**
+ * A [[MutableProjection]] that is calculated by calling `eval` on each of 
the specified
+ * expressions.
+ *
+ * @param expressions a sequence of expressions that determine the value 
of each column of the
+ *output row.
+ */
+class InterpretedMutableProjection(expressions: Seq[Expression]) extends 
MutableProjection {
+  def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
+this(expressions.map(BindReferences.bindReference(_, inputSchema)))
+
+  private[this] val buffer = new Array[Any](expressions.size)
+
+  override def initialize(partitionIndex: Int): Unit = {
+expressions.foreach(_.foreach {
+  case n: Nondeterministic => n.initialize(partitionIndex)
+  case _ =>
+})
+  }
+
+  private[this] val exprArray = expressions.toArray
+  private[this] var mutableRow: InternalRow = new 
GenericInternalRow(exprArray.length)
+  def currentValue: InternalRow = mutableRow
+
+  override def target(row: InternalRow): MutableProjection = {
+mutableRow = row
+this
+  }
+
+  override def apply(input: InternalRow): InternalRow = {
+var i = 0
+while (i < exprArray.length) {
+  // Store the result into buffer first, to make the projection atomic 
(needed by aggregation)
+  buffer(i) = exprArray(i).eval(input)
+  i += 1
+}
+i = 0
+while (i < exprArray.length) {
+  mutableRow(i) = buffer(i)
+  i += 1
+}
+mutableRow
+  }
+}
--- End diff --

The change seems to look good to me though, I'ld like to address 
performance and code quality issues in follow-ups. cc: @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...

2018-09-07 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22355#discussion_r215897598
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.InternalRow
+
+
+/**
+ * A [[MutableProjection]] that is calculated by calling `eval` on each of 
the specified
+ * expressions.
+ *
+ * @param expressions a sequence of expressions that determine the value 
of each column of the
+ *output row.
+ */
+class InterpretedMutableProjection(expressions: Seq[Expression]) extends 
MutableProjection {
+  def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
+this(expressions.map(BindReferences.bindReference(_, inputSchema)))
+
+  private[this] val buffer = new Array[Any](expressions.size)
+
+  override def initialize(partitionIndex: Int): Unit = {
+expressions.foreach(_.foreach {
+  case n: Nondeterministic => n.initialize(partitionIndex)
+  case _ =>
+})
+  }
+
+  private[this] val exprArray = expressions.toArray
+  private[this] var mutableRow: InternalRow = new 
GenericInternalRow(exprArray.length)
+  def currentValue: InternalRow = mutableRow
+
+  override def target(row: InternalRow): MutableProjection = {
+mutableRow = row
+this
+  }
+
+  override def apply(input: InternalRow): InternalRow = {
+var i = 0
+while (i < exprArray.length) {
+  // Store the result into buffer first, to make the projection atomic 
(needed by aggregation)
+  buffer(i) = exprArray(i).eval(input)
+  i += 1
+}
+i = 0
+while (i < exprArray.length) {
+  mutableRow(i) = buffer(i)
+  i += 1
+}
+mutableRow
+  }
+}
--- End diff --

Looks good point. Do you know whether there is a test cause that causes the 
exception with the interpreted one?  



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...

2018-09-07 Thread sadhen
Github user sadhen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22355#discussion_r215862272
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.InternalRow
+
+
+/**
+ * A [[MutableProjection]] that is calculated by calling `eval` on each of 
the specified
+ * expressions.
+ *
+ * @param expressions a sequence of expressions that determine the value 
of each column of the
+ *output row.
+ */
+class InterpretedMutableProjection(expressions: Seq[Expression]) extends 
MutableProjection {
+  def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
+this(expressions.map(BindReferences.bindReference(_, inputSchema)))
+
+  private[this] val buffer = new Array[Any](expressions.size)
+
+  override def initialize(partitionIndex: Int): Unit = {
+expressions.foreach(_.foreach {
+  case n: Nondeterministic => n.initialize(partitionIndex)
+  case _ =>
+})
+  }
+
+  private[this] val exprArray = expressions.toArray
+  private[this] var mutableRow: InternalRow = new 
GenericInternalRow(exprArray.length)
+  def currentValue: InternalRow = mutableRow
+
+  override def target(row: InternalRow): MutableProjection = {
+mutableRow = row
+this
+  }
+
+  override def apply(input: InternalRow): InternalRow = {
+var i = 0
+while (i < exprArray.length) {
+  // Store the result into buffer first, to make the projection atomic 
(needed by aggregation)
+  buffer(i) = exprArray(i).eval(input)
+  i += 1
+}
+i = 0
+while (i < exprArray.length) {
+  mutableRow(i) = buffer(i)
+  i += 1
+}
+mutableRow
+  }
+}
--- End diff --

The improvement (written 3 months ago) is based on the generated Java code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...

2018-09-07 Thread sadhen
Github user sadhen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22355#discussion_r215859282
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.InternalRow
+
+
+/**
+ * A [[MutableProjection]] that is calculated by calling `eval` on each of 
the specified
+ * expressions.
+ *
+ * @param expressions a sequence of expressions that determine the value 
of each column of the
+ *output row.
+ */
+class InterpretedMutableProjection(expressions: Seq[Expression]) extends 
MutableProjection {
+  def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
+this(expressions.map(BindReferences.bindReference(_, inputSchema)))
+
+  private[this] val buffer = new Array[Any](expressions.size)
+
+  override def initialize(partitionIndex: Int): Unit = {
+expressions.foreach(_.foreach {
+  case n: Nondeterministic => n.initialize(partitionIndex)
+  case _ =>
+})
+  }
+
+  private[this] val exprArray = expressions.toArray
+  private[this] var mutableRow: InternalRow = new 
GenericInternalRow(exprArray.length)
+  def currentValue: InternalRow = mutableRow
+
+  override def target(row: InternalRow): MutableProjection = {
+mutableRow = row
+this
+  }
+
+  override def apply(input: InternalRow): InternalRow = {
+var i = 0
+while (i < exprArray.length) {
+  // Store the result into buffer first, to make the projection atomic 
(needed by aggregation)
+  buffer(i) = exprArray(i).eval(input)
+  i += 1
+}
+i = 0
+while (i < exprArray.length) {
+  mutableRow(i) = buffer(i)
+  i += 1
+}
+mutableRow
+  }
+}
--- End diff --

This is my improved version of InterpretedMutableProject:

```
  override def apply(input: InternalRow): InternalRow = {
var i = 0
while (i < exprArray.length) {
  // Store the result into buffer first, to make the projection atomic 
(needed by aggregation)
  if (exprArray(i) != NoOp) {
buffer(i) = exprArray(i).eval(input)
  }
  i += 1
}
i = 0
while (i < exprArray.length) {
  if (exprArray(i) != NoOp) {
mutableRow(i) = buffer(i)
  }
  i += 1
}
mutableRow
  }
```

The AggregationIterator uses NoOp. If you replace the codegen one with the 
interpreted one. You will encounter an exception from `NoOp.eval`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...

2018-09-06 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22355#discussion_r215659345
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
 ---
@@ -178,15 +236,7 @@ object UnsafeProjection
   exprs: Seq[Expression],
   inputSchema: Seq[Attribute],
   subexpressionEliminationEnabled: Boolean): UnsafeProjection = {
-val unsafeExprs = toUnsafeExprs(toBoundExprs(exprs, inputSchema))
-try {
-  GenerateUnsafeProjection.generate(unsafeExprs, 
subexpressionEliminationEnabled)
-} catch {
-  case NonFatal(_) =>
-// We should have already seen the error message in `CodeGenerator`
-logWarning("Expr codegen error and falling back to interpreter 
mode")
-InterpretedUnsafeProjection.createProjection(unsafeExprs)
-}
+createObject(toUnsafeExprs(toBoundExprs(exprs, inputSchema)), 
subexpressionEliminationEnabled)
--- End diff --

Removed the duplicated fallback logic here and reused the logic of 
`CodeGeneratorWithInterpretedFallback`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...

2018-09-06 Thread maropu
GitHub user maropu opened a pull request:

https://github.com/apache/spark/pull/22355

[SPARK-25358][SQL] MutableProjection supports fallback to an interpreted 
mode

## What changes were proposed in this pull request?
In SPARK-23711, `UnsafeProjection` supports fallback to an interpreted 
mode. Therefore, this pr fixed code to support the same fallback mode in 
`MutableProjection` based on `CodeGeneratorWithInterpretedFallback`.

## How was this patch tested?
Added tests in `CodeGeneratorWithInterpretedFallbackSuite`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/maropu/spark SPARK-25358

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22355.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22355


commit c25c4269f6127b445208c752ddadcc23ae77a578
Author: Takeshi Yamamuro 
Date:   2018-09-06T14:48:04Z

Fix




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org