[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20750 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r175083026 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala --- @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter} +import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} +import org.apache.spark.sql.types.{UserDefinedType, _} +import org.apache.spark.unsafe.Platform + +/** + * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] it produces, a consumer + * should copy the row if it is being buffered. This class is not thread safe. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedUnsafeProjection(expressions: Array[Expression]) extends UnsafeProjection { + import InterpretedUnsafeProjection._ + + /** Number of (top level) fields in the resulting row. */ + private[this] val numFields = expressions.length + + /** Array that expression results. */ + private[this] val values = new Array[Any](numFields) + + /** The row representing the expression results. */ + private[this] val intermediate = new GenericInternalRow(values) + + /** The row returned by the projection. */ + private[this] val result = new UnsafeRow(numFields) + + /** The buffer which holds the resulting row's backing data. */ + private[this] val holder = new BufferHolder(result, numFields * 32) + + /** The writer that writes the intermediate result to the result row. */ + private[this] val writer: InternalRow => Unit = { +val rowWriter = new UnsafeRowWriter(holder, numFields) +val baseWriter = generateStructWriter( + holder, + rowWriter, + expressions.map(e => StructField("", e.dataType, e.nullable))) +if (!expressions.exists(_.nullable)) { + // No nullable fields. The top-level null bit mask will always be zeroed out. + baseWriter +} else { + // Zero out the null bit mask before we write the row. + row => { +rowWriter.zeroOutNullBytes() +baseWriter(row) + } +} + } + + override def initialize(partitionIndex: Int): Unit = { +expressions.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => +}) + } + + override def apply(row: InternalRow): UnsafeRow = { +// Put the expression results in the intermediate row. +var i = 0 +while (i < numFields) { + values(i) = expressions(i).eval(row) + i += 1 +} + +// Write the intermediate row to an unsafe row. +holder.reset() +writer(intermediate) +result.setTotalSize(holder.totalSize()) +result + } +} + +/** + * Helper functions for creating an [[InterpretedUnsafeProjection]]. + */ +object InterpretedUnsafeProjection extends UnsafeProjectionCreator { + + /** + * Returns an [[UnsafeProjection]] for given sequence of bound Expressions. + */ + override protected def createProjection(exprs: Seq[Expression]): UnsafeProjection = { +// We need to make sure that we do not reuse stateful expressions. +val cleanedExpressions = exprs.map(_.transform { + case s: Stateful => s.freshCopy() +}) +new InterpretedUnsafeProjection(cleanedExpressions.toArray) + } + + /** + * Generate a struct writer function. The generated function writes an [[InternalRow]] to the + * given buffer using the given
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r175065574 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala --- @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter} +import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} +import org.apache.spark.sql.types.{UserDefinedType, _} +import org.apache.spark.unsafe.Platform + +/** + * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] it produces, a consumer + * should copy the row if it is being buffered. This class is not thread safe. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedUnsafeProjection(expressions: Array[Expression]) extends UnsafeProjection { + import InterpretedUnsafeProjection._ + + /** Number of (top level) fields in the resulting row. */ + private[this] val numFields = expressions.length + + /** Array that expression results. */ + private[this] val values = new Array[Any](numFields) + + /** The row representing the expression results. */ + private[this] val intermediate = new GenericInternalRow(values) + + /** The row returned by the projection. */ + private[this] val result = new UnsafeRow(numFields) + + /** The buffer which holds the resulting row's backing data. */ + private[this] val holder = new BufferHolder(result, numFields * 32) + + /** The writer that writes the intermediate result to the result row. */ + private[this] val writer: InternalRow => Unit = { +val rowWriter = new UnsafeRowWriter(holder, numFields) +val baseWriter = generateStructWriter( + holder, + rowWriter, + expressions.map(e => StructField("", e.dataType, e.nullable))) +if (!expressions.exists(_.nullable)) { + // No nullable fields. The top-level null bit mask will always be zeroed out. + baseWriter +} else { + // Zero out the null bit mask before we write the row. + row => { +rowWriter.zeroOutNullBytes() +baseWriter(row) + } +} + } + + override def initialize(partitionIndex: Int): Unit = { +expressions.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => +}) + } + + override def apply(row: InternalRow): UnsafeRow = { +// Put the expression results in the intermediate row. +var i = 0 +while (i < numFields) { + values(i) = expressions(i).eval(row) + i += 1 +} + +// Write the intermediate row to an unsafe row. +holder.reset() +writer(intermediate) +result.setTotalSize(holder.totalSize()) +result + } +} + +/** + * Helper functions for creating an [[InterpretedUnsafeProjection]]. + */ +object InterpretedUnsafeProjection extends UnsafeProjectionCreator { + + /** + * Returns an [[UnsafeProjection]] for given sequence of bound Expressions. + */ + override protected def createProjection(exprs: Seq[Expression]): UnsafeProjection = { +// We need to make sure that we do not reuse stateful expressions. +val cleanedExpressions = exprs.map(_.transform { + case s: Stateful => s.freshCopy() +}) +new InterpretedUnsafeProjection(cleanedExpressions.toArray) + } + + /** + * Generate a struct writer function. The generated function writes an [[InternalRow]] to the + * given buffer using the given
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r174892728 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala --- @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter} +import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} +import org.apache.spark.sql.types.{UserDefinedType, _} +import org.apache.spark.unsafe.Platform + +/** + * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] it produces, a consumer + * should copy the row if it is being buffered. This class is not thread safe. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedUnsafeProjection(expressions: Array[Expression]) extends UnsafeProjection { + import InterpretedUnsafeProjection._ + + /** Number of (top level) fields in the resulting row. */ + private[this] val numFields = expressions.length + + /** Array that expression results. */ + private[this] val values = new Array[Any](numFields) + + /** The row representing the expression results. */ + private[this] val intermediate = new GenericInternalRow(values) + + /** The row returned by the projection. */ + private[this] val result = new UnsafeRow(numFields) + + /** The buffer which holds the resulting row's backing data. */ + private[this] val holder = new BufferHolder(result, numFields * 32) + + /** The writer that writes the intermediate result to the result row. */ + private[this] val writer: InternalRow => Unit = { +val rowWriter = new UnsafeRowWriter(holder, numFields) +val baseWriter = generateStructWriter( + holder, + rowWriter, + expressions.map(e => StructField("", e.dataType, e.nullable))) +if (!expressions.exists(_.nullable)) { + // No nullable fields. The top-level null bit mask will always be zeroed out. + baseWriter +} else { + // Zero out the null bit mask before we write the row. + row => { +rowWriter.zeroOutNullBytes() +baseWriter(row) + } +} + } + + override def initialize(partitionIndex: Int): Unit = { +expressions.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => +}) + } + + override def apply(row: InternalRow): UnsafeRow = { +// Put the expression results in the intermediate row. +var i = 0 +while (i < numFields) { + values(i) = expressions(i).eval(row) + i += 1 +} + +// Write the intermediate row to an unsafe row. +holder.reset() +writer(intermediate) +result.setTotalSize(holder.totalSize()) +result + } +} + +/** + * Helper functions for creating an [[InterpretedUnsafeProjection]]. + */ +object InterpretedUnsafeProjection extends UnsafeProjectionCreator { + + /** + * Returns an [[UnsafeProjection]] for given sequence of bound Expressions. + */ + override protected def createProjection(exprs: Seq[Expression]): UnsafeProjection = { +// We need to make sure that we do not reuse stateful expressions. +val cleanedExpressions = exprs.map(_.transform { + case s: Stateful => s.freshCopy() +}) +new InterpretedUnsafeProjection(cleanedExpressions.toArray) + } + + /** + * Generate a struct writer function. The generated function writes an [[InternalRow]] to the + * given buffer using the given
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r174892448 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala --- @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter} +import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} +import org.apache.spark.sql.types.{UserDefinedType, _} +import org.apache.spark.unsafe.Platform + +/** + * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] it produces, a consumer + * should copy the row if it is being buffered. This class is not thread safe. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedUnsafeProjection(expressions: Array[Expression]) extends UnsafeProjection { + import InterpretedUnsafeProjection._ + + /** Number of (top level) fields in the resulting row. */ + private[this] val numFields = expressions.length + + /** Array that expression results. */ + private[this] val values = new Array[Any](numFields) + + /** The row representing the expression results. */ + private[this] val intermediate = new GenericInternalRow(values) + + /** The row returned by the projection. */ + private[this] val result = new UnsafeRow(numFields) + + /** The buffer which holds the resulting row's backing data. */ + private[this] val holder = new BufferHolder(result, numFields * 32) + + /** The writer that writes the intermediate result to the result row. */ + private[this] val writer: InternalRow => Unit = { +val rowWriter = new UnsafeRowWriter(holder, numFields) +val baseWriter = generateStructWriter( + holder, + rowWriter, + expressions.map(e => StructField("", e.dataType, e.nullable))) +if (!expressions.exists(_.nullable)) { + // No nullable fields. The top-level null bit mask will always be zeroed out. + baseWriter +} else { + // Zero out the null bit mask before we write the row. + row => { +rowWriter.zeroOutNullBytes() +baseWriter(row) + } +} + } + + override def initialize(partitionIndex: Int): Unit = { +expressions.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => +}) + } + + override def apply(row: InternalRow): UnsafeRow = { +// Put the expression results in the intermediate row. +var i = 0 +while (i < numFields) { + values(i) = expressions(i).eval(row) + i += 1 +} + +// Write the intermediate row to an unsafe row. +holder.reset() +writer(intermediate) +result.setTotalSize(holder.totalSize()) +result + } +} + +/** + * Helper functions for creating an [[InterpretedUnsafeProjection]]. + */ +object InterpretedUnsafeProjection extends UnsafeProjectionCreator { + + /** + * Returns an [[UnsafeProjection]] for given sequence of bound Expressions. + */ + override protected def createProjection(exprs: Seq[Expression]): UnsafeProjection = { +// We need to make sure that we do not reuse stateful expressions. +val cleanedExpressions = exprs.map(_.transform { + case s: Stateful => s.freshCopy() +}) +new InterpretedUnsafeProjection(cleanedExpressions.toArray) + } + + /** + * Generate a struct writer function. The generated function writes an [[InternalRow]] to the + * given buffer using the given
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r174890611 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala --- @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter} +import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} +import org.apache.spark.sql.types.{UserDefinedType, _} +import org.apache.spark.unsafe.Platform + +/** + * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] it produces, a consumer + * should copy the row if it is being buffered. This class is not thread safe. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedUnsafeProjection(expressions: Array[Expression]) extends UnsafeProjection { + import InterpretedUnsafeProjection._ + + /** Number of (top level) fields in the resulting row. */ + private[this] val numFields = expressions.length + + /** Array that expression results. */ + private[this] val values = new Array[Any](numFields) + + /** The row representing the expression results. */ + private[this] val intermediate = new GenericInternalRow(values) + + /** The row returned by the projection. */ + private[this] val result = new UnsafeRow(numFields) + + /** The buffer which holds the resulting row's backing data. */ + private[this] val holder = new BufferHolder(result, numFields * 32) + + /** The writer that writes the intermediate result to the result row. */ + private[this] val writer: InternalRow => Unit = { +val rowWriter = new UnsafeRowWriter(holder, numFields) +val baseWriter = generateStructWriter( + holder, + rowWriter, + expressions.map(e => StructField("", e.dataType, e.nullable))) +if (!expressions.exists(_.nullable)) { + // No nullable fields. The top-level null bit mask will always be zeroed out. + baseWriter +} else { + // Zero out the null bit mask before we write the row. + row => { +rowWriter.zeroOutNullBytes() +baseWriter(row) + } +} + } + + override def initialize(partitionIndex: Int): Unit = { +expressions.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => +}) + } + + override def apply(row: InternalRow): UnsafeRow = { +// Put the expression results in the intermediate row. +var i = 0 +while (i < numFields) { + values(i) = expressions(i).eval(row) + i += 1 +} + +// Write the intermediate row to an unsafe row. +holder.reset() +writer(intermediate) +result.setTotalSize(holder.totalSize()) +result + } +} + +/** + * Helper functions for creating an [[InterpretedUnsafeProjection]]. + */ +object InterpretedUnsafeProjection extends UnsafeProjectionCreator { + + /** + * Returns an [[UnsafeProjection]] for given sequence of bound Expressions. + */ + override protected def createProjection(exprs: Seq[Expression]): UnsafeProjection = { +// We need to make sure that we do not reuse stateful expressions. +val cleanedExpressions = exprs.map(_.transform { + case s: Stateful => s.freshCopy() +}) +new InterpretedUnsafeProjection(cleanedExpressions.toArray) + } + + /** + * Generate a struct writer function. The generated function writes an [[InternalRow]] to the + * given buffer using the given
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r174873582 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala --- @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter} +import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} +import org.apache.spark.sql.types.{UserDefinedType, _} +import org.apache.spark.unsafe.Platform + +/** + * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] it produces, a consumer + * should copy the row if it is being buffered. This class is not thread safe. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedUnsafeProjection(expressions: Array[Expression]) extends UnsafeProjection { + import InterpretedUnsafeProjection._ + + /** Number of (top level) fields in the resulting row. */ + private[this] val numFields = expressions.length + + /** Array that expression results. */ + private[this] val values = new Array[Any](numFields) + + /** The row representing the expression results. */ + private[this] val intermediate = new GenericInternalRow(values) + + /** The row returned by the projection. */ + private[this] val result = new UnsafeRow(numFields) + + /** The buffer which holds the resulting row's backing data. */ + private[this] val holder = new BufferHolder(result, numFields * 32) + + /** The writer that writes the intermediate result to the result row. */ + private[this] val writer: InternalRow => Unit = { +val rowWriter = new UnsafeRowWriter(holder, numFields) +val baseWriter = generateStructWriter( + holder, + rowWriter, + expressions.map(e => StructField("", e.dataType, e.nullable))) +if (!expressions.exists(_.nullable)) { + // No nullable fields. The top-level null bit mask will always be zeroed out. + baseWriter +} else { + // Zero out the null bit mask before we write the row. + row => { +rowWriter.zeroOutNullBytes() +baseWriter(row) + } +} + } + + override def initialize(partitionIndex: Int): Unit = { +expressions.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => +}) + } + + override def apply(row: InternalRow): UnsafeRow = { +// Put the expression results in the intermediate row. +var i = 0 +while (i < numFields) { + values(i) = expressions(i).eval(row) + i += 1 +} + +// Write the intermediate row to an unsafe row. +holder.reset() +writer(intermediate) +result.setTotalSize(holder.totalSize()) +result + } +} + +/** + * Helper functions for creating an [[InterpretedUnsafeProjection]]. + */ +object InterpretedUnsafeProjection extends UnsafeProjectionCreator { + + /** + * Returns an [[UnsafeProjection]] for given sequence of bound Expressions. + */ + override protected def createProjection(exprs: Seq[Expression]): UnsafeProjection = { +// We need to make sure that we do not reuse stateful expressions. +val cleanedExpressions = exprs.map(_.transform { + case s: Stateful => s.freshCopy() +}) +new InterpretedUnsafeProjection(cleanedExpressions.toArray) + } + + /** + * Generate a struct writer function. The generated function writes an [[InternalRow]] to the + * given buffer using the given
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r174871585 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -328,6 +328,32 @@ trait Nondeterministic extends Expression { protected def evalInternal(input: InternalRow): Any } +/** + * An expression that contains mutable state. A stateful expression is always non-deterministic + * because the results it produces during evaluation are not only dependent on the given input + * but also on its internal state. + * + * The state of the expressions is generally not exposed in the parameter list and this makes + * comparing stateful expressions problematic because similar stateful expressions (with the same + * parameter list) but with different internal state will be considered equal. This is especially + * problematic during tree transformations. In order to counter this the `fastEquals` method for + * stateful expressions only returns `true` for the same reference. + * + * A stateful expression should never be evaluated multiple times for a single row. This should + * only be a problem for interpreted execution. This can be prevented by creating fresh copies + * of the stateful expression before execution, these can be made using the `freshCopy` function. + */ +trait Stateful extends Nondeterministic { + /** + * Return a fresh uninitialized copy of the stateful expression. + */ + def freshCopy(): Stateful = this --- End diff -- I think it's better to not provide this default implementation, to avoid mistakes in the future. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r174606284 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions.codegen; + +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * Base class for writing Unsafe* structures. + */ +public abstract class UnsafeWriter { + public abstract void setNullByte(int ordinal); --- End diff -- Actually this was my mistake... I thought `Platform.setInt(0)` is different from `Platform.setFloat(0.0f)`, and that's why I introduced a `setNull` method for each primitive type. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r174605769 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions.codegen; + +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * Base class for writing Unsafe* structures. + */ +public abstract class UnsafeWriter { + public abstract void setNullByte(int ordinal); --- End diff -- I feel `setNull1/2/4/8Bytes` is better. It's also easy to codegen, just `setNull${dt.defaultSize}Bytes`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r174605126 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -328,6 +328,21 @@ trait Nondeterministic extends Expression { protected def evalInternal(input: InternalRow): Any } +/** + * A stateful nondeterministic expression. These expressions contain state + * that is not stored in the parameter list. + */ +trait StatefulNondeterministic extends Nondeterministic { --- End diff -- Maybe we can just call it `Stateful` while still extending `Nondeterministic`, and in the doc we say that stateful expressions imply it's nondeterministic. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r174604738 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -328,6 +328,21 @@ trait Nondeterministic extends Expression { protected def evalInternal(input: InternalRow): Any } +/** + * A stateful nondeterministic expression. These expressions contain state + * that is not stored in the parameter list. + */ +trait StatefulNondeterministic extends Nondeterministic { --- End diff -- from Hive doc ``` A stateful UDF is considered to be non-deterministic, irrespective of what deterministic() returns. ``` This is corrected. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r174603757 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -328,6 +328,21 @@ trait Nondeterministic extends Expression { protected def evalInternal(input: InternalRow): Any } +/** + * A stateful nondeterministic expression. These expressions contain state + * that is not stored in the parameter list. + */ +trait StatefulNondeterministic extends Nondeterministic { --- End diff -- In hive stateful and deterministic are orthogonal. If we wanna add this new trait, I think it's time to figure it out the semantic. Shall we have a new trait called `Stateful`, or add an assumption that stateful functions must be nondeterministic? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r174594054 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala --- @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter} +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.types.{UserDefinedType, _} +import org.apache.spark.unsafe.Platform + +/** + * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] it produces, a consumer + * should copy the row if it is being buffered. This class is not thread safe. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedUnsafeProjection(expressions: Array[Expression]) extends UnsafeProjection { + import InterpretedUnsafeProjection._ + + /** Number of (top level) fields in the resulting row. */ + private[this] val numFields = expressions.length + + /** Array that expression results. */ + private[this] val values = new Array[Any](numFields) + + /** The row representing the expression results. */ + private[this] val intermediate = new GenericInternalRow(values) + + /** The row returned by the projection. */ + private[this] val result = new UnsafeRow(numFields) + + /** The buffer which holds the resulting row's backing data. */ + private[this] val holder = new BufferHolder(result, numFields * 32) + + /** The writer that writes the intermediate result to the result row. */ + private[this] val writer: InternalRow => Unit = { +val rowWriter = new UnsafeRowWriter(holder, numFields) +val baseWriter = generateStructWriter( + holder, + rowWriter, + expressions.map(e => StructField("", e.dataType, e.nullable))) +if (!expressions.exists(_.nullable)) { + // No nullable fields. The top-level null bit mask will always be zeroed out. + baseWriter +} else { + // Zero out the null bit mask before we write the row. + row => { +rowWriter.zeroOutNullBytes() +baseWriter(row) + } +} + } + + override def initialize(partitionIndex: Int): Unit = { +expressions.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => +}) + } + + override def apply(row: InternalRow): UnsafeRow = { +// Put the expression results in the intermediate row. +var i = 0 +while (i < numFields) { + values(i) = expressions(i).eval(row) + i += 1 +} + +// Write the intermediate row to an unsafe row. +holder.reset() +writer(intermediate) +result.setTotalSize(holder.totalSize()) +result + } +} + +/** + * Helper functions for creating an [[InterpretedUnsafeProjection]]. + */ +object InterpretedUnsafeProjection extends UnsafeProjectionCreator { + + /** + * Returns an [[UnsafeProjection]] for given sequence of bound Expressions. + */ + override protected def createProjection(exprs: Seq[Expression]): UnsafeProjection = { +// We need to make sure that we do not reuse stateful non deterministic expressions. +val cleanedExpressions = exprs.map(_.transform { + case s: StatefulNondeterministic => s.freshCopy() --- End diff -- In codegen the state is put in the generated class, if you happen to visit the same expression twice the state is added twice and is not shared during evaluation. In interpreted mode the Expression will be the
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r174591520 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala --- @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter} +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.types.{UserDefinedType, _} +import org.apache.spark.unsafe.Platform + +/** + * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] it produces, a consumer + * should copy the row if it is being buffered. This class is not thread safe. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedUnsafeProjection(expressions: Array[Expression]) extends UnsafeProjection { + import InterpretedUnsafeProjection._ + + /** Number of (top level) fields in the resulting row. */ + private[this] val numFields = expressions.length + + /** Array that expression results. */ + private[this] val values = new Array[Any](numFields) + + /** The row representing the expression results. */ + private[this] val intermediate = new GenericInternalRow(values) + + /** The row returned by the projection. */ + private[this] val result = new UnsafeRow(numFields) + + /** The buffer which holds the resulting row's backing data. */ + private[this] val holder = new BufferHolder(result, numFields * 32) + + /** The writer that writes the intermediate result to the result row. */ + private[this] val writer: InternalRow => Unit = { +val rowWriter = new UnsafeRowWriter(holder, numFields) +val baseWriter = generateStructWriter( + holder, + rowWriter, + expressions.map(e => StructField("", e.dataType, e.nullable))) +if (!expressions.exists(_.nullable)) { + // No nullable fields. The top-level null bit mask will always be zeroed out. + baseWriter +} else { + // Zero out the null bit mask before we write the row. + row => { +rowWriter.zeroOutNullBytes() +baseWriter(row) + } +} + } + + override def initialize(partitionIndex: Int): Unit = { +expressions.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => +}) + } + + override def apply(row: InternalRow): UnsafeRow = { +// Put the expression results in the intermediate row. +var i = 0 +while (i < numFields) { + values(i) = expressions(i).eval(row) + i += 1 +} + +// Write the intermediate row to an unsafe row. +holder.reset() +writer(intermediate) +result.setTotalSize(holder.totalSize()) +result + } +} + +/** + * Helper functions for creating an [[InterpretedUnsafeProjection]]. + */ +object InterpretedUnsafeProjection extends UnsafeProjectionCreator { + + /** + * Returns an [[UnsafeProjection]] for given sequence of bound Expressions. + */ + override protected def createProjection(exprs: Seq[Expression]): UnsafeProjection = { +// We need to make sure that we do not reuse stateful non deterministic expressions. +val cleanedExpressions = exprs.map(_.transform { + case s: StatefulNondeterministic => s.freshCopy() --- End diff -- why it's not a problem for the codegen version? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands,
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r174587783 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions.codegen; + +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * Base class for writing Unsafe* structures. + */ +public abstract class UnsafeWriter { + public abstract void setNullByte(int ordinal); --- End diff -- See my previous discussion with @mgaido91. I am fine either way, I can also add the missing methods an be done with it, that will just make the interpreted code path a bit messier. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r174587229 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java --- @@ -83,10 +83,10 @@ private long getElementOffset(int ordinal, int elementSize) { return startingOffset + headerInBytes + ordinal * elementSize; } - public void setOffsetAndSize(int ordinal, long currentCursor, int size) { + public void setOffsetAndSize(int ordinal, long currentCursor, long size) { --- End diff -- I think we can safely change the signature to take two ints. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r174586279 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions.codegen; + +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * Base class for writing Unsafe* structures. + */ +public abstract class UnsafeWriter { + public abstract void setNullByte(int ordinal); --- End diff -- This looks pretty weird. At the first glance I'm wondering why we don't have `setBoolean/Float/Double`, then I realized we don't need to, because we just need a way to set null for 1/2/4/8 bytes. maybe it's better to name them `setNull1/2/4/8Bytes`, and ask the `UnsafeArrayWriter` to follow --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r174584506 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions.codegen; + +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * Base class for writing Unsafe* structures. + */ +public abstract class UnsafeWriter { + public abstract void setNullByte(int ordinal); + public abstract void setNullShort(int ordinal); + public abstract void setNullInt(int ordinal); + public abstract void setNullLong(int ordinal); --- End diff -- Why we don't have `setNullBoolean/Float/Double`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r174583386 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java --- @@ -83,10 +83,10 @@ private long getElementOffset(int ordinal, int elementSize) { return startingOffset + headerInBytes + ordinal * elementSize; } - public void setOffsetAndSize(int ordinal, long currentCursor, int size) { + public void setOffsetAndSize(int ordinal, long currentCursor, long size) { --- End diff -- shall we check if `size` fits in an integer? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r173146007 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java --- @@ -93,6 +93,26 @@ public void setNullAt(int ordinal) { Platform.putLong(holder.buffer, getFieldOffset(ordinal), 0L); } + @Override + public void setNullByte(int ordinal) { --- End diff -- This is pretty low level stuff, so you should know how many bytes things contain at this point. I'd rather leave as it is. Doing a type match on such a hot code path doesn't seem like a good idea. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r173142004 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java --- @@ -93,6 +93,26 @@ public void setNullAt(int ordinal) { Platform.putLong(holder.buffer, getFieldOffset(ordinal), 0L); } + @Override + public void setNullByte(int ordinal) { --- End diff -- I am not sure it is a good idea, because then everyone while writing code should know exactly how many bytes each type is. I prefer the current approach. I would rather either reintroduce the `setNullAt` method with a match in the `UnsafeArrayData`'s implementation or let is as it is now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r173141050 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java --- @@ -93,6 +93,26 @@ public void setNullAt(int ordinal) { Platform.putLong(holder.buffer, getFieldOffset(ordinal), 0L); } + @Override + public void setNullByte(int ordinal) { --- End diff -- We could also name them differently e.g.: `setNull1Byte`, `setNull2Bytes`, `setNull4Bytes` & `setNull8Bytes` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r173132540 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java --- @@ -93,6 +93,26 @@ public void setNullAt(int ordinal) { Platform.putLong(holder.buffer, getFieldOffset(ordinal), 0L); } + @Override + public void setNullByte(int ordinal) { --- End diff -- I see, but I am not sure about having only some of these methods here. I mean, in `UnsafeArrayData` we have also `setNullDouble`, `setNullFloat`, etc. etc. It seems a bit weird to me that some of them are set at the parent level and some other no. It's not a big deal but I'd prefer for consistency to have all of them. What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r173130586 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java --- @@ -93,6 +93,26 @@ public void setNullAt(int ordinal) { Platform.putLong(holder.buffer, getFieldOffset(ordinal), 0L); } + @Override + public void setNullByte(int ordinal) { --- End diff -- These methods are needed for writing `UnsafeArrayData`, we fill the slot with 0s if we set it to null. The slot size in `UnsafeArrayData` is dependent on the data type we are storing in it. I wanted to avoid writing a lot of duplicate code in the `InterpretedUsafeProjection`, and this is why I added these methods the `UnsafeWriter` parent class, and this also why they are in the `UnsafeRowWriter`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r173123311 --- Diff: sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java --- @@ -93,6 +93,26 @@ public void setNullAt(int ordinal) { Platform.putLong(holder.buffer, getFieldOffset(ordinal), 0L); } + @Override + public void setNullByte(int ordinal) { --- End diff -- which is the reason why these methods have been introduced? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r173110061 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InterpretedUnsafeProjection.scala --- @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.analysis.CleanupAliases +import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, GenericInternalRow, Nondeterministic, SpecializedGetters, StatefulNondeterministic, UnsafeArrayData, UnsafeMapData, UnsafeProjection, UnsafeProjectionCreator, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter} +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.types.{UserDefinedType, _} +import org.apache.spark.unsafe.Platform + +/** + * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] it produces, a consumer + * should copy the row if it is being buffered. This class is not thread safe. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedUnsafeProjection(expressions: Array[Expression]) extends UnsafeProjection { + import InterpretedUnsafeProjection._ + + /** Number of (top level) fields in the resulting row. */ + private[this] val numFields = expressions.length + + /** Array that expression results. */ + private[this] val values = new Array[Any](numFields) + + /** The row representing the expression results. */ + private[this] val intermediate = new GenericInternalRow(values) + + /** The row returned by the projection. */ + private[this] val result = new UnsafeRow(numFields) + + /** The buffer which holds the resulting row's backing data. */ + private[this] val holder = new BufferHolder(result, numFields * 32) + + /** The writer that writes the intermediate result to the result row. */ + private[this] val writer: InternalRow => Unit = { +val rowWriter = new UnsafeRowWriter(holder, numFields) +val baseWriter = generateStructWriter( + holder, + rowWriter, + expressions.map(e => StructField("", e.dataType, e.nullable))) +if (!expressions.exists(_.nullable)) { + // No nullable fields. The top-level null bit mask will always be zeroed out. + baseWriter +} else { + // Zero out the null bit mask before we write the row. + row => { +rowWriter.zeroOutNullBytes() +baseWriter(row) + } +} + } + + override def initialize(partitionIndex: Int): Unit = { +expressions.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => +}) + } + + override def apply(row: InternalRow): UnsafeRow = { +// Put the expression results in the intermediate row. +var i = 0 +while (i < numFields) { + values(i) = expressions(i).eval(row) + i += 1 +} + +// Write the intermediate row to an unsafe row. +holder.reset() +writer(intermediate) +result.setTotalSize(holder.totalSize()) +result + } +} + +/** + * Helper functions for creating an [[InterpretedUnsafeProjection]]. + */ +object InterpretedUnsafeProjection extends UnsafeProjectionCreator { + + /** + * Returns an [[UnsafeProjection]] for given sequence of bound Expressions. + */ + override protected def createProjection(exprs: Seq[Expression]): UnsafeProjection = { +// We need to make sure that we do not reuse stateful non deterministic expressions. +val cleanedExpressions = exprs.map(_.transform { + case s: StatefulNondeterministic => s.freshCopy() +
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r173041019 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InterpretedUnsafeProjection.scala --- @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.analysis.CleanupAliases +import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, GenericInternalRow, Nondeterministic, SpecializedGetters, StatefulNondeterministic, UnsafeArrayData, UnsafeMapData, UnsafeProjection, UnsafeProjectionCreator, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter} +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.types.{UserDefinedType, _} +import org.apache.spark.unsafe.Platform + +/** + * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] it produces, a consumer + * should copy the row if it is being buffered. This class is not thread safe. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedUnsafeProjection(expressions: Array[Expression]) extends UnsafeProjection { + import InterpretedUnsafeProjection._ + + /** Number of (top level) fields in the resulting row. */ + private[this] val numFields = expressions.length + + /** Array that expression results. */ + private[this] val values = new Array[Any](numFields) + + /** The row representing the expression results. */ + private[this] val intermediate = new GenericInternalRow(values) + + /** The row returned by the projection. */ + private[this] val result = new UnsafeRow(numFields) + + /** The buffer which holds the resulting row's backing data. */ + private[this] val holder = new BufferHolder(result, numFields * 32) + + /** The writer that writes the intermediate result to the result row. */ + private[this] val writer: InternalRow => Unit = { +val rowWriter = new UnsafeRowWriter(holder, numFields) +val baseWriter = generateStructWriter( + holder, + rowWriter, + expressions.map(e => StructField("", e.dataType, e.nullable))) +if (!expressions.exists(_.nullable)) { + // No nullable fields. The top-level null bit mask will always be zeroed out. + baseWriter +} else { + // Zero out the null bit mask before we write the row. + row => { +rowWriter.zeroOutNullBytes() +baseWriter(row) + } +} + } + + override def initialize(partitionIndex: Int): Unit = { +expressions.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => +}) + } + + override def apply(row: InternalRow): UnsafeRow = { +// Put the expression results in the intermediate row. +var i = 0 +while (i < numFields) { + values(i) = expressions(i).eval(row) + i += 1 +} + +// Write the intermediate row to an unsafe row. +holder.reset() +writer(intermediate) +result.setTotalSize(holder.totalSize()) +result + } +} + +/** + * Helper functions for creating an [[InterpretedUnsafeProjection]]. + */ +object InterpretedUnsafeProjection extends UnsafeProjectionCreator { + + /** + * Returns an [[UnsafeProjection]] for given sequence of bound Expressions. + */ + override protected def createProjection(exprs: Seq[Expression]): UnsafeProjection = { +// We need to make sure that we do not reuse stateful non deterministic expressions. +val cleanedExpressions = exprs.map(_.transform { + case s: StatefulNondeterministic => s.freshCopy() +
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r173025320 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala --- @@ -146,6 +145,18 @@ object UnsafeProjection { create(exprs.map(BindReferences.bindReference(_, inputSchema))) } + /** + * Returns an [[UnsafeProjection]] for given sequence of bound Expressions. + */ + protected def createProjection(exprs: Seq[Expression]): UnsafeProjection --- End diff -- Yeah, that was pretty stupid. It is fixed now :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r172634434 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala --- @@ -146,6 +145,18 @@ object UnsafeProjection { create(exprs.map(BindReferences.bindReference(_, inputSchema))) } + /** + * Returns an [[UnsafeProjection]] for given sequence of bound Expressions. + */ + protected def createProjection(exprs: Seq[Expression]): UnsafeProjection --- End diff -- seems no place is calling this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r172539893 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala --- @@ -154,11 +154,20 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks { expression: Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = { +checkEvalutionWithUnsafeProjection(expression, expected, inputRow, UnsafeProjection) +checkEvalutionWithUnsafeProjection(expression, expected, inputRow, InterpretedUnsafeProjection) + } + + protected def checkEvalutionWithUnsafeProjection( --- End diff -- nit: typo in Evalu(a)tion --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/20750#discussion_r172536692 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InterpretedUnsafeProjection.scala --- @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.expressions.{Expression, GenericInternalRow, Nondeterministic, SpecializedGetters, UnsafeArrayData, UnsafeMapData, UnsafeProjection, UnsafeProjectionCreator, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter} +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.types.{UserDefinedType, _} +import org.apache.spark.unsafe.Platform + +/** + * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] it produces, a consumer + * should copy the row if it is being buffered. This class is not thread safe. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedUnsafeProjection(expressions: Array[Expression]) extends UnsafeProjection { --- End diff -- The current implementation takes a two step approach, first it evaluates the expressions and puts them in an intermediate row and it then converts this row to an `UnsafeRow`. We could also just create a converter from `InternalRow` to `UnsafeRow` and punt the projection work of to a `InterpretedMutableProjection`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...
GitHub user hvanhovell opened a pull request: https://github.com/apache/spark/pull/20750 [SPARK-23581][SQL] Add interpreted unsafe projection ## What changes were proposed in this pull request? We currently can only create unsafe rows using code generation. This is a problem for situations in which code generation fails. There is no fallback, and as a result we cannot execute the query. This PR adds an interpreted version of `UnsafeProjection`. The implementation is modeled after `InterpretedMutableProjection`. It stores the expression results in a `GenericInternalRow`, and it then uses a conversion function to convert the `GenericInternalRow` into an `UnsafeRow`. ## How was this patch tested? I am piggybacking on exiting `UnsafeProjection` tests, and I have added an interpreted version for each of these. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hvanhovell/spark SPARK-23581 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20750.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20750 commit 4689170a68ffe8e1ac1542b41b1760027875bf24 Author: Herman van HovellDate: 2018-03-06T13:19:44Z Add an interpreted unsafe projection commit ead8335b9ab923e667b3c07ece7bfd0320ef69f0 Author: Herman van Hovell Date: 2018-03-06T14:24:32Z Enable tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org