[GitHub] spark pull request #18416: [SPARK-21204][SQL][WIP] Add support for Scala Set...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18416#discussion_r124712549 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala --- @@ -339,6 +340,28 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext { LHMapClass(LHMap(1 -> 2)) -> LHMap("test" -> MapClass(Map(3 -> 4 } + test("arbitrary sets") { --- End diff -- Added a test for it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18416: [SPARK-21204][SQL][WIP] Add support for Scala Set...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18416#discussion_r124712535 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -834,6 +834,140 @@ case class CollectObjectsToMap private( } } +object CollectObjectsToSet { + private val curId = new java.util.concurrent.atomic.AtomicInteger() + + /** + * Construct an instance of CollectObjectsToSet case class. + * + * @param function The function applied on the collection elements. + * @param inputData An expression that when evaluated returns a collection object. + * @param collClass The type of the resulting collection. + */ + def apply( + function: Expression => Expression, + inputData: Expression, + collClass: Class[_]): CollectObjectsToSet = { +val id = curId.getAndIncrement() +val loopValue = s"CollectObjectsToSet_loopValue$id" +val loopIsNull = s"CollectObjectsToSet_loopIsNull$id" +val arrayType = inputData.dataType.asInstanceOf[ArrayType] +val loopVar = LambdaVariable(loopValue, loopIsNull, arrayType.elementType) +CollectObjectsToSet( + loopValue, loopIsNull, function(loopVar), inputData, collClass) + } +} + +/** + * Expression used to convert a Catalyst Array to an external Scala `Set`. + * The collection is constructed using the associated builder, obtained by calling `newBuilder` + * on the collection's companion object. + * + * Notice that when we convert a Catalyst array which contains duplicated elements to an external + * Scala `Set`, the elements will be de-duplicated. + * + * @param loopValue the name of the loop variable that is used when iterating over the value + * collection, and which is used as input for the `lambdaFunction` + * @param loopIsNull the nullability of the loop variable that is used when iterating over + *the value collection, and which is used as input for the + *`lambdaFunction` + * @param lmbdaFunction A function that takes the `loopValue` as input, and is used as + *a lambda function to handle collection elements. + * @param inputData An expression that when evaluated returns an array object. + * @param collClass The type of the resulting collection. + */ +case class CollectObjectsToSet private( +loopValue: String, +loopIsNull: String, +lambdaFunction: Expression, +inputData: Expression, +collClass: Class[_]) extends Expression with NonSQLExpression { + + override def nullable: Boolean = inputData.nullable + + override def children: Seq[Expression] = lambdaFunction :: inputData :: Nil + + override def eval(input: InternalRow): Any = +throw new UnsupportedOperationException("Only code-generated evaluation is supported") + + override def dataType: DataType = ObjectType(collClass) + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +// The data with PythonUserDefinedType are actually stored with the data type of its sqlType. +def inputDataType(dataType: DataType) = dataType match { + case p: PythonUserDefinedType => p.sqlType + case _ => dataType +} + +val arrayType = inputDataType(inputData.dataType).asInstanceOf[ArrayType] +val loopValueJavaType = ctx.javaType(arrayType.elementType) +ctx.addMutableState("boolean", loopIsNull, "") +ctx.addMutableState(loopValueJavaType, loopValue, "") +val genFunction = lambdaFunction.genCode(ctx) + +val genInputData = inputData.genCode(ctx) +val dataLength = ctx.freshName("dataLength") +val loopIndex = ctx.freshName("loopIndex") +val builderValue = ctx.freshName("builderValue") + +val getLength = s"${genInputData.value}.numElements()" +val getLoopVar = ctx.getValue(genInputData.value, arrayType.elementType, loopIndex) + +// Make a copy of the data if it's unsafe-backed +def makeCopyIfInstanceOf(clazz: Class[_ <: Any], value: String) = + s"$value instanceof ${clazz.getSimpleName}? $value.copy() : $value" +val genFunctionValue = + lambdaFunction.dataType match { +case StructType(_) => makeCopyIfInstanceOf(classOf[UnsafeRow], genFunction.value) +case ArrayType(_, _) => makeCopyIfInstanceOf(classOf[UnsafeArrayData], genFunction.value) +case MapType(_, _, _) => makeCopyIfInstanceOf(classOf[UnsafeMapData], genFunction.value) +case _ => genFunction.value + } + +val loopNullCheck = s"$loopIsNull = ${genInputData.value}.isNullAt($loopIndex);"
[GitHub] spark pull request #18416: [SPARK-21204][SQL][WIP] Add support for Scala Set...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/18416#discussion_r124332920 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetPrimitiveSuite.scala --- @@ -339,6 +340,28 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext { LHMapClass(LHMap(1 -> 2)) -> LHMap("test" -> MapClass(Map(3 -> 4 } + test("arbitrary sets") { --- End diff -- Better to test null cases? ``` Seq(Seq(Some(1), None), Seq(Some(2))).toDF("c").as[Set[Int]] ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18416: [SPARK-21204][SQL][WIP] Add support for Scala Set...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/18416#discussion_r124332023 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -834,6 +834,140 @@ case class CollectObjectsToMap private( } } +object CollectObjectsToSet { + private val curId = new java.util.concurrent.atomic.AtomicInteger() + + /** + * Construct an instance of CollectObjectsToSet case class. + * + * @param function The function applied on the collection elements. + * @param inputData An expression that when evaluated returns a collection object. + * @param collClass The type of the resulting collection. + */ + def apply( + function: Expression => Expression, + inputData: Expression, + collClass: Class[_]): CollectObjectsToSet = { +val id = curId.getAndIncrement() +val loopValue = s"CollectObjectsToSet_loopValue$id" +val loopIsNull = s"CollectObjectsToSet_loopIsNull$id" +val arrayType = inputData.dataType.asInstanceOf[ArrayType] +val loopVar = LambdaVariable(loopValue, loopIsNull, arrayType.elementType) +CollectObjectsToSet( + loopValue, loopIsNull, function(loopVar), inputData, collClass) + } +} + +/** + * Expression used to convert a Catalyst Array to an external Scala `Set`. + * The collection is constructed using the associated builder, obtained by calling `newBuilder` + * on the collection's companion object. + * + * Notice that when we convert a Catalyst array which contains duplicated elements to an external + * Scala `Set`, the elements will be de-duplicated. + * + * @param loopValue the name of the loop variable that is used when iterating over the value + * collection, and which is used as input for the `lambdaFunction` + * @param loopIsNull the nullability of the loop variable that is used when iterating over + *the value collection, and which is used as input for the + *`lambdaFunction` + * @param lmbdaFunction A function that takes the `loopValue` as input, and is used as + *a lambda function to handle collection elements. + * @param inputData An expression that when evaluated returns an array object. + * @param collClass The type of the resulting collection. + */ +case class CollectObjectsToSet private( +loopValue: String, +loopIsNull: String, +lambdaFunction: Expression, +inputData: Expression, +collClass: Class[_]) extends Expression with NonSQLExpression { + + override def nullable: Boolean = inputData.nullable + + override def children: Seq[Expression] = lambdaFunction :: inputData :: Nil + + override def eval(input: InternalRow): Any = +throw new UnsupportedOperationException("Only code-generated evaluation is supported") + + override def dataType: DataType = ObjectType(collClass) + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +// The data with PythonUserDefinedType are actually stored with the data type of its sqlType. +def inputDataType(dataType: DataType) = dataType match { + case p: PythonUserDefinedType => p.sqlType + case _ => dataType +} + +val arrayType = inputDataType(inputData.dataType).asInstanceOf[ArrayType] +val loopValueJavaType = ctx.javaType(arrayType.elementType) +ctx.addMutableState("boolean", loopIsNull, "") +ctx.addMutableState(loopValueJavaType, loopValue, "") +val genFunction = lambdaFunction.genCode(ctx) + +val genInputData = inputData.genCode(ctx) +val dataLength = ctx.freshName("dataLength") +val loopIndex = ctx.freshName("loopIndex") +val builderValue = ctx.freshName("builderValue") + +val getLength = s"${genInputData.value}.numElements()" +val getLoopVar = ctx.getValue(genInputData.value, arrayType.elementType, loopIndex) + +// Make a copy of the data if it's unsafe-backed +def makeCopyIfInstanceOf(clazz: Class[_ <: Any], value: String) = + s"$value instanceof ${clazz.getSimpleName}? $value.copy() : $value" +val genFunctionValue = + lambdaFunction.dataType match { +case StructType(_) => makeCopyIfInstanceOf(classOf[UnsafeRow], genFunction.value) +case ArrayType(_, _) => makeCopyIfInstanceOf(classOf[UnsafeArrayData], genFunction.value) +case MapType(_, _, _) => makeCopyIfInstanceOf(classOf[UnsafeMapData], genFunction.value) +case _ => genFunction.value + } + +val loopNullCheck = s"$loopIsNull = ${genInputData.value}.isNullAt($loopIndex);"
[GitHub] spark pull request #18416: [SPARK-21204][SQL][WIP] Add support for Scala Set...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18416#discussion_r123920728 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -992,6 +1123,128 @@ case class ExternalMapToCatalyst private( } } +object ExternalSetToCatalystArray { + private val curId = new java.util.concurrent.atomic.AtomicInteger() + + def apply( + inputSet: Expression, + elementType: DataType, + elementConverter: Expression => Expression, + elementNullable: Boolean): ExternalSetToCatalystArray = { +val id = curId.getAndIncrement() +val elementName = "ExternalSetToCatalystArray_element" + id +val elementIsNull = "ExternalSetToCatalystArray_element_isNull" + id + +ExternalSetToCatalystArray( + elementName, + elementIsNull, + elementType, + elementConverter(LambdaVariable(elementName, elementIsNull, elementType, elementNullable)), + inputSet +) + } +} + +/** + * Converts a Scala/Java set object into catalyst array format, by applying the converter when + * iterate the set. + * + * @param element the name of the set element variable that used when iterate the set, and used as + *input for the `elementConverter` + * @param elementIsNull the nullability of the element variable that used when iterate the set, and + *used as input for the `elementConverter` + * @param elementType the data type of the element variable that used when iterate the set, and + * used as input for the `elementConverter` + * @param elementConverter A function that take the `element` as input, and converts it to catalyst + * array format. + * @param child An expression that when evaluated returns the input set object. + */ +case class ExternalSetToCatalystArray private( +element: String, +elementIsNull: String, +elementType: DataType, +elementConverter: Expression, +child: Expression) + extends UnaryExpression with NonSQLExpression { + + override def foldable: Boolean = false + + override def dataType: ArrayType = ArrayType( +elementType = elementConverter.dataType, containsNull = elementConverter.nullable) + + override def eval(input: InternalRow): Any = +throw new UnsupportedOperationException("Only code-generated evaluation is supported") + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val inputSet = child.genCode(ctx) +val genElementConverter = elementConverter.genCode(ctx) +val length = ctx.freshName("length") +val index = ctx.freshName("index") + +val iter = ctx.freshName("iter") +val (defineIterator, defineElement) = child.dataType match { + case ObjectType(cls) if classOf[java.util.Set[_]].isAssignableFrom(cls) => +val javaIteratorCls = classOf[java.util.Iterator[_]].getName --- End diff -- I'd prefer to leave java set support to other PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18416: [SPARK-21204][SQL][WIP] Add support for Scala Set...
GitHub user viirya opened a pull request: https://github.com/apache/spark/pull/18416 [SPARK-21204][SQL][WIP] Add support for Scala Set collection types in serialization ## What changes were proposed in this pull request? Currently we can't produce a `Dataset` containing `Set` in SparkSQL. There's no corresponding internal data type in SparkSQL for a `Set`. A `Set` will be serialized to an array. ## How was this patch tested? Added unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 SPARK-21204 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18416.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18416 commit db1b91ed3fd0979841d40a20e7db0f64c62a947d Author: Liang-Chi Hsieh Date: 2017-06-25T10:21:27Z Add support for Scala Set collection types in serialization. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org