[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20750


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-16 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r175083026
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala
 ---
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter}
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{UserDefinedType, _}
+import org.apache.spark.unsafe.Platform
+
+/**
+ * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] 
it produces, a consumer
+ * should copy the row if it is being buffered. This class is not thread 
safe.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedUnsafeProjection(expressions: Array[Expression]) extends 
UnsafeProjection {
+  import InterpretedUnsafeProjection._
+
+  /** Number of (top level) fields in the resulting row. */
+  private[this] val numFields = expressions.length
+
+  /** Array that expression results. */
+  private[this] val values = new Array[Any](numFields)
+
+  /** The row representing the expression results. */
+  private[this] val intermediate = new GenericInternalRow(values)
+
+  /** The row returned by the projection. */
+  private[this] val result = new UnsafeRow(numFields)
+
+  /** The buffer which holds the resulting row's backing data. */
+  private[this] val holder = new BufferHolder(result, numFields * 32)
+
+  /** The writer that writes the intermediate result to the result row. */
+  private[this] val writer: InternalRow => Unit = {
+val rowWriter = new UnsafeRowWriter(holder, numFields)
+val baseWriter = generateStructWriter(
+  holder,
+  rowWriter,
+  expressions.map(e => StructField("", e.dataType, e.nullable)))
+if (!expressions.exists(_.nullable)) {
+  // No nullable fields. The top-level null bit mask will always be 
zeroed out.
+  baseWriter
+} else {
+  // Zero out the null bit mask before we write the row.
+  row => {
+rowWriter.zeroOutNullBytes()
+baseWriter(row)
+  }
+}
+  }
+
+  override def initialize(partitionIndex: Int): Unit = {
+expressions.foreach(_.foreach {
+  case n: Nondeterministic => n.initialize(partitionIndex)
+  case _ =>
+})
+  }
+
+  override def apply(row: InternalRow): UnsafeRow = {
+// Put the expression results in the intermediate row.
+var i = 0
+while (i < numFields) {
+  values(i) = expressions(i).eval(row)
+  i += 1
+}
+
+// Write the intermediate row to an unsafe row.
+holder.reset()
+writer(intermediate)
+result.setTotalSize(holder.totalSize())
+result
+  }
+}
+
+/**
+ * Helper functions for creating an [[InterpretedUnsafeProjection]].
+ */
+object InterpretedUnsafeProjection extends UnsafeProjectionCreator {
+
+  /**
+   * Returns an [[UnsafeProjection]] for given sequence of bound 
Expressions.
+   */
+  override protected def createProjection(exprs: Seq[Expression]): 
UnsafeProjection = {
+// We need to make sure that we do not reuse stateful expressions.
+val cleanedExpressions = exprs.map(_.transform {
+  case s: Stateful => s.freshCopy()
+})
+new InterpretedUnsafeProjection(cleanedExpressions.toArray)
+  }
+
+  /**
+   * Generate a struct writer function. The generated function writes an 
[[InternalRow]] to the
+   * given buffer using the given 

[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-16 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r175065574
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala
 ---
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter}
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{UserDefinedType, _}
+import org.apache.spark.unsafe.Platform
+
+/**
+ * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] 
it produces, a consumer
+ * should copy the row if it is being buffered. This class is not thread 
safe.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedUnsafeProjection(expressions: Array[Expression]) extends 
UnsafeProjection {
+  import InterpretedUnsafeProjection._
+
+  /** Number of (top level) fields in the resulting row. */
+  private[this] val numFields = expressions.length
+
+  /** Array that expression results. */
+  private[this] val values = new Array[Any](numFields)
+
+  /** The row representing the expression results. */
+  private[this] val intermediate = new GenericInternalRow(values)
+
+  /** The row returned by the projection. */
+  private[this] val result = new UnsafeRow(numFields)
+
+  /** The buffer which holds the resulting row's backing data. */
+  private[this] val holder = new BufferHolder(result, numFields * 32)
+
+  /** The writer that writes the intermediate result to the result row. */
+  private[this] val writer: InternalRow => Unit = {
+val rowWriter = new UnsafeRowWriter(holder, numFields)
+val baseWriter = generateStructWriter(
+  holder,
+  rowWriter,
+  expressions.map(e => StructField("", e.dataType, e.nullable)))
+if (!expressions.exists(_.nullable)) {
+  // No nullable fields. The top-level null bit mask will always be 
zeroed out.
+  baseWriter
+} else {
+  // Zero out the null bit mask before we write the row.
+  row => {
+rowWriter.zeroOutNullBytes()
+baseWriter(row)
+  }
+}
+  }
+
+  override def initialize(partitionIndex: Int): Unit = {
+expressions.foreach(_.foreach {
+  case n: Nondeterministic => n.initialize(partitionIndex)
+  case _ =>
+})
+  }
+
+  override def apply(row: InternalRow): UnsafeRow = {
+// Put the expression results in the intermediate row.
+var i = 0
+while (i < numFields) {
+  values(i) = expressions(i).eval(row)
+  i += 1
+}
+
+// Write the intermediate row to an unsafe row.
+holder.reset()
+writer(intermediate)
+result.setTotalSize(holder.totalSize())
+result
+  }
+}
+
+/**
+ * Helper functions for creating an [[InterpretedUnsafeProjection]].
+ */
+object InterpretedUnsafeProjection extends UnsafeProjectionCreator {
+
+  /**
+   * Returns an [[UnsafeProjection]] for given sequence of bound 
Expressions.
+   */
+  override protected def createProjection(exprs: Seq[Expression]): 
UnsafeProjection = {
+// We need to make sure that we do not reuse stateful expressions.
+val cleanedExpressions = exprs.map(_.transform {
+  case s: Stateful => s.freshCopy()
+})
+new InterpretedUnsafeProjection(cleanedExpressions.toArray)
+  }
+
+  /**
+   * Generate a struct writer function. The generated function writes an 
[[InternalRow]] to the
+   * given buffer using the given 

[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r174892728
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala
 ---
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter}
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{UserDefinedType, _}
+import org.apache.spark.unsafe.Platform
+
+/**
+ * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] 
it produces, a consumer
+ * should copy the row if it is being buffered. This class is not thread 
safe.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedUnsafeProjection(expressions: Array[Expression]) extends 
UnsafeProjection {
+  import InterpretedUnsafeProjection._
+
+  /** Number of (top level) fields in the resulting row. */
+  private[this] val numFields = expressions.length
+
+  /** Array that expression results. */
+  private[this] val values = new Array[Any](numFields)
+
+  /** The row representing the expression results. */
+  private[this] val intermediate = new GenericInternalRow(values)
+
+  /** The row returned by the projection. */
+  private[this] val result = new UnsafeRow(numFields)
+
+  /** The buffer which holds the resulting row's backing data. */
+  private[this] val holder = new BufferHolder(result, numFields * 32)
+
+  /** The writer that writes the intermediate result to the result row. */
+  private[this] val writer: InternalRow => Unit = {
+val rowWriter = new UnsafeRowWriter(holder, numFields)
+val baseWriter = generateStructWriter(
+  holder,
+  rowWriter,
+  expressions.map(e => StructField("", e.dataType, e.nullable)))
+if (!expressions.exists(_.nullable)) {
+  // No nullable fields. The top-level null bit mask will always be 
zeroed out.
+  baseWriter
+} else {
+  // Zero out the null bit mask before we write the row.
+  row => {
+rowWriter.zeroOutNullBytes()
+baseWriter(row)
+  }
+}
+  }
+
+  override def initialize(partitionIndex: Int): Unit = {
+expressions.foreach(_.foreach {
+  case n: Nondeterministic => n.initialize(partitionIndex)
+  case _ =>
+})
+  }
+
+  override def apply(row: InternalRow): UnsafeRow = {
+// Put the expression results in the intermediate row.
+var i = 0
+while (i < numFields) {
+  values(i) = expressions(i).eval(row)
+  i += 1
+}
+
+// Write the intermediate row to an unsafe row.
+holder.reset()
+writer(intermediate)
+result.setTotalSize(holder.totalSize())
+result
+  }
+}
+
+/**
+ * Helper functions for creating an [[InterpretedUnsafeProjection]].
+ */
+object InterpretedUnsafeProjection extends UnsafeProjectionCreator {
+
+  /**
+   * Returns an [[UnsafeProjection]] for given sequence of bound 
Expressions.
+   */
+  override protected def createProjection(exprs: Seq[Expression]): 
UnsafeProjection = {
+// We need to make sure that we do not reuse stateful expressions.
+val cleanedExpressions = exprs.map(_.transform {
+  case s: Stateful => s.freshCopy()
+})
+new InterpretedUnsafeProjection(cleanedExpressions.toArray)
+  }
+
+  /**
+   * Generate a struct writer function. The generated function writes an 
[[InternalRow]] to the
+   * given buffer using the given 

[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r174892448
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala
 ---
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter}
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{UserDefinedType, _}
+import org.apache.spark.unsafe.Platform
+
+/**
+ * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] 
it produces, a consumer
+ * should copy the row if it is being buffered. This class is not thread 
safe.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedUnsafeProjection(expressions: Array[Expression]) extends 
UnsafeProjection {
+  import InterpretedUnsafeProjection._
+
+  /** Number of (top level) fields in the resulting row. */
+  private[this] val numFields = expressions.length
+
+  /** Array that expression results. */
+  private[this] val values = new Array[Any](numFields)
+
+  /** The row representing the expression results. */
+  private[this] val intermediate = new GenericInternalRow(values)
+
+  /** The row returned by the projection. */
+  private[this] val result = new UnsafeRow(numFields)
+
+  /** The buffer which holds the resulting row's backing data. */
+  private[this] val holder = new BufferHolder(result, numFields * 32)
+
+  /** The writer that writes the intermediate result to the result row. */
+  private[this] val writer: InternalRow => Unit = {
+val rowWriter = new UnsafeRowWriter(holder, numFields)
+val baseWriter = generateStructWriter(
+  holder,
+  rowWriter,
+  expressions.map(e => StructField("", e.dataType, e.nullable)))
+if (!expressions.exists(_.nullable)) {
+  // No nullable fields. The top-level null bit mask will always be 
zeroed out.
+  baseWriter
+} else {
+  // Zero out the null bit mask before we write the row.
+  row => {
+rowWriter.zeroOutNullBytes()
+baseWriter(row)
+  }
+}
+  }
+
+  override def initialize(partitionIndex: Int): Unit = {
+expressions.foreach(_.foreach {
+  case n: Nondeterministic => n.initialize(partitionIndex)
+  case _ =>
+})
+  }
+
+  override def apply(row: InternalRow): UnsafeRow = {
+// Put the expression results in the intermediate row.
+var i = 0
+while (i < numFields) {
+  values(i) = expressions(i).eval(row)
+  i += 1
+}
+
+// Write the intermediate row to an unsafe row.
+holder.reset()
+writer(intermediate)
+result.setTotalSize(holder.totalSize())
+result
+  }
+}
+
+/**
+ * Helper functions for creating an [[InterpretedUnsafeProjection]].
+ */
+object InterpretedUnsafeProjection extends UnsafeProjectionCreator {
+
+  /**
+   * Returns an [[UnsafeProjection]] for given sequence of bound 
Expressions.
+   */
+  override protected def createProjection(exprs: Seq[Expression]): 
UnsafeProjection = {
+// We need to make sure that we do not reuse stateful expressions.
+val cleanedExpressions = exprs.map(_.transform {
+  case s: Stateful => s.freshCopy()
+})
+new InterpretedUnsafeProjection(cleanedExpressions.toArray)
+  }
+
+  /**
+   * Generate a struct writer function. The generated function writes an 
[[InternalRow]] to the
+   * given buffer using the given 

[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r174890611
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala
 ---
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter}
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{UserDefinedType, _}
+import org.apache.spark.unsafe.Platform
+
+/**
+ * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] 
it produces, a consumer
+ * should copy the row if it is being buffered. This class is not thread 
safe.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedUnsafeProjection(expressions: Array[Expression]) extends 
UnsafeProjection {
+  import InterpretedUnsafeProjection._
+
+  /** Number of (top level) fields in the resulting row. */
+  private[this] val numFields = expressions.length
+
+  /** Array that expression results. */
+  private[this] val values = new Array[Any](numFields)
+
+  /** The row representing the expression results. */
+  private[this] val intermediate = new GenericInternalRow(values)
+
+  /** The row returned by the projection. */
+  private[this] val result = new UnsafeRow(numFields)
+
+  /** The buffer which holds the resulting row's backing data. */
+  private[this] val holder = new BufferHolder(result, numFields * 32)
+
+  /** The writer that writes the intermediate result to the result row. */
+  private[this] val writer: InternalRow => Unit = {
+val rowWriter = new UnsafeRowWriter(holder, numFields)
+val baseWriter = generateStructWriter(
+  holder,
+  rowWriter,
+  expressions.map(e => StructField("", e.dataType, e.nullable)))
+if (!expressions.exists(_.nullable)) {
+  // No nullable fields. The top-level null bit mask will always be 
zeroed out.
+  baseWriter
+} else {
+  // Zero out the null bit mask before we write the row.
+  row => {
+rowWriter.zeroOutNullBytes()
+baseWriter(row)
+  }
+}
+  }
+
+  override def initialize(partitionIndex: Int): Unit = {
+expressions.foreach(_.foreach {
+  case n: Nondeterministic => n.initialize(partitionIndex)
+  case _ =>
+})
+  }
+
+  override def apply(row: InternalRow): UnsafeRow = {
+// Put the expression results in the intermediate row.
+var i = 0
+while (i < numFields) {
+  values(i) = expressions(i).eval(row)
+  i += 1
+}
+
+// Write the intermediate row to an unsafe row.
+holder.reset()
+writer(intermediate)
+result.setTotalSize(holder.totalSize())
+result
+  }
+}
+
+/**
+ * Helper functions for creating an [[InterpretedUnsafeProjection]].
+ */
+object InterpretedUnsafeProjection extends UnsafeProjectionCreator {
+
+  /**
+   * Returns an [[UnsafeProjection]] for given sequence of bound 
Expressions.
+   */
+  override protected def createProjection(exprs: Seq[Expression]): 
UnsafeProjection = {
+// We need to make sure that we do not reuse stateful expressions.
+val cleanedExpressions = exprs.map(_.transform {
+  case s: Stateful => s.freshCopy()
+})
+new InterpretedUnsafeProjection(cleanedExpressions.toArray)
+  }
+
+  /**
+   * Generate a struct writer function. The generated function writes an 
[[InternalRow]] to the
+   * given buffer using the given 

[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r174873582
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala
 ---
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter}
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{UserDefinedType, _}
+import org.apache.spark.unsafe.Platform
+
+/**
+ * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] 
it produces, a consumer
+ * should copy the row if it is being buffered. This class is not thread 
safe.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedUnsafeProjection(expressions: Array[Expression]) extends 
UnsafeProjection {
+  import InterpretedUnsafeProjection._
+
+  /** Number of (top level) fields in the resulting row. */
+  private[this] val numFields = expressions.length
+
+  /** Array that expression results. */
+  private[this] val values = new Array[Any](numFields)
+
+  /** The row representing the expression results. */
+  private[this] val intermediate = new GenericInternalRow(values)
+
+  /** The row returned by the projection. */
+  private[this] val result = new UnsafeRow(numFields)
+
+  /** The buffer which holds the resulting row's backing data. */
+  private[this] val holder = new BufferHolder(result, numFields * 32)
+
+  /** The writer that writes the intermediate result to the result row. */
+  private[this] val writer: InternalRow => Unit = {
+val rowWriter = new UnsafeRowWriter(holder, numFields)
+val baseWriter = generateStructWriter(
+  holder,
+  rowWriter,
+  expressions.map(e => StructField("", e.dataType, e.nullable)))
+if (!expressions.exists(_.nullable)) {
+  // No nullable fields. The top-level null bit mask will always be 
zeroed out.
+  baseWriter
+} else {
+  // Zero out the null bit mask before we write the row.
+  row => {
+rowWriter.zeroOutNullBytes()
+baseWriter(row)
+  }
+}
+  }
+
+  override def initialize(partitionIndex: Int): Unit = {
+expressions.foreach(_.foreach {
+  case n: Nondeterministic => n.initialize(partitionIndex)
+  case _ =>
+})
+  }
+
+  override def apply(row: InternalRow): UnsafeRow = {
+// Put the expression results in the intermediate row.
+var i = 0
+while (i < numFields) {
+  values(i) = expressions(i).eval(row)
+  i += 1
+}
+
+// Write the intermediate row to an unsafe row.
+holder.reset()
+writer(intermediate)
+result.setTotalSize(holder.totalSize())
+result
+  }
+}
+
+/**
+ * Helper functions for creating an [[InterpretedUnsafeProjection]].
+ */
+object InterpretedUnsafeProjection extends UnsafeProjectionCreator {
+
+  /**
+   * Returns an [[UnsafeProjection]] for given sequence of bound 
Expressions.
+   */
+  override protected def createProjection(exprs: Seq[Expression]): 
UnsafeProjection = {
+// We need to make sure that we do not reuse stateful expressions.
+val cleanedExpressions = exprs.map(_.transform {
+  case s: Stateful => s.freshCopy()
+})
+new InterpretedUnsafeProjection(cleanedExpressions.toArray)
+  }
+
+  /**
+   * Generate a struct writer function. The generated function writes an 
[[InternalRow]] to the
+   * given buffer using the given 

[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r174871585
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -328,6 +328,32 @@ trait Nondeterministic extends Expression {
   protected def evalInternal(input: InternalRow): Any
 }
 
+/**
+ * An expression that contains mutable state. A stateful expression is 
always non-deterministic
+ * because the results it produces during evaluation are not only 
dependent on the given input
+ * but also on its internal state.
+ *
+ * The state of the expressions is generally not exposed in the parameter 
list and this makes
+ * comparing stateful expressions problematic because similar stateful 
expressions (with the same
+ * parameter list) but with different internal state will be considered 
equal. This is especially
+ * problematic during tree transformations. In order to counter this the 
`fastEquals` method for
+ * stateful expressions only returns `true` for the same reference.
+ *
+ * A stateful expression should never be evaluated multiple times for a 
single row. This should
+ * only be a problem for interpreted execution. This can be prevented by 
creating fresh copies
+ * of the stateful expression before execution, these can be made using 
the `freshCopy` function.
+ */
+trait Stateful extends Nondeterministic {
+  /**
+   * Return a fresh uninitialized copy of the stateful expression.
+   */
+  def freshCopy(): Stateful = this
--- End diff --

I think it's better to not provide this default implementation, to avoid 
mistakes in the future.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r174606284
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions.codegen;
+
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Base class for writing Unsafe* structures.
+ */
+public abstract class UnsafeWriter {
+  public abstract void setNullByte(int ordinal);
--- End diff --

Actually this was my mistake... I thought `Platform.setInt(0)` is different 
from `Platform.setFloat(0.0f)`, and that's why I introduced a `setNull` method 
for each primitive type.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r174605769
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions.codegen;
+
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Base class for writing Unsafe* structures.
+ */
+public abstract class UnsafeWriter {
+  public abstract void setNullByte(int ordinal);
--- End diff --

I feel `setNull1/2/4/8Bytes` is better. It's also easy to codegen, just 
`setNull${dt.defaultSize}Bytes`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r174605126
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -328,6 +328,21 @@ trait Nondeterministic extends Expression {
   protected def evalInternal(input: InternalRow): Any
 }
 
+/**
+ * A stateful nondeterministic expression. These expressions contain state
+ * that is not stored in the parameter list.
+ */
+trait StatefulNondeterministic extends Nondeterministic {
--- End diff --

Maybe we can just call it `Stateful` while still extending 
`Nondeterministic`, and in the doc we say that stateful expressions imply it's 
nondeterministic.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r174604738
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -328,6 +328,21 @@ trait Nondeterministic extends Expression {
   protected def evalInternal(input: InternalRow): Any
 }
 
+/**
+ * A stateful nondeterministic expression. These expressions contain state
+ * that is not stored in the parameter list.
+ */
+trait StatefulNondeterministic extends Nondeterministic {
--- End diff --

from Hive doc
```
A stateful UDF is considered to be non-deterministic, irrespective of what 
deterministic() returns.
```
This is corrected.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r174603757
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -328,6 +328,21 @@ trait Nondeterministic extends Expression {
   protected def evalInternal(input: InternalRow): Any
 }
 
+/**
+ * A stateful nondeterministic expression. These expressions contain state
+ * that is not stored in the parameter list.
+ */
+trait StatefulNondeterministic extends Nondeterministic {
--- End diff --

In hive stateful and deterministic are orthogonal. If we wanna add this new 
trait, I think it's time to figure it out the semantic. Shall we have a new 
trait called `Stateful`, or add an assumption that stateful functions must be 
nondeterministic?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-14 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r174594054
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala
 ---
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter}
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.types.{UserDefinedType, _}
+import org.apache.spark.unsafe.Platform
+
+/**
+ * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] 
it produces, a consumer
+ * should copy the row if it is being buffered. This class is not thread 
safe.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedUnsafeProjection(expressions: Array[Expression]) extends 
UnsafeProjection {
+  import InterpretedUnsafeProjection._
+
+  /** Number of (top level) fields in the resulting row. */
+  private[this] val numFields = expressions.length
+
+  /** Array that expression results. */
+  private[this] val values = new Array[Any](numFields)
+
+  /** The row representing the expression results. */
+  private[this] val intermediate = new GenericInternalRow(values)
+
+  /** The row returned by the projection. */
+  private[this] val result = new UnsafeRow(numFields)
+
+  /** The buffer which holds the resulting row's backing data. */
+  private[this] val holder = new BufferHolder(result, numFields * 32)
+
+  /** The writer that writes the intermediate result to the result row. */
+  private[this] val writer: InternalRow => Unit = {
+val rowWriter = new UnsafeRowWriter(holder, numFields)
+val baseWriter = generateStructWriter(
+  holder,
+  rowWriter,
+  expressions.map(e => StructField("", e.dataType, e.nullable)))
+if (!expressions.exists(_.nullable)) {
+  // No nullable fields. The top-level null bit mask will always be 
zeroed out.
+  baseWriter
+} else {
+  // Zero out the null bit mask before we write the row.
+  row => {
+rowWriter.zeroOutNullBytes()
+baseWriter(row)
+  }
+}
+  }
+
+  override def initialize(partitionIndex: Int): Unit = {
+expressions.foreach(_.foreach {
+  case n: Nondeterministic => n.initialize(partitionIndex)
+  case _ =>
+})
+  }
+
+  override def apply(row: InternalRow): UnsafeRow = {
+// Put the expression results in the intermediate row.
+var i = 0
+while (i < numFields) {
+  values(i) = expressions(i).eval(row)
+  i += 1
+}
+
+// Write the intermediate row to an unsafe row.
+holder.reset()
+writer(intermediate)
+result.setTotalSize(holder.totalSize())
+result
+  }
+}
+
+/**
+ * Helper functions for creating an [[InterpretedUnsafeProjection]].
+ */
+object InterpretedUnsafeProjection extends UnsafeProjectionCreator {
+
+  /**
+   * Returns an [[UnsafeProjection]] for given sequence of bound 
Expressions.
+   */
+  override protected def createProjection(exprs: Seq[Expression]): 
UnsafeProjection = {
+// We need to make sure that we do not reuse stateful non 
deterministic expressions.
+val cleanedExpressions = exprs.map(_.transform {
+  case s: StatefulNondeterministic => s.freshCopy()
--- End diff --

In codegen the state is put in the generated class, if you happen to visit 
the same expression twice the state is added twice and is not shared during 
evaluation. In interpreted mode the Expression will be the 

[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r174591520
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala
 ---
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter}
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.types.{UserDefinedType, _}
+import org.apache.spark.unsafe.Platform
+
+/**
+ * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] 
it produces, a consumer
+ * should copy the row if it is being buffered. This class is not thread 
safe.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedUnsafeProjection(expressions: Array[Expression]) extends 
UnsafeProjection {
+  import InterpretedUnsafeProjection._
+
+  /** Number of (top level) fields in the resulting row. */
+  private[this] val numFields = expressions.length
+
+  /** Array that expression results. */
+  private[this] val values = new Array[Any](numFields)
+
+  /** The row representing the expression results. */
+  private[this] val intermediate = new GenericInternalRow(values)
+
+  /** The row returned by the projection. */
+  private[this] val result = new UnsafeRow(numFields)
+
+  /** The buffer which holds the resulting row's backing data. */
+  private[this] val holder = new BufferHolder(result, numFields * 32)
+
+  /** The writer that writes the intermediate result to the result row. */
+  private[this] val writer: InternalRow => Unit = {
+val rowWriter = new UnsafeRowWriter(holder, numFields)
+val baseWriter = generateStructWriter(
+  holder,
+  rowWriter,
+  expressions.map(e => StructField("", e.dataType, e.nullable)))
+if (!expressions.exists(_.nullable)) {
+  // No nullable fields. The top-level null bit mask will always be 
zeroed out.
+  baseWriter
+} else {
+  // Zero out the null bit mask before we write the row.
+  row => {
+rowWriter.zeroOutNullBytes()
+baseWriter(row)
+  }
+}
+  }
+
+  override def initialize(partitionIndex: Int): Unit = {
+expressions.foreach(_.foreach {
+  case n: Nondeterministic => n.initialize(partitionIndex)
+  case _ =>
+})
+  }
+
+  override def apply(row: InternalRow): UnsafeRow = {
+// Put the expression results in the intermediate row.
+var i = 0
+while (i < numFields) {
+  values(i) = expressions(i).eval(row)
+  i += 1
+}
+
+// Write the intermediate row to an unsafe row.
+holder.reset()
+writer(intermediate)
+result.setTotalSize(holder.totalSize())
+result
+  }
+}
+
+/**
+ * Helper functions for creating an [[InterpretedUnsafeProjection]].
+ */
+object InterpretedUnsafeProjection extends UnsafeProjectionCreator {
+
+  /**
+   * Returns an [[UnsafeProjection]] for given sequence of bound 
Expressions.
+   */
+  override protected def createProjection(exprs: Seq[Expression]): 
UnsafeProjection = {
+// We need to make sure that we do not reuse stateful non 
deterministic expressions.
+val cleanedExpressions = exprs.map(_.transform {
+  case s: StatefulNondeterministic => s.freshCopy()
--- End diff --

why it's not a problem for the codegen version?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, 

[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-14 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r174587783
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions.codegen;
+
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Base class for writing Unsafe* structures.
+ */
+public abstract class UnsafeWriter {
+  public abstract void setNullByte(int ordinal);
--- End diff --

See my previous discussion with @mgaido91. I am fine either way, I can also 
add the missing methods an be done with it, that will just make the interpreted 
code path a bit messier.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-14 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r174587229
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
 ---
@@ -83,10 +83,10 @@ private long getElementOffset(int ordinal, int 
elementSize) {
 return startingOffset + headerInBytes + ordinal * elementSize;
   }
 
-  public void setOffsetAndSize(int ordinal, long currentCursor, int size) {
+  public void setOffsetAndSize(int ordinal, long currentCursor, long size) 
{
--- End diff --

I think we can safely change the signature to take two ints.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r174586279
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions.codegen;
+
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Base class for writing Unsafe* structures.
+ */
+public abstract class UnsafeWriter {
+  public abstract void setNullByte(int ordinal);
--- End diff --

This looks pretty weird. At the first glance I'm wondering why we don't 
have `setBoolean/Float/Double`, then I realized we don't need to, because we 
just need a way to set null for 1/2/4/8 bytes.

maybe it's better to name them `setNull1/2/4/8Bytes`, and ask the 
`UnsafeArrayWriter` to follow


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r174584506
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions.codegen;
+
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Base class for writing Unsafe* structures.
+ */
+public abstract class UnsafeWriter {
+  public abstract void setNullByte(int ordinal);
+  public abstract void setNullShort(int ordinal);
+  public abstract void setNullInt(int ordinal);
+  public abstract void setNullLong(int ordinal);
--- End diff --

Why we don't have `setNullBoolean/Float/Double`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r174583386
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
 ---
@@ -83,10 +83,10 @@ private long getElementOffset(int ordinal, int 
elementSize) {
 return startingOffset + headerInBytes + ordinal * elementSize;
   }
 
-  public void setOffsetAndSize(int ordinal, long currentCursor, int size) {
+  public void setOffsetAndSize(int ordinal, long currentCursor, long size) 
{
--- End diff --

shall we check if `size` fits in an integer?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r173146007
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
 ---
@@ -93,6 +93,26 @@ public void setNullAt(int ordinal) {
 Platform.putLong(holder.buffer, getFieldOffset(ordinal), 0L);
   }
 
+  @Override
+  public void setNullByte(int ordinal) {
--- End diff --

This is pretty low level stuff, so you should know how many bytes things 
contain at this point. I'd rather leave as it is. Doing a type match on such a 
hot code path doesn't seem like a good idea.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-08 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r173142004
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
 ---
@@ -93,6 +93,26 @@ public void setNullAt(int ordinal) {
 Platform.putLong(holder.buffer, getFieldOffset(ordinal), 0L);
   }
 
+  @Override
+  public void setNullByte(int ordinal) {
--- End diff --

I am not sure it is a good idea, because then everyone while writing code 
should know exactly how many bytes each type is. I prefer the current approach. 
I would rather either reintroduce the `setNullAt` method with a match in the 
`UnsafeArrayData`'s implementation or let is as it is now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r173141050
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
 ---
@@ -93,6 +93,26 @@ public void setNullAt(int ordinal) {
 Platform.putLong(holder.buffer, getFieldOffset(ordinal), 0L);
   }
 
+  @Override
+  public void setNullByte(int ordinal) {
--- End diff --

We could also name them differently e.g.: `setNull1Byte`, `setNull2Bytes`, 
`setNull4Bytes` & `setNull8Bytes`



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-08 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r173132540
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
 ---
@@ -93,6 +93,26 @@ public void setNullAt(int ordinal) {
 Platform.putLong(holder.buffer, getFieldOffset(ordinal), 0L);
   }
 
+  @Override
+  public void setNullByte(int ordinal) {
--- End diff --

I see, but I am not sure about having only some of these methods here. I 
mean, in `UnsafeArrayData` we have also `setNullDouble`, `setNullFloat`, etc. 
etc. It seems a bit weird to me that some of them are set at the parent level 
and some other no. It's not a big deal but I'd prefer for consistency to have 
all of them. What do you think? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r173130586
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
 ---
@@ -93,6 +93,26 @@ public void setNullAt(int ordinal) {
 Platform.putLong(holder.buffer, getFieldOffset(ordinal), 0L);
   }
 
+  @Override
+  public void setNullByte(int ordinal) {
--- End diff --

These methods are needed for writing `UnsafeArrayData`, we fill the slot 
with 0s if we set it to null. The slot size in `UnsafeArrayData` is dependent 
on the data type we are storing in it.

I wanted to avoid writing a lot of duplicate code in the 
`InterpretedUsafeProjection`, and this is why I added these methods the 
`UnsafeWriter` parent class, and this also why they are in the 
`UnsafeRowWriter`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-08 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r173123311
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
 ---
@@ -93,6 +93,26 @@ public void setNullAt(int ordinal) {
 Platform.putLong(holder.buffer, getFieldOffset(ordinal), 0L);
   }
 
+  @Override
+  public void setNullByte(int ordinal) {
--- End diff --

which is the reason why these methods have been introduced?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r173110061
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InterpretedUnsafeProjection.scala
 ---
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.analysis.CleanupAliases
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, 
GenericInternalRow, Nondeterministic, SpecializedGetters, 
StatefulNondeterministic, UnsafeArrayData, UnsafeMapData, UnsafeProjection, 
UnsafeProjectionCreator, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter}
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.types.{UserDefinedType, _}
+import org.apache.spark.unsafe.Platform
+
+/**
+ * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] 
it produces, a consumer
+ * should copy the row if it is being buffered. This class is not thread 
safe.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedUnsafeProjection(expressions: Array[Expression]) extends 
UnsafeProjection {
+  import InterpretedUnsafeProjection._
+
+  /** Number of (top level) fields in the resulting row. */
+  private[this] val numFields = expressions.length
+
+  /** Array that expression results. */
+  private[this] val values = new Array[Any](numFields)
+
+  /** The row representing the expression results. */
+  private[this] val intermediate = new GenericInternalRow(values)
+
+  /** The row returned by the projection. */
+  private[this] val result = new UnsafeRow(numFields)
+
+  /** The buffer which holds the resulting row's backing data. */
+  private[this] val holder = new BufferHolder(result, numFields * 32)
+
+  /** The writer that writes the intermediate result to the result row. */
+  private[this] val writer: InternalRow => Unit = {
+val rowWriter = new UnsafeRowWriter(holder, numFields)
+val baseWriter = generateStructWriter(
+  holder,
+  rowWriter,
+  expressions.map(e => StructField("", e.dataType, e.nullable)))
+if (!expressions.exists(_.nullable)) {
+  // No nullable fields. The top-level null bit mask will always be 
zeroed out.
+  baseWriter
+} else {
+  // Zero out the null bit mask before we write the row.
+  row => {
+rowWriter.zeroOutNullBytes()
+baseWriter(row)
+  }
+}
+  }
+
+  override def initialize(partitionIndex: Int): Unit = {
+expressions.foreach(_.foreach {
+  case n: Nondeterministic => n.initialize(partitionIndex)
+  case _ =>
+})
+  }
+
+  override def apply(row: InternalRow): UnsafeRow = {
+// Put the expression results in the intermediate row.
+var i = 0
+while (i < numFields) {
+  values(i) = expressions(i).eval(row)
+  i += 1
+}
+
+// Write the intermediate row to an unsafe row.
+holder.reset()
+writer(intermediate)
+result.setTotalSize(holder.totalSize())
+result
+  }
+}
+
+/**
+ * Helper functions for creating an [[InterpretedUnsafeProjection]].
+ */
+object InterpretedUnsafeProjection extends UnsafeProjectionCreator {
+
+  /**
+   * Returns an [[UnsafeProjection]] for given sequence of bound 
Expressions.
+   */
+  override protected def createProjection(exprs: Seq[Expression]): 
UnsafeProjection = {
+// We need to make sure that we do not reuse stateful non 
deterministic expressions.
+val cleanedExpressions = exprs.map(_.transform {
+  case s: StatefulNondeterministic => s.freshCopy()
+ 

[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r173041019
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InterpretedUnsafeProjection.scala
 ---
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.analysis.CleanupAliases
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, 
GenericInternalRow, Nondeterministic, SpecializedGetters, 
StatefulNondeterministic, UnsafeArrayData, UnsafeMapData, UnsafeProjection, 
UnsafeProjectionCreator, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter}
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.types.{UserDefinedType, _}
+import org.apache.spark.unsafe.Platform
+
+/**
+ * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] 
it produces, a consumer
+ * should copy the row if it is being buffered. This class is not thread 
safe.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedUnsafeProjection(expressions: Array[Expression]) extends 
UnsafeProjection {
+  import InterpretedUnsafeProjection._
+
+  /** Number of (top level) fields in the resulting row. */
+  private[this] val numFields = expressions.length
+
+  /** Array that expression results. */
+  private[this] val values = new Array[Any](numFields)
+
+  /** The row representing the expression results. */
+  private[this] val intermediate = new GenericInternalRow(values)
+
+  /** The row returned by the projection. */
+  private[this] val result = new UnsafeRow(numFields)
+
+  /** The buffer which holds the resulting row's backing data. */
+  private[this] val holder = new BufferHolder(result, numFields * 32)
+
+  /** The writer that writes the intermediate result to the result row. */
+  private[this] val writer: InternalRow => Unit = {
+val rowWriter = new UnsafeRowWriter(holder, numFields)
+val baseWriter = generateStructWriter(
+  holder,
+  rowWriter,
+  expressions.map(e => StructField("", e.dataType, e.nullable)))
+if (!expressions.exists(_.nullable)) {
+  // No nullable fields. The top-level null bit mask will always be 
zeroed out.
+  baseWriter
+} else {
+  // Zero out the null bit mask before we write the row.
+  row => {
+rowWriter.zeroOutNullBytes()
+baseWriter(row)
+  }
+}
+  }
+
+  override def initialize(partitionIndex: Int): Unit = {
+expressions.foreach(_.foreach {
+  case n: Nondeterministic => n.initialize(partitionIndex)
+  case _ =>
+})
+  }
+
+  override def apply(row: InternalRow): UnsafeRow = {
+// Put the expression results in the intermediate row.
+var i = 0
+while (i < numFields) {
+  values(i) = expressions(i).eval(row)
+  i += 1
+}
+
+// Write the intermediate row to an unsafe row.
+holder.reset()
+writer(intermediate)
+result.setTotalSize(holder.totalSize())
+result
+  }
+}
+
+/**
+ * Helper functions for creating an [[InterpretedUnsafeProjection]].
+ */
+object InterpretedUnsafeProjection extends UnsafeProjectionCreator {
+
+  /**
+   * Returns an [[UnsafeProjection]] for given sequence of bound 
Expressions.
+   */
+  override protected def createProjection(exprs: Seq[Expression]): 
UnsafeProjection = {
+// We need to make sure that we do not reuse stateful non 
deterministic expressions.
+val cleanedExpressions = exprs.map(_.transform {
+  case s: StatefulNondeterministic => s.freshCopy()
+  

[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-07 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r173025320
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
 ---
@@ -146,6 +145,18 @@ object UnsafeProjection {
 create(exprs.map(BindReferences.bindReference(_, inputSchema)))
   }
 
+  /**
+   * Returns an [[UnsafeProjection]] for given sequence of bound 
Expressions.
+   */
+  protected def createProjection(exprs: Seq[Expression]): UnsafeProjection
--- End diff --

Yeah, that was pretty stupid. It is fixed now :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r172634434
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
 ---
@@ -146,6 +145,18 @@ object UnsafeProjection {
 create(exprs.map(BindReferences.bindReference(_, inputSchema)))
   }
 
+  /**
+   * Returns an [[UnsafeProjection]] for given sequence of bound 
Expressions.
+   */
+  protected def createProjection(exprs: Seq[Expression]): UnsafeProjection
--- End diff --

seems no place is calling this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-06 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r172539893
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
 ---
@@ -154,11 +154,20 @@ trait ExpressionEvalHelper extends 
GeneratorDrivenPropertyChecks {
   expression: Expression,
   expected: Any,
   inputRow: InternalRow = EmptyRow): Unit = {
+checkEvalutionWithUnsafeProjection(expression, expected, inputRow, 
UnsafeProjection)
+checkEvalutionWithUnsafeProjection(expression, expected, inputRow, 
InterpretedUnsafeProjection)
+  }
+
+  protected def checkEvalutionWithUnsafeProjection(
--- End diff --

nit: typo in Evalu(a)tion


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-06 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r172536692
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InterpretedUnsafeProjection.scala
 ---
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
GenericInternalRow, Nondeterministic, SpecializedGetters, UnsafeArrayData, 
UnsafeMapData, UnsafeProjection, UnsafeProjectionCreator, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter}
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.types.{UserDefinedType, _}
+import org.apache.spark.unsafe.Platform
+
+/**
+ * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] 
it produces, a consumer
+ * should copy the row if it is being buffered. This class is not thread 
safe.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedUnsafeProjection(expressions: Array[Expression]) extends 
UnsafeProjection {
--- End diff --

The current implementation takes a two step approach, first it evaluates 
the expressions and puts them in an intermediate row and it then converts this 
row to an `UnsafeRow`. We could also just create a converter from `InternalRow` 
to `UnsafeRow` and punt the projection work of to a 
`InterpretedMutableProjection`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-06 Thread hvanhovell
GitHub user hvanhovell opened a pull request:

https://github.com/apache/spark/pull/20750

[SPARK-23581][SQL] Add interpreted unsafe projection

## What changes were proposed in this pull request?
We currently can only create unsafe rows using code generation. This is a 
problem for situations in which code generation fails. There is no fallback, 
and as a result we cannot execute the query.

This PR adds an interpreted version of `UnsafeProjection`. The 
implementation is modeled after `InterpretedMutableProjection`. It stores the 
expression results in a `GenericInternalRow`, and it then uses a conversion 
function to convert the `GenericInternalRow` into an `UnsafeRow`.

## How was this patch tested?
I am piggybacking on exiting `UnsafeProjection` tests, and I have added an 
interpreted version for each of these.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hvanhovell/spark SPARK-23581

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20750.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20750


commit 4689170a68ffe8e1ac1542b41b1760027875bf24
Author: Herman van Hovell 
Date:   2018-03-06T13:19:44Z

Add an interpreted unsafe projection

commit ead8335b9ab923e667b3c07ece7bfd0320ef69f0
Author: Herman van Hovell 
Date:   2018-03-06T14:24:32Z

Enable tests




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org